"""Benchmark report dla phash backfill per performer. Pokazuje: 1. Coverage: ile scen ma phash / ile mogłoby (z thumb) 2. Distribution: Hamming distance histogram across all pairs (within performer) 3. Clusters: groups scen z phash Hamming ≤ THRESHOLD (likely-duplicate) 4. Sample par: title/duration/source dla 5 najlepszych match candidates Run: python /srv/scripts/phash_benchmark.py [--threshold N] """ from __future__ import annotations import argparse import sys from collections import defaultdict from sqlalchemy import select sys.path.insert(0, "/srv") from app.db import SessionLocal from app.models.performer import Performer from app.models.scene import Scene, ScenePerformer, SceneFingerprint from app.models.playback_source import PlaybackSource from app.resolve.scoring import hamming_distance_hex def main() -> None: ap = argparse.ArgumentParser() ap.add_argument("performer_id", type=str) ap.add_argument("--threshold", type=int, default=5, help="Hamming threshold for likely-dup") args = ap.parse_args() import uuid as _uuid perf_id = _uuid.UUID(args.performer_id) with SessionLocal() as session: perf = session.get(Performer, perf_id) if not perf: print(f"performer {perf_id} not found") return print(f"=== {perf.canonical_name} ({perf_id}) ===\n") # 1) Scenes + phash scenes = session.execute( select(Scene.id, Scene.title, Scene.duration_sec) .join(ScenePerformer, ScenePerformer.scene_id == Scene.id) .where(ScenePerformer.performer_id == perf_id) ).all() scene_ids = {s.id for s in scenes} title_by_id = {s.id: s.title for s in scenes} dur_by_id = {s.id: s.duration_sec for s in scenes} phashes = session.execute( select(SceneFingerprint.scene_id, SceneFingerprint.value) .where(SceneFingerprint.kind == "phash") .where(SceneFingerprint.scene_id.in_(scene_ids)) ).all() ph_by_scene: dict = defaultdict(list) for sid, val in phashes: # Skip garbage entries (length≠16 = failed extract, "0" placeholder from # legacy TPDB ingest, etc.). if val and len(val) == 16: ph_by_scene[sid].append(val) n_total = len(scene_ids) n_with_phash = len(ph_by_scene) print(f"Coverage: {n_with_phash}/{n_total} scenes have phash ({100*n_with_phash/max(n_total,1):.0f}%)") print() # 2) Pairwise distance distribution scene_list = list(ph_by_scene.items()) histogram: dict = defaultdict(int) pairs: list = [] # (dist, sid_a, sid_b) for i in range(len(scene_list)): sid_a, hashes_a = scene_list[i] for j in range(i + 1, len(scene_list)): sid_b, hashes_b = scene_list[j] # Min distance across all (a_hash, b_hash) — sceny mogą mieć multiple phash min_d = min(hamming_distance_hex(ha, hb) for ha in hashes_a for hb in hashes_b) histogram[min_d] += 1 pairs.append((min_d, sid_a, sid_b)) if not pairs: print("No pairs to compare (need ≥2 scenes with phash).") return print("Hamming distance distribution (all pairs):") for d in sorted(histogram): if d > 20: break bar = "#" * min(50, histogram[d]) print(f" d={d:2d} : {histogram[d]:4d} {bar}") n_unique = sum(c for d, c in histogram.items() if d > 20) print(f" d>20: {n_unique:4d} (effectively unrelated)") print() # 3) Pairs within threshold likely_dup = sorted([p for p in pairs if p[0] <= args.threshold]) print(f"Likely-duplicate pairs (Hamming ≤ {args.threshold}): {len(likely_dup)}") print() if not likely_dup: print("No likely duplicates found.") return # 4) Group into clusters (union-find) parent: dict = {sid: sid for sid in ph_by_scene} def find(x): while parent[x] != x: parent[x] = parent[parent[x]] x = parent[x] return x def union(a, b): ra, rb = find(a), find(b) if ra != rb: parent[ra] = rb for d, a, b in likely_dup: union(a, b) clusters: dict = defaultdict(list) for sid in ph_by_scene: clusters[find(sid)].append(sid) dup_clusters = [c for c in clusters.values() if len(c) > 1] print(f"Duplicate clusters: {len(dup_clusters)}") total_redundant = sum(len(c) - 1 for c in dup_clusters) print(f"Total redundant scenes: {total_redundant} (would merge to {len(dup_clusters)} canonical)") print() # 5) Print sample clusters print(f"=== Sample top {min(5, len(dup_clusters))} clusters ===\n") # Resolve sources per scene ps_by_scene: dict = defaultdict(list) for row in session.execute( select(PlaybackSource.scene_id, PlaybackSource.origin) .where(PlaybackSource.scene_id.in_([s for c in dup_clusters for s in c])) .where(PlaybackSource.dead_at.is_(None)) ).all(): origin = row.origin.split(":", 1)[1] if ":" in row.origin else row.origin ps_by_scene[row.scene_id].append(origin) for i, cluster in enumerate(sorted(dup_clusters, key=len, reverse=True)[:5], 1): print(f"--- Cluster {i} ({len(cluster)} scenes) ---") for sid in cluster: title = title_by_id.get(sid, "?")[:65] dur = dur_by_id.get(sid) dur_s = f"{dur}s" if dur else "no-dur" sources = ",".join(sorted(set(ps_by_scene.get(sid, [])))) or "no-source" ph = ph_by_scene[sid][0] print(f" {sid} {ph} {dur_s:>7s} [{sources}] {title}") print() if __name__ == "__main__": main()