feat/db-init #3
@@ -1,4 +1,5 @@
|
||||
from .user_repository import UserRepository
|
||||
from .robot_repository import RobotRepository
|
||||
from .product_repository import ProductRepository
|
||||
|
||||
__all__ = ['UserRepository', 'RobotRepository']
|
||||
__all__ = ['UserRepository', 'RobotRepository', 'ProductRepository']
|
||||
|
||||
@@ -0,0 +1,139 @@
|
||||
# database/repositories/product_repository.py
|
||||
from typing import List, Optional
|
||||
from database.connection import get_connection
|
||||
from model.product import Product
|
||||
|
||||
|
||||
class ProductRepository:
|
||||
def get_all(self) -> List[Product]:
|
||||
try:
|
||||
with get_connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT * FROM products ORDER BY id")
|
||||
return [
|
||||
Product(
|
||||
id=row[0],
|
||||
name=row[1],
|
||||
category=row[2],
|
||||
min_stock=row[3],
|
||||
optimal_stock=row[4]
|
||||
) for row in cur.fetchall()
|
||||
]
|
||||
except Exception as e:
|
||||
print(f"Ошибка получения продуктов: {e}")
|
||||
return []
|
||||
|
||||
def get_by_id(self, product_id: str) -> Optional[Product]:
|
||||
try:
|
||||
with get_connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT * FROM products WHERE id = %s", (product_id,))
|
||||
row = cur.fetchone()
|
||||
return Product(*row) if row else None
|
||||
except Exception as e:
|
||||
print(f"Ошибка получения товара {product_id}: {e}")
|
||||
return None
|
||||
|
||||
def get_by_category(self, category: str) -> List[Product]:
|
||||
try:
|
||||
with get_connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT * FROM products WHERE category = %s ORDER BY name", (category,))
|
||||
return [Product(*row) for row in cur.fetchall()]
|
||||
except Exception as e:
|
||||
print(f"Ошибка получения товаров по категории {category}: {e}")
|
||||
return []
|
||||
|
||||
def create_product(self, product_id: str, name: str, category: str,
|
||||
min_stock: int = 10, optimal_stock: int = 100) -> bool:
|
||||
try:
|
||||
with get_connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("""
|
||||
INSERT INTO products (id, name, category, min_stock, optimal_stock)
|
||||
VALUES (%s, %s, %s, %s, %s)
|
||||
""", (product_id, name, category, min_stock, optimal_stock))
|
||||
conn.commit()
|
||||
return cur.rowcount > 0
|
||||
except Exception as e:
|
||||
print(f"Ошибка создания товара {product_id}: {e}")
|
||||
return False
|
||||
|
||||
def update_product(self, product_id: str, name: str = None, category: str = None,
|
||||
min_stock: int = None, optimal_stock: int = None) -> bool:
|
||||
try:
|
||||
with get_connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
updates = []
|
||||
params = []
|
||||
|
||||
if name is not None:
|
||||
updates.append("name = %s")
|
||||
params.append(name)
|
||||
|
||||
if category is not None:
|
||||
updates.append("category = %s")
|
||||
params.append(category)
|
||||
|
||||
if min_stock is not None:
|
||||
updates.append("min_stock = %s")
|
||||
params.append(min_stock)
|
||||
|
||||
if optimal_stock is not None:
|
||||
updates.append("optimal_stock = %s")
|
||||
params.append(optimal_stock)
|
||||
|
||||
if not updates:
|
||||
return False
|
||||
|
||||
params.append(product_id)
|
||||
query = f"UPDATE products SET {', '.join(updates)} WHERE id = %s"
|
||||
|
||||
cur.execute(query, params)
|
||||
conn.commit()
|
||||
return cur.rowcount > 0
|
||||
except Exception as e:
|
||||
print(f"Ошибка обновления товара {product_id}: {e}")
|
||||
return False
|
||||
|
||||
def delete_product(self, product_id: str) -> bool:
|
||||
try:
|
||||
with get_connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("DELETE FROM products WHERE id = %s", (product_id,))
|
||||
conn.commit()
|
||||
return cur.rowcount > 0
|
||||
except Exception as e:
|
||||
print(f"Ошибка удаления товара {product_id}: {e}")
|
||||
return False
|
||||
|
||||
def search_products(self, search_term: str) -> List[Product]:
|
||||
try:
|
||||
with get_connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("""
|
||||
SELECT * FROM products
|
||||
WHERE name ILIKE %s OR id ILIKE %s
|
||||
ORDER BY name
|
||||
""", (f'%{search_term}%', f'%{search_term}%'))
|
||||
return [Product(*row) for row in cur.fetchall()]
|
||||
except Exception as e:
|
||||
print(f"Ошибка поиска товров по названию '{search_term}': {e}")
|
||||
return []
|
||||
|
||||
def get_low_stock_products(self) -> List[Product]:
|
||||
try:
|
||||
with get_connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("""
|
||||
SELECT DISTINCT p.*
|
||||
FROM products p
|
||||
JOIN inventory_history ih ON p.id = ih.product_id
|
||||
WHERE ih.status IN ('LOW_STOCK', 'CRITICAL')
|
||||
AND ih.scanned_at >= NOW() - INTERVAL '1 day'
|
||||
ORDER BY p.name
|
||||
""")
|
||||
return [Product(*row) for row in cur.fetchall()]
|
||||
except Exception as e:
|
||||
print(f"Ошибка получения товаров с низким запасом: {e}")
|
||||
return []
|
||||
@@ -2,7 +2,6 @@
|
||||
from typing import List, Optional
|
||||
from database.connection import get_connection
|
||||
from model.robot import Robot
|
||||
import logging
|
||||
|
||||
class RobotRepository:
|
||||
def get_all(self) -> List[Robot]:
|
||||
|
||||
Reference in New Issue
Block a user