feat(database/repositories/user_repository.py): Added user repository

This commit is contained in:
Kita Trofimov
2025-10-26 13:14:51 +03:00
parent 506f2f3f39
commit c1bab1b304
3 changed files with 128 additions and 0 deletions
+3
View File
@@ -52,3 +52,6 @@ def test_connection() -> bool:
except Exception as e:
print(f"Тест подключения бд провален: {e}")
return False
print(test_connection())
+3
View File
@@ -0,0 +1,3 @@
from .user_repository import UserRepository
__all__ = ['UserRepository']
+122
View File
@@ -0,0 +1,122 @@
from typing import List, Optional
from model.user import User
from database.connection import get_connection
class UserRepository:
def get_all(self) -> List[User]:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM users ORDER BY id")
return [
User(
id=row[0],
email=row[1],
password_hash=row[2],
name=row[3],
role=row[4],
created_at=row[5]
) for row in cur.fetchall()
]
def get_by_id(self, user_id: int) -> Optional[User]:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM users WHERE id = %s", (user_id,))
row = cur.fetchone()
if row:
return User(*row)
return None
def get_by_email(self, email: str) -> Optional[User]:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM users WHERE email = %s", (email,))
row = cur.fetchone()
if row:
return User(*row)
return None
def create_user(self, email: str, password_hash: str, name: str, role: str) -> Optional[User]:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO users (email, password_hash, name, role)
VALUES (%s, %s, %s, %s)
RETURNING id, email, password_hash, name, role, created_at
""", (email, password_hash, name, role))
row = cur.fetchone()
conn.commit()
if row:
return User(*row)
return None
def update_user(self, user_id: int, name: str = None, role: str = None) -> bool:
with get_connection() as conn:
with conn.cursor() as cur:
updates = []
params = []
if name is not None:
updates.append("name = %s")
params.append(name)
if role is not None:
updates.append("role = %s")
params.append(role)
if not updates:
return False
params.append(user_id)
query = f"UPDATE users SET {', '.join(updates)} WHERE id = %s"
cur.execute(query, params)
conn.commit()
return cur.rowcount > 0
def delete_user(self, user_id: int) -> bool:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("DELETE FROM users WHERE id = %s", (user_id,))
conn.commit()
return cur.rowcount > 0
def get_users_by_role(self, role: str) -> List[User]:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM users WHERE role = %s ORDER BY name", (role,))
return [
User(*row) for row in cur.fetchall()
]
def change_password(self, user_id: int, new_password_hash: str) -> bool:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
UPDATE users
SET password_hash = %s
WHERE id = %s
""", (new_password_hash, user_id))
conn.commit()
return cur.rowcount > 0
def authenticate_user(self, email: str, password_hash: str) -> Optional[User]:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM users
WHERE email = %s AND password_hash = %s
""", (email, password_hash))
row = cur.fetchone()
if row:
return User(*row)
return None
def user_exists(self, email: str) -> bool:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT 1 FROM users WHERE email = %s", (email,))
return cur.fetchone() is not None