goon/app/scheduler/bulk_dedup.py
jtrzupek 4922646011 feat(dedup): merge exact-phash + same-duration + shared-performer duplicates
bug-report 2026-06-03 ("ten sam czas, ta sama miniaturka, czemu się nie mergują"):
duplicate scenes not merged at ingest. Exact phash alone is noisy here (95% are
collisions on shared thumbnails/intro frames — different scenes; bulk_dedup scorer
correctly gives 0 auto-merge). The safe subset is exact-phash AND same duration
(±3s) AND shared performer/title — near-certain same scene. Same-duration is key:
it excludes the false-merge pattern (short-clip-vs-full has DIFFERING durations).

- scripts/merge_phash_exact_dupes.py: one-off, dry-run by default, per-pair re-fetch
  (handles clusters). Applied: 30 merged.
- bulk_dedup: add `_pairs_exact_phash` (SQL O(N log N), not the O(N²) Hamming scan)
  + strategy "phash_exact" — gated by the normal scorer (surfaces review candidates,
  no risky auto-merge), schedulable for ongoing exact-collision review.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-07 20:08:06 +02:00

502 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Bulk dedup pass dla istniejących scen kanonicznych.
Sceny ingestowane przed dodaniem `duration_proximity` + `duration_perf_strong_match_bump`
(2026-05-03) nigdy nie zobaczyły nowego scoringu — zostały obok siebie jako odrębne
canonical entries. Ten moduł re-scoruje pary scen ze SHARED phashem / performerem
i tworzy auto_merge / pending kandydaty zgodnie z aktualnym scoringiem.
Strategie:
phash — pary scen mające ten sam phash (Hamming ≤5). Mocny sygnał.
performers — dla każdego performera, pair-wise scenes; trafia bumpa duration±6s + perf≥0.5.
`dry_run=True` tylko loguje co by zrobił, nic nie zapisuje.
"""
from __future__ import annotations
import itertools
import logging
import uuid
from collections.abc import Iterable
from dataclasses import dataclass, field
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from app.config import get_settings
from app.db import session_scope
from app.models.merge_candidate import MergeCandidate, MergeKind, MergeStatus
from app.models.scene import Scene, SceneFingerprint, ScenePerformer
from app.resolve.scene_merge import merge_scenes
from app.resolve.scoring import (
ScoreBreakdown,
composite_score,
date_proximity,
duration_proximity,
hamming_distance_hex,
performer_set_similarity,
phash_similarity,
series_mismatch_strength,
title_similarity,
triage,
)
log = logging.getLogger(__name__)
@dataclass
class BulkCounters:
pairs_scored: int = 0
auto_merged: int = 0
pending_added: int = 0
rejected_low: int = 0
skipped_already_pending: int = 0
skipped_self: int = 0
skipped_not_cross_source: int = 0
# ---- pair scoring -----------------------------------------------------
def _scene_phashes(session: Session, scene_id: uuid.UUID) -> list[str]:
return list(
session.execute(
select(SceneFingerprint.value).where(
SceneFingerprint.scene_id == scene_id,
SceneFingerprint.kind == "phash",
)
)
.scalars()
.all()
)
def _scene_performers(session: Session, scene_id: uuid.UUID) -> list[uuid.UUID]:
return list(
session.execute(
select(ScenePerformer.performer_id).where(ScenePerformer.scene_id == scene_id)
)
.scalars()
.all()
)
def _best_phash(left_phashes: list[str], right_phashes: list[str]) -> float | None:
"""Najlepsza similarity między dwoma listami phashy (równa długość)."""
if not left_phashes or not right_phashes:
return None
best = 0.0
for a in left_phashes:
for b in right_phashes:
if len(a) != len(b):
continue
try:
sim = phash_similarity(a, b)
except ValueError:
continue
if sim > best:
best = sim
return best if best > 0 else None
def score_scene_pair(session: Session, a: Scene, b: Scene) -> ScoreBreakdown:
"""Odpowiednik `score_candidate` ale bez wrappowania w NormalizedScene."""
a_phashes = _scene_phashes(session, a.id)
b_phashes = _scene_phashes(session, b.id)
fp = _best_phash(a_phashes, b_phashes)
title = title_similarity(a.title_normalized, b.title_normalized)
a_perfs = _scene_performers(session, a.id)
b_perfs = _scene_performers(session, b.id)
perf = (
performer_set_similarity(a_perfs, b_perfs)
if (a_perfs or b_perfs)
else None
)
date_score = date_proximity(a.release_date, b.release_date) if (a.release_date and b.release_date) else None
duration_score = duration_proximity(a.duration_sec, b.duration_sec)
studio_match: bool | None
if a.studio_id is None or b.studio_id is None:
studio_match = None
else:
studio_match = a.studio_id == b.studio_id
series_mismatch = series_mismatch_strength(a.title_normalized, b.title_normalized)
# Bulk dedup nie jest aggregator — porównujemy dwie kanoniczne sceny, studio
# to prawdziwe studio. Aggregator mode tylko w resolverze przy ingest z tube'a.
composite, reasons = composite_score(
fp=fp,
title=title,
performers=perf,
date_score=date_score,
duration_score=duration_score,
studio_match=studio_match,
aggregator_mode=False,
series_mismatch=series_mismatch,
)
return ScoreBreakdown(
fp=fp,
title=title,
performers=perf,
date=date_score,
duration=duration_score,
studio_match=studio_match,
composite=composite,
reasons=reasons,
)
# ---- candidate selection ---------------------------------------------
def _pairs_sharing_phash(session: Session, max_hamming: int) -> Iterable[tuple[uuid.UUID, uuid.UUID]]:
"""Yield pary scene_id mające phashe w odległości Hamminga ≤ max_hamming.
Nie próbujemy pgvector / BK-tree — dla 215 phashy O(N²) = 46k iteracji jest OK.
"""
rows = session.execute(
select(SceneFingerprint.scene_id, SceneFingerprint.value).where(
SceneFingerprint.kind == "phash"
)
).all()
seen_pairs: set[tuple[uuid.UUID, uuid.UUID]] = set()
for (sid_a, hash_a), (sid_b, hash_b) in itertools.combinations(rows, 2):
if sid_a == sid_b:
continue
if len(hash_a) != len(hash_b):
continue
try:
d = hamming_distance_hex(hash_a, hash_b)
except ValueError:
continue
if d > max_hamming:
continue
pair = (sid_a, sid_b) if sid_a < sid_b else (sid_b, sid_a)
if pair in seen_pairs:
continue
seen_pairs.add(pair)
yield pair
def _pairs_exact_phash(session: Session) -> Iterable[tuple[uuid.UUID, uuid.UUID]]:
"""Yield pary scen mających IDENTYCZNY phash (exact value match) — przez SQL self-join,
O(N log N) zamiast O(N²) z `_pairs_sharing_phash`. Identyczny phash = identyczna
miniaturka = niemal pewnie ten sam scene (potem `_process_pair` scoruje i auto-merguje
tylko ≥auto_t, więc przypadkowe kolizje z innym performerem/duration wpadną w review).
Skalowalne dla setek tys. phashy gdzie Hamming-fuzzy O(N²) jest nierealne."""
from sqlalchemy import text
rows = session.execute(text("""
SELECT DISTINCT a.scene_id AS sa, b.scene_id AS sb
FROM scene_fingerprints a
JOIN scene_fingerprints b
ON a.value = b.value AND a.scene_id < b.scene_id
WHERE a.kind = 'phash' AND b.kind = 'phash'
"""))
for sa, sb in rows:
yield (sa, sb)
def _pairs_sharing_performer(
session: Session,
*,
cross_source_only: bool = False,
) -> Iterable[tuple[uuid.UUID, uuid.UUID]]:
"""Yield pary scen z wspólnym performerem. Dedup pair (a<b).
`cross_source_only=True` — tylko pary (tpdb-only ↔ stashdb-only). To eliminuje
~99% par w bazie z dużą liczbą scen mainstream performerów (popularny performer
ma N scen → N²/2 par; 1959 performerów × średnio 50 scen² = 4M+ par bez filtru,
z filtrem zostają tylko cross-source kandydaci ~10-100x mniej). Bez tego flag
bulk_dedup OOM-uje na 41k+ scenes.
"""
# Per performer, weź wszystkie scene_id i wygeneruj pary.
perf_to_scenes: dict[uuid.UUID, list[uuid.UUID]] = {}
for performer_id, scene_id in session.execute(
select(ScenePerformer.performer_id, ScenePerformer.scene_id)
):
perf_to_scenes.setdefault(performer_id, []).append(scene_id)
if cross_source_only:
# Pre-build scene → canonical kinds map
from app.models.scene import SceneExternalRef
from app.models.source import Source, SourceKind
scene_kinds: dict[uuid.UUID, set[str]] = {}
for sid, kind in session.execute(
select(SceneExternalRef.scene_id, Source.kind)
.join(Source, Source.id == SceneExternalRef.source_id)
.where(Source.kind.in_([SourceKind.tpdb, SourceKind.stashdb]))
):
scene_kinds.setdefault(sid, set()).add(kind.value)
# Pre-build scene → (studio_id, release_date, duration_sec) — żeby pre-filtrować
# pary do tych z realną szansą bycia duplikatami zanim odpalimy scoring (drogie).
# Bez tego 9M par × ~110/s = 22h na 0% hit rate.
scene_meta: dict[uuid.UUID, tuple] = {}
for sid, studio_id, rel_date, dur in session.execute(
select(Scene.id, Scene.studio_id, Scene.release_date, Scene.duration_sec)
):
scene_meta[sid] = (studio_id, rel_date, dur)
def _candidate(a_id: uuid.UUID, b_id: uuid.UUID) -> bool:
a = scene_meta.get(a_id)
b = scene_meta.get(b_id)
if not a or not b:
return False
a_studio, a_date, a_dur = a
b_studio, b_date, b_dur = b
# studio match (oba znają studio i to samo) — bardzo silny sygnał
if a_studio is not None and a_studio == b_studio:
return True
# date ±7d (oba mają daty)
if a_date and b_date and abs((a_date - b_date).days) <= 7:
return True
# duration ±30s (oba znają długość; 30s zostawia margines na intro/outro
# różniący się między TPDB a StashDB metadata)
if a_dur and b_dur and abs(a_dur - b_dur) <= 30:
return True
return False
seen_pairs: set[tuple[uuid.UUID, uuid.UUID]] = set()
for scene_ids in perf_to_scenes.values():
tpdb_only = [sid for sid in scene_ids if scene_kinds.get(sid) == {"tpdb"}]
stash_only = [sid for sid in scene_ids if scene_kinds.get(sid) == {"stashdb"}]
for a in tpdb_only:
for b in stash_only:
pair = (a, b) if a < b else (b, a)
if pair in seen_pairs:
continue
if not _candidate(a, b):
continue
seen_pairs.add(pair)
yield pair
return
seen_pairs = set()
for scene_ids in perf_to_scenes.values():
if len(scene_ids) < 2:
continue
for a, b in itertools.combinations(scene_ids, 2):
pair = (a, b) if a < b else (b, a)
if pair in seen_pairs:
continue
seen_pairs.add(pair)
yield pair
# ---- main entry --------------------------------------------------------
def run_bulk_dedup(
*,
strategy: str = "all", # 'phash' | 'performers' | 'all'
dry_run: bool = False,
max_hamming: int | None = None,
auto_merge_threshold: float | None = None,
review_threshold: float | None = None,
cross_source_only: bool = False,
) -> BulkCounters:
settings = get_settings()
auto_t = auto_merge_threshold or settings.auto_merge_threshold
review_t = review_threshold or settings.review_threshold
max_h = max_hamming or settings.fingerprint_hamming_max
counters = BulkCounters()
# Etap 1: zbierz wszystkie kandydujące pary (BEZ commita do DB).
log.info("bulk_dedup start strategy=%s dry_run=%s", strategy, dry_run)
with session_scope() as session:
pairs: list[tuple[uuid.UUID, uuid.UUID]] = []
if strategy in ("phash", "all"):
phash_pairs = list(_pairs_sharing_phash(session, max_hamming=max_h))
log.info("bulk_dedup: %d phash-shared pairs", len(phash_pairs))
pairs.extend(phash_pairs)
if strategy == "phash_exact":
# EXACT phash collisions via SQL (O(N log N), nie O(N²) jak _pairs_sharing_phash).
# Identyczny phash = identyczna miniaturka = ten sam scene → missing-merge
# (bug-report 2026-06-03 "ten sam czas, ta sama miniaturka, czemu się nie
# mergują"). Skalowalne dla 441k+ phashy gdzie Hamming-O(N²) jest nierealne.
exact_pairs = list(_pairs_exact_phash(session))
log.info("bulk_dedup: %d exact-phash pairs", len(exact_pairs))
pairs.extend(exact_pairs)
if strategy in ("performers", "all"):
perf_pairs = list(
_pairs_sharing_performer(session, cross_source_only=cross_source_only)
)
log.info(
"bulk_dedup: %d performer-shared pairs (cross_source_only=%s)",
len(perf_pairs), cross_source_only,
)
pairs.extend(perf_pairs)
# Dedup
unique_pairs = list({(a, b) if a < b else (b, a) for a, b in pairs})
log.info("bulk_dedup: %d unique pairs total", len(unique_pairs))
# Etap 2: per pair — score + decision. Każda decyzja w osobnej transakcji,
# żeby gdyby coś padło, częściowy postęp był persistowany.
for sid_a, sid_b in unique_pairs:
try:
_process_pair(
sid_a, sid_b,
auto_t=auto_t, review_t=review_t,
dry_run=dry_run, counters=counters,
cross_source_only=cross_source_only,
)
except Exception as e:
log.exception("bulk_dedup pair %s%s failed: %s", sid_a, sid_b, e)
log.info("bulk_dedup done: %s", counters)
return counters
def _scene_source_kinds(session: Session, scene_id: uuid.UUID) -> set[str]:
"""Zwraca set('tpdb', 'stashdb', 'scraper'...) — kinds external_refów sceny."""
from app.models.scene import SceneExternalRef
from app.models.source import Source
rows = session.execute(
select(Source.kind)
.join(SceneExternalRef, SceneExternalRef.source_id == Source.id)
.where(SceneExternalRef.scene_id == scene_id)
.distinct()
).all()
return {kind.value for (kind,) in rows}
def _is_cross_source_pair(session: Session, sid_a: uuid.UUID, sid_b: uuid.UUID) -> bool:
"""True gdy para to (tpdb-only ↔ stashdb-only) — czyli kandydaci do mergu który
odsłoniłby cross-source overlap. Pomija pary gdzie któryś już ma BOTH refs
(już zmergowane wcześniej) lub te same single-source (rzadko duplikaty)."""
a_kinds = _scene_source_kinds(session, sid_a)
b_kinds = _scene_source_kinds(session, sid_b)
canonical_a = a_kinds & {"tpdb", "stashdb"}
canonical_b = b_kinds & {"tpdb", "stashdb"}
# Obie strony muszą mieć dokładnie 1 canonical źródło, i muszą się różnić.
if len(canonical_a) != 1 or len(canonical_b) != 1:
return False
return canonical_a != canonical_b
def _process_pair(
sid_a: uuid.UUID,
sid_b: uuid.UUID,
*,
auto_t: float,
review_t: float,
dry_run: bool,
counters: BulkCounters,
cross_source_only: bool = False,
) -> None:
with session_scope() as session:
a = session.get(Scene, sid_a)
b = session.get(Scene, sid_b)
if a is None or b is None:
counters.skipped_self += 1
return
if a.id == b.id:
counters.skipped_self += 1
return
if cross_source_only and not _is_cross_source_pair(session, sid_a, sid_b):
counters.skipped_not_cross_source += 1
return
# Skip jeśli już istnieje pending/auto_merged/rejected dla tej pary.
existing = session.execute(
select(MergeCandidate).where(
MergeCandidate.kind == MergeKind.scene,
(
((MergeCandidate.left_id == a.id) & (MergeCandidate.right_id == b.id))
| ((MergeCandidate.left_id == b.id) & (MergeCandidate.right_id == a.id))
),
MergeCandidate.status.in_(
[MergeStatus.pending, MergeStatus.merged, MergeStatus.rejected]
),
).limit(1)
).scalar_one_or_none()
if existing is not None:
counters.skipped_already_pending += 1
return
breakdown = score_scene_pair(session, a, b)
counters.pairs_scored += 1
if counters.pairs_scored % 500 == 0:
log.info(
"bulk_dedup progress: %d pairs scored (auto=%d pending=%d rej=%d)",
counters.pairs_scored,
counters.auto_merged,
counters.pending_added,
counters.rejected_low,
)
decision = "auto" if breakdown.composite >= auto_t else (
"review" if breakdown.composite >= review_t else "reject"
)
if decision == "auto":
counters.auto_merged += 1
if dry_run:
log.info(
"[dry] AUTO %s '%s' (dur=%s) ↔ '%s' (dur=%s) score=%.3f",
a.id, a.title[:40], a.duration_sec, b.title[:40], b.duration_sec,
breakdown.composite,
)
return
# Pick keep = scene z większą liczbą external_refs, tie-break: starszy (lower created_at).
keep, drop = _pick_keep_drop(session, a, b)
log.info(
"AUTO merge keep=%s drop=%s score=%.3f",
keep.id, drop.id, breakdown.composite,
)
session.add(
MergeCandidate(
kind=MergeKind.scene,
left_id=keep.id,
right_id=keep.id, # audit: post-merge scene_id
score=breakdown.composite,
reasons={"path": "bulk_dedup", **breakdown.to_dict()},
status=MergeStatus.auto_merged,
)
)
merge_scenes(
session, keep_id=keep.id, drop_id=drop.id, resolved_by="bulk_dedup"
)
elif decision == "review":
counters.pending_added += 1
if dry_run:
log.info(
"[dry] PENDING '%s''%s' score=%.3f",
a.title[:40], b.title[:40], breakdown.composite,
)
return
session.add(
MergeCandidate(
kind=MergeKind.scene,
left_id=a.id,
right_id=b.id,
score=breakdown.composite,
reasons={"path": "bulk_dedup", **breakdown.to_dict()},
status=MergeStatus.pending,
)
)
else:
counters.rejected_low += 1
def _pick_keep_drop(session: Session, a: Scene, b: Scene) -> tuple[Scene, Scene]:
"""Keep = scena z większą liczbą external_refs. Tie → starsza."""
from app.models.scene import SceneExternalRef
a_count = session.execute(
select(func.count()).select_from(SceneExternalRef).where(SceneExternalRef.scene_id == a.id)
).scalar_one()
b_count = session.execute(
select(func.count()).select_from(SceneExternalRef).where(SceneExternalRef.scene_id == b.id)
).scalar_one()
if a_count > b_count:
return a, b
if b_count > a_count:
return b, a
# tie → starsza scena (lower created_at)
return (a, b) if a.created_at <= b.created_at else (b, a)