from typing import List, Optional from model.user import User from database.connection import get_connection class UserRepository: def get_all(self) -> List[User]: with get_connection() as conn: with conn.cursor() as cur: cur.execute("SELECT * FROM users ORDER BY id") return [ User( id=row[0], email=row[1], password_hash=row[2], name=row[3], role=row[4], created_at=row[5] ) for row in cur.fetchall() ] def get_by_id(self, user_id: int) -> Optional[User]: with get_connection() as conn: with conn.cursor() as cur: cur.execute("SELECT * FROM users WHERE id = %s", (user_id,)) row = cur.fetchone() if row: return User(*row) return None def get_by_email(self, email: str) -> Optional[User]: with get_connection() as conn: with conn.cursor() as cur: cur.execute("SELECT * FROM users WHERE email = %s", (email,)) row = cur.fetchone() if row: return User(*row) return None def create_user(self, email: str, password_hash: str, name: str, role: str) -> Optional[User]: with get_connection() as conn: with conn.cursor() as cur: cur.execute(""" INSERT INTO users (email, password_hash, name, role) VALUES (%s, %s, %s, %s) RETURNING id, email, password_hash, name, role, created_at """, (email, password_hash, name, role)) row = cur.fetchone() conn.commit() if row: return User(*row) return None def update_user(self, user_id: int, name: str = None, role: str = None) -> bool: with get_connection() as conn: with conn.cursor() as cur: updates = [] params = [] if name is not None: updates.append("name = %s") params.append(name) if role is not None: updates.append("role = %s") params.append(role) if not updates: return False params.append(user_id) query = f"UPDATE users SET {', '.join(updates)} WHERE id = %s" cur.execute(query, params) conn.commit() return cur.rowcount > 0 def delete_user(self, user_id: int) -> bool: with get_connection() as conn: with conn.cursor() as cur: cur.execute("DELETE FROM users WHERE id = %s", (user_id,)) conn.commit() return cur.rowcount > 0 def get_users_by_role(self, role: str) -> List[User]: with get_connection() as conn: with conn.cursor() as cur: cur.execute("SELECT * FROM users WHERE role = %s ORDER BY name", (role,)) return [ User(*row) for row in cur.fetchall() ] def change_password(self, user_id: int, new_password_hash: str) -> bool: with get_connection() as conn: with conn.cursor() as cur: cur.execute(""" UPDATE users SET password_hash = %s WHERE id = %s """, (new_password_hash, user_id)) conn.commit() return cur.rowcount > 0 def authenticate_user(self, email: str, password_hash: str) -> Optional[User]: with get_connection() as conn: with conn.cursor() as cur: cur.execute(""" SELECT * FROM users WHERE email = %s AND password_hash = %s """, (email, password_hash)) row = cur.fetchone() if row: return User(*row) return None def user_exists(self, email: str) -> bool: with get_connection() as conn: with conn.cursor() as cur: cur.execute("SELECT 1 FROM users WHERE email = %s", (email,)) return cur.fetchone() is not None