2026-01-22 23:55:58 +03:00
|
|
|
from flask import Flask, jsonify
|
|
|
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
|
|
|
from apscheduler.triggers.interval import IntervalTrigger
|
|
|
|
|
import requests
|
2026-02-06 18:52:55 +03:00
|
|
|
from time import time
|
2026-01-22 23:55:58 +03:00
|
|
|
import atexit
|
|
|
|
|
import re
|
|
|
|
|
from urllib.parse import urlparse, parse_qs
|
|
|
|
|
|
|
|
|
|
listens = {}
|
|
|
|
|
listening = {}
|
|
|
|
|
|
|
|
|
|
def yt_cover(youtube_url):
|
|
|
|
|
parsed_url = urlparse(youtube_url)
|
|
|
|
|
|
|
|
|
|
if parsed_url.netloc in ("youtube.com", "music.youtube.com"):
|
|
|
|
|
query_params = parse_qs(parsed_url.query)
|
|
|
|
|
video_id = query_params.get('v', [None])[0]
|
|
|
|
|
|
|
|
|
|
elif parsed_url.netloc == 'youtu.be':
|
|
|
|
|
video_id = parsed_url.path[1:]
|
|
|
|
|
|
|
|
|
|
if not video_id:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
return f"http://img.youtube.com/vi/{video_id}/sddefault.jpg"
|
|
|
|
|
|
|
|
|
|
def parse_listens(data: dict) -> dict:
|
|
|
|
|
new_data = {
|
|
|
|
|
"count": data["count"],
|
|
|
|
|
"listens": []
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for track in data["listens"]:
|
|
|
|
|
track = track["track_metadata"]
|
|
|
|
|
|
|
|
|
|
new_track = {
|
|
|
|
|
"artist_name": track["artist_name"],
|
|
|
|
|
"track_name": track["track_name"]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if mb := track.get("mbid_mapping"):
|
|
|
|
|
new_track["id"] = mb.get("caa_release_mbid", mb["release_mbid"])
|
|
|
|
|
new_track["artist_name"] = mb["artists"][0]["artist_credit_name"]
|
|
|
|
|
new_track["track_name"] = mb["recording_name"]
|
|
|
|
|
elif info := track.get("additional_info"):
|
|
|
|
|
if info \
|
|
|
|
|
.get("music_service_name", "") \
|
|
|
|
|
.lower() in ("youtube", "youtube music"):
|
|
|
|
|
if cover := yt_cover(track["additional_info"]["origin_url"]):
|
|
|
|
|
new_track["cover_url"] = cover
|
|
|
|
|
|
|
|
|
|
if "cover_url" not in new_track.keys() and "id" in new_track.keys():
|
|
|
|
|
new_track["cover_url"] = "/asset/mb/" + new_track["id"]
|
|
|
|
|
|
|
|
|
|
new_data["listens"].append(new_track)
|
|
|
|
|
|
|
|
|
|
return new_data
|
|
|
|
|
|
|
|
|
|
def api_request(url: str, cache):
|
|
|
|
|
try:
|
|
|
|
|
response = requests.get(url, timeout=10)
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
cache.update({
|
|
|
|
|
'data': parse_listens(response.json().get("payload")),
|
2026-02-06 18:52:55 +03:00
|
|
|
'last_updated': time(),
|
2026-01-22 23:55:58 +03:00
|
|
|
'status': 'success'
|
|
|
|
|
})
|
|
|
|
|
else:
|
|
|
|
|
cache['status'] = f'error: {response.status_code}'
|
|
|
|
|
except Exception as e:
|
|
|
|
|
cache['status'] = f'error: {str(e)}'
|
|
|
|
|
|
|
|
|
|
scheduler = BackgroundScheduler()
|
|
|
|
|
scheduler.add_job(
|
|
|
|
|
func=lambda: api_request("https://api.listenbrainz.org/1/user/risdeveau/listens?count=5", listens),
|
|
|
|
|
trigger=IntervalTrigger(minutes=1),
|
|
|
|
|
id='risdeveau.listenbrainz.listens',
|
|
|
|
|
replace_existing=True
|
|
|
|
|
)
|
|
|
|
|
scheduler.add_job(
|
|
|
|
|
func=lambda: api_request("https://api.listenbrainz.org/1/user/risdeveau/playing-now", listening),
|
|
|
|
|
trigger=IntervalTrigger(seconds=15),
|
|
|
|
|
id='risdeveau.listenbrainz.playing-now',
|
|
|
|
|
replace_existing=True
|
|
|
|
|
)
|
|
|
|
|
scheduler.start()
|
|
|
|
|
|
|
|
|
|
atexit.register(lambda: scheduler.shutdown())
|