Files
lair.moe/blueprints/risdeveau/modules/api/steam.py
T
2026-02-05 23:04:19 +03:00

72 lines
2.6 KiB
Python

from os import environ
from flask import Flask, jsonify
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
import requests
from time import time
import atexit
import re
from urllib.parse import urlparse, parse_qs
TOKEN = environ.get("STEAM_TOKEN")
MY_ID = 76561198826355942
recent = {}
owned = {}
def modify_game_list(json: dict) -> dict:
if 'games' in json.keys():
apps = (3301060, 404790, 1281930, 1920960, 1325960, 431960)
new_games = []
for i, g in enumerate(json['games']):
if g['appid'] not in apps:
json['games'][i]['h_cover'] = f"https://shared.fastly.steamstatic.com/store_item_assets//steam/apps/{g['appid']}/header.jpg"
json['games'][i]['v_cover'] = f"https://shared.fastly.steamstatic.com/store_item_assets//steam/apps/{g['appid']}/library_600x900.jpg"
new_games.append(json['games'][i])
json['games'] = new_games
return json
def steam_request(interface: str, method: str, v: int = 1, **kwargs) -> requests.Response:
return requests.get(
f"https://api.steampowered.com/{interface}/{method}/v{v:04}/",
params=dict({"key": TOKEN}, **kwargs),
timeout=10
)
def api_request(cache, *args, **kwargs):
try:
response = steam_request(*args, **kwargs)
if response.status_code == 200:
cache.update({
'data': modify_game_list(response.json().get("response")),
'last_updated': time(),
'status': 'success'
})
else:
cache['status'] = f'error: {response.status_code}'
except Exception as e:
cache['status'] = f'error: {str(e)}'
if TOKEN:
scheduler = BackgroundScheduler()
scheduler.add_job(
func=lambda: api_request(recent, "IPlayerService", "GetRecentlyPlayedGames", steamid=76561198826355942),
trigger=IntervalTrigger(minutes=15),
id='risdeveau.steam.recent',
replace_existing=True
)
scheduler.add_job(
func=lambda: api_request(owned, "IPlayerService", "GetOwnedGames", steamid=76561198826355942, include_appinfo=1, include_played_free_games=1),
trigger=IntervalTrigger(minutes=60),
id='risdeveau.steam.owned',
replace_existing=True
)
scheduler.start()
api_request(recent, "IPlayerService", "GetRecentlyPlayedGames", steamid=76561198826355942)
api_request(owned, "IPlayerService", "GetOwnedGames", steamid=76561198826355942, include_appinfo=1, include_played_free_games=1)
atexit.register(lambda: scheduler.shutdown())
else:
print("STEAM_TOKEN is not defined")