goon/app/scheduler/source_stats.py
jtrzupek c154deab37 feat(sources): 0-5★ ranking on Sites (freshness/metadata/plays) + playback telemetry
Rates each source on three axes the user asked for:
- freshness: how recently/often new content arrives (newest age + 7d volume)
- richness: metadata coverage (thumbnail/tags/performers/description/studio/duration)
- plays: does it actually play — from real playback telemetry when available,
  else a proxy from the resolve mechanism. 0★ = offline (gates the overall stars,
  so a fresh+rich source that doesn't play still ranks bottom — the hqfap/4k69 case)

Backend:
- playback_events: fire-and-forget telemetry POST from the app per playback attempt
  (origin + success/error + time-to-first-frame), append-only, 30d retention
- source_stats: per-origin computed scores, refreshed by a scheduler job (6h);
  /sources joins it and sorts by stars
- models + local migration 0025; new GOON_SCHED_SOURCE_STATS_HOURS setting

Mobile:
- Sites rows show ★ rating; tap the stars for a breakdown (axes + metadata %, plus
  whether "plays" is measured or estimated)
- PlayerScreen reports playback success/failure per source (native path only —
  symmetric, conservative); origin threaded through Scene/Movie play callsites

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-22 10:00:59 +02:00

274 lines
11 KiB
Python

"""run_source_stats — policz ocenę 0-5★ per origin (tube źródło) do source_stats.
Trzy osie (user request):
1. freshness — częstotliwość odświeżania (wiek najnowszej sceny + wolumen 7d),
2. richness — bogactwo metadanych (% scen z thumb/tag/perf/desc/studio/dur),
3. health — czy realnie gra. Z TELEMETRII (playback_events 7d: success rate
+ ttff) gdy jest dość prób; inaczej PROXY z typu ekstraktora
(natywny mp4 > hoster/WebView) — z adnotacją basis.
stars = ważona średnia osi, ale health==0 (offline) GATE'uje stars do 0 — źródło
świeże i bogate, które nie gra, ma być na dnie (to cały sens, casus hqfap/4k69).
Liczone w workerze (scheduler), bo richness to agregat po ~2M live playback_sources.
Retencja telemetrii: kasuje playback_events starsze niż _EVENT_RETENTION_DAYS.
"""
from __future__ import annotations
import json
import logging
from datetime import datetime, timedelta, timezone
from sqlalchemy import text
from app.db import session_scope
log = logging.getLogger(__name__)
_TELEMETRY_WINDOW_DAYS = 7
_TELEMETRY_MIN_ATTEMPTS = 10 # poniżej tego telemetria niemiarodajna → proxy
_EVENT_RETENTION_DAYS = 30
# Richness: wagi składowych (suma=1.0). thumb to minimum higieny, canonical-bogactwo
# (desc/studio/tag/perf) waży więcej. dur średnio.
_RICHNESS_WEIGHTS = {
"thumb": 0.12,
"tag": 0.20,
"perf": 0.20,
"desc": 0.16,
"studio": 0.16,
"dur": 0.16,
}
def _health_proxy_for(sitetag: str) -> tuple[int, str]:
"""Proxy health gdy brak telemetrii: z mechanizmu resolve. Nie może dać 5★ —
bez realnego sygnału nie wiemy że gra, max 4. Zwraca (score, mechanizm)."""
try:
from app.extractors import _REGISTRY # type: ignore
fn = _REGISTRY.get(sitetag)
except Exception:
fn = None
if fn is None:
return 2, "unknown" # nie ma ekstraktora → niepewna grywalność
mod = getattr(fn, "__module__", "") or ""
if mod.endswith("_vps_blocked_fallback"):
return 3, "webview" # gra w WebView (residential IP), ale wolniej/ciężej
if mod.endswith("_embed_iframe"):
return 3, "hoster" # phone-side resolve hostera (dood/luluvid) — średnio
if mod.endswith("_ytdlp"):
return 4, "ytdlp" # server-side, zwykle szybki/pewny
return 4, "native" # dedykowany natywny ekstraktor (direct mp4/HLS)
def _freshness_score(newest_at: datetime | None, new_7d: int, scenes: int) -> int:
if not scenes or newest_at is None:
return 0
now = datetime.now(timezone.utc)
age_days = (now - newest_at).total_seconds() / 86400.0
if age_days <= 2:
base = 5
elif age_days <= 4:
base = 4
elif age_days <= 10:
base = 3
elif age_days <= 30:
base = 2
else:
base = 1
# Zamrożone źródło (świeżość tylko z dawnego importu) — brak nowych w 7d ścina do 1.
if new_7d == 0 and age_days > 4:
base = 1
return base
def _richness_score(pcts: dict[str, float]) -> int:
weighted = sum(pcts.get(k, 0.0) * w for k, w in _RICHNESS_WEIGHTS.items())
stars = round(weighted / 20.0) # 0..100% → 0..5
return max(1, min(5, stars))
def _health_score_from_telemetry(attempts: int, successes: int, p50_ttff_ms: int | None) -> int:
if attempts <= 0:
return 0
rate = 100.0 * successes / attempts
if rate >= 90:
s = 5
elif rate >= 75:
s = 4
elif rate >= 50:
s = 3
elif rate >= 25:
s = 2
elif rate > 0:
s = 1
else:
return 0 # ~0% sukcesów = offline
# Składowa "szybkość": wolny start (>8s mediana) ścina o 1 (nie poniżej 1).
if p50_ttff_ms is not None and p50_ttff_ms > 8000 and s > 1:
s -= 1
return s
def _overall_stars(freshness: int, richness: int, health: int) -> int:
if health == 0:
return 0 # offline gate — nieważne jak świeże/bogate
raw = 0.40 * freshness + 0.30 * richness + 0.30 * health
return max(1, min(5, round(raw)))
def run_source_stats() -> dict:
"""Przelicz source_stats dla wszystkich live tube origins. Zwraca podsumowanie."""
now = datetime.now(timezone.utc)
win_start = now - timedelta(days=_TELEMETRY_WINDOW_DAYS)
with session_scope() as session:
# 1) Freshness + richness: agregat po DISTINCT (origin, scene) z live sources,
# join do flag scen (has_tag/perf/desc/studio/dur policzone raz).
rows = session.execute(
text(
"""
WITH scene_flags AS (
SELECT s.id,
(s.description IS NOT NULL AND length(btrim(s.description))>0) AS has_desc,
(s.studio_id IS NOT NULL) AS has_studio,
(s.duration_sec IS NOT NULL) AS has_dur,
EXISTS(SELECT 1 FROM scene_tags st WHERE st.scene_id=s.id) AS has_tag,
EXISTS(SELECT 1 FROM scene_performers sp WHERE sp.scene_id=s.id) AS has_perf
FROM scenes s
),
live AS (
SELECT DISTINCT ON (origin, scene_id) origin, scene_id,
(thumbnail_url IS NOT NULL) AS has_thumb, created_at
FROM playback_sources
WHERE dead_at IS NULL AND origin LIKE 'tube:%'
)
SELECT l.origin,
count(*) AS scenes,
count(*) FILTER (WHERE l.created_at > :win) AS new_7d,
max(l.created_at) AS newest_at,
100.0*avg(l.has_thumb::int) AS thumb,
100.0*avg(f.has_tag::int) AS tag,
100.0*avg(f.has_perf::int) AS perf,
100.0*avg(f.has_desc::int) AS descr,
100.0*avg(f.has_studio::int) AS studio,
100.0*avg(f.has_dur::int) AS dur
FROM live l JOIN scene_flags f ON f.id = l.scene_id
GROUP BY l.origin
"""
),
{"win": win_start},
).all()
# 2) Telemetria health per origin (okno 7d).
tele = {
r.origin: r
for r in session.execute(
text(
"""
SELECT origin,
count(*) AS attempts,
count(*) FILTER (WHERE status='success') AS successes,
percentile_disc(0.5) WITHIN GROUP (ORDER BY ttff_ms)
FILTER (WHERE status='success' AND ttff_ms IS NOT NULL) AS p50_ttff
FROM playback_events
WHERE created_at > :win
GROUP BY origin
"""
),
{"win": win_start},
).all()
}
computed = 0
for r in rows:
origin = r.origin
sitetag = origin.split(":", 1)[1] if ":" in origin else origin
pcts = {
"thumb": float(r.thumb or 0),
"tag": float(r.tag or 0),
"perf": float(r.perf or 0),
"desc": float(r.descr or 0),
"studio": float(r.studio or 0),
"dur": float(r.dur or 0),
}
freshness = _freshness_score(r.newest_at, int(r.new_7d), int(r.scenes))
richness = _richness_score(pcts)
t = tele.get(origin)
if t and int(t.attempts) >= _TELEMETRY_MIN_ATTEMPTS:
health = _health_score_from_telemetry(
int(t.attempts), int(t.successes),
int(t.p50_ttff) if t.p50_ttff is not None else None,
)
health_basis = "telemetry"
pb_attempts = int(t.attempts)
pb_success_rate = round(100.0 * int(t.successes) / int(t.attempts))
p50_ttff = int(t.p50_ttff) if t.p50_ttff is not None else None
mechanism = None
else:
health, mechanism = _health_proxy_for(sitetag)
health_basis = "proxy"
pb_attempts = int(t.attempts) if t else 0
pb_success_rate = (
round(100.0 * int(t.successes) / int(t.attempts)) if t and int(t.attempts) else None
)
p50_ttff = None
stars = _overall_stars(freshness, richness, health)
components = {
"pct": {k: round(v) for k, v in pcts.items()},
"health_basis": health_basis,
"mechanism": mechanism,
"pb_attempts_7d": pb_attempts,
"pb_success_rate": pb_success_rate,
"p50_ttff_ms": p50_ttff,
}
session.execute(
text(
"""
INSERT INTO source_stats
(origin, stars, freshness, richness, health, scenes, new_7d,
newest_at, components, computed_at)
VALUES
(:origin, :stars, :freshness, :richness, :health, :scenes, :new_7d,
:newest_at, CAST(:components AS jsonb), now())
ON CONFLICT (origin) DO UPDATE SET
stars=EXCLUDED.stars, freshness=EXCLUDED.freshness,
richness=EXCLUDED.richness, health=EXCLUDED.health,
scenes=EXCLUDED.scenes, new_7d=EXCLUDED.new_7d,
newest_at=EXCLUDED.newest_at, components=EXCLUDED.components,
computed_at=now()
"""
),
{
"origin": origin,
"stars": stars,
"freshness": freshness,
"richness": richness,
"health": health,
"scenes": int(r.scenes),
"new_7d": int(r.new_7d),
"newest_at": r.newest_at,
"components": json.dumps(components),
},
)
computed += 1
# 3) Sprzątanie: usuń source_stats dla origins bez już-live sources (np. hqfap/4k69
# po ukryciu) + retencja telemetrii.
live_origins = {r.origin for r in rows}
stale = session.execute(text("SELECT origin FROM source_stats")).scalars().all()
for o in stale:
if o not in live_origins:
session.execute(
text("DELETE FROM source_stats WHERE origin=:o"), {"o": o}
)
session.execute(
text("DELETE FROM playback_events WHERE created_at < :cut"),
{"cut": now - timedelta(days=_EVENT_RETENTION_DAYS)},
)
log.info("[source-stats] computed %d origins", computed)
return {"computed": computed}