Rates each source on three axes the user asked for: - freshness: how recently/often new content arrives (newest age + 7d volume) - richness: metadata coverage (thumbnail/tags/performers/description/studio/duration) - plays: does it actually play — from real playback telemetry when available, else a proxy from the resolve mechanism. 0★ = offline (gates the overall stars, so a fresh+rich source that doesn't play still ranks bottom — the hqfap/4k69 case) Backend: - playback_events: fire-and-forget telemetry POST from the app per playback attempt (origin + success/error + time-to-first-frame), append-only, 30d retention - source_stats: per-origin computed scores, refreshed by a scheduler job (6h); /sources joins it and sorts by stars - models + local migration 0025; new GOON_SCHED_SOURCE_STATS_HOURS setting Mobile: - Sites rows show ★ rating; tap the stars for a breakdown (axes + metadata %, plus whether "plays" is measured or estimated) - PlayerScreen reports playback success/failure per source (native path only — symmetric, conservative); origin threaded through Scene/Movie play callsites Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
274 lines
11 KiB
Python
274 lines
11 KiB
Python
"""run_source_stats — policz ocenę 0-5★ per origin (tube źródło) do source_stats.
|
|
|
|
Trzy osie (user request):
|
|
1. freshness — częstotliwość odświeżania (wiek najnowszej sceny + wolumen 7d),
|
|
2. richness — bogactwo metadanych (% scen z thumb/tag/perf/desc/studio/dur),
|
|
3. health — czy realnie gra. Z TELEMETRII (playback_events 7d: success rate
|
|
+ ttff) gdy jest dość prób; inaczej PROXY z typu ekstraktora
|
|
(natywny mp4 > hoster/WebView) — z adnotacją basis.
|
|
|
|
stars = ważona średnia osi, ale health==0 (offline) GATE'uje stars do 0 — źródło
|
|
świeże i bogate, które nie gra, ma być na dnie (to cały sens, casus hqfap/4k69).
|
|
|
|
Liczone w workerze (scheduler), bo richness to agregat po ~2M live playback_sources.
|
|
Retencja telemetrii: kasuje playback_events starsze niż _EVENT_RETENTION_DAYS.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from sqlalchemy import text
|
|
|
|
from app.db import session_scope
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
_TELEMETRY_WINDOW_DAYS = 7
|
|
_TELEMETRY_MIN_ATTEMPTS = 10 # poniżej tego telemetria niemiarodajna → proxy
|
|
_EVENT_RETENTION_DAYS = 30
|
|
# Richness: wagi składowych (suma=1.0). thumb to minimum higieny, canonical-bogactwo
|
|
# (desc/studio/tag/perf) waży więcej. dur średnio.
|
|
_RICHNESS_WEIGHTS = {
|
|
"thumb": 0.12,
|
|
"tag": 0.20,
|
|
"perf": 0.20,
|
|
"desc": 0.16,
|
|
"studio": 0.16,
|
|
"dur": 0.16,
|
|
}
|
|
|
|
|
|
def _health_proxy_for(sitetag: str) -> tuple[int, str]:
|
|
"""Proxy health gdy brak telemetrii: z mechanizmu resolve. Nie może dać 5★ —
|
|
bez realnego sygnału nie wiemy że gra, max 4. Zwraca (score, mechanizm)."""
|
|
try:
|
|
from app.extractors import _REGISTRY # type: ignore
|
|
|
|
fn = _REGISTRY.get(sitetag)
|
|
except Exception:
|
|
fn = None
|
|
if fn is None:
|
|
return 2, "unknown" # nie ma ekstraktora → niepewna grywalność
|
|
mod = getattr(fn, "__module__", "") or ""
|
|
if mod.endswith("_vps_blocked_fallback"):
|
|
return 3, "webview" # gra w WebView (residential IP), ale wolniej/ciężej
|
|
if mod.endswith("_embed_iframe"):
|
|
return 3, "hoster" # phone-side resolve hostera (dood/luluvid) — średnio
|
|
if mod.endswith("_ytdlp"):
|
|
return 4, "ytdlp" # server-side, zwykle szybki/pewny
|
|
return 4, "native" # dedykowany natywny ekstraktor (direct mp4/HLS)
|
|
|
|
|
|
def _freshness_score(newest_at: datetime | None, new_7d: int, scenes: int) -> int:
|
|
if not scenes or newest_at is None:
|
|
return 0
|
|
now = datetime.now(timezone.utc)
|
|
age_days = (now - newest_at).total_seconds() / 86400.0
|
|
if age_days <= 2:
|
|
base = 5
|
|
elif age_days <= 4:
|
|
base = 4
|
|
elif age_days <= 10:
|
|
base = 3
|
|
elif age_days <= 30:
|
|
base = 2
|
|
else:
|
|
base = 1
|
|
# Zamrożone źródło (świeżość tylko z dawnego importu) — brak nowych w 7d ścina do 1.
|
|
if new_7d == 0 and age_days > 4:
|
|
base = 1
|
|
return base
|
|
|
|
|
|
def _richness_score(pcts: dict[str, float]) -> int:
|
|
weighted = sum(pcts.get(k, 0.0) * w for k, w in _RICHNESS_WEIGHTS.items())
|
|
stars = round(weighted / 20.0) # 0..100% → 0..5
|
|
return max(1, min(5, stars))
|
|
|
|
|
|
def _health_score_from_telemetry(attempts: int, successes: int, p50_ttff_ms: int | None) -> int:
|
|
if attempts <= 0:
|
|
return 0
|
|
rate = 100.0 * successes / attempts
|
|
if rate >= 90:
|
|
s = 5
|
|
elif rate >= 75:
|
|
s = 4
|
|
elif rate >= 50:
|
|
s = 3
|
|
elif rate >= 25:
|
|
s = 2
|
|
elif rate > 0:
|
|
s = 1
|
|
else:
|
|
return 0 # ~0% sukcesów = offline
|
|
# Składowa "szybkość": wolny start (>8s mediana) ścina o 1 (nie poniżej 1).
|
|
if p50_ttff_ms is not None and p50_ttff_ms > 8000 and s > 1:
|
|
s -= 1
|
|
return s
|
|
|
|
|
|
def _overall_stars(freshness: int, richness: int, health: int) -> int:
|
|
if health == 0:
|
|
return 0 # offline gate — nieważne jak świeże/bogate
|
|
raw = 0.40 * freshness + 0.30 * richness + 0.30 * health
|
|
return max(1, min(5, round(raw)))
|
|
|
|
|
|
def run_source_stats() -> dict:
|
|
"""Przelicz source_stats dla wszystkich live tube origins. Zwraca podsumowanie."""
|
|
now = datetime.now(timezone.utc)
|
|
win_start = now - timedelta(days=_TELEMETRY_WINDOW_DAYS)
|
|
|
|
with session_scope() as session:
|
|
# 1) Freshness + richness: agregat po DISTINCT (origin, scene) z live sources,
|
|
# join do flag scen (has_tag/perf/desc/studio/dur policzone raz).
|
|
rows = session.execute(
|
|
text(
|
|
"""
|
|
WITH scene_flags AS (
|
|
SELECT s.id,
|
|
(s.description IS NOT NULL AND length(btrim(s.description))>0) AS has_desc,
|
|
(s.studio_id IS NOT NULL) AS has_studio,
|
|
(s.duration_sec IS NOT NULL) AS has_dur,
|
|
EXISTS(SELECT 1 FROM scene_tags st WHERE st.scene_id=s.id) AS has_tag,
|
|
EXISTS(SELECT 1 FROM scene_performers sp WHERE sp.scene_id=s.id) AS has_perf
|
|
FROM scenes s
|
|
),
|
|
live AS (
|
|
SELECT DISTINCT ON (origin, scene_id) origin, scene_id,
|
|
(thumbnail_url IS NOT NULL) AS has_thumb, created_at
|
|
FROM playback_sources
|
|
WHERE dead_at IS NULL AND origin LIKE 'tube:%'
|
|
)
|
|
SELECT l.origin,
|
|
count(*) AS scenes,
|
|
count(*) FILTER (WHERE l.created_at > :win) AS new_7d,
|
|
max(l.created_at) AS newest_at,
|
|
100.0*avg(l.has_thumb::int) AS thumb,
|
|
100.0*avg(f.has_tag::int) AS tag,
|
|
100.0*avg(f.has_perf::int) AS perf,
|
|
100.0*avg(f.has_desc::int) AS descr,
|
|
100.0*avg(f.has_studio::int) AS studio,
|
|
100.0*avg(f.has_dur::int) AS dur
|
|
FROM live l JOIN scene_flags f ON f.id = l.scene_id
|
|
GROUP BY l.origin
|
|
"""
|
|
),
|
|
{"win": win_start},
|
|
).all()
|
|
|
|
# 2) Telemetria health per origin (okno 7d).
|
|
tele = {
|
|
r.origin: r
|
|
for r in session.execute(
|
|
text(
|
|
"""
|
|
SELECT origin,
|
|
count(*) AS attempts,
|
|
count(*) FILTER (WHERE status='success') AS successes,
|
|
percentile_disc(0.5) WITHIN GROUP (ORDER BY ttff_ms)
|
|
FILTER (WHERE status='success' AND ttff_ms IS NOT NULL) AS p50_ttff
|
|
FROM playback_events
|
|
WHERE created_at > :win
|
|
GROUP BY origin
|
|
"""
|
|
),
|
|
{"win": win_start},
|
|
).all()
|
|
}
|
|
|
|
computed = 0
|
|
for r in rows:
|
|
origin = r.origin
|
|
sitetag = origin.split(":", 1)[1] if ":" in origin else origin
|
|
pcts = {
|
|
"thumb": float(r.thumb or 0),
|
|
"tag": float(r.tag or 0),
|
|
"perf": float(r.perf or 0),
|
|
"desc": float(r.descr or 0),
|
|
"studio": float(r.studio or 0),
|
|
"dur": float(r.dur or 0),
|
|
}
|
|
freshness = _freshness_score(r.newest_at, int(r.new_7d), int(r.scenes))
|
|
richness = _richness_score(pcts)
|
|
|
|
t = tele.get(origin)
|
|
if t and int(t.attempts) >= _TELEMETRY_MIN_ATTEMPTS:
|
|
health = _health_score_from_telemetry(
|
|
int(t.attempts), int(t.successes),
|
|
int(t.p50_ttff) if t.p50_ttff is not None else None,
|
|
)
|
|
health_basis = "telemetry"
|
|
pb_attempts = int(t.attempts)
|
|
pb_success_rate = round(100.0 * int(t.successes) / int(t.attempts))
|
|
p50_ttff = int(t.p50_ttff) if t.p50_ttff is not None else None
|
|
mechanism = None
|
|
else:
|
|
health, mechanism = _health_proxy_for(sitetag)
|
|
health_basis = "proxy"
|
|
pb_attempts = int(t.attempts) if t else 0
|
|
pb_success_rate = (
|
|
round(100.0 * int(t.successes) / int(t.attempts)) if t and int(t.attempts) else None
|
|
)
|
|
p50_ttff = None
|
|
|
|
stars = _overall_stars(freshness, richness, health)
|
|
components = {
|
|
"pct": {k: round(v) for k, v in pcts.items()},
|
|
"health_basis": health_basis,
|
|
"mechanism": mechanism,
|
|
"pb_attempts_7d": pb_attempts,
|
|
"pb_success_rate": pb_success_rate,
|
|
"p50_ttff_ms": p50_ttff,
|
|
}
|
|
|
|
session.execute(
|
|
text(
|
|
"""
|
|
INSERT INTO source_stats
|
|
(origin, stars, freshness, richness, health, scenes, new_7d,
|
|
newest_at, components, computed_at)
|
|
VALUES
|
|
(:origin, :stars, :freshness, :richness, :health, :scenes, :new_7d,
|
|
:newest_at, CAST(:components AS jsonb), now())
|
|
ON CONFLICT (origin) DO UPDATE SET
|
|
stars=EXCLUDED.stars, freshness=EXCLUDED.freshness,
|
|
richness=EXCLUDED.richness, health=EXCLUDED.health,
|
|
scenes=EXCLUDED.scenes, new_7d=EXCLUDED.new_7d,
|
|
newest_at=EXCLUDED.newest_at, components=EXCLUDED.components,
|
|
computed_at=now()
|
|
"""
|
|
),
|
|
{
|
|
"origin": origin,
|
|
"stars": stars,
|
|
"freshness": freshness,
|
|
"richness": richness,
|
|
"health": health,
|
|
"scenes": int(r.scenes),
|
|
"new_7d": int(r.new_7d),
|
|
"newest_at": r.newest_at,
|
|
"components": json.dumps(components),
|
|
},
|
|
)
|
|
computed += 1
|
|
|
|
# 3) Sprzątanie: usuń source_stats dla origins bez już-live sources (np. hqfap/4k69
|
|
# po ukryciu) + retencja telemetrii.
|
|
live_origins = {r.origin for r in rows}
|
|
stale = session.execute(text("SELECT origin FROM source_stats")).scalars().all()
|
|
for o in stale:
|
|
if o not in live_origins:
|
|
session.execute(
|
|
text("DELETE FROM source_stats WHERE origin=:o"), {"o": o}
|
|
)
|
|
session.execute(
|
|
text("DELETE FROM playback_events WHERE created_at < :cut"),
|
|
{"cut": now - timedelta(days=_EVENT_RETENTION_DAYS)},
|
|
)
|
|
|
|
log.info("[source-stats] computed %d origins", computed)
|
|
return {"computed": computed}
|