From 4922646011ef1aaa8034fba5977d32e0d16bd755 Mon Sep 17 00:00:00 2001 From: jtrzupek Date: Sun, 7 Jun 2026 20:08:06 +0200 Subject: [PATCH] feat(dedup): merge exact-phash + same-duration + shared-performer duplicates MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit bug-report 2026-06-03 ("ten sam czas, ta sama miniaturka, czemu się nie mergują"): duplicate scenes not merged at ingest. Exact phash alone is noisy here (95% are collisions on shared thumbnails/intro frames — different scenes; bulk_dedup scorer correctly gives 0 auto-merge). The safe subset is exact-phash AND same duration (±3s) AND shared performer/title — near-certain same scene. Same-duration is key: it excludes the false-merge pattern (short-clip-vs-full has DIFFERING durations). - scripts/merge_phash_exact_dupes.py: one-off, dry-run by default, per-pair re-fetch (handles clusters). Applied: 30 merged. - bulk_dedup: add `_pairs_exact_phash` (SQL O(N log N), not the O(N²) Hamming scan) + strategy "phash_exact" — gated by the normal scorer (surfaces review candidates, no risky auto-merge), schedulable for ongoing exact-collision review. Co-Authored-By: Claude Opus 4.8 --- app/scheduler/bulk_dedup.py | 26 +++++++++ scripts/merge_phash_exact_dupes.py | 85 ++++++++++++++++++++++++++++++ 2 files changed, 111 insertions(+) create mode 100644 scripts/merge_phash_exact_dupes.py diff --git a/app/scheduler/bulk_dedup.py b/app/scheduler/bulk_dedup.py index 209683c..696882a 100644 --- a/app/scheduler/bulk_dedup.py +++ b/app/scheduler/bulk_dedup.py @@ -180,6 +180,24 @@ def _pairs_sharing_phash(session: Session, max_hamming: int) -> Iterable[tuple[u yield pair +def _pairs_exact_phash(session: Session) -> Iterable[tuple[uuid.UUID, uuid.UUID]]: + """Yield pary scen mających IDENTYCZNY phash (exact value match) — przez SQL self-join, + O(N log N) zamiast O(N²) z `_pairs_sharing_phash`. Identyczny phash = identyczna + miniaturka = niemal pewnie ten sam scene (potem `_process_pair` scoruje i auto-merguje + tylko ≥auto_t, więc przypadkowe kolizje z innym performerem/duration wpadną w review). + Skalowalne dla setek tys. phashy gdzie Hamming-fuzzy O(N²) jest nierealne.""" + from sqlalchemy import text + rows = session.execute(text(""" + SELECT DISTINCT a.scene_id AS sa, b.scene_id AS sb + FROM scene_fingerprints a + JOIN scene_fingerprints b + ON a.value = b.value AND a.scene_id < b.scene_id + WHERE a.kind = 'phash' AND b.kind = 'phash' + """)) + for sa, sb in rows: + yield (sa, sb) + + def _pairs_sharing_performer( session: Session, *, @@ -294,6 +312,14 @@ def run_bulk_dedup( phash_pairs = list(_pairs_sharing_phash(session, max_hamming=max_h)) log.info("bulk_dedup: %d phash-shared pairs", len(phash_pairs)) pairs.extend(phash_pairs) + if strategy == "phash_exact": + # EXACT phash collisions via SQL (O(N log N), nie O(N²) jak _pairs_sharing_phash). + # Identyczny phash = identyczna miniaturka = ten sam scene → missing-merge + # (bug-report 2026-06-03 "ten sam czas, ta sama miniaturka, czemu się nie + # mergują"). Skalowalne dla 441k+ phashy gdzie Hamming-O(N²) jest nierealne. + exact_pairs = list(_pairs_exact_phash(session)) + log.info("bulk_dedup: %d exact-phash pairs", len(exact_pairs)) + pairs.extend(exact_pairs) if strategy in ("performers", "all"): perf_pairs = list( _pairs_sharing_performer(session, cross_source_only=cross_source_only) diff --git a/scripts/merge_phash_exact_dupes.py b/scripts/merge_phash_exact_dupes.py new file mode 100644 index 0000000..a163a44 --- /dev/null +++ b/scripts/merge_phash_exact_dupes.py @@ -0,0 +1,85 @@ +"""Merge wysoko-pewnych missing-merge duplikatów (exact phash + same duration + shared +performer/title). + +Problem (bug-report 2026-06-03 "ten sam czas, ta sama miniaturka, czemu się nie +mergują"): duplikaty scen nie zmergowane przy ingeście. Exact-phash sam w sobie jest +hałaśliwy (95% to kolizje na wspólnych miniaturkach/intro — różne sceny), więc bulk_dedup +scorer słusznie ich nie auto-merguje. ALE podzbiór z DODATKOWO tą samą długością (±3s) +ORAZ wspólnym performerem lub tytułem to niemal pewny ten sam scene. + +Same-duration jest kluczowe: wyklucza wzorzec false-merge (tam długości się RÓŻNIĄ — +krótki klip vs pełna scena; patrz audit_false_merges). Tu długości są równe → realny dup. + +UWAGA: merge KASUJE zdublowaną scenę (refs/sources/tags łączone do keepera) — +NIEODWRACALNE. Domyślnie dry-run; --yes wykonuje. Per-para re-fetch → obsługuje klastry +(scena znika po wcześniejszym merge'u w tym samym klastrze). + +Uruchomienie: + python -m scripts.merge_phash_exact_dupes # dry-run + python -m scripts.merge_phash_exact_dupes --yes # wykonaj +""" +from __future__ import annotations + +import argparse + +from sqlalchemy import text + +from app.db import session_scope +from app.models.scene import Scene +from app.resolve.scene_merge import merge_scenes +from app.scheduler.bulk_dedup import _pick_keep_drop + +_SAFE_PAIRS_SQL = """ +SELECT DISTINCT a.scene_id AS sa, b.scene_id AS sb +FROM scene_fingerprints a +JOIN scene_fingerprints b ON a.value = b.value AND a.scene_id < b.scene_id +JOIN scenes sca ON sca.id = a.scene_id +JOIN scenes scb ON scb.id = b.scene_id +WHERE a.kind = 'phash' AND b.kind = 'phash' + AND sca.duration_sec IS NOT NULL AND scb.duration_sec IS NOT NULL + AND abs(sca.duration_sec - scb.duration_sec) <= 3 + AND ( + lower(coalesce(sca.title, 'x')) = lower(coalesce(scb.title, 'y')) + OR EXISTS ( + SELECT 1 FROM scene_performers pa + JOIN scene_performers pb ON pa.performer_id = pb.performer_id + WHERE pa.scene_id = a.scene_id AND pb.scene_id = b.scene_id + ) + ) +""" + + +def main() -> None: + ap = argparse.ArgumentParser(description=__doc__) + ap.add_argument("--yes", action="store_true", help="wykonaj merge (bez tego dry-run)") + args = ap.parse_args() + + with session_scope() as s: + pairs = [(r[0], r[1]) for r in s.execute(text(_SAFE_PAIRS_SQL))] + print(f"safe high-confidence duplicate pairs: {len(pairs)}") + + merged = skipped = 0 + for sa, sb in pairs: + with session_scope() as s: + a = s.get(Scene, sa) + b = s.get(Scene, sb) + if a is None or b is None or a.id == b.id: + skipped += 1 # już zmergowane w tym klastrze + continue + keep, drop = _pick_keep_drop(s, a, b) + print(f" {'MERGE' if args.yes else '[dry] MERGE'} keep={str(keep.id)[:8]} " + f"'{(keep.title or '')[:32]}' ({keep.duration_sec}s) <- drop={str(drop.id)[:8]} " + f"'{(drop.title or '')[:32]}'") + if args.yes: + merge_scenes(s, keep_id=keep.id, drop_id=drop.id, resolved_by="phash_exact_safe") + s.commit() + merged += 1 + + if args.yes: + print(f"\nAPPLIED: merged={merged} skipped(already-gone)={skipped}") + else: + print(f"\n(dry-run — {len(pairs)} par; uruchom z --yes aby scalić. NIEODWRACALNE)") + + +if __name__ == "__main__": + main()