Files
lair.moe/blueprints/root/modules/euroring.py
T

257 lines
6.6 KiB
Python
Raw Normal View History

2026-04-09 04:17:57 +03:00
from __future__ import annotations
import atexit
import json
import os
import re
import tempfile
from time import time
from urllib.parse import urlsplit
import fcntl
import requests
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
def _cache_dir() -> str:
path = os.environ.get("LAIR_CACHE_DIR", "/tmp/lair-cache")
os.makedirs(path, exist_ok=True)
return path
def _cache_file(name: str) -> str:
return os.path.join(_cache_dir(), f"{name}.json")
def _atomic_write_json(path: str, payload: dict) -> None:
parent = os.path.dirname(path) or "."
fd, tmp = tempfile.mkstemp(prefix=".tmp-", dir=parent)
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(payload, f, ensure_ascii=False, separators=(",", ":"))
os.replace(tmp, path)
finally:
try:
if os.path.exists(tmp):
os.unlink(tmp)
except Exception:
pass
_CACHE_MTIME = 0.0
_LOCK_FD: int | None = None
def _load_json_if_newer(path: str, last_mtime: float) -> tuple[dict | None, float]:
try:
stat = os.stat(path)
except FileNotFoundError:
return None, last_mtime
except Exception:
return None, last_mtime
mtime = float(stat.st_mtime)
if mtime <= float(last_mtime):
return None, last_mtime
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f), mtime
except Exception:
return None, last_mtime
def _should_start_scheduler() -> bool:
global _LOCK_FD
if _LOCK_FD is not None:
return True
lock_path = os.environ.get("LAIR_SCHED_LOCK", "/tmp/lair-scheduler.lock")
fd = os.open(lock_path, os.O_CREAT | os.O_RDWR, 0o644)
try:
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
_LOCK_FD = fd
return True
except BlockingIOError:
os.close(fd)
return False
except Exception:
try:
os.close(fd)
finally:
return False
EURORING_SOURCE_URL = "https://euroring.neocities.org/scripts/onionring-variables.js"
EURORING_SITE_URL = "https://lair.moe"
EURORING_TIMEOUT = float(os.environ.get("EURORING_TIMEOUT", "10"))
data = {
"enabled": bool(EURORING_SOURCE_URL and EURORING_SITE_URL),
"status": "disabled",
"source_url": EURORING_SOURCE_URL,
"site_url": EURORING_SITE_URL,
"ring_name": "",
"index_url": None,
"prev_url": None,
"next_url": None,
"count": 0,
"last_updated": 0,
}
_IS_WRITER = _should_start_scheduler()
_CACHE_PATH = _cache_file("webring")
def _site_key(url: str) -> tuple[str, int | None, str, str]:
parts = urlsplit(url.strip())
hostname = (parts.hostname or "").lower()
port = parts.port
path = re.sub(r"/+$", "", parts.path or "")
query = parts.query or ""
return hostname, port, path, query
def _parse_js_string_value(text: str, variable: str) -> str | None:
match = re.search(
rf"var\s+{re.escape(variable)}\s*=\s*(['\"])(.*?)\1\s*;",
text,
re.S,
)
if not match:
return None
return match.group(2).strip()
def _parse_bool_value(text: str, variable: str) -> bool | None:
match = re.search(rf"var\s+{re.escape(variable)}\s*=\s*(true|false)\s*;", text)
if not match:
return None
return match.group(1) == "true"
def _parse_sites(text: str) -> list[str]:
match = re.search(r"var\s+sites\s*=\s*\[(.*?)\]\s*;", text, re.S)
if not match:
raise ValueError("sites array not found")
sites_block = match.group(1)
sites: list[str] = []
for raw_line in sites_block.splitlines():
line = raw_line.strip()
if not line or line.startswith("//"):
continue
site_match = re.match(
r"(?P<quote>['\"])(?P<url>.*?)(?P=quote)\s*,?\s*(?://.*)?$",
line,
)
if site_match:
sites.append(site_match.group("url").strip())
if not sites:
raise ValueError("sites array is empty")
return sites
def _compute_ring_payload(text: str) -> dict:
sites = _parse_sites(text)
target_key = _site_key(EURORING_SITE_URL)
current_index = None
for i, site in enumerate(sites):
if _site_key(site) == target_key:
current_index = i
break
if current_index is None:
raise ValueError(f"site not found in ring: {EURORING_SITE_URL}")
ring_name = _parse_js_string_value(text, "ringName") or "Webring"
use_index = _parse_bool_value(text, "useIndex")
index_url = _parse_js_string_value(text, "indexPage") if use_index else None
return {
"enabled": True,
"status": "success",
"source_url": EURORING_SOURCE_URL,
"site_url": EURORING_SITE_URL,
"ring_name": ring_name,
"index_url": index_url,
"prev_url": sites[(current_index - 1) % len(sites)],
"next_url": sites[(current_index + 1) % len(sites)],
"count": len(sites),
"last_updated": time(),
}
def refresh_cache() -> None:
global _CACHE_MTIME
if _IS_WRITER:
return
payload, mtime = _load_json_if_newer(_CACHE_PATH, _CACHE_MTIME)
if payload is None:
return
data.update(payload)
_CACHE_MTIME = mtime
def _persist_cache() -> None:
if not _IS_WRITER:
return
_atomic_write_json(_CACHE_PATH, data)
def fetch_webring() -> None:
if not data["enabled"]:
return
previous_state = dict(data)
try:
response = requests.get(EURORING_SOURCE_URL, timeout=EURORING_TIMEOUT)
response.raise_for_status()
payload = _compute_ring_payload(response.text)
data.update(payload)
except Exception as exc:
data.update({
"enabled": True,
"status": f"error: {exc}",
"source_url": EURORING_SOURCE_URL,
"site_url": EURORING_SITE_URL,
"ring_name": data.get("ring_name") or "Webring",
"index_url": data.get("index_url"),
"prev_url": data.get("prev_url"),
"next_url": data.get("next_url"),
"count": data.get("count", 0),
"last_updated": data.get("last_updated", 0),
})
if data != previous_state:
_persist_cache()
if data["enabled"]:
scheduler = BackgroundScheduler()
if _IS_WRITER:
scheduler.add_job(
func=fetch_webring,
trigger=IntervalTrigger(days=1),
id="root.webring.refresh",
replace_existing=True,
)
_persist_cache()
scheduler.start()
fetch_webring()
atexit.register(lambda: scheduler.shutdown())
else:
refresh_cache()