from flask import Flask, jsonify from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.interval import IntervalTrigger import requests from datetime import datetime import atexit import re from urllib.parse import urlparse, parse_qs listens = {} listening = {} def yt_cover(youtube_url): parsed_url = urlparse(youtube_url) if parsed_url.netloc in ("youtube.com", "music.youtube.com"): query_params = parse_qs(parsed_url.query) video_id = query_params.get('v', [None])[0] elif parsed_url.netloc == 'youtu.be': video_id = parsed_url.path[1:] if not video_id: return return f"http://img.youtube.com/vi/{video_id}/sddefault.jpg" def parse_listens(data: dict) -> dict: new_data = { "count": data["count"], "listens": [] } for track in data["listens"]: track = track["track_metadata"] new_track = { "artist_name": track["artist_name"], "track_name": track["track_name"] } if mb := track.get("mbid_mapping"): new_track["id"] = mb.get("caa_release_mbid", mb["release_mbid"]) new_track["artist_name"] = mb["artists"][0]["artist_credit_name"] new_track["track_name"] = mb["recording_name"] elif info := track.get("additional_info"): if info \ .get("music_service_name", "") \ .lower() in ("youtube", "youtube music"): if cover := yt_cover(track["additional_info"]["origin_url"]): new_track["cover_url"] = cover if "cover_url" not in new_track.keys() and "id" in new_track.keys(): new_track["cover_url"] = "/asset/mb/" + new_track["id"] new_data["listens"].append(new_track) return new_data def api_request(url: str, cache): try: response = requests.get(url, timeout=10) if response.status_code == 200: cache.update({ 'data': parse_listens(response.json().get("payload")), 'last_updated': datetime.now().isoformat(), 'status': 'success' }) else: cache['status'] = f'error: {response.status_code}' except Exception as e: cache['status'] = f'error: {str(e)}' scheduler = BackgroundScheduler() scheduler.add_job( func=lambda: api_request("https://api.listenbrainz.org/1/user/risdeveau/listens?count=5", listens), trigger=IntervalTrigger(minutes=1), id='risdeveau.listenbrainz.listens', replace_existing=True ) scheduler.add_job( func=lambda: api_request("https://api.listenbrainz.org/1/user/risdeveau/playing-now", listening), trigger=IntervalTrigger(seconds=15), id='risdeveau.listenbrainz.playing-now', replace_existing=True ) scheduler.start() atexit.register(lambda: scheduler.shutdown())