Files
Backend/db/repositories/inventory_repository.py
T

163 lines
7.7 KiB
Python

# db/repositories/inventory_repository.py
from typing import List, Optional, Tuple
from datetime import datetime
import logging
from db.connection import get_connection
from model.inventory import InventoryRecord
logger = logging.getLogger(__name__)
class InventoryRepository:
def get_all(self) -> List[InventoryRecord]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM inventory_history ORDER BY scanned_at DESC")
records = [
InventoryRecord(
id=row[0],
robot_id=row[1],
product_id=row[2],
quantity=row[3],
zone=row[4],
row_number=row[5],
shelf_number=row[6],
status=row[7],
scanned_at=row[8],
created_at=row[9]
) for row in cur.fetchall()
]
logger.debug(f"Получено {len(records)} записей инвентаризации")
return records
except Exception as e:
logger.error(f"Ошибка получения истории инвентаризации: {e}")
return []
def get_by_id(self, record_id: int) -> Optional[InventoryRecord]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM inventory_history WHERE id = %s", (record_id,))
row = cur.fetchone()
if row:
logger.debug(f"Запись инвентаризации {record_id} найдена")
return InventoryRecord(*row)
logger.warning(f"Запись инвентаризации {record_id} не найдена")
return None
except Exception as e:
logger.error(f"Ошибка получения записи инвентаризации {record_id}: {e}")
return None
def create_record(
self,
robot_id: str,
product_id: str,
quantity: int,
zone: str,
row_number: int,
shelf_number: int,
status: str,
scanned_at: datetime
) -> Optional[int]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO inventory_history
(robot_id, product_id, quantity, zone, row_number, shelf_number, status, scanned_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""", (robot_id, product_id, quantity, zone, row_number, shelf_number, status, scanned_at))
record_id = cur.fetchone()[0]
conn.commit()
logger.debug(f"Создана новая запись инвентаризации ID: {record_id}")
return record_id
except Exception as e:
logger.error(f"Ошибка создания записи инвентаризации: {e}")
return None
def get_latest_inventory(self) -> List[InventoryRecord]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT DISTINCT ON (product_id) *
FROM inventory_history
ORDER BY product_id, scanned_at DESC
""")
records = [InventoryRecord(*row) for row in cur.fetchall()]
logger.debug(f"Получено {len(records)} последних записей инвентаризации по товарам")
return records
except Exception as e:
logger.error(f"Ошибка получения последней записи инвентаризации по каждому товару: {e}")
return []
def get_records_by_product(self, product_id: str, limit: int = 100) -> List[InventoryRecord]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM inventory_history
WHERE product_id = %s
ORDER BY scanned_at DESC
LIMIT %s
""", (product_id, limit))
records = [InventoryRecord(*row) for row in cur.fetchall()]
logger.debug(f"Получено {len(records)} записей инвентаризации для товара {product_id}")
return records
except Exception as e:
logger.error(f"Ошибка получения записи инвентаризации по продукту {product_id}: {e}")
return []
def get_records_by_robot(self, robot_id: str, limit: int = 100) -> List[InventoryRecord]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM inventory_history
WHERE robot_id = %s
ORDER BY scanned_at DESC
LIMIT %s
""", (robot_id, limit))
records = [InventoryRecord(*row) for row in cur.fetchall()]
logger.debug(f"Получено {len(records)} записей инвентаризации для робота {robot_id}")
return records
except Exception as e:
logger.error(f"Ошибка получения записи инвентаризации по роботу {robot_id}: {e}")
return []
def get_records_by_zone(self, zone: str, limit: int = 100) -> List[InventoryRecord]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM inventory_history
WHERE zone = %s
ORDER BY scanned_at DESC
LIMIT %s
""", (zone, limit))
records = [InventoryRecord(*row) for row in cur.fetchall()]
logger.debug(f"Получено {len(records)} записей инвентаризации для зоны {zone}")
return records
except Exception as e:
logger.error(f"Ошибка получения записи инвентаризации по зоне {zone}: {e}")
return []
def get_critical_items(self, hours: int = 24) -> List[InventoryRecord]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM inventory_history
WHERE status IN ('LOW_STOCK', 'CRITICAL')
AND scanned_at >= NOW() - INTERVAL '%s hours'
ORDER BY scanned_at DESC
""", (hours,))
records = [InventoryRecord(*row) for row in cur.fetchall()]
logger.debug(f"Получено {len(records)} критических товаров за последние {hours} часов")
return records
except Exception as e:
logger.error(f"Ошибка получения товаров с критическим статусом: {e}")
return []