41 Commits

Author SHA1 Message Date
Kita Trofimov 7fdd3c56cb Merge remote-tracking branch 'origin/feat/db-init' into feat/db-init
# Conflicts:
#	api/loginapi.py
#	db/connection.py
#	db/repositories/ai_prediction_repository.py
#	db/repositories/inventory_repository.py
#	db/repositories/product_repository.py
#	db/repositories/robot_repository.py
#	db/repositories/user_repository.py
2025-10-27 23:04:10 +03:00
Kita Trofimov 052c93928c feat(db): Database initialization has been added 2025-10-27 23:02:19 +03:00
Sweetbread e44696ce04 wip 2025-10-26 23:19:51 +03:00
Sweetbread 72f766ab46 fix(log): replace logger with loguru (again!) 2025-10-26 23:09:42 +03:00
Kita Trofimov 9ecb6a83ba style(database): Fixed the issue with long function declarations 2025-10-26 21:25:59 +03:00
Kita Trofimov 951e4c7e45 style(database): Changed logging 2025-10-26 21:25:59 +03:00
Kita Trofimov 7786680f43 fix(database): Fixed the database connection 2025-10-26 21:16:23 +03:00
Kita Trofimov 33c3b81f6e style(database): Refactor repository name 2025-10-26 21:16:23 +03:00
Kita Trofimov 6006e074d8 fix(model/user.py): Removed unnecesary methods 2025-10-26 21:16:23 +03:00
Kita Trofimov 771f7e0549 feat(database/repositories/user_repository.py): A new login verification method has been added 2025-10-26 21:16:23 +03:00
Kita Trofimov fd1b7df93a feat(database/repositories/ai_prediction_repository.py): Added new repository AIPredictionsRepository 2025-10-26 21:16:23 +03:00
Kita Trofimov 88c4460da9 feat(model/ai_prediction.py): Added new model AIPrediction 2025-10-26 21:16:23 +03:00
Kita Trofimov 51bf856434 feat(database/repositories/inventory_repository.py): Added inventory repository 2025-10-26 21:16:23 +03:00
Kita Trofimov 0ded143c55 feat(database/repositories/product_repository.py): Added product repository 2025-10-26 21:16:23 +03:00
Kita Trofimov 056d800c4f feat(database/repositories/robot_repository.py): Added robot_repository 2025-10-26 21:16:23 +03:00
Kita Trofimov 3be4556844 feat(database/repositories/user_repository.py): Added user repository 2025-10-26 21:16:23 +03:00
Kita Trofimov 928677fea8 feat(database): Implemented database connection 2025-10-26 21:16:23 +03:00
Kita Trofimov e80c5645f1 feat(model): Added correct data models 2025-10-26 21:16:23 +03:00
Sweetbread 70041fa58e feat(jwt): add iat field 2025-10-26 21:16:23 +03:00
Sweetbread 6f478d717a feat(log): use loguru 2025-10-26 21:16:23 +03:00
Sweetbread be5732bc32 refactor(auth): remove /api/auth/* handlers to api/auth.py 2025-10-26 21:16:23 +03:00
Sweetbread 4ad7869e27 refactor(user): rename to toJson back 2025-10-26 21:16:23 +03:00
Sweetbread 67c06a0203 feat(auth): add data validation 2025-10-26 21:16:23 +03:00
Sweetbread df5317432d refactor(user): change class name to uppercase 2025-10-26 21:16:23 +03:00
Sweetbread 96a310a80d fix(env): ensure the .env is loaded 2025-10-26 21:16:22 +03:00
Kirill ec985a808d fix(api/auth/loginapi.py, app.py, utils/PostgressConnect.py, utils/createLogger.py, utils/loadDotEnv.py): Review fix
https://g.codrs.ru/Hackaton/Backend/pulls/2#issuecomment-23
https://g.codrs.ru/Hackaton/Backend/pulls/2#issuecomment-31
2025-10-26 21:16:17 +03:00
Kita Trofimov c33e8798ed style(database): Fixed type of logger on the db sucsess operations 2025-10-26 20:31:51 +03:00
Kita Trofimov 401205aea2 style(database): Fixed the issue with long function declarations 2025-10-26 19:41:11 +03:00
Kita Trofimov 8c07b1febc style(database): Changed logging 2025-10-26 19:32:34 +03:00
Kita Trofimov 3e4755cca1 fix(database): Fixed the database connection 2025-10-26 19:14:51 +03:00
Kita Trofimov c117ceb85e style(database): Refactor repository name 2025-10-26 18:52:54 +03:00
Kita Trofimov 4e3b21f31e fix(model/user.py): Removed unnecesary methods 2025-10-26 18:48:54 +03:00
Kita Trofimov 6483afa0d4 feat(database/repositories/user_repository.py): A new login verification method has been added 2025-10-26 16:32:02 +03:00
Kita Trofimov 251e1b6e57 feat(database/repositories/ai_prediction_repository.py): Added new repository AIPredictionsRepository 2025-10-26 16:09:33 +03:00
Kita Trofimov 64e81aec69 feat(model/ai_prediction.py): Added new model AIPrediction 2025-10-26 15:52:48 +03:00
Kita Trofimov 6d5abeb88d feat(database/repositories/inventory_repository.py): Added inventory repository 2025-10-26 15:42:20 +03:00
Kita Trofimov fa00e27015 feat(database/repositories/product_repository.py): Added product repository 2025-10-26 15:04:47 +03:00
Kita Trofimov 2f6782ab9d feat(database/repositories/robot_repository.py): Added robot_repository 2025-10-26 14:09:21 +03:00
Kita Trofimov c1bab1b304 feat(database/repositories/user_repository.py): Added user repository 2025-10-26 13:14:51 +03:00
Kita Trofimov 506f2f3f39 feat(database): Implemented database connection 2025-10-26 00:43:32 +03:00
Kita Trofimov 79cac3d8cb feat(model): Added correct data models 2025-10-26 00:11:20 +03:00
22 changed files with 1165 additions and 50 deletions
+2
View File
@@ -0,0 +1,2 @@
KEY= # Key for JWT token
POSTGRES_URL=postgresql://
+35
View File
@@ -0,0 +1,35 @@
from flask import Blueprint, request, jsonify
from model.user import User
from db.repositories.user_repository import UserRepository # FIXME: authenticate_user as get_user
from utils.token import generateKey as getToken
auth = Blueprint("auth", __name__)
@auth.route('/login', methods = ['POST'])
def login():
if request.is_json:
req = request.json
email = req.get('email')
password = req.get('password')
if not email or not password:
return "Request must have email and password", 400
if len(email.strip()) < 4 or '@' not in email or '.' not in email:
return "Email is incorrect", 400
if len(password.strip()) < 8:
return "Password is too short", 400
user = UserRepository().authenticate_user(email, password)
if not user:
return "Wrong credentials", 400
token = getToken(user)
return jsonify({'token': token, 'user': {'id': user.id, 'name': user.name, 'role': user.role}})
else:
return "Request is not a json", 400
-12
View File
@@ -1,12 +0,0 @@
from flask import Blueprint, request, jsonify
from model.user import user
loginBP = Blueprint("loginapi", __name__)
@loginBP.route('/api/login', methods = ['POST'])
def login():
email = request.form['email']
password = request.form['password']
#if(isvalid(email, password)):
us = user(email, password)
return jsonify(us.toDictionary())
+5 -3
View File
@@ -1,9 +1,11 @@
from sys import exit
from flask import Flask
from api.auth.loginapi import loginBP
from api.auth import auth
from utils.loadDotEnv import initializeENV
state = initializeENV()
if not initializeENV():
exit(-1)
app = Flask(__name__)
app.register_blueprint(loginBP)
app.register_blueprint(auth, url_prefix='/api/auth')
+65
View File
@@ -0,0 +1,65 @@
import psycopg2
import os
from contextlib import contextmanager
from typing import Generator
from loguru import logger
from utils.loadDotEnv import initializeENV
initializeENV()
def PSQLConnect():
conn_str = os.getenv('POSTGRES_CONNECTION')
if not conn_str:
logger.error("POSTGRES_CONNECTION не найден в .env файле")
raise ValueError("POSTGRES_CONNECTION не найден в .env файле")
conn = psycopg2.connect(conn_str)
logger.debug("Подключение к БД установлено")
return conn
def PSQLCursor(conn):
cur = conn.cursor()
logger.debug("Курсор БД создан")
return cur
@contextmanager
def get_connection() -> Generator[psycopg2.extensions.connection, None, None]:
conn = None
try:
conn = PSQLConnect()
logger.debug("Контекст подключения к БД открыт")
yield conn
except Exception as e:
logger.error(f"Ошибка в контексте подключения: {e}")
if conn:
conn.rollback()
logger.debug("Откат транзакции выполнен")
raise
finally:
if conn:
conn.close()
logger.debug("Подключение к БД закрыто")
def test_connection() -> bool:
try:
with get_connection() as conn:
cur = PSQLCursor(conn)
cur.execute("SELECT version();")
version = cur.fetchone()
logger.info(f"Подключение к БД успешно: {version[0]}")
cur.close()
logger.debug("Курсор БД закрыт")
return True
except Exception as e:
logger.error(f"Ошибка подключения к БД: {e}")
return False
if __name__ == "__main__":
test_connection()
+147
View File
@@ -0,0 +1,147 @@
from db.connection import get_connection
from loguru import logger
def create_tables():
try:
with get_connection() as conn:
with conn.cursor() as cur:
# Пользователи
cur.execute("""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
name VARCHAR(255) NOT NULL,
role VARCHAR(50) NOT NULL DEFAULT 'viewer',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Роботы
cur.execute("""
CREATE TABLE IF NOT EXISTS robots (
id VARCHAR(50) PRIMARY KEY,
status VARCHAR(50) DEFAULT 'active',
battery_level INTEGER DEFAULT 100,
last_update TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
current_zone VARCHAR(10),
current_row INTEGER,
current_shelf INTEGER
)
""")
# Товары
cur.execute("""
CREATE TABLE IF NOT EXISTS products (
id VARCHAR(50) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
category VARCHAR(100),
min_stock INTEGER DEFAULT 10,
optimal_stock INTEGER DEFAULT 100
)
""")
# История инвентаризации
cur.execute("""
CREATE TABLE IF NOT EXISTS inventory_history (
id SERIAL PRIMARY KEY,
robot_id VARCHAR(50) REFERENCES robots(id),
product_id VARCHAR(50) REFERENCES products(id),
quantity INTEGER NOT NULL,
zone VARCHAR(10) NOT NULL,
row_number INTEGER,
shelf_number INTEGER,
status VARCHAR(50),
scanned_at TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Прогнозы ИИ
cur.execute("""
CREATE TABLE IF NOT EXISTS ai_predictions (
id SERIAL PRIMARY KEY,
product_id VARCHAR(50) REFERENCES products(id),
prediction_date DATE NOT NULL,
days_until_stockout INTEGER,
recommended_order INTEGER,
confidence_score DECIMAL(3,2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
logger.debug("Все таблицы успешно созданы")
except Exception as e:
logger.error(f"Ошибка создания таблиц: {e}")
raise
def create_indexes():
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("CREATE INDEX IF NOT EXISTS idx_inventory_scanned ON inventory_history(scanned_at DESC)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_inventory_product ON inventory_history(product_id)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_inventory_zone ON inventory_history(zone)")
conn.commit()
logger.debug("Индексы созданы")
except Exception as e:
logger.error(f"Ошибка создания индексов: {e}")
raise
def insert_sample_data():
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO users (email, password_hash, name, role)
VALUES
('admin@warehouse.com', 'hash1', 'Администратор', 'admin'),
('operator@warehouse.com', 'hash2', 'Оператор Иванов', 'operator'),
('viewer@warehouse.com', 'hash3', 'Наблюдатель Петров', 'viewer')
ON CONFLICT (email) DO NOTHING
""")
cur.execute("""
INSERT INTO robots (id, status, battery_level, current_zone)
VALUES
('RB-001', 'active', 85, 'A'),
('RB-002', 'active', 45, 'B'),
('RB-003', 'maintenance', 100, NULL)
ON CONFLICT (id) DO NOTHING
""")
cur.execute("""
INSERT INTO products (id, name, category, min_stock, optimal_stock)
VALUES
('TEL-1234', 'Смартфон X', 'Электроника', 5, 50),
('NOTE-567', 'Ноутбук Pro', 'Электроника', 3, 20),
('ACC-999', 'Чехол для телефона', 'Аксессуары', 10, 100)
ON CONFLICT (id) DO NOTHING
""")
conn.commit()
logger.debug("Тестовые данные добавлены")
except Exception as e:
logger.error(f"Ошибка добавления тестовых данных: {e}")
raise
def initialize_database():
logger.info("Начинаем инициализацию базы данных...")
create_tables()
create_indexes()
insert_sample_data()
logger.debug("База данных успешно инициализирована!")
if __name__ == "__main__":
initialize_database()
+7
View File
@@ -0,0 +1,7 @@
from .user_repository import UserRepository
from .robot_repository import RobotRepository
from .product_repository import ProductRepository
from .inventory_repository import InventoryRepository
from .ai_prediction_repository import AIPredictionsRepository
__all__ = ['UserRepository', 'RobotRepository', 'ProductRepository', 'InventoryRepository', 'AIPredictionsRepository']
+118
View File
@@ -0,0 +1,118 @@
from typing import List, Optional
from datetime import datetime, date
from loguru import logger
from db.connection import get_connection
from model.ai_prediction import AIPrediction
class AIPredictionsRepository:
def get_all(self) -> List[AIPrediction]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM ai_predictions ORDER BY prediction_date DESC, product_id")
predictions = [
AIPrediction(
id=row[0],
product_id=row[1],
prediction_date=row[2],
days_until_stockout=row[3],
recommended_order=row[4],
confidence_score=row[5],
created_at=row[6]
) for row in cur.fetchall()
]
logger.debug(f"Получено {len(predictions)} прогнозов")
return predictions
except Exception as e:
logger.error(f"Ошибка получения прогнозов: {e}")
return []
def get_by_id(self, prediction_id: int) -> Optional[AIPrediction]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM ai_predictions WHERE id = %s", (prediction_id,))
row = cur.fetchone()
if row:
logger.debug(f"Прогноз {prediction_id} найден")
return AIPrediction(*row)
logger.warning(f"Прогноз {prediction_id} не найден")
return None
except Exception as e:
logger.error(f"Ошибка получения прогноза {prediction_id}: {e}")
return None
def get_by_product(self, product_id: str, limit: int = 10) -> List[AIPrediction]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM ai_predictions
WHERE product_id = %s
ORDER BY prediction_date DESC
LIMIT %s
""", (product_id, limit))
predictions = [AIPrediction(*row) for row in cur.fetchall()]
logger.debug(f"Получено {len(predictions)} прогнозов для товара {product_id}")
return predictions
except Exception as e:
logger.error(f"Ошибка получения прогноза по товару {product_id}: {e}")
return []
def get_latest_predictions(self) -> List[AIPrediction]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT DISTINCT ON (product_id) *
FROM ai_predictions
ORDER BY product_id, prediction_date DESC
""")
predictions = [AIPrediction(*row) for row in cur.fetchall()]
logger.debug(f"Получено {len(predictions)} последних прогнозов по товарам")
return predictions
except Exception as e:
logger.error(f"Ошибка получения последних прогнозов по товарам: {e}")
return []
def create_prediction(
self,
product_id: str,
prediction_date: date,
days_until_stockout: int,
recommended_order: int,
confidence_score: float
) -> Optional[int]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO ai_predictions
(product_id, prediction_date, days_until_stockout, recommended_order, confidence_score)
VALUES (%s, %s, %s, %s, %s)
RETURNING id
""", (product_id, prediction_date, days_until_stockout, recommended_order, confidence_score))
prediction_id = cur.fetchone()[0]
conn.commit()
logger.debug(f"Создан новый прогноз ID: {prediction_id} для товара {product_id}")
return prediction_id
except Exception as e:
logger.error(f"Ошибка создания прогноза: {e}")
return None
def delete_old_predictions(self, older_than_days: int = 90) -> int:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
DELETE FROM ai_predictions
WHERE prediction_date < CURRENT_DATE - INTERVAL '%s days'
""", (older_than_days,))
deleted_count = cur.rowcount
conn.commit()
logger.debug(f"Удалено {deleted_count} старых прогнозов старше {older_than_days} дней")
return deleted_count
except Exception as e:
logger.error(f"Ошибка удаления старых прогнозов: {e}")
return 0
+160
View File
@@ -0,0 +1,160 @@
# db/repositories/inventory_repository.py
from typing import List, Optional, Tuple
from datetime import datetime
from loguru import logger
from db.connection import get_connection
from model.inventory import InventoryRecord
class InventoryRepository:
def get_all(self) -> List[InventoryRecord]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM inventory_history ORDER BY scanned_at DESC")
records = [
InventoryRecord(
id=row[0],
robot_id=row[1],
product_id=row[2],
quantity=row[3],
zone=row[4],
row_number=row[5],
shelf_number=row[6],
status=row[7],
scanned_at=row[8],
created_at=row[9]
) for row in cur.fetchall()
]
logger.debug(f"Получено {len(records)} записей инвентаризации")
return records
except Exception as e:
logger.error(f"Ошибка получения истории инвентаризации: {e}")
return []
def get_by_id(self, record_id: int) -> Optional[InventoryRecord]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM inventory_history WHERE id = %s", (record_id,))
row = cur.fetchone()
if row:
logger.debug(f"Запись инвентаризации {record_id} найдена")
return InventoryRecord(*row)
logger.warning(f"Запись инвентаризации {record_id} не найдена")
return None
except Exception as e:
logger.error(f"Ошибка получения записи инвентаризации {record_id}: {e}")
return None
def create_record(
self,
robot_id: str,
product_id: str,
quantity: int,
zone: str,
row_number: int,
shelf_number: int,
status: str,
scanned_at: datetime
) -> Optional[int]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO inventory_history
(robot_id, product_id, quantity, zone, row_number, shelf_number, status, scanned_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""", (robot_id, product_id, quantity, zone, row_number, shelf_number, status, scanned_at))
record_id = cur.fetchone()[0]
conn.commit()
logger.debug(f"Создана новая запись инвентаризации ID: {record_id}")
return record_id
except Exception as e:
logger.error(f"Ошибка создания записи инвентаризации: {e}")
return None
def get_latest_inventory(self) -> List[InventoryRecord]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT DISTINCT ON (product_id) *
FROM inventory_history
ORDER BY product_id, scanned_at DESC
""")
records = [InventoryRecord(*row) for row in cur.fetchall()]
logger.debug(f"Получено {len(records)} последних записей инвентаризации по товарам")
return records
except Exception as e:
logger.error(f"Ошибка получения последней записи инвентаризации по каждому товару: {e}")
return []
def get_records_by_product(self, product_id: str, limit: int = 100) -> List[InventoryRecord]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM inventory_history
WHERE product_id = %s
ORDER BY scanned_at DESC
LIMIT %s
""", (product_id, limit))
records = [InventoryRecord(*row) for row in cur.fetchall()]
logger.debug(f"Получено {len(records)} записей инвентаризации для товара {product_id}")
return records
except Exception as e:
logger.error(f"Ошибка получения записи инвентаризации по продукту {product_id}: {e}")
return []
def get_records_by_robot(self, robot_id: str, limit: int = 100) -> List[InventoryRecord]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM inventory_history
WHERE robot_id = %s
ORDER BY scanned_at DESC
LIMIT %s
""", (robot_id, limit))
records = [InventoryRecord(*row) for row in cur.fetchall()]
logger.debug(f"Получено {len(records)} записей инвентаризации для робота {robot_id}")
return records
except Exception as e:
logger.error(f"Ошибка получения записи инвентаризации по роботу {robot_id}: {e}")
return []
def get_records_by_zone(self, zone: str, limit: int = 100) -> List[InventoryRecord]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM inventory_history
WHERE zone = %s
ORDER BY scanned_at DESC
LIMIT %s
""", (zone, limit))
records = [InventoryRecord(*row) for row in cur.fetchall()]
logger.debug(f"Получено {len(records)} записей инвентаризации для зоны {zone}")
return records
except Exception as e:
logger.error(f"Ошибка получения записи инвентаризации по зоне {zone}: {e}")
return []
def get_critical_items(self, hours: int = 24) -> List[InventoryRecord]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM inventory_history
WHERE status IN ('LOW_STOCK', 'CRITICAL')
AND scanned_at >= NOW() - INTERVAL '%s hours'
ORDER BY scanned_at DESC
""", (hours,))
records = [InventoryRecord(*row) for row in cur.fetchall()]
logger.debug(f"Получено {len(records)} критических товаров за последние {hours} часов")
return records
except Exception as e:
logger.error(f"Ошибка получения товаров с критическим статусом: {e}")
return []
+166
View File
@@ -0,0 +1,166 @@
from typing import List, Optional
from loguru import logger
from db.connection import get_connection
from model.product import Product
class ProductRepository:
def get_all(self) -> List[Product]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM products ORDER BY id")
products = [
Product(
id=row[0],
name=row[1],
category=row[2],
min_stock=row[3],
optimal_stock=row[4]
) for row in cur.fetchall()
]
logger.debug(f"Получено {len(products)} товаров")
return products
except Exception as e:
logger.error(f"Ошибка получения продуктов: {e}")
return []
def get_by_id(self, product_id: str) -> Optional[Product]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM products WHERE id = %s", (product_id,))
row = cur.fetchone()
if row:
logger.debug(f"Товар {product_id} найден")
return Product(*row)
logger.warning(f"Товар {product_id} не найден")
return None
except Exception as e:
logger.error(f"Ошибка получения товара {product_id}: {e}")
return None
def get_by_category(self, category: str) -> List[Product]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM products WHERE category = %s ORDER BY name", (category,))
products = [Product(*row) for row in cur.fetchall()]
logger.debug(f"Получено {len(products)} товаров категории {category}")
return products
except Exception as e:
logger.error(f"Ошибка получения товаров по категории {category}: {e}")
return []
def create_product(self, product_id: str, name: str, category: str,
min_stock: int = 10, optimal_stock: int = 100) -> bool:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO products (id, name, category, min_stock, optimal_stock)
VALUES (%s, %s, %s, %s, %s)
""", (product_id, name, category, min_stock, optimal_stock))
conn.commit()
success = cur.rowcount > 0
if success:
logger.debug(f"Создан новый товар {product_id}: {name}")
else:
logger.warning(f"Не удалось создать товар {product_id}")
return success
except Exception as e:
logger.error(f"Ошибка создания товара {product_id}: {e}")
return False
def update_product(self, product_id: str, name: str = None, category: str = None,
min_stock: int = None, optimal_stock: int = None) -> bool:
try:
with get_connection() as conn:
with conn.cursor() as cur:
updates = []
params = []
if name is not None:
updates.append("name = %s")
params.append(name)
if category is not None:
updates.append("category = %s")
params.append(category)
if min_stock is not None:
updates.append("min_stock = %s")
params.append(min_stock)
if optimal_stock is not None:
updates.append("optimal_stock = %s")
params.append(optimal_stock)
if not updates:
logger.warning(f"Нет данных для обновления товара {product_id}")
return False
params.append(product_id)
query = f"UPDATE products SET {', '.join(updates)} WHERE id = %s"
cur.execute(query, params)
conn.commit()
success = cur.rowcount > 0
if success:
logger.debug(f"Товар {product_id} успешно обновлен")
else:
logger.warning(f"Товар {product_id} не найден для обновления")
return success
except Exception as e:
logger.error(f"Ошибка обновления товара {product_id}: {e}")
return False
def delete_product(self, product_id: str) -> bool:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("DELETE FROM products WHERE id = %s", (product_id,))
conn.commit()
success = cur.rowcount > 0
if success:
logger.debug(f"Товар {product_id} удален")
else:
logger.warning(f"Товар {product_id} не найден для удаления")
return success
except Exception as e:
logger.error(f"Ошибка удаления товара {product_id}: {e}")
return False
def search_products(self, search_term: str) -> List[Product]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM products
WHERE name ILIKE %s OR id ILIKE %s
ORDER BY name
""", (f'%{search_term}%', f'%{search_term}%'))
products = [Product(*row) for row in cur.fetchall()]
logger.debug(f"Найдено {len(products)} товаров по запросу '{search_term}'")
return products
except Exception as e:
logger.error(f"Ошибка поиска товаров по названию '{search_term}': {e}")
return []
def get_low_stock_products(self) -> List[Product]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT DISTINCT p.*
FROM products p
JOIN inventory_history ih ON p.id = ih.product_id
WHERE ih.status IN ('LOW_STOCK', 'CRITICAL')
AND ih.scanned_at >= NOW() - INTERVAL '1 day'
ORDER BY p.name
""")
products = [Product(*row) for row in cur.fetchall()]
logger.debug(f"Получено {len(products)} товаров с низким запасом")
return products
except Exception as e:
logger.error(f"Ошибка получения товаров с низким запасом: {e}")
return []
+172
View File
@@ -0,0 +1,172 @@
from typing import List, Optional
from loguru import logger
from db.connection import get_connection
from model.robot import Robot
class RobotRepository:
def get_all(self) -> List[Robot]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM robots ORDER BY id")
robots = [
Robot(
id=row[0],
status=row[1],
battery_level=row[2],
last_update=row[3],
current_zone=row[4],
current_row=row[5],
current_shelf=row[6]
) for row in cur.fetchall()
]
logger.debug(f"Получено {len(robots)} роботов")
return robots
except Exception as e:
logger.error(f"Ошибка получения всех роботов: {e}")
return []
def get_by_id(self, robot_id: str) -> Optional[Robot]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM robots WHERE id = %s", (robot_id,))
row = cur.fetchone()
if row:
logger.debug(f"Робот {robot_id} найден")
return Robot(*row)
logger.warning(f"Робот {robot_id} не найден")
return None
except Exception as e:
logger.error(f"Ошибка получения робота {robot_id}: {e}")
return None
def update_robot(
self,
robot_id: str,
status: str = None,
battery_level: int = None,
current_zone: str = None,
current_row: int = None,
current_shelf: int = None
) -> bool:
try:
with get_connection() as conn:
with conn.cursor() as cur:
updates = ["last_update = CURRENT_TIMESTAMP"]
params = []
if status is not None:
updates.append("status = %s")
params.append(status)
if battery_level is not None:
updates.append("battery_level = %s")
params.append(battery_level)
if current_zone is not None:
updates.append("current_zone = %s")
params.append(current_zone)
if current_row is not None:
updates.append("current_row = %s")
params.append(current_row)
if current_shelf is not None:
updates.append("current_shelf = %s")
params.append(current_shelf)
params.append(robot_id)
query = f"UPDATE robots SET {', '.join(updates)} WHERE id = %s"
cur.execute(query, params)
conn.commit()
success = cur.rowcount > 0
if success:
logger.debug(f"Робот {robot_id} успешно обновлен")
else:
logger.warning(f"Робот {robot_id} не найден для обновления")
return success
except Exception as e:
logger.error(f"Ошибка обновления робота {robot_id}: {e}")
return False
def get_robots_by_status(self, status: str) -> List[Robot]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM robots WHERE status = %s ORDER BY id", (status,))
robots = [Robot(*row) for row in cur.fetchall()]
logger.debug(f"Получено {len(robots)} роботов со статусом {status}")
return robots
except Exception as e:
logger.error(f"Ошибка получения роботов по статусу {status}: {e}")
return []
def get_low_battery_robots(self, threshold: int = 20) -> List[Robot]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM robots
WHERE battery_level < %s AND status = 'active'
ORDER BY battery_level ASC
""", (threshold,))
robots = [Robot(*row) for row in cur.fetchall()]
logger.debug(f"Получено {len(robots)} роботов с низким зарядом (<{threshold}%)")
return robots
except Exception as e:
logger.error(f"Ошибка получения роботов с низким зарядом: {e}")
return []
def get_robots_in_zone(self, zone: str) -> List[Robot]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM robots WHERE current_zone = %s ORDER BY id", (zone,))
robots = [Robot(*row) for row in cur.fetchall()]
logger.debug(f"Получено {len(robots)} роботов в зоне {zone}")
return robots
except Exception as e:
logger.error(f"Ошибка получения роботов в зоне {zone}: {e}")
return []
def create_robot(
self,
robot_id: str,
status: str = 'active',
battery_level: int = 100
) -> bool:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO robots (id, status, battery_level, last_update)
VALUES (%s, %s, %s, CURRENT_TIMESTAMP)
""", (robot_id, status, battery_level))
conn.commit()
success = cur.rowcount > 0
if success:
logger.debug(f"Создан новый робот {robot_id}")
else:
logger.warning(f"Не удалось создать робота {robot_id}")
return success
except Exception as e:
logger.error(f"Ошибка создания робота {robot_id}: {e}")
return False
def delete_robot(self, robot_id: str) -> bool:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("DELETE FROM robots WHERE id = %s", (robot_id,))
conn.commit()
success = cur.rowcount > 0
if success:
logger.debug(f"Робот {robot_id} удален")
else:
logger.warning(f"Робот {robot_id} не найден для удаления")
return success
except Exception as e:
logger.error(f"Ошибка удаления робота {robot_id}: {e}")
return False
+202
View File
@@ -0,0 +1,202 @@
from typing import List, Optional
from loguru import logger
from model.user import User
from db.connection import get_connection
class UserRepository:
def get_all(self) -> List[User]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM users ORDER BY id")
users = [
User(
id=row[0],
email=row[1],
password_hash=row[2],
name=row[3],
role=row[4],
created_at=row[5]
) for row in cur.fetchall()
]
logger.debug(f"Получено {len(users)} пользователей")
return users
except Exception as e:
logger.error(f"Ошибка получения пользователей: {e}")
return []
def get_by_id(self, user_id: int) -> Optional[User]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM users WHERE id = %s", (user_id,))
row = cur.fetchone()
if row:
logger.debug(f"Пользователь {user_id} найден")
return User(*row)
logger.warning(f"Пользователь {user_id} не найден")
return None
except Exception as e:
logger.error(f"Ошибка получения пользователя {user_id}: {e}")
return None
def get_by_email(self, email: str) -> Optional[User]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM users WHERE email = %s", (email,))
row = cur.fetchone()
if row:
logger.debug(f"Пользователь с email {email} найден")
return User(*row)
logger.warning(f"Пользователь с email {email} не найден")
return None
except Exception as e:
logger.error(f"Ошибка получения пользователя по email {email}: {e}")
return None
def create_user(
self,
email: str,
password_hash: str,
name: str,
role: str
) -> Optional[User]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO users (email, password_hash, name, role)
VALUES (%s, %s, %s, %s)
RETURNING id, email, password_hash, name, role, created_at
""", (email, password_hash, name, role))
row = cur.fetchone()
conn.commit()
if row:
logger.debug(f"Создан новый пользователь {email} с ID {row[0]}")
return User(*row)
logger.warning(f"Не удалось создать пользователя {email}")
return None
except Exception as e:
logger.error(f"Ошибка создания пользователя {email}: {e}")
return None
def update_user(self, user_id: int, name: str = None, role: str = None) -> bool:
try:
with get_connection() as conn:
with conn.cursor() as cur:
updates = []
params = []
if name is not None:
updates.append("name = %s")
params.append(name)
if role is not None:
updates.append("role = %s")
params.append(role)
if not updates:
logger.warning(f"Нет данных для обновления пользователя {user_id}")
return False
params.append(user_id)
query = f"UPDATE users SET {', '.join(updates)} WHERE id = %s"
cur.execute(query, params)
conn.commit()
success = cur.rowcount > 0
if success:
logger.debug(f"Пользователь {user_id} успешно обновлен")
else:
logger.warning(f"Пользователь {user_id} не найден для обновления")
return success
except Exception as e:
logger.error(f"Ошибка обновления пользователя {user_id}: {e}")
return False
def delete_user(self, user_id: int) -> bool:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("DELETE FROM users WHERE id = %s", (user_id,))
conn.commit()
success = cur.rowcount > 0
if success:
logger.debug(f"Пользователь {user_id} удален")
else:
logger.warning(f"Пользователь {user_id} не найден для удаления")
return success
except Exception as e:
logger.error(f"Ошибка удаления пользователя {user_id}: {e}")
return False
def get_users_by_role(self, role: str) -> List[User]:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM users WHERE role = %s ORDER BY name", (role,))
users = [User(*row) for row in cur.fetchall()]
logger.debug(f"Получено {len(users)} пользователей с ролью {role}")
return users
except Exception as e:
logger.error(f"Ошибка получения пользователей по роли {role}: {e}")
return []
def change_password(self, user_id: int, new_password_hash: str) -> bool:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
UPDATE users
SET password_hash = %s
WHERE id = %s
""", (new_password_hash, user_id))
conn.commit()
success = cur.rowcount > 0
if success:
logger.debug(f"Пароль пользователя {user_id} изменен")
else:
logger.warning(f"Пользователь {user_id} не найден для смены пароля")
return success
except Exception as e:
logger.error(f"Ошибка смены пароля пользователя {user_id}: {e}")
return False
def authenticate_user(self, email: str, password_hash: str) -> Optional[User]:
if not self.user_exists(email):
return
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM users
WHERE email = %s AND password_hash = %s
""", (email, password_hash))
row = cur.fetchone()
if row:
logger.debug(f"Успешная аутентификация пользователя {email}")
return User(*row)
logger.warning(f"Неудачная аутентификация пользователя {email}")
return None
except Exception as e:
logger.error(f"Ошибка аутентификации пользователя {email}: {e}")
return None
def user_exists(self, email: str) -> bool:
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT 1 FROM users WHERE email = %s", (email,))
exists = cur.fetchone() is not None
if exists:
logger.debug(f"Пользователь с email {email} существует")
else:
logger.debug(f"Пользователь с email {email} не существует")
return exists
except Exception as e:
logger.error(f"Ошибка проверки существования пользователя {email}: {e}")
return False
+2 -1
View File
@@ -1,4 +1,5 @@
flask==3.1.2
python-dotenv
psycopg-binary
psycopg2-binary
pyjwt
loguru
+7
View File
@@ -0,0 +1,7 @@
from .robot import Robot
from .product import Product
from .inventory import InventoryRecord
from .user import User
from .ai_prediction import AIPrediction
__all__ = ['Robot', 'Product', 'InventoryRecord', 'User', 'AIPrediction']
+12
View File
@@ -0,0 +1,12 @@
from dataclasses import dataclass
from datetime import datetime, date
@dataclass
class AIPrediction:
id: int
product_id: str
prediction_date: date
days_until_stockout: int
recommended_order: int
confidence_score: float
created_at: datetime
+14
View File
@@ -0,0 +1,14 @@
from dataclasses import dataclass
from datetime import datetime
@dataclass
class InventoryRecord:
id: int
robot_id: str
product_id: str
quantity: int
zone: str
row_number: int
shelf_number: int
status: str
scanned_at: datetime
created_at: datetime
+9
View File
@@ -0,0 +1,9 @@
from dataclasses import dataclass
@dataclass
class Product:
id: str
name: str
category: str
min_stock: int
optimal_stock: int
+13
View File
@@ -0,0 +1,13 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
@dataclass
class Robot:
id: str
status: str
battery_level: int
last_update: datetime
current_zone: Optional[str] = None
current_row: Optional[int] = None
current_shelf: Optional[int] = None
+6 -14
View File
@@ -1,20 +1,12 @@
from dataclasses import dataclass
import json
from utils.token import generateKey
from datetime import datetime
@dataclass
class user:
class User:
id: int
email: str
password_hash: str
name: str
role: str
token: str
def __init__(self, email: str, passwd: str):
#us = getUsModel() #возвращает словарь
self.id = 1#us['id']
self.name = 'Bob'#us['name']
self.role = 'Backend'#us['role']
self.token = generateKey(email, passwd)
def toDictionary(self):
return {"user": {"id": self.id, "name": self.name, "role": self.role}, "token": self.token}
created_at: datetime
-10
View File
@@ -1,10 +0,0 @@
import psycopg
import os
def PSQLConnect():
conn = psycopg.connect(os.getenv('POSTDRESS_CONNECTION'))
return conn
def PSQLCursor(conn):
cur = conn.cursor()
return cur
+10 -8
View File
@@ -1,12 +1,14 @@
import os
from dotenv import load_dotenv
from loguru import logger as log
def initializeENV():
dotenv_path = '../.env'
if os.path.exists(dotenv_path):
load_dotenv(dotenv_path)
print('.env is loaded')
return 1
DOTENV_PATH = '.env'
def initializeENV() -> bool:
if os.path.exists(DOTENV_PATH):
load_dotenv(DOTENV_PATH)
log.info('.env is loaded')
return True
else:
print('.env isn`t loaded')
return 0
log.error('.env isn`t loaded')
return False
+13 -2
View File
@@ -1,7 +1,18 @@
import jwt
import os
from time import time
from model.user import User
def generateKey(email, passwd):
def generateKey(user: User) -> dict:
key = os.getenv('KEY')
encoded = jwt.encode({email: passwd}, key, algorithm="HS256")
encoded = jwt.encode(
{
'id': user.id,
'name': user.name,
'role': user.role,
'iat': time()
},
key,
algorithm="HS256"
)
return encoded