diff --git a/scripts/audit_false_merges.py b/scripts/audit_false_merges.py new file mode 100644 index 0000000..a24607a --- /dev/null +++ b/scripts/audit_false_merges.py @@ -0,0 +1,126 @@ +"""Audyt potencjalnych false-merge'y scen (data quality). + +Kontekst: przed 2026-05-12 scoring auto-mergeował sceny w aggregator mode bez +wymaganego strong-signal (fp/duration/date). Hardening z 2026-05-12 (scoring.py: +`aggregator_weak_signal_cap` = 0.85 + duration bump zacieśniony do ≤3s, bug +ef090842) zatrzymuje NOWE false-merge'y, ale starsze zostały w danych. + +Candidate-log (`merge_candidates`, status=auto_merged) NIE zapisuje który +external_ref dopięto, więc precyzyjne cofnięcie z samego logu jest niemożliwe. +Zamiast tego wykrywamy false-merge po SKUTKU: scena, do której dopięto źródło +innego wideo, ma `playback_sources` o **niespójnych długościach**. To sygnał +wykrywalny z bieżących danych i daje konkretny target (outlier source). + +Read-only — NIC nie mutuje. Remediacja (detach/mark-dead outliera) to osobna +decyzja: krótka długość bywa legit (preview/trailer) albo błąd metadanych, więc +n=2 jest niejednoznaczne (nie wiadomo które źródło jest „prawdziwe”). Do oceny +ludzkiej / osobnego skryptu z jawnym progiem. + +Uruchomienie: + python -m scripts.audit_false_merges # podsumowanie + buckety + python -m scripts.audit_false_merges --list 50 # 50 najgorszych z rozpiską + python -m scripts.audit_false_merges --min-ratio 3 # tylko ratio >= 3x + python -m scripts.audit_false_merges --json out.json # eksport suspektów +""" +from __future__ import annotations + +import argparse +import json + +from sqlalchemy import text + +from app.db import engine + +# Suspekt: scena z ≥2 żywymi źródłami mającymi duration, gdzie najdłuższe vs +# najkrótsze różni się o > MIN_ABS_GAP sekund ORAZ ratio >= MIN_RATIO. +MIN_ABS_GAP = 90 # sekund — odsiewa drobne różnice encodingu/trim +DEFAULT_MIN_RATIO = 1.25 +MIN_DUR = 30 # ignoruj śmieciowe <30s wpisy w samym progu liczenia + +_SUSPECTS_SQL = """ +WITH d AS ( + SELECT scene_id, duration_sec + FROM playback_sources + WHERE dead_at IS NULL AND duration_sec IS NOT NULL AND duration_sec > :min_dur +), +agg AS ( + SELECT scene_id, count(*) AS n, min(duration_sec) AS mn, max(duration_sec) AS mx + FROM d GROUP BY scene_id HAVING count(*) >= 2 +) +SELECT a.scene_id, a.n, a.mn, a.mx, (a.mx::float / NULLIF(a.mn, 0)) AS ratio +FROM agg a +WHERE (a.mx - a.mn) > :gap AND (a.mx::float / NULLIF(a.mn, 0)) >= :ratio +ORDER BY ratio DESC +""" + + +def _summary(conn) -> None: + cov = conn.execute(text(""" + SELECT count(*) FILTER (WHERE duration_sec IS NOT NULL AND duration_sec > 0), count(*) + FROM playback_sources WHERE dead_at IS NULL + """)).fetchone() + print(f"playback_sources with duration: {cov[0]}/{cov[1]} " + f"({100*cov[0]/max(cov[1],1):.0f}%)") + + total = conn.execute(text(f"SELECT count(*) FROM ({_SUSPECTS_SQL}) q"), + {"min_dur": MIN_DUR, "gap": MIN_ABS_GAP, "ratio": DEFAULT_MIN_RATIO}).scalar() + print(f"\nduration-inconsistent scenes (gap>{MIN_ABS_GAP}s, ratio>={DEFAULT_MIN_RATIO}x): {total}") + print("severity buckets (by max/min ratio):") + for lo, hi, lbl in [(1.25, 1.5, "1.25-1.5x"), (1.5, 2.0, "1.5-2x"), + (2.0, 3.0, "2-3x"), (3.0, 1e9, ">3x")]: + n = conn.execute(text(f"SELECT count(*) FROM ({_SUSPECTS_SQL}) q WHERE ratio >= :lo AND ratio < :hi"), + {"min_dur": MIN_DUR, "gap": MIN_ABS_GAP, "ratio": DEFAULT_MIN_RATIO, + "lo": lo, "hi": hi}).scalar() + print(f" {lbl:>10}: {n}") + print("\nNOTE: read-only audit. >3x ratio = strong false-merge signal " + "(short clip attached to full scene). n>=3 disambiguates the outlier; " + "n=2 needs human review. Remediation is a separate, explicit decision.") + + +def _rows(conn, min_ratio: float): + return conn.execute(text(_SUSPECTS_SQL), + {"min_dur": MIN_DUR, "gap": MIN_ABS_GAP, "ratio": min_ratio}).fetchall() + + +def _source_breakdown(conn, scene_id): + return conn.execute(text(""" + SELECT origin, duration_sec, dead_at IS NOT NULL AS dead, page_url + FROM playback_sources WHERE scene_id = :sid + ORDER BY duration_sec NULLS LAST + """), {"sid": scene_id}).fetchall() + + +def main() -> None: + ap = argparse.ArgumentParser(description=__doc__) + ap.add_argument("--list", type=int, default=0, metavar="N", + help="wypisz N najgorszych scen z rozpiską źródeł") + ap.add_argument("--min-ratio", type=float, default=DEFAULT_MIN_RATIO, + help=f"minimalny max/min ratio (default {DEFAULT_MIN_RATIO})") + ap.add_argument("--json", metavar="PATH", help="eksportuj wszystkich suspektów do JSON") + args = ap.parse_args() + + with engine.connect() as conn: + _summary(conn) + + if args.list: + rows = _rows(conn, args.min_ratio)[: args.list] + print(f"\n=== top {len(rows)} suspects (ratio>={args.min_ratio}x) ===") + for r in rows: + title = conn.execute(text("SELECT title FROM scenes WHERE id=:i"), + {"i": r[0]}).scalar() + print(f"\n{r[0]} n={r[1]} {r[2]}s..{r[3]}s ratio={r[4]:.1f}x {(title or '')[:60]}") + for b in _source_breakdown(conn, r[0]): + flag = " DEAD" if b[2] else "" + print(f" {b[0]:<28} {str(b[1]) + 's':>8}{flag} {(b[3] or '')[:50]}") + + if args.json: + rows = _rows(conn, args.min_ratio) + out = [{"scene_id": str(r[0]), "n_sources": r[1], "min_dur": r[2], + "max_dur": r[3], "ratio": round(r[4], 3)} for r in rows] + with open(args.json, "w", encoding="utf-8") as f: + json.dump(out, f, indent=2) + print(f"\nwrote {len(out)} suspects -> {args.json}") + + +if __name__ == "__main__": + main()