import atexit import re from dataclasses import dataclass from hashlib import md5 from json import dumps from time import time from urllib.parse import parse_qs, urlparse from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.interval import IntervalTrigger import requests from flask import Flask, jsonify @dataclass class Cache: data = {} last_updated = time() status = None data = { "caches": { "now": Cache(), "listens": Cache() }, "last_updated": time(), "etag": "" } def yt_cover(youtube_url): parsed_url = urlparse(youtube_url) if parsed_url.netloc in ("youtube.com", "music.youtube.com"): query_params = parse_qs(parsed_url.query) video_id = query_params.get('v', [None])[0] elif parsed_url.netloc == 'youtu.be': video_id = parsed_url.path[1:] if not video_id: return return f"https://img.youtube.com/vi/{video_id}/2.jpg" def parse_listens(json: dict) -> dict: cover_replacing = { "1e699948-c7c8-4bb2-9f8b-62e14b882a5d": "ca464c1d-5848-45bb-b92d-b1e4b00f9d65", "0d516a93-061e-4a27-9cf7-f36e3a96f888": "5cc0c0c7-22f9-4a4b-a24c-f1a6732f813b", "92ea5cc8-80e0-4da0-a10b-1bc2f8e8781e": "e8f3e14a-4794-4bab-b403-d562cdad4c2f", } new_json = { "count": json["count"], "listens": [] } for track in json["listens"]: listened_at = track.get("listened_at", 0) track = track["track_metadata"] new_track = { "artist_name": track["artist_name"], "track_name": track["track_name"], "listened_at": listened_at } if mb := track.get("mbid_mapping"): new_track["id"] = \ cover_replacing.get(mb["release_mbid"], mb.get("caa_release_mbid", mb["release_mbid"] )) new_track["artist_name"] = mb["artists"][0]["artist_credit_name"] new_track["track_name"] = mb["recording_name"] elif info := track.get("additional_info"): if info \ .get("music_service_name", "") \ .lower() in ("youtube", "youtube music"): if cover := yt_cover(track["additional_info"]["origin_url"]): new_track["cover_url"] = cover if "cover_url" not in new_track.keys() and "id" in new_track.keys(): new_track["cover_url"] = "/asset/mb/" + new_track["id"] new_json["listens"].append(new_track) return new_json def api_request(url: str, cache: Cache): try: response = requests.get(url, timeout=10) if response.status_code == 200: json = parse_listens(response.json().get("payload")) cache.status = 'success' if cache.data != json: cache.data = json cache.last_updated = time() data['last_updated'] = time() data['etag'] = md5(''.join( ( dumps(data['caches'][x].data) for x in data['caches'] ) ).encode()).hexdigest() else: cache.status = f'error: {response.status_code}' except Exception as e: cache.status = f'error: {str(e)}' scheduler = BackgroundScheduler() scheduler.add_job( func=lambda: api_request("https://api.listenbrainz.org/1/user/risdeveau/listens?count=5", data['caches']['listens']), trigger=IntervalTrigger(minutes=1), id='risdeveau.listenbrainz.listens', replace_existing=True ) scheduler.add_job( func=lambda: api_request("https://api.listenbrainz.org/1/user/risdeveau/playing-now", data['caches']['now']), trigger=IntervalTrigger(seconds=15), id='risdeveau.listenbrainz.playing-now', replace_existing=True ) scheduler.start() atexit.register(lambda: scheduler.shutdown())