"""run_source_stats — policz ocenę 0-5★ per origin (tube źródło) do source_stats. Trzy osie (user request): 1. freshness — częstotliwość odświeżania (wiek najnowszej sceny + wolumen 7d), 2. richness — bogactwo metadanych (% scen z thumb/tag/perf/desc/studio/dur), 3. health — czy realnie gra. Z TELEMETRII (playback_events 7d: success rate + ttff) gdy jest dość prób; inaczej PROXY z typu ekstraktora (natywny mp4 > hoster/WebView) — z adnotacją basis. stars = ważona średnia osi, ale health==0 (offline) GATE'uje stars do 0 — źródło świeże i bogate, które nie gra, ma być na dnie (to cały sens, casus hqfap/4k69). Liczone w workerze (scheduler), bo richness to agregat po ~2M live playback_sources. Retencja telemetrii: kasuje playback_events starsze niż _EVENT_RETENTION_DAYS. """ from __future__ import annotations import json import logging from datetime import datetime, timedelta, timezone from sqlalchemy import text from app.db import session_scope log = logging.getLogger(__name__) _TELEMETRY_WINDOW_DAYS = 7 _TELEMETRY_MIN_ATTEMPTS = 10 # poniżej tego telemetria niemiarodajna → proxy _EVENT_RETENTION_DAYS = 30 # Richness: wagi składowych (suma=1.0). thumb to minimum higieny, canonical-bogactwo # (desc/studio/tag/perf) waży więcej. dur średnio. _RICHNESS_WEIGHTS = { "thumb": 0.12, "tag": 0.20, "perf": 0.20, "desc": 0.16, "studio": 0.16, "dur": 0.16, } def _health_proxy_for(sitetag: str) -> tuple[int, str]: """Proxy health gdy brak telemetrii: z mechanizmu resolve. Nie może dać 5★ — bez realnego sygnału nie wiemy że gra, max 4. Zwraca (score, mechanizm).""" try: from app.extractors import _REGISTRY # type: ignore fn = _REGISTRY.get(sitetag) except Exception: fn = None if fn is None: return 2, "unknown" # nie ma ekstraktora → niepewna grywalność mod = getattr(fn, "__module__", "") or "" if mod.endswith("_vps_blocked_fallback"): return 3, "webview" # gra w WebView (residential IP), ale wolniej/ciężej if mod.endswith("_embed_iframe"): return 3, "hoster" # phone-side resolve hostera (dood/luluvid) — średnio if mod.endswith("_ytdlp"): return 4, "ytdlp" # server-side, zwykle szybki/pewny return 4, "native" # dedykowany natywny ekstraktor (direct mp4/HLS) def _freshness_score(newest_at: datetime | None, new_7d: int, scenes: int) -> int: if not scenes or newest_at is None: return 0 now = datetime.now(timezone.utc) age_days = (now - newest_at).total_seconds() / 86400.0 if age_days <= 2: base = 5 elif age_days <= 4: base = 4 elif age_days <= 10: base = 3 elif age_days <= 30: base = 2 else: base = 1 # Zamrożone źródło (świeżość tylko z dawnego importu) — brak nowych w 7d ścina do 1. if new_7d == 0 and age_days > 4: base = 1 return base def _richness_score(pcts: dict[str, float]) -> int: weighted = sum(pcts.get(k, 0.0) * w for k, w in _RICHNESS_WEIGHTS.items()) stars = round(weighted / 20.0) # 0..100% → 0..5 return max(1, min(5, stars)) def _health_score_from_telemetry(attempts: int, successes: int, p50_ttff_ms: int | None) -> int: if attempts <= 0: return 0 rate = 100.0 * successes / attempts if rate >= 90: s = 5 elif rate >= 75: s = 4 elif rate >= 50: s = 3 elif rate >= 25: s = 2 elif rate > 0: s = 1 else: return 0 # ~0% sukcesów = offline # Składowa "szybkość": wolny start (>8s mediana) ścina o 1 (nie poniżej 1). if p50_ttff_ms is not None and p50_ttff_ms > 8000 and s > 1: s -= 1 return s def _overall_stars(freshness: int, richness: int, health: int) -> int: if health == 0: return 0 # offline gate — nieważne jak świeże/bogate raw = 0.40 * freshness + 0.30 * richness + 0.30 * health return max(1, min(5, round(raw))) def run_source_stats() -> dict: """Przelicz source_stats dla wszystkich live tube origins. Zwraca podsumowanie.""" now = datetime.now(timezone.utc) win_start = now - timedelta(days=_TELEMETRY_WINDOW_DAYS) with session_scope() as session: # 1) Freshness + richness: agregat po DISTINCT (origin, scene) z live sources, # join do flag scen (has_tag/perf/desc/studio/dur policzone raz). rows = session.execute( text( """ WITH scene_flags AS ( SELECT s.id, (s.description IS NOT NULL AND length(btrim(s.description))>0) AS has_desc, (s.studio_id IS NOT NULL) AS has_studio, (s.duration_sec IS NOT NULL) AS has_dur, EXISTS(SELECT 1 FROM scene_tags st WHERE st.scene_id=s.id) AS has_tag, EXISTS(SELECT 1 FROM scene_performers sp WHERE sp.scene_id=s.id) AS has_perf FROM scenes s ), live AS ( SELECT DISTINCT ON (origin, scene_id) origin, scene_id, (thumbnail_url IS NOT NULL) AS has_thumb, created_at FROM playback_sources WHERE dead_at IS NULL AND origin LIKE 'tube:%' ) SELECT l.origin, count(*) AS scenes, count(*) FILTER (WHERE l.created_at > :win) AS new_7d, max(l.created_at) AS newest_at, 100.0*avg(l.has_thumb::int) AS thumb, 100.0*avg(f.has_tag::int) AS tag, 100.0*avg(f.has_perf::int) AS perf, 100.0*avg(f.has_desc::int) AS descr, 100.0*avg(f.has_studio::int) AS studio, 100.0*avg(f.has_dur::int) AS dur FROM live l JOIN scene_flags f ON f.id = l.scene_id GROUP BY l.origin """ ), {"win": win_start}, ).all() # 2) Telemetria health per origin (okno 7d). tele = { r.origin: r for r in session.execute( text( """ SELECT origin, count(*) AS attempts, count(*) FILTER (WHERE status='success') AS successes, percentile_disc(0.5) WITHIN GROUP (ORDER BY ttff_ms) FILTER (WHERE status='success' AND ttff_ms IS NOT NULL) AS p50_ttff FROM playback_events WHERE created_at > :win GROUP BY origin """ ), {"win": win_start}, ).all() } computed = 0 for r in rows: origin = r.origin sitetag = origin.split(":", 1)[1] if ":" in origin else origin pcts = { "thumb": float(r.thumb or 0), "tag": float(r.tag or 0), "perf": float(r.perf or 0), "desc": float(r.descr or 0), "studio": float(r.studio or 0), "dur": float(r.dur or 0), } freshness = _freshness_score(r.newest_at, int(r.new_7d), int(r.scenes)) richness = _richness_score(pcts) t = tele.get(origin) if t and int(t.attempts) >= _TELEMETRY_MIN_ATTEMPTS: health = _health_score_from_telemetry( int(t.attempts), int(t.successes), int(t.p50_ttff) if t.p50_ttff is not None else None, ) health_basis = "telemetry" pb_attempts = int(t.attempts) pb_success_rate = round(100.0 * int(t.successes) / int(t.attempts)) p50_ttff = int(t.p50_ttff) if t.p50_ttff is not None else None mechanism = None else: health, mechanism = _health_proxy_for(sitetag) health_basis = "proxy" pb_attempts = int(t.attempts) if t else 0 pb_success_rate = ( round(100.0 * int(t.successes) / int(t.attempts)) if t and int(t.attempts) else None ) p50_ttff = None stars = _overall_stars(freshness, richness, health) components = { "pct": {k: round(v) for k, v in pcts.items()}, "health_basis": health_basis, "mechanism": mechanism, "pb_attempts_7d": pb_attempts, "pb_success_rate": pb_success_rate, "p50_ttff_ms": p50_ttff, } session.execute( text( """ INSERT INTO source_stats (origin, stars, freshness, richness, health, scenes, new_7d, newest_at, components, computed_at) VALUES (:origin, :stars, :freshness, :richness, :health, :scenes, :new_7d, :newest_at, CAST(:components AS jsonb), now()) ON CONFLICT (origin) DO UPDATE SET stars=EXCLUDED.stars, freshness=EXCLUDED.freshness, richness=EXCLUDED.richness, health=EXCLUDED.health, scenes=EXCLUDED.scenes, new_7d=EXCLUDED.new_7d, newest_at=EXCLUDED.newest_at, components=EXCLUDED.components, computed_at=now() """ ), { "origin": origin, "stars": stars, "freshness": freshness, "richness": richness, "health": health, "scenes": int(r.scenes), "new_7d": int(r.new_7d), "newest_at": r.newest_at, "components": json.dumps(components), }, ) computed += 1 # 3) Sprzątanie: usuń source_stats dla origins bez już-live sources (np. hqfap/4k69 # po ukryciu) + retencja telemetrii. live_origins = {r.origin for r in rows} stale = session.execute(text("SELECT origin FROM source_stats")).scalars().all() for o in stale: if o not in live_origins: session.execute( text("DELETE FROM source_stats WHERE origin=:o"), {"o": o} ) session.execute( text("DELETE FROM playback_events WHERE created_at < :cut"), {"cut": now - timedelta(days=_EVENT_RETENTION_DAYS)}, ) log.info("[source-stats] computed %d origins", computed) return {"computed": computed}