feat(database/repositories/ai_prediction_repository.py): Added new repository AIPredictionsRepository

This commit is contained in:
Kita Trofimov
2025-10-26 16:09:33 +03:00
parent 64e81aec69
commit 251e1b6e57
2 changed files with 104 additions and 1 deletions
+2 -1
View File
@@ -2,5 +2,6 @@ from .user_repository import UserRepository
from .robot_repository import RobotRepository
from .product_repository import ProductRepository
from .inventory_repository import InventoryRepository
from .ai_prediction_repository import AIPredictionsRepository
__all__ = ['UserRepository', 'RobotRepository', 'ProductRepository', 'InventoryRepository']
__all__ = ['UserRepository', 'RobotRepository', 'ProductRepository', 'InventoryRepository', 'AIPredictionsRepository']
@@ -0,0 +1,102 @@
from typing import List, Optional
from datetime import datetime, date
from database.connection import get_connection
from model.ai_prediction import AIPrediction
class AIPredictionsRepository:
def get_all(self) -> List[AIPrediction]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM ai_predictions ORDER BY prediction_date DESC, product_id")
return [
AIPrediction(
id=row[0],
product_id=row[1],
prediction_date=row[2],
days_until_stockout=row[3],
recommended_order=row[4],
confidence_score=row[5],
created_at=row[6]
) for row in cur.fetchall()
]
except Exception as e:
print(f"Ошибка получения прогнозов: {e}")
return []
def get_by_id(self, prediction_id: int) -> Optional[AIPrediction]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM ai_predictions WHERE id = %s", (prediction_id,))
row = cur.fetchone()
return AIPrediction(*row) if row else None
except Exception as e:
print(f"Ошибка получения прогноза {prediction_id}: {e}")
return None
def get_by_product(self, product_id: str, limit: int = 10) -> List[AIPrediction]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM ai_predictions
WHERE product_id = %s
ORDER BY prediction_date DESC
LIMIT %s
""", (product_id, limit))
return [AIPrediction(*row) for row in cur.fetchall()]
except Exception as e:
print(f"Ошибка получения прогноза по товару {product_id}: {e}")
return []
def get_latest_predictions(self) -> List[AIPrediction]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT DISTINCT ON (product_id) *
FROM ai_predictions
ORDER BY product_id, prediction_date DESC
""")
return [AIPrediction(*row) for row in cur.fetchall()]
except Exception as e:
print(f"Ошибка получения последних прогнозов по товарам: {e}")
return []
def create_prediction(self, product_id: str, prediction_date: date,
days_until_stockout: int, recommended_order: int,
confidence_score: float) -> Optional[int]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO ai_predictions
(product_id, prediction_date, days_until_stockout, recommended_order, confidence_score)
VALUES (%s, %s, %s, %s, %s)
RETURNING id
""", (product_id, prediction_date, days_until_stockout, recommended_order, confidence_score))
prediction_id = cur.fetchone()[0]
conn.commit()
return prediction_id
except Exception as e:
print(f"Ошибка создания прогноза: {e}")
return None
def delete_old_predictions(self, older_than_days: int = 90) -> int:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
DELETE FROM ai_predictions
WHERE prediction_date < CURRENT_DATE - INTERVAL '%s days'
""", (older_than_days,))
deleted_count = cur.rowcount
conn.commit()
return deleted_count
except Exception as e:
print(f"Ошибка удаления старых прогнозов: {e}")
return 0