Files
lair.moe/blueprints/root/modules/euroring.py
T
Sweetbread fdbb4c6a26
Docker Build and Push / build-and-push (push) Successful in 53s
fixup! fixup! Add some webrings
2026-04-13 22:07:41 +03:00

189 lines
4.9 KiB
Python

from __future__ import annotations
import atexit
import os
import re
from time import time
from urllib.parse import urlsplit
import requests
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from blueprints.risdeveau.modules.api.scheduler_guard import should_start_scheduler
from blueprints.risdeveau.modules.api.shared_cache import (
atomic_write_json,
cache_file,
load_json_if_newer,
)
EURORING_SOURCE_URL = os.environ.get(
"EURORING_SOURCE_URL",
"https://euroring.neocities.org/scripts/onionring-variables.js",
)
EURORING_SITE_URL = os.environ.get("EURORING_SITE_URL", "https://lair.moe")
EURORING_TIMEOUT = float(os.environ.get("EURORING_TIMEOUT", "10"))
data = {
"enabled": bool(EURORING_SOURCE_URL and EURORING_SITE_URL),
"status": "disabled",
"source_url": EURORING_SOURCE_URL,
"site_url": EURORING_SITE_URL,
"ring_name": "",
"index_url": None,
"prev_url": None,
"next_url": None,
"count": 0,
"last_updated": 0,
}
_IS_WRITER = should_start_scheduler() if data["enabled"] else False
_CACHE_PATH = cache_file("webring")
_CACHE_MTIME = 0.0
def _site_key(url: str) -> tuple[str, int | None, str, str]:
parts = urlsplit(url.strip())
hostname = (parts.hostname or "").lower()
port = parts.port
path = re.sub(r"/+$", "", parts.path or "")
query = parts.query or ""
return hostname, port, path, query
def _parse_js_string_value(text: str, variable: str) -> str | None:
match = re.search(
rf"var\s+{re.escape(variable)}\s*=\s*(['\"])(.*?)\1\s*;",
text,
re.S,
)
if not match:
return None
return match.group(2).strip()
def _parse_sites(text: str) -> list[str]:
match = re.search(r"var\s+sites\s*=\s*\[(.*?)\]\s*;", text, re.S)
if not match:
raise ValueError("sites array not found")
sites_block = match.group(1)
sites: list[str] = []
for raw_line in sites_block.splitlines():
line = raw_line.strip()
if not line or line.startswith("//"):
continue
site_match = re.match(
r"(?P<quote>['\"])(?P<url>.*?)(?P=quote)\s*,?\s*(?://.*)?$",
line,
)
if site_match:
sites.append(site_match.group("url").strip())
if not sites:
raise ValueError("sites array is empty")
return sites
def _compute_ring_payload(text: str) -> dict:
sites = _parse_sites(text)
target_key = _site_key(EURORING_SITE_URL)
current_index = None
for i, site in enumerate(sites):
if _site_key(site) == target_key:
current_index = i
break
if current_index is None:
raise ValueError(f"site not found in ring: {EURORING_SITE_URL}")
ring_name = _parse_js_string_value(text, "ringName") or "Webring"
index_url = _parse_js_string_value(text, "indexPage")
return {
"enabled": True,
"status": "success",
"source_url": EURORING_SOURCE_URL,
"site_url": EURORING_SITE_URL,
"ring_name": ring_name,
"index_url": index_url,
"prev_url": sites[(current_index - 1) % len(sites)],
"next_url": sites[(current_index + 1) % len(sites)],
"count": len(sites),
"last_updated": time(),
}
def refresh_cache() -> None:
global _CACHE_MTIME
if _IS_WRITER:
return
payload, mtime = load_json_if_newer(_CACHE_PATH, _CACHE_MTIME)
if payload is None:
return
data.update(payload)
_CACHE_MTIME = mtime
def _persist_cache() -> None:
if not _IS_WRITER:
return
atomic_write_json(_CACHE_PATH, data)
def fetch_webring() -> None:
if not data["enabled"]:
return
previous_state = dict(data)
try:
response = requests.get(EURORING_SOURCE_URL, timeout=EURORING_TIMEOUT)
response.raise_for_status()
payload = _compute_ring_payload(response.text)
data.update(payload)
except Exception as exc:
data.update({
"enabled": True,
"status": f"error: {exc}",
"source_url": EURORING_SOURCE_URL,
"site_url": EURORING_SITE_URL,
"ring_name": data.get("ring_name") or "Webring",
"index_url": data.get("index_url"),
"prev_url": data.get("prev_url"),
"next_url": data.get("next_url"),
"count": data.get("count", 0),
"last_updated": data.get("last_updated", 0),
})
if data != previous_state:
_persist_cache()
if data["enabled"]:
scheduler = BackgroundScheduler()
if _IS_WRITER:
scheduler.add_job(
func=fetch_webring,
trigger=IntervalTrigger(days=1),
id="root.webring.refresh",
replace_existing=True,
)
_persist_cache()
scheduler.start()
fetch_webring()
atexit.register(lambda: scheduler.shutdown())
else:
refresh_cache()