Refactoring

This commit is contained in:
2026-02-06 23:24:41 +03:00
parent 0f5d1b5221
commit db11fabe1a
5 changed files with 120 additions and 89 deletions
+19 -31
View File
@@ -1,24 +1,22 @@
import os import os
import magic
from pathlib import Path
from htmlmin import minify
from time import time
from datetime import datetime, timedelta from datetime import datetime, timedelta
from hashlib import md5 from pathlib import Path
from json import dumps from time import time
from musicbrainzngs import get_image_front
from flask import ( from flask import (
Blueprint, Blueprint,
request,
render_template,
send_file,
send_from_directory,
make_response,
abort, abort,
make_response,
render_template,
request,
send_file,
) )
import magic
from htmlmin import minify
from musicbrainzngs import get_image_front
from .modules.api.lb import listens, listening from .modules.api.lb import data as lb_data
from .modules.api.steam import recent, owned from .modules.api.steam import data as steam_data
def tmsmp(sec: int) -> str: def tmsmp(sec: int) -> str:
if sec == 0: if sec == 0:
@@ -43,12 +41,6 @@ def utmsmp(unix: int) -> str:
def rtmsmp(unix: int) -> str: def rtmsmp(unix: int) -> str:
return tmsmp(int(time() - unix)) return tmsmp(int(time() - unix))
def lb_etag():
return md5((dumps(listens.get('data')) + dumps(listening.get('data'))).encode()).hexdigest()
def steam_etag():
return md5((dumps(recent.get('data')) + dumps(owned.get('data'))).encode()).hexdigest()
bp = Blueprint( bp = Blueprint(
"risdeveau", "risdeveau",
__name__, __name__,
@@ -82,12 +74,8 @@ def mb_cover(mbid):
return r return r
args = { args = {
"lb_etag": lb_etag, "lb": lb_data,
"steam_etag": steam_etag, "steam": steam_data,
"lb": listens,
"lb_now": listening,
"recent": recent,
"owned": owned,
"tmsmp": tmsmp, "tmsmp": tmsmp,
"utmsmp": utmsmp, "utmsmp": utmsmp,
"rtmsmp": rtmsmp "rtmsmp": rtmsmp
@@ -106,15 +94,15 @@ def module(module):
if any((modified_since, none_match)): if any((modified_since, none_match)):
match module: match module:
case "listenbrainz": case "listenbrainz":
if modified_since >= int(max((x.get('last_updated', 0) for x in (listens, listening)))): if modified_since >= int(lb_data['last_updated']):
return '', 304 return '', 304
if none_match == lb_etag(): if none_match == lb_data['etag']:
return '', 304 return '', 304
case "steam": case "steam":
if modified_since >= int(max((x.get('last_updated', 0) for x in (recent, owned)))): if modified_since >= int(steam_data['last_updated']):
return "Not updated yet", 304 return '', 304
if none_match == steam_etag(): if none_match == steam_data['etag']:
return '', 304 return '', 304
return render_tmpl(f'{module}.htm', **args) return render_tmpl(f'{module}.htm', **args)
+46 -24
View File
@@ -1,14 +1,31 @@
from flask import Flask, jsonify import atexit
import re
from dataclasses import dataclass
from hashlib import md5
from json import dumps
from time import time
from urllib.parse import parse_qs, urlparse
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.interval import IntervalTrigger
import requests import requests
from time import time from flask import Flask, jsonify
import atexit
import re
from urllib.parse import urlparse, parse_qs
listens = {}
listening = {} @dataclass
class Cache:
data = {}
last_updated = time()
status = None
data = {
"caches": {
"now": Cache(),
"listens": Cache()
},
"last_updated": time(),
"etag": ""
}
def yt_cover(youtube_url): def yt_cover(youtube_url):
parsed_url = urlparse(youtube_url) parsed_url = urlparse(youtube_url)
@@ -23,15 +40,15 @@ def yt_cover(youtube_url):
if not video_id: if not video_id:
return return
return f"http://img.youtube.com/vi/{video_id}/sddefault.jpg" return f"https://img.youtube.com/vi/{video_id}/sddefault.jpg"
def parse_listens(data: dict) -> dict: def parse_listens(json: dict) -> dict:
new_data = { new_json = {
"count": data["count"], "count": json["count"],
"listens": [] "listens": []
} }
for track in data["listens"]: for track in json["listens"]:
track = track["track_metadata"] track = track["track_metadata"]
new_track = { new_track = {
@@ -53,33 +70,38 @@ def parse_listens(data: dict) -> dict:
if "cover_url" not in new_track.keys() and "id" in new_track.keys(): if "cover_url" not in new_track.keys() and "id" in new_track.keys():
new_track["cover_url"] = "/asset/mb/" + new_track["id"] new_track["cover_url"] = "/asset/mb/" + new_track["id"]
new_data["listens"].append(new_track) new_json["listens"].append(new_track)
return new_data return new_json
def api_request(url: str, cache): def api_request(url: str, cache: Cache):
try: try:
response = requests.get(url, timeout=10) response = requests.get(url, timeout=10)
if response.status_code == 200: if response.status_code == 200:
cache.update({ json = parse_listens(response.json().get("payload"))
'data': parse_listens(response.json().get("payload")), cache.status = 'success'
'last_updated': time(),
'status': 'success' if cache.data != json:
}) cache.data = json
cache.last_updated = time()
data['last_updated'] = time()
data['etag'] = md5(''.join(
( dumps(data['caches'][x].data) for x in data['caches'] )
).encode()).hexdigest()
else: else:
cache['status'] = f'error: {response.status_code}' cache.status = f'error: {response.status_code}'
except Exception as e: except Exception as e:
cache['status'] = f'error: {str(e)}' cache.status = f'error: {str(e)}'
scheduler = BackgroundScheduler() scheduler = BackgroundScheduler()
scheduler.add_job( scheduler.add_job(
func=lambda: api_request("https://api.listenbrainz.org/1/user/risdeveau/listens?count=5", listens), func=lambda: api_request("https://api.listenbrainz.org/1/user/risdeveau/listens?count=5", data['caches']['listens']),
trigger=IntervalTrigger(minutes=1), trigger=IntervalTrigger(minutes=1),
id='risdeveau.listenbrainz.listens', id='risdeveau.listenbrainz.listens',
replace_existing=True replace_existing=True
) )
scheduler.add_job( scheduler.add_job(
func=lambda: api_request("https://api.listenbrainz.org/1/user/risdeveau/playing-now", listening), func=lambda: api_request("https://api.listenbrainz.org/1/user/risdeveau/playing-now", data['caches']['now']),
trigger=IntervalTrigger(seconds=15), trigger=IntervalTrigger(seconds=15),
id='risdeveau.listenbrainz.playing-now', id='risdeveau.listenbrainz.playing-now',
replace_existing=True replace_existing=True
+41 -18
View File
@@ -1,18 +1,35 @@
import atexit
import re
from dataclasses import dataclass
from hashlib import md5
from json import dumps
from os import environ from os import environ
from flask import Flask, jsonify from time import time
from urllib.parse import parse_qs, urlparse
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.interval import IntervalTrigger
import requests import requests
from time import time from flask import Flask
import atexit
import re
from urllib.parse import urlparse, parse_qs
TOKEN = environ.get("STEAM_TOKEN") TOKEN = environ.get("STEAM_TOKEN")
MY_ID = 76561198826355942 MY_ID = 76561198826355942
recent = {} @dataclass
owned = {} class Cache:
data = {}
last_updated = time()
status = None
data = {
"caches": {
"recent": Cache(),
"owned": Cache()
},
"last_updated": time(),
"etag": ""
}
def modify_game_list(json: dict) -> dict: def modify_game_list(json: dict) -> dict:
if 'games' in json.keys(): if 'games' in json.keys():
@@ -37,34 +54,40 @@ def api_request(cache, *args, **kwargs):
try: try:
response = steam_request(*args, **kwargs) response = steam_request(*args, **kwargs)
if response.status_code == 200: if response.status_code == 200:
cache.update({ json = modify_game_list(response.json().get("response"))
'data': modify_game_list(response.json().get("response")), cache.status = 'success'
'last_updated': time(),
'status': 'success' if cache.data != json:
}) cache.data = json
cache.last_updated = time()
data['last_updated'] = time()
data['etag'] = md5(''.join(
( dumps(data['caches'][x].data) for x in data['caches'] )
).encode()).hexdigest()
else: else:
cache['status'] = f'error: {response.status_code}' cache.status = f'error: {response.status_code}'
print("x")
except Exception as e: except Exception as e:
cache['status'] = f'error: {str(e)}' cache.status = f'error: {str(e)}'
if TOKEN: if TOKEN:
scheduler = BackgroundScheduler() scheduler = BackgroundScheduler()
scheduler.add_job( scheduler.add_job(
func=lambda: api_request(recent, "IPlayerService", "GetRecentlyPlayedGames", steamid=76561198826355942), func=lambda: api_request(data['caches']['recent'], "IPlayerService", "GetRecentlyPlayedGames", steamid=76561198826355942),
trigger=IntervalTrigger(minutes=15), trigger=IntervalTrigger(minutes=15),
id='risdeveau.steam.recent', id='risdeveau.steam.recent',
replace_existing=True replace_existing=True
) )
scheduler.add_job( scheduler.add_job(
func=lambda: api_request(owned, "IPlayerService", "GetOwnedGames", steamid=76561198826355942, include_appinfo=1, include_played_free_games=1), func=lambda: api_request(data['caches']['owned'], "IPlayerService", "GetOwnedGames", steamid=76561198826355942, include_appinfo=1, include_played_free_games=1),
trigger=IntervalTrigger(minutes=60), trigger=IntervalTrigger(minutes=60),
id='risdeveau.steam.owned', id='risdeveau.steam.owned',
replace_existing=True replace_existing=True
) )
scheduler.start() scheduler.start()
api_request(recent, "IPlayerService", "GetRecentlyPlayedGames", steamid=76561198826355942) api_request(data['caches']['recent'], "IPlayerService", "GetRecentlyPlayedGames", steamid=76561198826355942)
api_request(owned, "IPlayerService", "GetOwnedGames", steamid=76561198826355942, include_appinfo=1, include_played_free_games=1) api_request(data['caches']['owned'], "IPlayerService", "GetOwnedGames", steamid=76561198826355942, include_appinfo=1, include_played_free_games=1)
atexit.register(lambda: scheduler.shutdown()) atexit.register(lambda: scheduler.shutdown())
else: else:
@@ -10,25 +10,23 @@
</div> </div>
{% endmacro %} {% endmacro %}
{% if lb_now.data and lb.data %}
<div <div
class="block" class="block"
hx-get="/m/listenbrainz" hx-get="/m/listenbrainz"
hx-trigger="every 15s" hx-trigger="every 15s"
hx-swap="outerHTML" hx-swap="outerHTML"
hx-headers='{ hx-headers='{
"If-Modified-Since": {{ (lb_now.last_updated, lb.last_updated) | max | int }}, "If-Modified-Since": {{ lb.last_updated | int }},
"If-None-Match": "{{ lb_etag() }}" "If-None-Match": "{{ lb.etag }}"
}' }'
> >
<h2><a href="https://listenbrainz.org/user/risdeveau/">Listenbrainz</a></h2> <h2><a href="https://listenbrainz.org/user/risdeveau/">Listenbrainz</a></h2>
{% if lb_now.data and lb_now.data.listens.0 %} {% if lb.caches.now.data and lb.caches.now.data.listens.0 %}
{{ track_block(lb_now.data.listens.0, is_active=true) }} {{ track_block(lb.caches.now.data.listens.0, is_active=true) }}
{% endif %} {% endif %}
{% if lb.data and lb.data.listens %} {% if lb.caches.listens.data and lb.caches.listens.data.listens %}
{% for track in lb.data.listens %} {% for track in lb.caches.listens.data.listens %}
{{ track_block(track) }} {{ track_block(track) }}
{% endfor %} {% endfor %}
{% endif %} {% endif %}
</div> </div>
{% endif %}
+8 -8
View File
@@ -4,15 +4,15 @@
hx-trigger="every 1m" hx-trigger="every 1m"
hx-swap="outerHTML" hx-swap="outerHTML"
hx-headers='{ hx-headers='{
"If-Modified-Since": {{ (recent.last_updated, owned.last_updated) | max | int }}, "If-Modified-Since": {{ steam.last_updated | int }},
"If-None-Match": "{{ steam_etag() }}" "If-None-Match": "{{ steam.etag }}"
}' }'
> >
<h2><a href="https://steamcommunity.com/id/risdeveau">Steam</a></h2> <h2><a href="https://steamcommunity.com/id/risdeveau">Steam</a></h2>
{% if recent.data.games %} {% if steam.caches.recent.data.games %}
<h3>Recently played:</h3> <h3>Recently played:</h3>
{% for g in recent.data.games %} {% for g in steam.caches.recent.data.games %}
<a href="https://store.steampowered.com/app/{{ g.appid }}" class="block"> <a href="https://store.steampowered.com/app/{{ g.appid }}" class="block">
<picture> <picture>
<source media="(max-width: 45rem)" srcset="{{ g.v_cover }}"> <source media="(max-width: 45rem)" srcset="{{ g.v_cover }}">
@@ -31,12 +31,12 @@
</div> </div>
</a> </a>
{% endfor %} {% endfor %}
<p>Last updated: {{ rtmsmp(recent.last_updated) }} ago</p> <p>Last updated: {{ rtmsmp(steam.caches.recent.last_updated) }} ago</p>
{% endif %} {% endif %}
{% if owned.data.games %} {% if steam.caches.owned.data.games %}
<h3>Top played games:</h3> <h3>Top played games:</h3>
{% set owned_games = owned.data.games | sort(attribute="playtime_forever", reverse=true) %} {% set owned_games = steam.caches.owned.data.games | sort(attribute="playtime_forever", reverse=true) %}
{% for g in owned_games[:5] %} {% for g in owned_games[:5] %}
<a href="https://store.steampowered.com/app/{{ g.appid }}" class="block"> <a href="https://store.steampowered.com/app/{{ g.appid }}" class="block">
<picture> <picture>
@@ -58,6 +58,6 @@
</div> </div>
</a> </a>
{% endfor %} {% endfor %}
<p>Last updated: {{ rtmsmp(owned.last_updated) }} ago</p> <p>Last updated: {{ rtmsmp(steam.caches.owned.last_updated) }} ago</p>
{% endif %} {% endif %}
</div> </div>