Add Steam info

This commit is contained in:
2026-02-04 00:34:16 +03:00
parent c17bf8ca83
commit 0aa2477f35
5 changed files with 177 additions and 1 deletions
+71
View File
@@ -0,0 +1,71 @@
from os import environ
from flask import Flask, jsonify
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
import requests
from time import time
import atexit
import re
from urllib.parse import urlparse, parse_qs
TOKEN = environ.get("STEAM_TOKEN")
MY_ID = 76561198826355942
recent = {}
owned = {}
def modify_game_list(json: dict) -> dict:
if 'games' in json.keys():
apps = (3301060, 404790, 1281930, 1920960, 1325960, 431960)
new_games = []
for i, g in enumerate(json['games']):
if g['appid'] not in apps:
json['games'][i]['h_cover'] = f"https://shared.fastly.steamstatic.com/store_item_assets//steam/apps/{g['appid']}/header.jpg"
json['games'][i]['v_cover'] = f"https://shared.fastly.steamstatic.com/store_item_assets//steam/apps/{g['appid']}/library_600x900.jpg"
new_games.append(json['games'][i])
json['games'] = new_games
return json
def steam_request(interface: str, method: str, v: int = 1, **kwargs) -> requests.Response:
return requests.get(
f"https://api.steampowered.com/{interface}/{method}/v{v:04}/",
params=dict({"key": TOKEN}, **kwargs),
timeout=10
)
def api_request(cache, *args, **kwargs):
try:
response = steam_request(*args, **kwargs)
if response.status_code == 200:
cache.update({
'data': modify_game_list(response.json().get("response")),
'last_updated': time(),
'status': 'success'
})
else:
cache['status'] = f'error: {response.status_code}'
except Exception as e:
cache['status'] = f'error: {str(e)}'
if TOKEN:
scheduler = BackgroundScheduler()
scheduler.add_job(
func=lambda: api_request(recent, "IPlayerService", "GetRecentlyPlayedGames", steamid=76561198826355942),
trigger=IntervalTrigger(minutes=15),
id='risdeveau.steam.recent',
replace_existing=True
)
scheduler.add_job(
func=lambda: api_request(owned, "IPlayerService", "GetOwnedGames", steamid=76561198826355942, include_appinfo=1, include_played_free_games=1),
trigger=IntervalTrigger(minutes=60),
id='risdeveau.steam.owned',
replace_existing=True
)
scheduler.start()
api_request(recent, "IPlayerService", "GetRecentlyPlayedGames", steamid=76561198826355942)
api_request(owned, "IPlayerService", "GetOwnedGames", steamid=76561198826355942, include_appinfo=1, include_played_free_games=1)
atexit.register(lambda: scheduler.shutdown())
else:
print("STEAM_TOKEN is not defined")