Fix API calls by all workers
Quite a complex commit by GPT. I'll rewrite it somewhen
This commit is contained in:
@@ -15,8 +15,8 @@ import magic
|
|||||||
from htmlmin import minify
|
from htmlmin import minify
|
||||||
from musicbrainzngs import get_image_front
|
from musicbrainzngs import get_image_front
|
||||||
|
|
||||||
from .modules.api.lb import data as lb_data
|
from .modules.api.lb import data as lb_data, refresh_cache as lb_refresh
|
||||||
from .modules.api.steam import data as steam_data
|
from .modules.api.steam import data as steam_data, refresh_cache as steam_refresh
|
||||||
|
|
||||||
def tmsmp(sec: int) -> str:
|
def tmsmp(sec: int) -> str:
|
||||||
if sec == 0:
|
if sec == 0:
|
||||||
@@ -83,10 +83,16 @@ args = {
|
|||||||
|
|
||||||
@bp.route("/")
|
@bp.route("/")
|
||||||
def index():
|
def index():
|
||||||
|
lb_refresh()
|
||||||
|
steam_refresh()
|
||||||
return render_tmpl('index.html', **args)
|
return render_tmpl('index.html', **args)
|
||||||
|
|
||||||
@bp.route("/m/<module>")
|
@bp.route("/m/<module>")
|
||||||
def module(module):
|
def module(module):
|
||||||
|
if module == "listenbrainz":
|
||||||
|
lb_refresh()
|
||||||
|
elif module == "steam":
|
||||||
|
steam_refresh()
|
||||||
if modified_since := request.headers.get('if-modified-since'):
|
if modified_since := request.headers.get('if-modified-since'):
|
||||||
modified_since = int(modified_since)
|
modified_since = int(modified_since)
|
||||||
none_match = request.headers.get('if-none-match')
|
none_match = request.headers.get('if-none-match')
|
||||||
|
|||||||
@@ -11,6 +11,9 @@ from apscheduler.triggers.interval import IntervalTrigger
|
|||||||
import requests
|
import requests
|
||||||
from flask import Flask, jsonify
|
from flask import Flask, jsonify
|
||||||
|
|
||||||
|
from .scheduler_guard import should_start_scheduler
|
||||||
|
from .shared_cache import atomic_write_json, cache_file, load_json_if_newer
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Cache:
|
class Cache:
|
||||||
@@ -18,6 +21,7 @@ class Cache:
|
|||||||
last_updated = time()
|
last_updated = time()
|
||||||
status = None
|
status = None
|
||||||
|
|
||||||
|
|
||||||
data = {
|
data = {
|
||||||
"caches": {
|
"caches": {
|
||||||
"now": Cache(),
|
"now": Cache(),
|
||||||
@@ -27,6 +31,55 @@ data = {
|
|||||||
"etag": ""
|
"etag": ""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_IS_WRITER = should_start_scheduler()
|
||||||
|
_CACHE_PATH = cache_file("listenbrainz")
|
||||||
|
_CACHE_MTIME = 0.0
|
||||||
|
|
||||||
|
|
||||||
|
def refresh_cache() -> None:
|
||||||
|
"""Refresh in-memory cache from the shared JSON file (for non-writer workers)."""
|
||||||
|
global _CACHE_MTIME
|
||||||
|
if _IS_WRITER:
|
||||||
|
return
|
||||||
|
|
||||||
|
payload, mtime = load_json_if_newer(_CACHE_PATH, _CACHE_MTIME)
|
||||||
|
if payload is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
data["etag"] = payload.get("etag", data.get("etag", ""))
|
||||||
|
data["last_updated"] = payload.get("last_updated", data.get("last_updated", time()))
|
||||||
|
caches = payload.get("caches", {})
|
||||||
|
for key, cache in data.get("caches", {}).items():
|
||||||
|
if isinstance(caches, dict) and key in caches and isinstance(caches[key], dict):
|
||||||
|
c = caches[key]
|
||||||
|
cache.data = c.get("data", cache.data)
|
||||||
|
cache.last_updated = c.get("last_updated", cache.last_updated)
|
||||||
|
cache.status = c.get("status", cache.status)
|
||||||
|
finally:
|
||||||
|
_CACHE_MTIME = mtime
|
||||||
|
|
||||||
|
|
||||||
|
def _persist_cache() -> None:
|
||||||
|
"""Persist current cache state to the shared JSON file (writer worker only)."""
|
||||||
|
if not _IS_WRITER:
|
||||||
|
return
|
||||||
|
|
||||||
|
payload = {
|
||||||
|
"etag": data.get("etag", ""),
|
||||||
|
"last_updated": data.get("last_updated", time()),
|
||||||
|
"caches": {
|
||||||
|
k: {
|
||||||
|
"data": v.data,
|
||||||
|
"last_updated": v.last_updated,
|
||||||
|
"status": v.status,
|
||||||
|
}
|
||||||
|
for k, v in data.get("caches", {}).items()
|
||||||
|
},
|
||||||
|
}
|
||||||
|
atomic_write_json(_CACHE_PATH, payload)
|
||||||
|
|
||||||
|
|
||||||
def yt_cover(youtube_url):
|
def yt_cover(youtube_url):
|
||||||
parsed_url = urlparse(youtube_url)
|
parsed_url = urlparse(youtube_url)
|
||||||
|
|
||||||
@@ -42,6 +95,7 @@ def yt_cover(youtube_url):
|
|||||||
|
|
||||||
return f"https://img.youtube.com/vi/{video_id}/2.jpg"
|
return f"https://img.youtube.com/vi/{video_id}/2.jpg"
|
||||||
|
|
||||||
|
|
||||||
def parse_listens(json: dict) -> dict:
|
def parse_listens(json: dict) -> dict:
|
||||||
cover_replacing = {
|
cover_replacing = {
|
||||||
"1e699948-c7c8-4bb2-9f8b-62e14b882a5d": "ca464c1d-5848-45bb-b92d-b1e4b00f9d65",
|
"1e699948-c7c8-4bb2-9f8b-62e14b882a5d": "ca464c1d-5848-45bb-b92d-b1e4b00f9d65",
|
||||||
@@ -86,7 +140,11 @@ def parse_listens(json: dict) -> dict:
|
|||||||
|
|
||||||
return new_json
|
return new_json
|
||||||
|
|
||||||
|
|
||||||
def api_request(url: str, cache: Cache):
|
def api_request(url: str, cache: Cache):
|
||||||
|
changed = False
|
||||||
|
prev_status = cache.status
|
||||||
|
|
||||||
try:
|
try:
|
||||||
response = requests.get(url, timeout=10)
|
response = requests.get(url, timeout=10)
|
||||||
if response.status_code == 200:
|
if response.status_code == 200:
|
||||||
@@ -100,12 +158,22 @@ def api_request(url: str, cache: Cache):
|
|||||||
data['etag'] = md5(''.join(
|
data['etag'] = md5(''.join(
|
||||||
( dumps(data['caches'][x].data) for x in data['caches'] )
|
( dumps(data['caches'][x].data) for x in data['caches'] )
|
||||||
).encode()).hexdigest()
|
).encode()).hexdigest()
|
||||||
|
changed = True
|
||||||
else:
|
else:
|
||||||
cache.status = f'error: {response.status_code}'
|
cache.status = f'error: {response.status_code}'
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
cache.status = f'error: {str(e)}'
|
cache.status = f'error: {str(e)}'
|
||||||
|
|
||||||
|
if prev_status != cache.status:
|
||||||
|
changed = True
|
||||||
|
|
||||||
|
if changed:
|
||||||
|
_persist_cache()
|
||||||
|
|
||||||
|
|
||||||
scheduler = BackgroundScheduler()
|
scheduler = BackgroundScheduler()
|
||||||
|
|
||||||
|
if _IS_WRITER:
|
||||||
scheduler.add_job(
|
scheduler.add_job(
|
||||||
func=lambda: api_request("https://api.listenbrainz.org/1/user/risdeveau/listens?count=5", data['caches']['listens']),
|
func=lambda: api_request("https://api.listenbrainz.org/1/user/risdeveau/listens?count=5", data['caches']['listens']),
|
||||||
trigger=IntervalTrigger(minutes=1),
|
trigger=IntervalTrigger(minutes=1),
|
||||||
@@ -118,6 +186,8 @@ scheduler.add_job(
|
|||||||
id='risdeveau.listenbrainz.playing-now',
|
id='risdeveau.listenbrainz.playing-now',
|
||||||
replace_existing=True
|
replace_existing=True
|
||||||
)
|
)
|
||||||
|
_persist_cache()
|
||||||
scheduler.start()
|
scheduler.start()
|
||||||
|
|
||||||
atexit.register(lambda: scheduler.shutdown())
|
atexit.register(lambda: scheduler.shutdown())
|
||||||
|
else:
|
||||||
|
refresh_cache()
|
||||||
|
|||||||
@@ -0,0 +1,35 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
import fcntl
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
_LOCK_FD: Optional[int] = None # keep fd open for process lifetime
|
||||||
|
|
||||||
|
|
||||||
|
def should_start_scheduler() -> bool:
|
||||||
|
"""Return True in exactly one process (the scheduler owner).
|
||||||
|
|
||||||
|
Under gunicorn --workers N, code is imported in N processes. If APScheduler
|
||||||
|
starts at import time, you get N schedulers => N× API calls.
|
||||||
|
|
||||||
|
We prevent that by taking a non-blocking exclusive lock on a lockfile.
|
||||||
|
"""
|
||||||
|
global _LOCK_FD
|
||||||
|
if _LOCK_FD is not None:
|
||||||
|
return True
|
||||||
|
|
||||||
|
lock_path = os.environ.get("LAIR_SCHED_LOCK", "/tmp/lair-scheduler.lock")
|
||||||
|
fd = os.open(lock_path, os.O_CREAT | os.O_RDWR, 0o644)
|
||||||
|
try:
|
||||||
|
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||||
|
_LOCK_FD = fd
|
||||||
|
return True
|
||||||
|
except BlockingIOError:
|
||||||
|
os.close(fd)
|
||||||
|
return False
|
||||||
|
except Exception:
|
||||||
|
try:
|
||||||
|
os.close(fd)
|
||||||
|
finally:
|
||||||
|
return False
|
||||||
@@ -0,0 +1,53 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
from typing import Any, Optional, Tuple
|
||||||
|
|
||||||
|
|
||||||
|
def cache_dir() -> str:
|
||||||
|
"""Directory where the scheduler owner writes shared cache JSON files."""
|
||||||
|
d = os.environ.get("LAIR_CACHE_DIR", "/tmp/lair-cache")
|
||||||
|
os.makedirs(d, exist_ok=True)
|
||||||
|
return d
|
||||||
|
|
||||||
|
|
||||||
|
def cache_file(name: str) -> str:
|
||||||
|
return os.path.join(cache_dir(), f"{name}.json")
|
||||||
|
|
||||||
|
|
||||||
|
def atomic_write_json(path: str, payload: dict) -> None:
|
||||||
|
"""Write JSON atomically so readers never observe partial content."""
|
||||||
|
parent = os.path.dirname(path) or "."
|
||||||
|
fd, tmp = tempfile.mkstemp(prefix=".tmp-", dir=parent)
|
||||||
|
try:
|
||||||
|
with os.fdopen(fd, "w", encoding="utf-8") as f:
|
||||||
|
json.dump(payload, f, ensure_ascii=False, separators=(",", ":"))
|
||||||
|
os.replace(tmp, path)
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
if os.path.exists(tmp):
|
||||||
|
os.unlink(tmp)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def load_json_if_newer(path: str, last_mtime: float) -> Tuple[Optional[dict], float]:
|
||||||
|
"""Load JSON only if the file exists and has mtime newer than last_mtime."""
|
||||||
|
try:
|
||||||
|
st = os.stat(path)
|
||||||
|
except FileNotFoundError:
|
||||||
|
return None, last_mtime
|
||||||
|
except Exception:
|
||||||
|
return None, last_mtime
|
||||||
|
|
||||||
|
mtime = float(st.st_mtime)
|
||||||
|
if mtime <= float(last_mtime):
|
||||||
|
return None, last_mtime
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(path, "r", encoding="utf-8") as f:
|
||||||
|
return json.load(f), mtime
|
||||||
|
except Exception:
|
||||||
|
return None, last_mtime
|
||||||
@@ -12,16 +12,21 @@ from apscheduler.triggers.interval import IntervalTrigger
|
|||||||
import requests
|
import requests
|
||||||
from flask import Flask
|
from flask import Flask
|
||||||
|
|
||||||
|
from .scheduler_guard import should_start_scheduler
|
||||||
|
from .shared_cache import atomic_write_json, cache_file, load_json_if_newer
|
||||||
|
|
||||||
|
|
||||||
TOKEN = environ.get("STEAM_TOKEN")
|
TOKEN = environ.get("STEAM_TOKEN")
|
||||||
MY_ID = 76561198826355942
|
MY_ID = 76561198826355942
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Cache:
|
class Cache:
|
||||||
data = {}
|
data = {}
|
||||||
last_updated = time()
|
last_updated = time()
|
||||||
status = None
|
status = None
|
||||||
|
|
||||||
|
|
||||||
data = {
|
data = {
|
||||||
"caches": {
|
"caches": {
|
||||||
"recent": Cache(),
|
"recent": Cache(),
|
||||||
@@ -31,6 +36,55 @@ data = {
|
|||||||
"etag": ""
|
"etag": ""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_IS_WRITER = should_start_scheduler()
|
||||||
|
_CACHE_PATH = cache_file("steam")
|
||||||
|
_CACHE_MTIME = 0.0
|
||||||
|
|
||||||
|
|
||||||
|
def refresh_cache() -> None:
|
||||||
|
"""Refresh in-memory cache from the shared JSON file (for non-writer workers)."""
|
||||||
|
global _CACHE_MTIME
|
||||||
|
if _IS_WRITER:
|
||||||
|
return
|
||||||
|
|
||||||
|
payload, mtime = load_json_if_newer(_CACHE_PATH, _CACHE_MTIME)
|
||||||
|
if payload is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
data["etag"] = payload.get("etag", data.get("etag", ""))
|
||||||
|
data["last_updated"] = payload.get("last_updated", data.get("last_updated", time()))
|
||||||
|
caches = payload.get("caches", {})
|
||||||
|
for key, cache in data.get("caches", {}).items():
|
||||||
|
if isinstance(caches, dict) and key in caches and isinstance(caches[key], dict):
|
||||||
|
c = caches[key]
|
||||||
|
cache.data = c.get("data", cache.data)
|
||||||
|
cache.last_updated = c.get("last_updated", cache.last_updated)
|
||||||
|
cache.status = c.get("status", cache.status)
|
||||||
|
finally:
|
||||||
|
_CACHE_MTIME = mtime
|
||||||
|
|
||||||
|
|
||||||
|
def _persist_cache() -> None:
|
||||||
|
"""Persist current cache state to the shared JSON file (writer worker only)."""
|
||||||
|
if not _IS_WRITER:
|
||||||
|
return
|
||||||
|
|
||||||
|
payload = {
|
||||||
|
"etag": data.get("etag", ""),
|
||||||
|
"last_updated": data.get("last_updated", time()),
|
||||||
|
"caches": {
|
||||||
|
k: {
|
||||||
|
"data": v.data,
|
||||||
|
"last_updated": v.last_updated,
|
||||||
|
"status": v.status,
|
||||||
|
}
|
||||||
|
for k, v in data.get("caches", {}).items()
|
||||||
|
},
|
||||||
|
}
|
||||||
|
atomic_write_json(_CACHE_PATH, payload)
|
||||||
|
|
||||||
|
|
||||||
def modify_game_list(json: dict) -> dict:
|
def modify_game_list(json: dict) -> dict:
|
||||||
if 'games' in json.keys():
|
if 'games' in json.keys():
|
||||||
apps = (3301060, 404790, 1281930, 1920960, 1325960, 431960)
|
apps = (3301060, 404790, 1281930, 1920960, 1325960, 431960)
|
||||||
@@ -43,6 +97,7 @@ def modify_game_list(json: dict) -> dict:
|
|||||||
json['games'] = new_games
|
json['games'] = new_games
|
||||||
return json
|
return json
|
||||||
|
|
||||||
|
|
||||||
def steam_request(interface: str, method: str, v: int = 1, **kwargs) -> requests.Response:
|
def steam_request(interface: str, method: str, v: int = 1, **kwargs) -> requests.Response:
|
||||||
return requests.get(
|
return requests.get(
|
||||||
f"https://api.steampowered.com/{interface}/{method}/v{v:04}/",
|
f"https://api.steampowered.com/{interface}/{method}/v{v:04}/",
|
||||||
@@ -50,7 +105,11 @@ def steam_request(interface: str, method: str, v: int = 1, **kwargs) -> requests
|
|||||||
timeout=10
|
timeout=10
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def api_request(cache, *args, **kwargs):
|
def api_request(cache, *args, **kwargs):
|
||||||
|
changed = False
|
||||||
|
prev_status = cache.status
|
||||||
|
|
||||||
try:
|
try:
|
||||||
response = steam_request(*args, **kwargs)
|
response = steam_request(*args, **kwargs)
|
||||||
if response.status_code == 200:
|
if response.status_code == 200:
|
||||||
@@ -64,31 +123,51 @@ def api_request(cache, *args, **kwargs):
|
|||||||
data['etag'] = md5(''.join(
|
data['etag'] = md5(''.join(
|
||||||
( dumps(data['caches'][x].data) for x in data['caches'] )
|
( dumps(data['caches'][x].data) for x in data['caches'] )
|
||||||
).encode()).hexdigest()
|
).encode()).hexdigest()
|
||||||
|
changed = True
|
||||||
else:
|
else:
|
||||||
cache.status = f'error: {response.status_code}'
|
cache.status = f'error: {response.status_code}'
|
||||||
print("x")
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
cache.status = f'error: {str(e)}'
|
cache.status = f'error: {str(e)}'
|
||||||
|
|
||||||
|
if prev_status != cache.status:
|
||||||
|
changed = True
|
||||||
|
|
||||||
|
if changed:
|
||||||
|
_persist_cache()
|
||||||
|
|
||||||
|
|
||||||
if TOKEN:
|
if TOKEN:
|
||||||
scheduler = BackgroundScheduler()
|
scheduler = BackgroundScheduler()
|
||||||
|
|
||||||
|
if _IS_WRITER:
|
||||||
scheduler.add_job(
|
scheduler.add_job(
|
||||||
func=lambda: api_request(data['caches']['recent'], "IPlayerService", "GetRecentlyPlayedGames", steamid=76561198826355942),
|
func=lambda: api_request(data['caches']['recent'], "IPlayerService", "GetRecentlyPlayedGames", steamid=MY_ID),
|
||||||
trigger=IntervalTrigger(minutes=15),
|
trigger=IntervalTrigger(minutes=15),
|
||||||
id='risdeveau.steam.recent',
|
id='risdeveau.steam.recent',
|
||||||
replace_existing=True
|
replace_existing=True
|
||||||
)
|
)
|
||||||
scheduler.add_job(
|
scheduler.add_job(
|
||||||
func=lambda: api_request(data['caches']['owned'], "IPlayerService", "GetOwnedGames", steamid=76561198826355942, include_appinfo=1, include_played_free_games=1),
|
func=lambda: api_request(data['caches']['owned'], "IPlayerService", "GetOwnedGames", steamid=MY_ID, include_appinfo=1, include_played_free_games=1),
|
||||||
trigger=IntervalTrigger(minutes=60),
|
trigger=IntervalTrigger(minutes=60),
|
||||||
id='risdeveau.steam.owned',
|
id='risdeveau.steam.owned',
|
||||||
replace_existing=True
|
replace_existing=True
|
||||||
)
|
)
|
||||||
|
|
||||||
|
_persist_cache()
|
||||||
scheduler.start()
|
scheduler.start()
|
||||||
|
|
||||||
api_request(data['caches']['recent'], "IPlayerService", "GetRecentlyPlayedGames", steamid=76561198826355942)
|
api_request(data['caches']['recent'], "IPlayerService", "GetRecentlyPlayedGames", steamid=MY_ID)
|
||||||
api_request(data['caches']['owned'], "IPlayerService", "GetOwnedGames", steamid=76561198826355942, include_appinfo=1, include_played_free_games=1)
|
api_request(data['caches']['owned'], "IPlayerService", "GetOwnedGames", steamid=MY_ID, include_appinfo=1, include_played_free_games=1)
|
||||||
|
|
||||||
atexit.register(lambda: scheduler.shutdown())
|
atexit.register(lambda: scheduler.shutdown())
|
||||||
else:
|
else:
|
||||||
print("STEAM_TOKEN is not defined")
|
refresh_cache()
|
||||||
|
else:
|
||||||
|
msg = "STEAM_TOKEN is not defined"
|
||||||
|
print(msg)
|
||||||
|
for c in data["caches"].values():
|
||||||
|
c.status = msg
|
||||||
|
if _IS_WRITER:
|
||||||
|
_persist_cache()
|
||||||
|
else:
|
||||||
|
refresh_cache()
|
||||||
|
|||||||
Reference in New Issue
Block a user