Files
lair.moe/blueprints/risdeveau/modules/api/lb.py
T
Sweetbread a0cfa765bd
Docker Build and Push / build-and-push (push) Successful in 20s
Cover replacing for LB
2026-02-07 19:25:12 +03:00

124 lines
3.8 KiB
Python

import atexit
import re
from dataclasses import dataclass
from hashlib import md5
from json import dumps
from time import time
from urllib.parse import parse_qs, urlparse
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
import requests
from flask import Flask, jsonify
@dataclass
class Cache:
data = {}
last_updated = time()
status = None
data = {
"caches": {
"now": Cache(),
"listens": Cache()
},
"last_updated": time(),
"etag": ""
}
def yt_cover(youtube_url):
parsed_url = urlparse(youtube_url)
if parsed_url.netloc in ("youtube.com", "music.youtube.com"):
query_params = parse_qs(parsed_url.query)
video_id = query_params.get('v', [None])[0]
elif parsed_url.netloc == 'youtu.be':
video_id = parsed_url.path[1:]
if not video_id:
return
return f"https://img.youtube.com/vi/{video_id}/2.jpg"
def parse_listens(json: dict) -> dict:
cover_replacing = {
"1e699948-c7c8-4bb2-9f8b-62e14b882a5d": "ca464c1d-5848-45bb-b92d-b1e4b00f9d65",
"0d516a93-061e-4a27-9cf7-f36e3a96f888": "5cc0c0c7-22f9-4a4b-a24c-f1a6732f813b",
"92ea5cc8-80e0-4da0-a10b-1bc2f8e8781e": "e8f3e14a-4794-4bab-b403-d562cdad4c2f",
}
new_json = {
"count": json["count"],
"listens": []
}
for track in json["listens"]:
listened_at = track.get("listened_at", 0)
track = track["track_metadata"]
new_track = {
"artist_name": track["artist_name"],
"track_name": track["track_name"],
"listened_at": listened_at
}
if mb := track.get("mbid_mapping"):
new_track["id"] = \
cover_replacing.get(mb["release_mbid"],
mb.get("caa_release_mbid",
mb["release_mbid"]
))
new_track["artist_name"] = mb["artists"][0]["artist_credit_name"]
new_track["track_name"] = mb["recording_name"]
elif info := track.get("additional_info"):
if info \
.get("music_service_name", "") \
.lower() in ("youtube", "youtube music"):
if cover := yt_cover(track["additional_info"]["origin_url"]):
new_track["cover_url"] = cover
if "cover_url" not in new_track.keys() and "id" in new_track.keys():
new_track["cover_url"] = "/asset/mb/" + new_track["id"]
new_json["listens"].append(new_track)
return new_json
def api_request(url: str, cache: Cache):
try:
response = requests.get(url, timeout=10)
if response.status_code == 200:
json = parse_listens(response.json().get("payload"))
cache.status = 'success'
if cache.data != json:
cache.data = json
cache.last_updated = time()
data['last_updated'] = time()
data['etag'] = md5(''.join(
( dumps(data['caches'][x].data) for x in data['caches'] )
).encode()).hexdigest()
else:
cache.status = f'error: {response.status_code}'
except Exception as e:
cache.status = f'error: {str(e)}'
scheduler = BackgroundScheduler()
scheduler.add_job(
func=lambda: api_request("https://api.listenbrainz.org/1/user/risdeveau/listens?count=5", data['caches']['listens']),
trigger=IntervalTrigger(minutes=1),
id='risdeveau.listenbrainz.listens',
replace_existing=True
)
scheduler.add_job(
func=lambda: api_request("https://api.listenbrainz.org/1/user/risdeveau/playing-now", data['caches']['now']),
trigger=IntervalTrigger(seconds=15),
id='risdeveau.listenbrainz.playing-now',
replace_existing=True
)
scheduler.start()
atexit.register(lambda: scheduler.shutdown())