diff --git a/database/repositories/__init__.py b/database/repositories/__init__.py index 7ba25ec..fc2093e 100644 --- a/database/repositories/__init__.py +++ b/database/repositories/__init__.py @@ -1,5 +1,6 @@ from .user_repository import UserRepository from .robot_repository import RobotRepository from .product_repository import ProductRepository +from .inventory_repository import InventoryRepository -__all__ = ['UserRepository', 'RobotRepository', 'ProductRepository'] +__all__ = ['UserRepository', 'RobotRepository', 'ProductRepository', 'InventoryRepository'] diff --git a/database/repositories/inventory_repository.py b/database/repositories/inventory_repository.py new file mode 100644 index 0000000..bc101b5 --- /dev/null +++ b/database/repositories/inventory_repository.py @@ -0,0 +1,135 @@ +# database/repositories/inventory_repository.py +from typing import List, Optional, Tuple +from datetime import datetime +from database.connection import get_connection +from model.inventory import InventoryRecord + + + +class InventoryRepository: + def get_all(self) -> List[InventoryRecord]: + try: + with get_connection() as conn: + with conn.cursor() as cur: + cur.execute("SELECT * FROM inventory_history ORDER BY scanned_at DESC") + return [ + InventoryRecord( + id=row[0], + robot_id=row[1], + product_id=row[2], + quantity=row[3], + zone=row[4], + row_number=row[5], + shelf_number=row[6], + status=row[7], + scanned_at=row[8], + created_at=row[9] + ) for row in cur.fetchall() + ] + except Exception as e: + print(f"Ошибка получения истории инвентаризации: {e}") + return [] + + def get_by_id(self, record_id: int) -> Optional[InventoryRecord]: + try: + with get_connection() as conn: + with conn.cursor() as cur: + cur.execute("SELECT * FROM inventory_history WHERE id = %s", (record_id,)) + row = cur.fetchone() + return InventoryRecord(*row) if row else None + except Exception as e: + print(f"Ошибка получения записи инвентаризации {record_id}: {e}") + return None + + def create_record(self, robot_id: str, product_id: str, quantity: int, zone: str, + row_number: int, shelf_number: int, status: str, scanned_at: datetime) -> Optional[int]: + try: + with get_connection() as conn: + with conn.cursor() as cur: + cur.execute(""" + INSERT INTO inventory_history + (robot_id, product_id, quantity, zone, row_number, shelf_number, status, scanned_at) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) + RETURNING id + """, (robot_id, product_id, quantity, zone, row_number, shelf_number, status, scanned_at)) + + record_id = cur.fetchone()[0] + conn.commit() + return record_id + except Exception as e: + print(f"Ошибка создания записи инвентаризации: {e}") + return None + + def get_latest_inventory(self) -> List[InventoryRecord]: + try: + with get_connection() as conn: + with conn.cursor() as cur: + cur.execute(""" + SELECT DISTINCT ON (product_id) * + FROM inventory_history + ORDER BY product_id, scanned_at DESC + """) + return [InventoryRecord(*row) for row in cur.fetchall()] + except Exception as e: + print(f"Ошибка получения последней записи инвентаризации по каждому товару: {e}") + return [] + + def get_records_by_product(self, product_id: str, limit: int = 100) -> List[InventoryRecord]: + try: + with get_connection() as conn: + with conn.cursor() as cur: + cur.execute(""" + SELECT * FROM inventory_history + WHERE product_id = %s + ORDER BY scanned_at DESC + LIMIT %s + """, (product_id, limit)) + return [InventoryRecord(*row) for row in cur.fetchall()] + except Exception as e: + print(f"Ошибка получения записи инвентаризации по продукту {product_id}: {e}") + return [] + + def get_records_by_robot(self, robot_id: str, limit: int = 100) -> List[InventoryRecord]: + try: + with get_connection() as conn: + with conn.cursor() as cur: + cur.execute(""" + SELECT * FROM inventory_history + WHERE robot_id = %s + ORDER BY scanned_at DESC + LIMIT %s + """, (robot_id, limit)) + return [InventoryRecord(*row) for row in cur.fetchall()] + except Exception as e: + print(f"Ошибка получения записи инвентаризации по роботу {robot_id}: {e}") + return [] + + def get_records_by_zone(self, zone: str, limit: int = 100) -> List[InventoryRecord]: + try: + with get_connection() as conn: + with conn.cursor() as cur: + cur.execute(""" + SELECT * FROM inventory_history + WHERE zone = %s + ORDER BY scanned_at DESC + LIMIT %s + """, (zone, limit)) + return [InventoryRecord(*row) for row in cur.fetchall()] + except Exception as e: + print(f"Ошибка получения записи инвентаризации по зоне {zone}: {e}") + return [] + + def get_critical_items(self, hours: int = 24) -> List[InventoryRecord]: + try: + with get_connection() as conn: + with conn.cursor() as cur: + cur.execute(""" + SELECT * FROM inventory_history + WHERE status IN ('LOW_STOCK', 'CRITICAL') + AND scanned_at >= NOW() - INTERVAL '%s hours' + ORDER BY scanned_at DESC + """, (hours,)) + return [InventoryRecord(*row) for row in cur.fetchall()] + except Exception as e: + print(f"Ошибка получения товаров с критическим статусом: {e}") + return []