72 lines
2.7 KiB
Python
72 lines
2.7 KiB
Python
from os import environ
|
|
from flask import Flask, jsonify
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
from apscheduler.triggers.interval import IntervalTrigger
|
|
import requests
|
|
from datetime import datetime
|
|
import atexit
|
|
import re
|
|
from urllib.parse import urlparse, parse_qs
|
|
|
|
TOKEN = environ.get("STEAM_TOKEN")
|
|
MY_ID = 76561198826355942
|
|
|
|
recent = {}
|
|
owned = {}
|
|
|
|
def modify_game_list(json: dict) -> dict:
|
|
if 'games' in json.keys():
|
|
apps = (3301060, 404790, 1281930, 1920960, 1325960, 431960)
|
|
new_games = []
|
|
for i, g in enumerate(json['games']):
|
|
if g['appid'] not in apps:
|
|
json['games'][i]['h_cover'] = f"https://shared.fastly.steamstatic.com/store_item_assets//steam/apps/{g['appid']}/header.jpg"
|
|
json['games'][i]['v_cover'] = f"https://shared.fastly.steamstatic.com/store_item_assets//steam/apps/{g['appid']}/library_600x900.jpg"
|
|
new_games.append(json['games'][i])
|
|
json['games'] = new_games
|
|
return json
|
|
|
|
def steam_request(interface: str, method: str, v: int = 1, **kwargs) -> requests.Response:
|
|
return requests.get(
|
|
f"https://api.steampowered.com/{interface}/{method}/v{v:04}/",
|
|
params=dict({"key": TOKEN}, **kwargs),
|
|
timeout=10
|
|
)
|
|
|
|
def api_request(cache, *args, **kwargs):
|
|
try:
|
|
response = steam_request(*args, **kwargs)
|
|
if response.status_code == 200:
|
|
cache.update({
|
|
'data': modify_game_list(response.json().get("response")),
|
|
'last_updated': datetime.now().isoformat(),
|
|
'status': 'success'
|
|
})
|
|
else:
|
|
cache['status'] = f'error: {response.status_code}'
|
|
except Exception as e:
|
|
cache['status'] = f'error: {str(e)}'
|
|
|
|
if TOKEN:
|
|
scheduler = BackgroundScheduler()
|
|
scheduler.add_job(
|
|
func=lambda: api_request(recent, "IPlayerService", "GetRecentlyPlayedGames", steamid=76561198826355942),
|
|
trigger=IntervalTrigger(minutes=15),
|
|
id='risdeveau.steam.recent',
|
|
replace_existing=True
|
|
)
|
|
scheduler.add_job(
|
|
func=lambda: api_request(owned, "IPlayerService", "GetOwnedGames", steamid=76561198826355942, include_appinfo=1, include_played_free_games=1),
|
|
trigger=IntervalTrigger(minutes=15),
|
|
id='risdeveau.steam.owned',
|
|
replace_existing=True
|
|
)
|
|
scheduler.start()
|
|
|
|
api_request(recent, "IPlayerService", "GetRecentlyPlayedGames", steamid=76561198826355942)
|
|
api_request(owned, "IPlayerService", "GetOwnedGames", steamid=76561198826355942, include_appinfo=1, include_played_free_games=1)
|
|
|
|
atexit.register(lambda: scheduler.shutdown())
|
|
else:
|
|
print("STEAM_TOKEN is not defined")
|