style(database): Refactor repository name

This commit is contained in:
Kita Trofimov
2025-10-26 18:52:54 +03:00
parent 4e3b21f31e
commit c117ceb85e
8 changed files with 10 additions and 13 deletions
+57
View File
@@ -0,0 +1,57 @@
# db/connection.py
import psycopg2
import os
from contextlib import contextmanager
from typing import Generator
from utils.loadDotEnv import initializeENV
initializeENV()
def get_db_config() -> dict:
return {
'host': os.getenv('DB_HOST', 'localhost'),
'port': int(os.getenv('DB_PORT', 5432)),
'db': os.getenv('DB_NAME', 'warehouse_db'),
'user': os.getenv('DB_USER', 'postgres'),
'password': os.getenv('DB_PASSWORD', '')
}
@contextmanager
def get_connection() -> Generator[psycopg2.extensions.connection, None, None]:
conn = None
try:
config = get_db_config()
conn = psycopg2.connect(**config)
print("Подключение к БД установлено")
yield conn
except psycopg2.OperationalError as e:
print(f"Ошибка подключения к БД: {e}")
raise
except Exception as e:
print(f"Неожиданная ошибка: {e}")
if conn:
conn.rollback()
raise
finally:
if conn:
conn.close()
print("БД закрыта")
def test_connection() -> bool:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT version();")
version = cur.fetchone()
print(f" Версия PostgreSQL: {version[0]}")
return True
except Exception as e:
print(f"Тест подключения бд провален: {e}")
return False
print(test_connection())
+7
View File
@@ -0,0 +1,7 @@
from .user_repository import UserRepository
from .robot_repository import RobotRepository
from .product_repository import ProductRepository
from .inventory_repository import InventoryRepository
from .ai_prediction_repository import AIPredictionsRepository
__all__ = ['UserRepository', 'RobotRepository', 'ProductRepository', 'InventoryRepository', 'AIPredictionsRepository']
+102
View File
@@ -0,0 +1,102 @@
from typing import List, Optional
from datetime import datetime, date
from db.connection import get_connection
from model.ai_prediction import AIPrediction
class AIPredictionsRepository:
def get_all(self) -> List[AIPrediction]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM ai_predictions ORDER BY prediction_date DESC, product_id")
return [
AIPrediction(
id=row[0],
product_id=row[1],
prediction_date=row[2],
days_until_stockout=row[3],
recommended_order=row[4],
confidence_score=row[5],
created_at=row[6]
) for row in cur.fetchall()
]
except Exception as e:
print(f"Ошибка получения прогнозов: {e}")
return []
def get_by_id(self, prediction_id: int) -> Optional[AIPrediction]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM ai_predictions WHERE id = %s", (prediction_id,))
row = cur.fetchone()
return AIPrediction(*row) if row else None
except Exception as e:
print(f"Ошибка получения прогноза {prediction_id}: {e}")
return None
def get_by_product(self, product_id: str, limit: int = 10) -> List[AIPrediction]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM ai_predictions
WHERE product_id = %s
ORDER BY prediction_date DESC
LIMIT %s
""", (product_id, limit))
return [AIPrediction(*row) for row in cur.fetchall()]
except Exception as e:
print(f"Ошибка получения прогноза по товару {product_id}: {e}")
return []
def get_latest_predictions(self) -> List[AIPrediction]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT DISTINCT ON (product_id) *
FROM ai_predictions
ORDER BY product_id, prediction_date DESC
""")
return [AIPrediction(*row) for row in cur.fetchall()]
except Exception as e:
print(f"Ошибка получения последних прогнозов по товарам: {e}")
return []
def create_prediction(self, product_id: str, prediction_date: date,
days_until_stockout: int, recommended_order: int,
confidence_score: float) -> Optional[int]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO ai_predictions
(product_id, prediction_date, days_until_stockout, recommended_order, confidence_score)
VALUES (%s, %s, %s, %s, %s)
RETURNING id
""", (product_id, prediction_date, days_until_stockout, recommended_order, confidence_score))
prediction_id = cur.fetchone()[0]
conn.commit()
return prediction_id
except Exception as e:
print(f"Ошибка создания прогноза: {e}")
return None
def delete_old_predictions(self, older_than_days: int = 90) -> int:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
DELETE FROM ai_predictions
WHERE prediction_date < CURRENT_DATE - INTERVAL '%s days'
""", (older_than_days,))
deleted_count = cur.rowcount
conn.commit()
return deleted_count
except Exception as e:
print(f"Ошибка удаления старых прогнозов: {e}")
return 0
+135
View File
@@ -0,0 +1,135 @@
# db/repositories/inventory_repository.py
from typing import List, Optional, Tuple
from datetime import datetime
from db.connection import get_connection
from model.inventory import InventoryRecord
class InventoryRepository:
def get_all(self) -> List[InventoryRecord]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM inventory_history ORDER BY scanned_at DESC")
return [
InventoryRecord(
id=row[0],
robot_id=row[1],
product_id=row[2],
quantity=row[3],
zone=row[4],
row_number=row[5],
shelf_number=row[6],
status=row[7],
scanned_at=row[8],
created_at=row[9]
) for row in cur.fetchall()
]
except Exception as e:
print(f"Ошибка получения истории инвентаризации: {e}")
return []
def get_by_id(self, record_id: int) -> Optional[InventoryRecord]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM inventory_history WHERE id = %s", (record_id,))
row = cur.fetchone()
return InventoryRecord(*row) if row else None
except Exception as e:
print(f"Ошибка получения записи инвентаризации {record_id}: {e}")
return None
def create_record(self, robot_id: str, product_id: str, quantity: int, zone: str,
row_number: int, shelf_number: int, status: str, scanned_at: datetime) -> Optional[int]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO inventory_history
(robot_id, product_id, quantity, zone, row_number, shelf_number, status, scanned_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""", (robot_id, product_id, quantity, zone, row_number, shelf_number, status, scanned_at))
record_id = cur.fetchone()[0]
conn.commit()
return record_id
except Exception as e:
print(f"Ошибка создания записи инвентаризации: {e}")
return None
def get_latest_inventory(self) -> List[InventoryRecord]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT DISTINCT ON (product_id) *
FROM inventory_history
ORDER BY product_id, scanned_at DESC
""")
return [InventoryRecord(*row) for row in cur.fetchall()]
except Exception as e:
print(f"Ошибка получения последней записи инвентаризации по каждому товару: {e}")
return []
def get_records_by_product(self, product_id: str, limit: int = 100) -> List[InventoryRecord]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM inventory_history
WHERE product_id = %s
ORDER BY scanned_at DESC
LIMIT %s
""", (product_id, limit))
return [InventoryRecord(*row) for row in cur.fetchall()]
except Exception as e:
print(f"Ошибка получения записи инвентаризации по продукту {product_id}: {e}")
return []
def get_records_by_robot(self, robot_id: str, limit: int = 100) -> List[InventoryRecord]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM inventory_history
WHERE robot_id = %s
ORDER BY scanned_at DESC
LIMIT %s
""", (robot_id, limit))
return [InventoryRecord(*row) for row in cur.fetchall()]
except Exception as e:
print(f"Ошибка получения записи инвентаризации по роботу {robot_id}: {e}")
return []
def get_records_by_zone(self, zone: str, limit: int = 100) -> List[InventoryRecord]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM inventory_history
WHERE zone = %s
ORDER BY scanned_at DESC
LIMIT %s
""", (zone, limit))
return [InventoryRecord(*row) for row in cur.fetchall()]
except Exception as e:
print(f"Ошибка получения записи инвентаризации по зоне {zone}: {e}")
return []
def get_critical_items(self, hours: int = 24) -> List[InventoryRecord]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM inventory_history
WHERE status IN ('LOW_STOCK', 'CRITICAL')
AND scanned_at >= NOW() - INTERVAL '%s hours'
ORDER BY scanned_at DESC
""", (hours,))
return [InventoryRecord(*row) for row in cur.fetchall()]
except Exception as e:
print(f"Ошибка получения товаров с критическим статусом: {e}")
return []
+139
View File
@@ -0,0 +1,139 @@
# db/repositories/product_repository.py
from typing import List, Optional
from db.connection import get_connection
from model.product import Product
class ProductRepository:
def get_all(self) -> List[Product]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM products ORDER BY id")
return [
Product(
id=row[0],
name=row[1],
category=row[2],
min_stock=row[3],
optimal_stock=row[4]
) for row in cur.fetchall()
]
except Exception as e:
print(f"Ошибка получения продуктов: {e}")
return []
def get_by_id(self, product_id: str) -> Optional[Product]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM products WHERE id = %s", (product_id,))
row = cur.fetchone()
return Product(*row) if row else None
except Exception as e:
print(f"Ошибка получения товара {product_id}: {e}")
return None
def get_by_category(self, category: str) -> List[Product]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM products WHERE category = %s ORDER BY name", (category,))
return [Product(*row) for row in cur.fetchall()]
except Exception as e:
print(f"Ошибка получения товаров по категории {category}: {e}")
return []
def create_product(self, product_id: str, name: str, category: str,
min_stock: int = 10, optimal_stock: int = 100) -> bool:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO products (id, name, category, min_stock, optimal_stock)
VALUES (%s, %s, %s, %s, %s)
""", (product_id, name, category, min_stock, optimal_stock))
conn.commit()
return cur.rowcount > 0
except Exception as e:
print(f"Ошибка создания товара {product_id}: {e}")
return False
def update_product(self, product_id: str, name: str = None, category: str = None,
min_stock: int = None, optimal_stock: int = None) -> bool:
try:
with get_connection() as conn:
with conn.cursor() as cur:
updates = []
params = []
if name is not None:
updates.append("name = %s")
params.append(name)
if category is not None:
updates.append("category = %s")
params.append(category)
if min_stock is not None:
updates.append("min_stock = %s")
params.append(min_stock)
if optimal_stock is not None:
updates.append("optimal_stock = %s")
params.append(optimal_stock)
if not updates:
return False
params.append(product_id)
query = f"UPDATE products SET {', '.join(updates)} WHERE id = %s"
cur.execute(query, params)
conn.commit()
return cur.rowcount > 0
except Exception as e:
print(f"Ошибка обновления товара {product_id}: {e}")
return False
def delete_product(self, product_id: str) -> bool:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("DELETE FROM products WHERE id = %s", (product_id,))
conn.commit()
return cur.rowcount > 0
except Exception as e:
print(f"Ошибка удаления товара {product_id}: {e}")
return False
def search_products(self, search_term: str) -> List[Product]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM products
WHERE name ILIKE %s OR id ILIKE %s
ORDER BY name
""", (f'%{search_term}%', f'%{search_term}%'))
return [Product(*row) for row in cur.fetchall()]
except Exception as e:
print(f"Ошибка поиска товров по названию '{search_term}': {e}")
return []
def get_low_stock_products(self) -> List[Product]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT DISTINCT p.*
FROM products p
JOIN inventory_history ih ON p.id = ih.product_id
WHERE ih.status IN ('LOW_STOCK', 'CRITICAL')
AND ih.scanned_at >= NOW() - INTERVAL '1 day'
ORDER BY p.name
""")
return [Product(*row) for row in cur.fetchall()]
except Exception as e:
print(f"Ошибка получения товаров с низким запасом: {e}")
return []
+133
View File
@@ -0,0 +1,133 @@
# db/repositories/robot_repository.py
from typing import List, Optional
from db.connection import get_connection
from model.robot import Robot
class RobotRepository:
def get_all(self) -> List[Robot]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM robots ORDER BY id")
return [
Robot(
id=row[0],
status=row[1],
battery_level=row[2],
last_update=row[3],
current_zone=row[4],
current_row=row[5],
current_shelf=row[6]
) for row in cur.fetchall()
]
except Exception as e:
print(f"Ошика получения всех роботов: {e}")
return []
def get_by_id(self, robot_id: str) -> Optional[Robot]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM robots WHERE id = %s", (robot_id,))
row = cur.fetchone()
return Robot(*row) if row else None
except Exception as e:
print(f"Ошибка получения роботов {robot_id}: {e}")
return None
def update_robot(self, robot_id: str, status: str = None, battery_level: int = None,
current_zone: str = None, current_row: int = None, current_shelf: int = None) -> bool:
try:
with get_connection() as conn:
with conn.cursor() as cur:
updates = ["last_update = CURRENT_TIMESTAMP"]
params = []
if status is not None:
updates.append("status = %s")
params.append(status)
if battery_level is not None:
updates.append("battery_level = %s")
params.append(battery_level)
if current_zone is not None:
updates.append("current_zone = %s")
params.append(current_zone)
if current_row is not None:
updates.append("current_row = %s")
params.append(current_row)
if current_shelf is not None:
updates.append("current_shelf = %s")
params.append(current_shelf)
params.append(robot_id)
query = f"UPDATE robots SET {', '.join(updates)} WHERE id = %s"
cur.execute(query, params)
conn.commit()
return cur.rowcount > 0
except Exception as e:
print(f"Ошибка обновления робота {robot_id}: {e}")
return False
def get_robots_by_status(self, status: str) -> List[Robot]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM robots WHERE status = %s ORDER BY id", (status,))
return [Robot(*row) for row in cur.fetchall()]
except Exception as e:
print(f"Ошибка получения роботов по статусу {status}: {e}")
return []
def get_low_battery_robots(self, threshold: int = 20) -> List[Robot]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM robots
WHERE battery_level < %s AND status = 'active'
ORDER BY battery_level ASC
""", (threshold,))
return [Robot(*row) for row in cur.fetchall()]
except Exception as e:
print(f"Ошибка получения роботов с низким зарядом: {e}")
return []
def get_robots_in_zone(self, zone: str) -> List[Robot]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM robots WHERE current_zone = %s ORDER BY id", (zone,))
return [Robot(*row) for row in cur.fetchall()]
except Exception as e:
print(f"Ошибка получения роботов в зоне {zone}: {e}")
return []
def create_robot(self, robot_id: str, status: str = 'active', battery_level: int = 100) -> bool:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO robots (id, status, battery_level, last_update)
VALUES (%s, %s, %s, CURRENT_TIMESTAMP)
""", (robot_id, status, battery_level))
conn.commit()
return cur.rowcount > 0
except Exception as e:
print(f"Ошибка создания робота {robot_id}: {e}")
return False
def delete_robot(self, robot_id: str) -> bool:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("DELETE FROM robots WHERE id = %s", (robot_id,))
conn.commit()
return cur.rowcount > 0
except Exception as e:
print(f"Ошибка удаления робота {robot_id}: {e}")
return False
+130
View File
@@ -0,0 +1,130 @@
from typing import List, Optional
from model.user import User
from db.connection import get_connection
class UserRepository:
def get_all(self) -> List[User]:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM users ORDER BY id")
return [
User(
id=row[0],
email=row[1],
password_hash=row[2],
name=row[3],
role=row[4],
created_at=row[5]
) for row in cur.fetchall()
]
def get_by_id(self, user_id: int) -> Optional[User]:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM users WHERE id = %s", (user_id,))
row = cur.fetchone()
if row:
return User(*row)
return None
def get_by_email(self, email: str) -> Optional[User]:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM users WHERE email = %s", (email,))
row = cur.fetchone()
if row:
return User(*row)
return None
def create_user(self, email: str, password_hash: str, name: str, role: str) -> Optional[User]:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO users (email, password_hash, name, role)
VALUES (%s, %s, %s, %s)
RETURNING id, email, password_hash, name, role, created_at
""", (email, password_hash, name, role))
row = cur.fetchone()
conn.commit()
if row:
return User(*row)
return None
def update_user(self, user_id: int, name: str = None, role: str = None) -> bool:
with get_connection() as conn:
with conn.cursor() as cur:
updates = []
params = []
if name is not None:
updates.append("name = %s")
params.append(name)
if role is not None:
updates.append("role = %s")
params.append(role)
if not updates:
return False
params.append(user_id)
query = f"UPDATE users SET {', '.join(updates)} WHERE id = %s"
cur.execute(query, params)
conn.commit()
return cur.rowcount > 0
def delete_user(self, user_id: int) -> bool:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("DELETE FROM users WHERE id = %s", (user_id,))
conn.commit()
return cur.rowcount > 0
def get_users_by_role(self, role: str) -> List[User]:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM users WHERE role = %s ORDER BY name", (role,))
return [
User(*row) for row in cur.fetchall()
]
def change_password(self, user_id: int, new_password_hash: str) -> bool:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
UPDATE users
SET password_hash = %s
WHERE id = %s
""", (new_password_hash, user_id))
conn.commit()
return cur.rowcount > 0
def authenticate_user(self, email: str, password_hash: str) -> Optional[User]:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM users
WHERE email = %s AND password_hash = %s
""", (email, password_hash))
row = cur.fetchone()
if row:
return User(*row)
return None
def is_valid_authenticate(self, email: str, password_hash: str) -> bool:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT 1 FROM users
WHERE email = %s AND password_hash = %s
""", (email, password_hash))
return cur.fetchone() is not None
def user_exists(self, email: str) -> bool:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT 1 FROM users WHERE email = %s", (email,))
return cur.fetchone() is not None