Files
lair.moe/blueprints/risdeveau/modules/api/steam.py
T
Sweetbread 4844cdb7b6 Fix API calls by all workers
Quite a complex commit by GPT. I'll rewrite it somewhen
2026-04-08 22:55:27 +03:00

174 lines
5.3 KiB
Python

import atexit
import re
from dataclasses import dataclass
from hashlib import md5
from json import dumps
from os import environ
from time import time
from urllib.parse import parse_qs, urlparse
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
import requests
from flask import Flask
from .scheduler_guard import should_start_scheduler
from .shared_cache import atomic_write_json, cache_file, load_json_if_newer
TOKEN = environ.get("STEAM_TOKEN")
MY_ID = 76561198826355942
@dataclass
class Cache:
data = {}
last_updated = time()
status = None
data = {
"caches": {
"recent": Cache(),
"owned": Cache()
},
"last_updated": time(),
"etag": ""
}
_IS_WRITER = should_start_scheduler()
_CACHE_PATH = cache_file("steam")
_CACHE_MTIME = 0.0
def refresh_cache() -> None:
"""Refresh in-memory cache from the shared JSON file (for non-writer workers)."""
global _CACHE_MTIME
if _IS_WRITER:
return
payload, mtime = load_json_if_newer(_CACHE_PATH, _CACHE_MTIME)
if payload is None:
return
try:
data["etag"] = payload.get("etag", data.get("etag", ""))
data["last_updated"] = payload.get("last_updated", data.get("last_updated", time()))
caches = payload.get("caches", {})
for key, cache in data.get("caches", {}).items():
if isinstance(caches, dict) and key in caches and isinstance(caches[key], dict):
c = caches[key]
cache.data = c.get("data", cache.data)
cache.last_updated = c.get("last_updated", cache.last_updated)
cache.status = c.get("status", cache.status)
finally:
_CACHE_MTIME = mtime
def _persist_cache() -> None:
"""Persist current cache state to the shared JSON file (writer worker only)."""
if not _IS_WRITER:
return
payload = {
"etag": data.get("etag", ""),
"last_updated": data.get("last_updated", time()),
"caches": {
k: {
"data": v.data,
"last_updated": v.last_updated,
"status": v.status,
}
for k, v in data.get("caches", {}).items()
},
}
atomic_write_json(_CACHE_PATH, payload)
def modify_game_list(json: dict) -> dict:
if 'games' in json.keys():
apps = (3301060, 404790, 1281930, 1920960, 1325960, 431960)
new_games = []
for i, g in enumerate(json['games']):
if g['appid'] not in apps:
json['games'][i]['h_cover'] = f"https://shared.fastly.steamstatic.com/store_item_assets//steam/apps/{g['appid']}/header.jpg"
json['games'][i]['v_cover'] = f"https://shared.fastly.steamstatic.com/store_item_assets//steam/apps/{g['appid']}/library_600x900.jpg"
new_games.append(json['games'][i])
json['games'] = new_games
return json
def steam_request(interface: str, method: str, v: int = 1, **kwargs) -> requests.Response:
return requests.get(
f"https://api.steampowered.com/{interface}/{method}/v{v:04}/",
params=dict({"key": TOKEN}, **kwargs),
timeout=10
)
def api_request(cache, *args, **kwargs):
changed = False
prev_status = cache.status
try:
response = steam_request(*args, **kwargs)
if response.status_code == 200:
json = modify_game_list(response.json().get("response"))
cache.status = 'success'
if cache.data != json:
cache.data = json
cache.last_updated = time()
data['last_updated'] = time()
data['etag'] = md5(''.join(
( dumps(data['caches'][x].data) for x in data['caches'] )
).encode()).hexdigest()
changed = True
else:
cache.status = f'error: {response.status_code}'
except Exception as e:
cache.status = f'error: {str(e)}'
if prev_status != cache.status:
changed = True
if changed:
_persist_cache()
if TOKEN:
scheduler = BackgroundScheduler()
if _IS_WRITER:
scheduler.add_job(
func=lambda: api_request(data['caches']['recent'], "IPlayerService", "GetRecentlyPlayedGames", steamid=MY_ID),
trigger=IntervalTrigger(minutes=15),
id='risdeveau.steam.recent',
replace_existing=True
)
scheduler.add_job(
func=lambda: api_request(data['caches']['owned'], "IPlayerService", "GetOwnedGames", steamid=MY_ID, include_appinfo=1, include_played_free_games=1),
trigger=IntervalTrigger(minutes=60),
id='risdeveau.steam.owned',
replace_existing=True
)
_persist_cache()
scheduler.start()
api_request(data['caches']['recent'], "IPlayerService", "GetRecentlyPlayedGames", steamid=MY_ID)
api_request(data['caches']['owned'], "IPlayerService", "GetOwnedGames", steamid=MY_ID, include_appinfo=1, include_played_free_games=1)
atexit.register(lambda: scheduler.shutdown())
else:
refresh_cache()
else:
msg = "STEAM_TOKEN is not defined"
print(msg)
for c in data["caches"].values():
c.status = msg
if _IS_WRITER:
_persist_cache()
else:
refresh_cache()