from __future__ import annotations import atexit import json import os import re import tempfile from time import time from urllib.parse import urlsplit import fcntl import requests from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.interval import IntervalTrigger def _cache_dir() -> str: path = os.environ.get("LAIR_CACHE_DIR", "/tmp/lair-cache") os.makedirs(path, exist_ok=True) return path def _cache_file(name: str) -> str: return os.path.join(_cache_dir(), f"{name}.json") def _atomic_write_json(path: str, payload: dict) -> None: parent = os.path.dirname(path) or "." fd, tmp = tempfile.mkstemp(prefix=".tmp-", dir=parent) try: with os.fdopen(fd, "w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, separators=(",", ":")) os.replace(tmp, path) finally: try: if os.path.exists(tmp): os.unlink(tmp) except Exception: pass _CACHE_MTIME = 0.0 _LOCK_FD: int | None = None def _load_json_if_newer(path: str, last_mtime: float) -> tuple[dict | None, float]: try: stat = os.stat(path) except FileNotFoundError: return None, last_mtime except Exception: return None, last_mtime mtime = float(stat.st_mtime) if mtime <= float(last_mtime): return None, last_mtime try: with open(path, "r", encoding="utf-8") as f: return json.load(f), mtime except Exception: return None, last_mtime def _should_start_scheduler() -> bool: global _LOCK_FD if _LOCK_FD is not None: return True lock_path = os.environ.get("LAIR_SCHED_LOCK", "/tmp/lair-scheduler.lock") fd = os.open(lock_path, os.O_CREAT | os.O_RDWR, 0o644) try: fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) _LOCK_FD = fd return True except BlockingIOError: os.close(fd) return False except Exception: try: os.close(fd) finally: return False EURORING_SOURCE_URL = "https://euroring.neocities.org/scripts/onionring-variables.js" EURORING_SITE_URL = "https://lair.moe" EURORING_TIMEOUT = float(os.environ.get("EURORING_TIMEOUT", "10")) data = { "enabled": bool(EURORING_SOURCE_URL and EURORING_SITE_URL), "status": "disabled", "source_url": EURORING_SOURCE_URL, "site_url": EURORING_SITE_URL, "ring_name": "", "index_url": None, "prev_url": None, "next_url": None, "count": 0, "last_updated": 0, } _IS_WRITER = _should_start_scheduler() _CACHE_PATH = _cache_file("webring") def _site_key(url: str) -> tuple[str, int | None, str, str]: parts = urlsplit(url.strip()) hostname = (parts.hostname or "").lower() port = parts.port path = re.sub(r"/+$", "", parts.path or "") query = parts.query or "" return hostname, port, path, query def _parse_js_string_value(text: str, variable: str) -> str | None: match = re.search( rf"var\s+{re.escape(variable)}\s*=\s*(['\"])(.*?)\1\s*;", text, re.S, ) if not match: return None return match.group(2).strip() def _parse_bool_value(text: str, variable: str) -> bool | None: match = re.search(rf"var\s+{re.escape(variable)}\s*=\s*(true|false)\s*;", text) if not match: return None return match.group(1) == "true" def _parse_sites(text: str) -> list[str]: match = re.search(r"var\s+sites\s*=\s*\[(.*?)\]\s*;", text, re.S) if not match: raise ValueError("sites array not found") sites_block = match.group(1) sites: list[str] = [] for raw_line in sites_block.splitlines(): line = raw_line.strip() if not line or line.startswith("//"): continue site_match = re.match( r"(?P['\"])(?P.*?)(?P=quote)\s*,?\s*(?://.*)?$", line, ) if site_match: sites.append(site_match.group("url").strip()) if not sites: raise ValueError("sites array is empty") return sites def _compute_ring_payload(text: str) -> dict: sites = _parse_sites(text) target_key = _site_key(EURORING_SITE_URL) current_index = None for i, site in enumerate(sites): if _site_key(site) == target_key: current_index = i break if current_index is None: raise ValueError(f"site not found in ring: {EURORING_SITE_URL}") ring_name = _parse_js_string_value(text, "ringName") or "Webring" use_index = _parse_bool_value(text, "useIndex") index_url = _parse_js_string_value(text, "indexPage") if use_index else None return { "enabled": True, "status": "success", "source_url": EURORING_SOURCE_URL, "site_url": EURORING_SITE_URL, "ring_name": ring_name, "index_url": index_url, "prev_url": sites[(current_index - 1) % len(sites)], "next_url": sites[(current_index + 1) % len(sites)], "count": len(sites), "last_updated": time(), } def refresh_cache() -> None: global _CACHE_MTIME if _IS_WRITER: return payload, mtime = _load_json_if_newer(_CACHE_PATH, _CACHE_MTIME) if payload is None: return data.update(payload) _CACHE_MTIME = mtime def _persist_cache() -> None: if not _IS_WRITER: return _atomic_write_json(_CACHE_PATH, data) def fetch_webring() -> None: if not data["enabled"]: return previous_state = dict(data) try: response = requests.get(EURORING_SOURCE_URL, timeout=EURORING_TIMEOUT) response.raise_for_status() payload = _compute_ring_payload(response.text) data.update(payload) except Exception as exc: data.update({ "enabled": True, "status": f"error: {exc}", "source_url": EURORING_SOURCE_URL, "site_url": EURORING_SITE_URL, "ring_name": data.get("ring_name") or "Webring", "index_url": data.get("index_url"), "prev_url": data.get("prev_url"), "next_url": data.get("next_url"), "count": data.get("count", 0), "last_updated": data.get("last_updated", 0), }) if data != previous_state: _persist_cache() if data["enabled"]: scheduler = BackgroundScheduler() if _IS_WRITER: scheduler.add_job( func=fetch_webring, trigger=IntervalTrigger(days=1), id="root.webring.refresh", replace_existing=True, ) _persist_cache() scheduler.start() fetch_webring() atexit.register(lambda: scheduler.shutdown()) else: refresh_cache()