feat(dedup): merge exact-phash + same-duration + shared-performer duplicates

bug-report 2026-06-03 ("ten sam czas, ta sama miniaturka, czemu się nie mergują"):
duplicate scenes not merged at ingest. Exact phash alone is noisy here (95% are
collisions on shared thumbnails/intro frames — different scenes; bulk_dedup scorer
correctly gives 0 auto-merge). The safe subset is exact-phash AND same duration
(±3s) AND shared performer/title — near-certain same scene. Same-duration is key:
it excludes the false-merge pattern (short-clip-vs-full has DIFFERING durations).

- scripts/merge_phash_exact_dupes.py: one-off, dry-run by default, per-pair re-fetch
  (handles clusters). Applied: 30 merged.
- bulk_dedup: add `_pairs_exact_phash` (SQL O(N log N), not the O(N²) Hamming scan)
  + strategy "phash_exact" — gated by the normal scorer (surfaces review candidates,
  no risky auto-merge), schedulable for ongoing exact-collision review.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
jtrzupek 2026-06-07 20:08:06 +02:00
parent d5409d01ce
commit 4922646011
2 changed files with 111 additions and 0 deletions

View file

@ -180,6 +180,24 @@ def _pairs_sharing_phash(session: Session, max_hamming: int) -> Iterable[tuple[u
yield pair yield pair
def _pairs_exact_phash(session: Session) -> Iterable[tuple[uuid.UUID, uuid.UUID]]:
"""Yield pary scen mających IDENTYCZNY phash (exact value match) — przez SQL self-join,
O(N log N) zamiast O() z `_pairs_sharing_phash`. Identyczny phash = identyczna
miniaturka = niemal pewnie ten sam scene (potem `_process_pair` scoruje i auto-merguje
tylko auto_t, więc przypadkowe kolizje z innym performerem/duration wpadną w review).
Skalowalne dla setek tys. phashy gdzie Hamming-fuzzy O() jest nierealne."""
from sqlalchemy import text
rows = session.execute(text("""
SELECT DISTINCT a.scene_id AS sa, b.scene_id AS sb
FROM scene_fingerprints a
JOIN scene_fingerprints b
ON a.value = b.value AND a.scene_id < b.scene_id
WHERE a.kind = 'phash' AND b.kind = 'phash'
"""))
for sa, sb in rows:
yield (sa, sb)
def _pairs_sharing_performer( def _pairs_sharing_performer(
session: Session, session: Session,
*, *,
@ -294,6 +312,14 @@ def run_bulk_dedup(
phash_pairs = list(_pairs_sharing_phash(session, max_hamming=max_h)) phash_pairs = list(_pairs_sharing_phash(session, max_hamming=max_h))
log.info("bulk_dedup: %d phash-shared pairs", len(phash_pairs)) log.info("bulk_dedup: %d phash-shared pairs", len(phash_pairs))
pairs.extend(phash_pairs) pairs.extend(phash_pairs)
if strategy == "phash_exact":
# EXACT phash collisions via SQL (O(N log N), nie O(N²) jak _pairs_sharing_phash).
# Identyczny phash = identyczna miniaturka = ten sam scene → missing-merge
# (bug-report 2026-06-03 "ten sam czas, ta sama miniaturka, czemu się nie
# mergują"). Skalowalne dla 441k+ phashy gdzie Hamming-O(N²) jest nierealne.
exact_pairs = list(_pairs_exact_phash(session))
log.info("bulk_dedup: %d exact-phash pairs", len(exact_pairs))
pairs.extend(exact_pairs)
if strategy in ("performers", "all"): if strategy in ("performers", "all"):
perf_pairs = list( perf_pairs = list(
_pairs_sharing_performer(session, cross_source_only=cross_source_only) _pairs_sharing_performer(session, cross_source_only=cross_source_only)

View file

@ -0,0 +1,85 @@
"""Merge wysoko-pewnych missing-merge duplikatów (exact phash + same duration + shared
performer/title).
Problem (bug-report 2026-06-03 "ten sam czas, ta sama miniaturka, czemu się nie
mergują"): duplikaty scen nie zmergowane przy ingeście. Exact-phash sam w sobie jest
hałaśliwy (95% to kolizje na wspólnych miniaturkach/intro różne sceny), więc bulk_dedup
scorer słusznie ich nie auto-merguje. ALE podzbiór z DODATKOWO samą długością (±3s)
ORAZ wspólnym performerem lub tytułem to niemal pewny ten sam scene.
Same-duration jest kluczowe: wyklucza wzorzec false-merge (tam długości się RÓŻNIĄ
krótki klip vs pełna scena; patrz audit_false_merges). Tu długości równe realny dup.
UWAGA: merge KASUJE zdublowaną scenę (refs/sources/tags łączone do keepera)
NIEODWRACALNE. Domyślnie dry-run; --yes wykonuje. Per-para re-fetch obsługuje klastry
(scena znika po wcześniejszym merge'u w tym samym klastrze).
Uruchomienie:
python -m scripts.merge_phash_exact_dupes # dry-run
python -m scripts.merge_phash_exact_dupes --yes # wykonaj
"""
from __future__ import annotations
import argparse
from sqlalchemy import text
from app.db import session_scope
from app.models.scene import Scene
from app.resolve.scene_merge import merge_scenes
from app.scheduler.bulk_dedup import _pick_keep_drop
_SAFE_PAIRS_SQL = """
SELECT DISTINCT a.scene_id AS sa, b.scene_id AS sb
FROM scene_fingerprints a
JOIN scene_fingerprints b ON a.value = b.value AND a.scene_id < b.scene_id
JOIN scenes sca ON sca.id = a.scene_id
JOIN scenes scb ON scb.id = b.scene_id
WHERE a.kind = 'phash' AND b.kind = 'phash'
AND sca.duration_sec IS NOT NULL AND scb.duration_sec IS NOT NULL
AND abs(sca.duration_sec - scb.duration_sec) <= 3
AND (
lower(coalesce(sca.title, 'x')) = lower(coalesce(scb.title, 'y'))
OR EXISTS (
SELECT 1 FROM scene_performers pa
JOIN scene_performers pb ON pa.performer_id = pb.performer_id
WHERE pa.scene_id = a.scene_id AND pb.scene_id = b.scene_id
)
)
"""
def main() -> None:
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("--yes", action="store_true", help="wykonaj merge (bez tego dry-run)")
args = ap.parse_args()
with session_scope() as s:
pairs = [(r[0], r[1]) for r in s.execute(text(_SAFE_PAIRS_SQL))]
print(f"safe high-confidence duplicate pairs: {len(pairs)}")
merged = skipped = 0
for sa, sb in pairs:
with session_scope() as s:
a = s.get(Scene, sa)
b = s.get(Scene, sb)
if a is None or b is None or a.id == b.id:
skipped += 1 # już zmergowane w tym klastrze
continue
keep, drop = _pick_keep_drop(s, a, b)
print(f" {'MERGE' if args.yes else '[dry] MERGE'} keep={str(keep.id)[:8]} "
f"'{(keep.title or '')[:32]}' ({keep.duration_sec}s) <- drop={str(drop.id)[:8]} "
f"'{(drop.title or '')[:32]}'")
if args.yes:
merge_scenes(s, keep_id=keep.id, drop_id=drop.id, resolved_by="phash_exact_safe")
s.commit()
merged += 1
if args.yes:
print(f"\nAPPLIED: merged={merged} skipped(already-gone)={skipped}")
else:
print(f"\n(dry-run — {len(pairs)} par; uruchom z --yes aby scalić. NIEODWRACALNE)")
if __name__ == "__main__":
main()