Files
lair.moe/blueprints/risdeveau/modules/api/steam.py
T
2026-02-04 14:44:11 +03:00

64 lines
2.2 KiB
Python

from os import environ
from flask import Flask, jsonify
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
import requests
from datetime import datetime
import atexit
import re
from urllib.parse import urlparse, parse_qs
TOKEN = environ.get("STEAM_TOKEN")
MY_ID = 76561198826355942
recent = {}
owned = {}
def get_cover_url(appid: int) -> str:
return f"https://shared.fastly.steamstatic.com/store_item_assets//steam/apps/{appid}/header.jpg"
def inject_cover_url(json: dict) -> dict:
if 'games' in json.keys():
for i, g in enumerate(json['games']):
json['games'][i]['h_cover'] = f"https://shared.fastly.steamstatic.com/store_item_assets//steam/apps/{g['appid']}/header.jpg"
json['games'][i]['v_cover'] = f"https://shared.fastly.steamstatic.com/store_item_assets//steam/apps/{g['appid']}/library_600x900.jpg"
return json
def steam_request(interface: str, method: str, v: int = 1, **kwargs) -> requests.Response:
return requests.get(
f"https://api.steampowered.com/{interface}/{method}/v{v:04}/",
params=dict({"key": TOKEN}, **kwargs),
timeout=10
)
def api_request(cache, *args, **kwargs):
try:
response = steam_request(*args, **kwargs)
if response.status_code == 200:
cache.update({
'data': inject_cover_url(response.json().get("response")),
'last_updated': datetime.now().isoformat(),
'status': 'success'
})
else:
cache['status'] = f'error: {response.status_code}'
except Exception as e:
cache['status'] = f'error: {str(e)}'
scheduler = BackgroundScheduler()
scheduler.add_job(
func=lambda: api_request(recent, "IPlayerService", "GetRecentlyPlayedGames", steamid=76561198826355942),
trigger=IntervalTrigger(minutes=15),
id='risdeveau.steam.recent',
replace_existing=True
)
scheduler.add_job(
func=lambda: api_request(owned, "IPlayerService", "GetOwnedGames", steamid=76561198826355942, include_appinfo=1, include_played_free_games=1),
trigger=IntervalTrigger(minutes=15),
id='risdeveau.steam.recent',
replace_existing=True
)
scheduler.start()
atexit.register(lambda: scheduler.shutdown())