Files
lair.moe/blueprints/risdeveau/modules/api/steam.py
T

96 lines
3.2 KiB
Python
Raw Normal View History

2026-02-06 23:24:41 +03:00
import atexit
import re
from dataclasses import dataclass
from hashlib import md5
from json import dumps
2026-02-04 00:34:16 +03:00
from os import environ
2026-02-06 23:24:41 +03:00
from time import time
from urllib.parse import parse_qs, urlparse
2026-02-04 00:34:16 +03:00
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
import requests
2026-02-06 23:24:41 +03:00
from flask import Flask
2026-02-04 00:34:16 +03:00
TOKEN = environ.get("STEAM_TOKEN")
MY_ID = 76561198826355942
2026-02-06 23:24:41 +03:00
@dataclass
class Cache:
data = {}
last_updated = time()
status = None
data = {
"caches": {
"recent": Cache(),
"owned": Cache()
},
"last_updated": time(),
"etag": ""
}
2026-02-04 00:34:16 +03:00
def modify_game_list(json: dict) -> dict:
if 'games' in json.keys():
apps = (3301060, 404790, 1281930, 1920960, 1325960, 431960)
2026-02-10 10:27:45 +03:00
new_games = {}
2026-02-04 00:34:16 +03:00
for i, g in enumerate(json['games']):
if g['appid'] not in apps:
json['games'][i]['h_cover'] = f"https://shared.fastly.steamstatic.com/store_item_assets//steam/apps/{g['appid']}/header.jpg"
json['games'][i]['v_cover'] = f"https://shared.fastly.steamstatic.com/store_item_assets//steam/apps/{g['appid']}/library_600x900.jpg"
2026-02-10 10:27:45 +03:00
new_games[g['appid']] = json['games'][i]
2026-02-04 00:34:16 +03:00
json['games'] = new_games
return json
def steam_request(interface: str, method: str, v: int = 1, **kwargs) -> requests.Response:
return requests.get(
f"https://api.steampowered.com/{interface}/{method}/v{v:04}/",
params=dict({"key": TOKEN}, **kwargs),
timeout=10
)
def api_request(cache, *args, **kwargs):
try:
response = steam_request(*args, **kwargs)
if response.status_code == 200:
2026-02-06 23:24:41 +03:00
json = modify_game_list(response.json().get("response"))
cache.status = 'success'
if cache.data != json:
cache.data = json
cache.last_updated = time()
data['last_updated'] = time()
data['etag'] = md5(''.join(
( dumps(data['caches'][x].data) for x in data['caches'] )
).encode()).hexdigest()
2026-02-04 00:34:16 +03:00
else:
2026-02-06 23:24:41 +03:00
cache.status = f'error: {response.status_code}'
print("x")
2026-02-04 00:34:16 +03:00
except Exception as e:
2026-02-06 23:24:41 +03:00
cache.status = f'error: {str(e)}'
2026-02-04 00:34:16 +03:00
if TOKEN:
scheduler = BackgroundScheduler()
scheduler.add_job(
2026-02-06 23:24:41 +03:00
func=lambda: api_request(data['caches']['recent'], "IPlayerService", "GetRecentlyPlayedGames", steamid=76561198826355942),
2026-02-04 00:34:16 +03:00
trigger=IntervalTrigger(minutes=15),
id='risdeveau.steam.recent',
replace_existing=True
)
scheduler.add_job(
2026-02-06 23:24:41 +03:00
func=lambda: api_request(data['caches']['owned'], "IPlayerService", "GetOwnedGames", steamid=76561198826355942, include_appinfo=1, include_played_free_games=1),
2026-02-04 00:34:16 +03:00
trigger=IntervalTrigger(minutes=60),
id='risdeveau.steam.owned',
replace_existing=True
)
scheduler.start()
2026-02-06 23:24:41 +03:00
api_request(data['caches']['recent'], "IPlayerService", "GetRecentlyPlayedGames", steamid=76561198826355942)
api_request(data['caches']['owned'], "IPlayerService", "GetOwnedGames", steamid=76561198826355942, include_appinfo=1, include_played_free_games=1)
2026-02-04 00:34:16 +03:00
atexit.register(lambda: scheduler.shutdown())
else:
print("STEAM_TOKEN is not defined")