import atexit import re from dataclasses import dataclass from hashlib import md5 from json import dumps from time import time from urllib.parse import parse_qs, urlparse from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.interval import IntervalTrigger import requests from flask import Flask, jsonify @dataclass class Cache: data = {} last_updated = time() status = None data = { "caches": { "now": Cache(), "listens": Cache() }, "last_updated": time(), "etag": "" } def yt_cover(youtube_url): parsed_url = urlparse(youtube_url) if parsed_url.netloc in ("youtube.com", "music.youtube.com"): query_params = parse_qs(parsed_url.query) video_id = query_params.get('v', [None])[0] elif parsed_url.netloc == 'youtu.be': video_id = parsed_url.path[1:] if not video_id: return return f"https://img.youtube.com/vi/{video_id}/sddefault.jpg" def parse_listens(json: dict) -> dict: new_json = { "count": json["count"], "listens": [] } for track in json["listens"]: listened_at = track.get("listened_at", 0) track = track["track_metadata"] new_track = { "artist_name": track["artist_name"], "track_name": track["track_name"], "listened_at": listened_at } if mb := track.get("mbid_mapping"): new_track["id"] = mb.get("caa_release_mbid", mb["release_mbid"]) new_track["artist_name"] = mb["artists"][0]["artist_credit_name"] new_track["track_name"] = mb["recording_name"] elif info := track.get("additional_info"): if info \ .get("music_service_name", "") \ .lower() in ("youtube", "youtube music"): if cover := yt_cover(track["additional_info"]["origin_url"]): new_track["cover_url"] = cover if "cover_url" not in new_track.keys() and "id" in new_track.keys(): new_track["cover_url"] = "/asset/mb/" + new_track["id"] new_json["listens"].append(new_track) return new_json def api_request(url: str, cache: Cache): try: response = requests.get(url, timeout=10) if response.status_code == 200: json = parse_listens(response.json().get("payload")) cache.status = 'success' if cache.data != json: cache.data = json cache.last_updated = time() data['last_updated'] = time() data['etag'] = md5(''.join( ( dumps(data['caches'][x].data) for x in data['caches'] ) ).encode()).hexdigest() else: cache.status = f'error: {response.status_code}' except Exception as e: cache.status = f'error: {str(e)}' scheduler = BackgroundScheduler() scheduler.add_job( func=lambda: api_request("https://api.listenbrainz.org/1/user/risdeveau/listens?count=5", data['caches']['listens']), trigger=IntervalTrigger(minutes=1), id='risdeveau.listenbrainz.listens', replace_existing=True ) scheduler.add_job( func=lambda: api_request("https://api.listenbrainz.org/1/user/risdeveau/playing-now", data['caches']['now']), trigger=IntervalTrigger(seconds=15), id='risdeveau.listenbrainz.playing-now', replace_existing=True ) scheduler.start() atexit.register(lambda: scheduler.shutdown())