from os import environ from flask import Flask, jsonify from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.interval import IntervalTrigger import requests from time import time import atexit import re from urllib.parse import urlparse, parse_qs TOKEN = environ.get("STEAM_TOKEN") MY_ID = 76561198826355942 recent = {} owned = {} def modify_game_list(json: dict) -> dict: if 'games' in json.keys(): apps = (3301060, 404790, 1281930, 1920960, 1325960, 431960) new_games = [] for i, g in enumerate(json['games']): if g['appid'] not in apps: json['games'][i]['h_cover'] = f"https://shared.fastly.steamstatic.com/store_item_assets//steam/apps/{g['appid']}/header.jpg" json['games'][i]['v_cover'] = f"https://shared.fastly.steamstatic.com/store_item_assets//steam/apps/{g['appid']}/library_600x900.jpg" new_games.append(json['games'][i]) json['games'] = new_games return json def steam_request(interface: str, method: str, v: int = 1, **kwargs) -> requests.Response: return requests.get( f"https://api.steampowered.com/{interface}/{method}/v{v:04}/", params=dict({"key": TOKEN}, **kwargs), timeout=10 ) def api_request(cache, *args, **kwargs): try: response = steam_request(*args, **kwargs) if response.status_code == 200: cache.update({ 'data': modify_game_list(response.json().get("response")), 'last_updated': time(), 'status': 'success' }) else: cache['status'] = f'error: {response.status_code}' except Exception as e: cache['status'] = f'error: {str(e)}' if TOKEN: scheduler = BackgroundScheduler() scheduler.add_job( func=lambda: api_request(recent, "IPlayerService", "GetRecentlyPlayedGames", steamid=76561198826355942), trigger=IntervalTrigger(minutes=15), id='risdeveau.steam.recent', replace_existing=True ) scheduler.add_job( func=lambda: api_request(owned, "IPlayerService", "GetOwnedGames", steamid=76561198826355942, include_appinfo=1, include_played_free_games=1), trigger=IntervalTrigger(minutes=60), id='risdeveau.steam.owned', replace_existing=True ) scheduler.start() api_request(recent, "IPlayerService", "GetRecentlyPlayedGames", steamid=76561198826355942) api_request(owned, "IPlayerService", "GetOwnedGames", steamid=76561198826355942, include_appinfo=1, include_played_free_games=1) atexit.register(lambda: scheduler.shutdown()) else: print("STEAM_TOKEN is not defined")