diff --git a/blueprints/risdeveau/__init__.py b/blueprints/risdeveau/__init__.py index 765d0f3..209afe4 100644 --- a/blueprints/risdeveau/__init__.py +++ b/blueprints/risdeveau/__init__.py @@ -15,8 +15,8 @@ import magic from htmlmin import minify from musicbrainzngs import get_image_front -from .modules.api.lb import data as lb_data -from .modules.api.steam import data as steam_data +from .modules.api.lb import data as lb_data, refresh_cache as lb_refresh +from .modules.api.steam import data as steam_data, refresh_cache as steam_refresh def tmsmp(sec: int) -> str: if sec == 0: @@ -83,10 +83,16 @@ args = { @bp.route("/") def index(): + lb_refresh() + steam_refresh() return render_tmpl('index.html', **args) @bp.route("/m/") def module(module): + if module == "listenbrainz": + lb_refresh() + elif module == "steam": + steam_refresh() if modified_since := request.headers.get('if-modified-since'): modified_since = int(modified_since) none_match = request.headers.get('if-none-match') diff --git a/blueprints/risdeveau/modules/api/lb.py b/blueprints/risdeveau/modules/api/lb.py index 4e9af6f..384edea 100644 --- a/blueprints/risdeveau/modules/api/lb.py +++ b/blueprints/risdeveau/modules/api/lb.py @@ -11,6 +11,9 @@ from apscheduler.triggers.interval import IntervalTrigger import requests from flask import Flask, jsonify +from .scheduler_guard import should_start_scheduler +from .shared_cache import atomic_write_json, cache_file, load_json_if_newer + @dataclass class Cache: @@ -18,6 +21,7 @@ class Cache: last_updated = time() status = None + data = { "caches": { "now": Cache(), @@ -27,6 +31,55 @@ data = { "etag": "" } +_IS_WRITER = should_start_scheduler() +_CACHE_PATH = cache_file("listenbrainz") +_CACHE_MTIME = 0.0 + + +def refresh_cache() -> None: + """Refresh in-memory cache from the shared JSON file (for non-writer workers).""" + global _CACHE_MTIME + if _IS_WRITER: + return + + payload, mtime = load_json_if_newer(_CACHE_PATH, _CACHE_MTIME) + if payload is None: + return + + try: + data["etag"] = payload.get("etag", data.get("etag", "")) + data["last_updated"] = payload.get("last_updated", data.get("last_updated", time())) + caches = payload.get("caches", {}) + for key, cache in data.get("caches", {}).items(): + if isinstance(caches, dict) and key in caches and isinstance(caches[key], dict): + c = caches[key] + cache.data = c.get("data", cache.data) + cache.last_updated = c.get("last_updated", cache.last_updated) + cache.status = c.get("status", cache.status) + finally: + _CACHE_MTIME = mtime + + +def _persist_cache() -> None: + """Persist current cache state to the shared JSON file (writer worker only).""" + if not _IS_WRITER: + return + + payload = { + "etag": data.get("etag", ""), + "last_updated": data.get("last_updated", time()), + "caches": { + k: { + "data": v.data, + "last_updated": v.last_updated, + "status": v.status, + } + for k, v in data.get("caches", {}).items() + }, + } + atomic_write_json(_CACHE_PATH, payload) + + def yt_cover(youtube_url): parsed_url = urlparse(youtube_url) @@ -42,6 +95,7 @@ def yt_cover(youtube_url): return f"https://img.youtube.com/vi/{video_id}/2.jpg" + def parse_listens(json: dict) -> dict: cover_replacing = { "1e699948-c7c8-4bb2-9f8b-62e14b882a5d": "ca464c1d-5848-45bb-b92d-b1e4b00f9d65", @@ -86,7 +140,11 @@ def parse_listens(json: dict) -> dict: return new_json + def api_request(url: str, cache: Cache): + changed = False + prev_status = cache.status + try: response = requests.get(url, timeout=10) if response.status_code == 200: @@ -100,24 +158,36 @@ def api_request(url: str, cache: Cache): data['etag'] = md5(''.join( ( dumps(data['caches'][x].data) for x in data['caches'] ) ).encode()).hexdigest() + changed = True else: cache.status = f'error: {response.status_code}' except Exception as e: cache.status = f'error: {str(e)}' -scheduler = BackgroundScheduler() -scheduler.add_job( - func=lambda: api_request("https://api.listenbrainz.org/1/user/risdeveau/listens?count=5", data['caches']['listens']), - trigger=IntervalTrigger(minutes=1), - id='risdeveau.listenbrainz.listens', - replace_existing=True -) -scheduler.add_job( - func=lambda: api_request("https://api.listenbrainz.org/1/user/risdeveau/playing-now", data['caches']['now']), - trigger=IntervalTrigger(seconds=15), - id='risdeveau.listenbrainz.playing-now', - replace_existing=True -) -scheduler.start() + if prev_status != cache.status: + changed = True -atexit.register(lambda: scheduler.shutdown()) + if changed: + _persist_cache() + + +scheduler = BackgroundScheduler() + +if _IS_WRITER: + scheduler.add_job( + func=lambda: api_request("https://api.listenbrainz.org/1/user/risdeveau/listens?count=5", data['caches']['listens']), + trigger=IntervalTrigger(minutes=1), + id='risdeveau.listenbrainz.listens', + replace_existing=True + ) + scheduler.add_job( + func=lambda: api_request("https://api.listenbrainz.org/1/user/risdeveau/playing-now", data['caches']['now']), + trigger=IntervalTrigger(seconds=15), + id='risdeveau.listenbrainz.playing-now', + replace_existing=True + ) + _persist_cache() + scheduler.start() + atexit.register(lambda: scheduler.shutdown()) +else: + refresh_cache() diff --git a/blueprints/risdeveau/modules/api/scheduler_guard.py b/blueprints/risdeveau/modules/api/scheduler_guard.py new file mode 100644 index 0000000..b3ffdb1 --- /dev/null +++ b/blueprints/risdeveau/modules/api/scheduler_guard.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +import os +import fcntl +from typing import Optional + +_LOCK_FD: Optional[int] = None # keep fd open for process lifetime + + +def should_start_scheduler() -> bool: + """Return True in exactly one process (the scheduler owner). + + Under gunicorn --workers N, code is imported in N processes. If APScheduler + starts at import time, you get N schedulers => NĂ— API calls. + + We prevent that by taking a non-blocking exclusive lock on a lockfile. + """ + global _LOCK_FD + if _LOCK_FD is not None: + return True + + lock_path = os.environ.get("LAIR_SCHED_LOCK", "/tmp/lair-scheduler.lock") + fd = os.open(lock_path, os.O_CREAT | os.O_RDWR, 0o644) + try: + fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + _LOCK_FD = fd + return True + except BlockingIOError: + os.close(fd) + return False + except Exception: + try: + os.close(fd) + finally: + return False diff --git a/blueprints/risdeveau/modules/api/shared_cache.py b/blueprints/risdeveau/modules/api/shared_cache.py new file mode 100644 index 0000000..855f37d --- /dev/null +++ b/blueprints/risdeveau/modules/api/shared_cache.py @@ -0,0 +1,53 @@ +from __future__ import annotations + +import json +import os +import tempfile +from typing import Any, Optional, Tuple + + +def cache_dir() -> str: + """Directory where the scheduler owner writes shared cache JSON files.""" + d = os.environ.get("LAIR_CACHE_DIR", "/tmp/lair-cache") + os.makedirs(d, exist_ok=True) + return d + + +def cache_file(name: str) -> str: + return os.path.join(cache_dir(), f"{name}.json") + + +def atomic_write_json(path: str, payload: dict) -> None: + """Write JSON atomically so readers never observe partial content.""" + parent = os.path.dirname(path) or "." + fd, tmp = tempfile.mkstemp(prefix=".tmp-", dir=parent) + try: + with os.fdopen(fd, "w", encoding="utf-8") as f: + json.dump(payload, f, ensure_ascii=False, separators=(",", ":")) + os.replace(tmp, path) + finally: + try: + if os.path.exists(tmp): + os.unlink(tmp) + except Exception: + pass + + +def load_json_if_newer(path: str, last_mtime: float) -> Tuple[Optional[dict], float]: + """Load JSON only if the file exists and has mtime newer than last_mtime.""" + try: + st = os.stat(path) + except FileNotFoundError: + return None, last_mtime + except Exception: + return None, last_mtime + + mtime = float(st.st_mtime) + if mtime <= float(last_mtime): + return None, last_mtime + + try: + with open(path, "r", encoding="utf-8") as f: + return json.load(f), mtime + except Exception: + return None, last_mtime diff --git a/blueprints/risdeveau/modules/api/steam.py b/blueprints/risdeveau/modules/api/steam.py index 5061d33..b88d8ce 100644 --- a/blueprints/risdeveau/modules/api/steam.py +++ b/blueprints/risdeveau/modules/api/steam.py @@ -12,16 +12,21 @@ from apscheduler.triggers.interval import IntervalTrigger import requests from flask import Flask +from .scheduler_guard import should_start_scheduler +from .shared_cache import atomic_write_json, cache_file, load_json_if_newer + TOKEN = environ.get("STEAM_TOKEN") MY_ID = 76561198826355942 + @dataclass class Cache: data = {} last_updated = time() status = None + data = { "caches": { "recent": Cache(), @@ -31,6 +36,55 @@ data = { "etag": "" } +_IS_WRITER = should_start_scheduler() +_CACHE_PATH = cache_file("steam") +_CACHE_MTIME = 0.0 + + +def refresh_cache() -> None: + """Refresh in-memory cache from the shared JSON file (for non-writer workers).""" + global _CACHE_MTIME + if _IS_WRITER: + return + + payload, mtime = load_json_if_newer(_CACHE_PATH, _CACHE_MTIME) + if payload is None: + return + + try: + data["etag"] = payload.get("etag", data.get("etag", "")) + data["last_updated"] = payload.get("last_updated", data.get("last_updated", time())) + caches = payload.get("caches", {}) + for key, cache in data.get("caches", {}).items(): + if isinstance(caches, dict) and key in caches and isinstance(caches[key], dict): + c = caches[key] + cache.data = c.get("data", cache.data) + cache.last_updated = c.get("last_updated", cache.last_updated) + cache.status = c.get("status", cache.status) + finally: + _CACHE_MTIME = mtime + + +def _persist_cache() -> None: + """Persist current cache state to the shared JSON file (writer worker only).""" + if not _IS_WRITER: + return + + payload = { + "etag": data.get("etag", ""), + "last_updated": data.get("last_updated", time()), + "caches": { + k: { + "data": v.data, + "last_updated": v.last_updated, + "status": v.status, + } + for k, v in data.get("caches", {}).items() + }, + } + atomic_write_json(_CACHE_PATH, payload) + + def modify_game_list(json: dict) -> dict: if 'games' in json.keys(): apps = (3301060, 404790, 1281930, 1920960, 1325960, 431960) @@ -43,6 +97,7 @@ def modify_game_list(json: dict) -> dict: json['games'] = new_games return json + def steam_request(interface: str, method: str, v: int = 1, **kwargs) -> requests.Response: return requests.get( f"https://api.steampowered.com/{interface}/{method}/v{v:04}/", @@ -50,7 +105,11 @@ def steam_request(interface: str, method: str, v: int = 1, **kwargs) -> requests timeout=10 ) + def api_request(cache, *args, **kwargs): + changed = False + prev_status = cache.status + try: response = steam_request(*args, **kwargs) if response.status_code == 200: @@ -64,31 +123,51 @@ def api_request(cache, *args, **kwargs): data['etag'] = md5(''.join( ( dumps(data['caches'][x].data) for x in data['caches'] ) ).encode()).hexdigest() + changed = True else: cache.status = f'error: {response.status_code}' - print("x") except Exception as e: cache.status = f'error: {str(e)}' + if prev_status != cache.status: + changed = True + + if changed: + _persist_cache() + + if TOKEN: scheduler = BackgroundScheduler() - scheduler.add_job( - func=lambda: api_request(data['caches']['recent'], "IPlayerService", "GetRecentlyPlayedGames", steamid=76561198826355942), - trigger=IntervalTrigger(minutes=15), - id='risdeveau.steam.recent', - replace_existing=True - ) - scheduler.add_job( - func=lambda: api_request(data['caches']['owned'], "IPlayerService", "GetOwnedGames", steamid=76561198826355942, include_appinfo=1, include_played_free_games=1), - trigger=IntervalTrigger(minutes=60), - id='risdeveau.steam.owned', - replace_existing=True - ) - scheduler.start() - api_request(data['caches']['recent'], "IPlayerService", "GetRecentlyPlayedGames", steamid=76561198826355942) - api_request(data['caches']['owned'], "IPlayerService", "GetOwnedGames", steamid=76561198826355942, include_appinfo=1, include_played_free_games=1) + if _IS_WRITER: + scheduler.add_job( + func=lambda: api_request(data['caches']['recent'], "IPlayerService", "GetRecentlyPlayedGames", steamid=MY_ID), + trigger=IntervalTrigger(minutes=15), + id='risdeveau.steam.recent', + replace_existing=True + ) + scheduler.add_job( + func=lambda: api_request(data['caches']['owned'], "IPlayerService", "GetOwnedGames", steamid=MY_ID, include_appinfo=1, include_played_free_games=1), + trigger=IntervalTrigger(minutes=60), + id='risdeveau.steam.owned', + replace_existing=True + ) - atexit.register(lambda: scheduler.shutdown()) + _persist_cache() + scheduler.start() + + api_request(data['caches']['recent'], "IPlayerService", "GetRecentlyPlayedGames", steamid=MY_ID) + api_request(data['caches']['owned'], "IPlayerService", "GetOwnedGames", steamid=MY_ID, include_appinfo=1, include_played_free_games=1) + + atexit.register(lambda: scheduler.shutdown()) + else: + refresh_cache() else: - print("STEAM_TOKEN is not defined") + msg = "STEAM_TOKEN is not defined" + print(msg) + for c in data["caches"].values(): + c.status = msg + if _IS_WRITER: + _persist_cache() + else: + refresh_cache()