from typing import List, Optional import logging from model.user import User from db.connection import get_connection logger = logging.getLogger(__name__) class UserRepository: def get_all(self) -> List[User]: try: with get_connection() as conn: with conn.cursor() as cur: cur.execute("SELECT * FROM users ORDER BY id") users = [ User( id=row[0], email=row[1], password_hash=row[2], name=row[3], role=row[4], created_at=row[5] ) for row in cur.fetchall() ] logger.info(f"Получено {len(users)} пользователей") return users except Exception as e: logger.error(f"Ошибка получения пользователей: {e}") return [] def get_by_id(self, user_id: int) -> Optional[User]: try: with get_connection() as conn: with conn.cursor() as cur: cur.execute("SELECT * FROM users WHERE id = %s", (user_id,)) row = cur.fetchone() if row: logger.info(f"Пользователь {user_id} найден") return User(*row) logger.warning(f"Пользователь {user_id} не найден") return None except Exception as e: logger.error(f"Ошибка получения пользователя {user_id}: {e}") return None def get_by_email(self, email: str) -> Optional[User]: try: with get_connection() as conn: with conn.cursor() as cur: cur.execute("SELECT * FROM users WHERE email = %s", (email,)) row = cur.fetchone() if row: logger.info(f"Пользователь с email {email} найден") return User(*row) logger.warning(f"Пользователь с email {email} не найден") return None except Exception as e: logger.error(f"Ошибка получения пользователя по email {email}: {e}") return None def create_user(self, email: str, password_hash: str, name: str, role: str) -> Optional[User]: try: with get_connection() as conn: with conn.cursor() as cur: cur.execute(""" INSERT INTO users (email, password_hash, name, role) VALUES (%s, %s, %s, %s) RETURNING id, email, password_hash, name, role, created_at """, (email, password_hash, name, role)) row = cur.fetchone() conn.commit() if row: logger.info(f"Создан новый пользователь {email} с ID {row[0]}") return User(*row) logger.warning(f"Не удалось создать пользователя {email}") return None except Exception as e: logger.error(f"Ошибка создания пользователя {email}: {e}") return None def update_user(self, user_id: int, name: str = None, role: str = None) -> bool: try: with get_connection() as conn: with conn.cursor() as cur: updates = [] params = [] if name is not None: updates.append("name = %s") params.append(name) if role is not None: updates.append("role = %s") params.append(role) if not updates: logger.warning(f"Нет данных для обновления пользователя {user_id}") return False params.append(user_id) query = f"UPDATE users SET {', '.join(updates)} WHERE id = %s" cur.execute(query, params) conn.commit() success = cur.rowcount > 0 if success: logger.info(f"Пользователь {user_id} успешно обновлен") else: logger.warning(f"Пользователь {user_id} не найден для обновления") return success except Exception as e: logger.error(f"Ошибка обновления пользователя {user_id}: {e}") return False def delete_user(self, user_id: int) -> bool: try: with get_connection() as conn: with conn.cursor() as cur: cur.execute("DELETE FROM users WHERE id = %s", (user_id,)) conn.commit() success = cur.rowcount > 0 if success: logger.info(f"Пользователь {user_id} удален") else: logger.warning(f"Пользователь {user_id} не найден для удаления") return success except Exception as e: logger.error(f"Ошибка удаления пользователя {user_id}: {e}") return False def get_users_by_role(self, role: str) -> List[User]: try: with get_connection() as conn: with conn.cursor() as cur: cur.execute("SELECT * FROM users WHERE role = %s ORDER BY name", (role,)) users = [User(*row) for row in cur.fetchall()] logger.info(f"Получено {len(users)} пользователей с ролью {role}") return users except Exception as e: logger.error(f"Ошибка получения пользователей по роли {role}: {e}") return [] def change_password(self, user_id: int, new_password_hash: str) -> bool: try: with get_connection() as conn: with conn.cursor() as cur: cur.execute(""" UPDATE users SET password_hash = %s WHERE id = %s """, (new_password_hash, user_id)) conn.commit() success = cur.rowcount > 0 if success: logger.info(f"Пароль пользователя {user_id} изменен") else: logger.warning(f"Пользователь {user_id} не найден для смены пароля") return success except Exception as e: logger.error(f"Ошибка смены пароля пользователя {user_id}: {e}") return False def authenticate_user(self, email: str, password_hash: str) -> Optional[User]: try: with get_connection() as conn: with conn.cursor() as cur: cur.execute(""" SELECT * FROM users WHERE email = %s AND password_hash = %s """, (email, password_hash)) row = cur.fetchone() if row: logger.info(f"Успешная аутентификация пользователя {email}") return User(*row) logger.warning(f"Неудачная аутентификация пользователя {email}") return None except Exception as e: logger.error(f"Ошибка аутентификации пользователя {email}: {e}") return None def is_valid_authenticate(self, email: str, password_hash: str) -> bool: try: with get_connection() as conn: with conn.cursor() as cur: cur.execute(""" SELECT 1 FROM users WHERE email = %s AND password_hash = %s """, (email, password_hash)) is_valid = cur.fetchone() is not None if is_valid: logger.info(f"Валидные учетные данные для пользователя {email}") else: logger.warning(f"Невалидные учетные данные для пользователя {email}") return is_valid except Exception as e: logger.error(f"Ошибка проверки учетных данных пользователя {email}: {e}") return False def user_exists(self, email: str) -> bool: try: with get_connection() as conn: with conn.cursor() as cur: cur.execute("SELECT 1 FROM users WHERE email = %s", (email,)) exists = cur.fetchone() is not None if exists: logger.info(f"Пользователь с email {email} существует") else: logger.info(f"Пользователь с email {email} не существует") return exists except Exception as e: logger.error(f"Ошибка проверки существования пользователя {email}: {e}") return False