goon/app/scheduler/bulk_dedup.py
jtrzupek 2b602beea5 fix(dedup): tighten cross-source candidate prefilter — kill 1800s hang (GOON-V)
_candidate used OR logic (studio OR date±7d OR dur±30s) → 938,950 pairs;
Etap-2 scoring at ~110/s never finished in 1800s → bulk_dedup_performers HUNG
every run, orphan thread leaked until restart. Require AND: same studio plus
(date±2d OR dur±30s). 939k→16k pairs, full run 213s. Real cross-source dup of
one master shares studio + near date/duration; rare studio_id-mismatch pairs
skipped on purpose — a job that COMPLETES beats one that times out merging nothing.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 10:03:33 +02:00

507 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Bulk dedup pass dla istniejących scen kanonicznych.
Sceny ingestowane przed dodaniem `duration_proximity` + `duration_perf_strong_match_bump`
(2026-05-03) nigdy nie zobaczyły nowego scoringu — zostały obok siebie jako odrębne
canonical entries. Ten moduł re-scoruje pary scen ze SHARED phashem / performerem
i tworzy auto_merge / pending kandydaty zgodnie z aktualnym scoringiem.
Strategie:
phash — pary scen mające ten sam phash (Hamming ≤5). Mocny sygnał.
performers — dla każdego performera, pair-wise scenes; trafia bumpa duration±6s + perf≥0.5.
`dry_run=True` tylko loguje co by zrobił, nic nie zapisuje.
"""
from __future__ import annotations
import itertools
import logging
import uuid
from collections.abc import Iterable
from dataclasses import dataclass, field
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from app.config import get_settings
from app.db import session_scope
from app.models.merge_candidate import MergeCandidate, MergeKind, MergeStatus
from app.models.scene import Scene, SceneFingerprint, ScenePerformer
from app.resolve.scene_merge import merge_scenes
from app.resolve.scoring import (
ScoreBreakdown,
composite_score,
date_proximity,
duration_proximity,
hamming_distance_hex,
performer_set_similarity,
phash_similarity,
series_mismatch_strength,
title_similarity,
triage,
)
log = logging.getLogger(__name__)
@dataclass
class BulkCounters:
pairs_scored: int = 0
auto_merged: int = 0
pending_added: int = 0
rejected_low: int = 0
skipped_already_pending: int = 0
skipped_self: int = 0
skipped_not_cross_source: int = 0
# ---- pair scoring -----------------------------------------------------
def _scene_phashes(session: Session, scene_id: uuid.UUID) -> list[str]:
return list(
session.execute(
select(SceneFingerprint.value).where(
SceneFingerprint.scene_id == scene_id,
SceneFingerprint.kind == "phash",
)
)
.scalars()
.all()
)
def _scene_performers(session: Session, scene_id: uuid.UUID) -> list[uuid.UUID]:
return list(
session.execute(
select(ScenePerformer.performer_id).where(ScenePerformer.scene_id == scene_id)
)
.scalars()
.all()
)
def _best_phash(left_phashes: list[str], right_phashes: list[str]) -> float | None:
"""Najlepsza similarity między dwoma listami phashy (równa długość)."""
if not left_phashes or not right_phashes:
return None
best = 0.0
for a in left_phashes:
for b in right_phashes:
if len(a) != len(b):
continue
try:
sim = phash_similarity(a, b)
except ValueError:
continue
if sim > best:
best = sim
return best if best > 0 else None
def score_scene_pair(session: Session, a: Scene, b: Scene) -> ScoreBreakdown:
"""Odpowiednik `score_candidate` ale bez wrappowania w NormalizedScene."""
a_phashes = _scene_phashes(session, a.id)
b_phashes = _scene_phashes(session, b.id)
fp = _best_phash(a_phashes, b_phashes)
title = title_similarity(a.title_normalized, b.title_normalized)
a_perfs = _scene_performers(session, a.id)
b_perfs = _scene_performers(session, b.id)
perf = (
performer_set_similarity(a_perfs, b_perfs)
if (a_perfs or b_perfs)
else None
)
date_score = date_proximity(a.release_date, b.release_date) if (a.release_date and b.release_date) else None
duration_score = duration_proximity(a.duration_sec, b.duration_sec)
studio_match: bool | None
if a.studio_id is None or b.studio_id is None:
studio_match = None
else:
studio_match = a.studio_id == b.studio_id
series_mismatch = series_mismatch_strength(a.title_normalized, b.title_normalized)
# Bulk dedup nie jest aggregator — porównujemy dwie kanoniczne sceny, studio
# to prawdziwe studio. Aggregator mode tylko w resolverze przy ingest z tube'a.
composite, reasons = composite_score(
fp=fp,
title=title,
performers=perf,
date_score=date_score,
duration_score=duration_score,
studio_match=studio_match,
aggregator_mode=False,
series_mismatch=series_mismatch,
)
return ScoreBreakdown(
fp=fp,
title=title,
performers=perf,
date=date_score,
duration=duration_score,
studio_match=studio_match,
composite=composite,
reasons=reasons,
)
# ---- candidate selection ---------------------------------------------
def _pairs_sharing_phash(session: Session, max_hamming: int) -> Iterable[tuple[uuid.UUID, uuid.UUID]]:
"""Yield pary scene_id mające phashe w odległości Hamminga ≤ max_hamming.
Nie próbujemy pgvector / BK-tree — dla 215 phashy O(N²) = 46k iteracji jest OK.
"""
rows = session.execute(
select(SceneFingerprint.scene_id, SceneFingerprint.value).where(
SceneFingerprint.kind == "phash"
)
).all()
seen_pairs: set[tuple[uuid.UUID, uuid.UUID]] = set()
for (sid_a, hash_a), (sid_b, hash_b) in itertools.combinations(rows, 2):
if sid_a == sid_b:
continue
if len(hash_a) != len(hash_b):
continue
try:
d = hamming_distance_hex(hash_a, hash_b)
except ValueError:
continue
if d > max_hamming:
continue
pair = (sid_a, sid_b) if sid_a < sid_b else (sid_b, sid_a)
if pair in seen_pairs:
continue
seen_pairs.add(pair)
yield pair
def _pairs_exact_phash(session: Session) -> Iterable[tuple[uuid.UUID, uuid.UUID]]:
"""Yield pary scen mających IDENTYCZNY phash (exact value match) — przez SQL self-join,
O(N log N) zamiast O(N²) z `_pairs_sharing_phash`. Identyczny phash = identyczna
miniaturka = niemal pewnie ten sam scene (potem `_process_pair` scoruje i auto-merguje
tylko ≥auto_t, więc przypadkowe kolizje z innym performerem/duration wpadną w review).
Skalowalne dla setek tys. phashy gdzie Hamming-fuzzy O(N²) jest nierealne."""
from sqlalchemy import text
rows = session.execute(text("""
SELECT DISTINCT a.scene_id AS sa, b.scene_id AS sb
FROM scene_fingerprints a
JOIN scene_fingerprints b
ON a.value = b.value AND a.scene_id < b.scene_id
WHERE a.kind = 'phash' AND b.kind = 'phash'
"""))
for sa, sb in rows:
yield (sa, sb)
def _pairs_sharing_performer(
session: Session,
*,
cross_source_only: bool = False,
) -> Iterable[tuple[uuid.UUID, uuid.UUID]]:
"""Yield pary scen z wspólnym performerem. Dedup pair (a<b).
`cross_source_only=True` — tylko pary (tpdb-only ↔ stashdb-only). To eliminuje
~99% par w bazie z dużą liczbą scen mainstream performerów (popularny performer
ma N scen → N²/2 par; 1959 performerów × średnio 50 scen² = 4M+ par bez filtru,
z filtrem zostają tylko cross-source kandydaci ~10-100x mniej). Bez tego flag
bulk_dedup OOM-uje na 41k+ scenes.
"""
# Per performer, weź wszystkie scene_id i wygeneruj pary.
perf_to_scenes: dict[uuid.UUID, list[uuid.UUID]] = {}
for performer_id, scene_id in session.execute(
select(ScenePerformer.performer_id, ScenePerformer.scene_id)
):
perf_to_scenes.setdefault(performer_id, []).append(scene_id)
if cross_source_only:
# Pre-build scene → canonical kinds map
from app.models.scene import SceneExternalRef
from app.models.source import Source, SourceKind
scene_kinds: dict[uuid.UUID, set[str]] = {}
for sid, kind in session.execute(
select(SceneExternalRef.scene_id, Source.kind)
.join(Source, Source.id == SceneExternalRef.source_id)
.where(Source.kind.in_([SourceKind.tpdb, SourceKind.stashdb]))
):
scene_kinds.setdefault(sid, set()).add(kind.value)
# Pre-build scene → (studio_id, release_date, duration_sec) — żeby pre-filtrować
# pary do tych z realną szansą bycia duplikatami zanim odpalimy scoring (drogie).
# Bez tego 9M par × ~110/s = 22h na 0% hit rate.
scene_meta: dict[uuid.UUID, tuple] = {}
for sid, studio_id, rel_date, dur in session.execute(
select(Scene.id, Scene.studio_id, Scene.release_date, Scene.duration_sec)
):
scene_meta[sid] = (studio_id, rel_date, dur)
def _candidate(a_id: uuid.UUID, b_id: uuid.UUID) -> bool:
# Prefiltr cross-source par PRZED scoringiem (drogi). Wymagamy KONIUNKCJI
# sygnałów: identyczne studio ORAZ zbieżna data/długość. Wcześniej alternatywa
# (studio LUB data±7d LUB dur±30s) przepuszczała 939k par — płodny performer w
# jednym studio generuje tpdb×stashdb kartezjan, a sam wspólny studio go nie tnie.
# Etap 2 (~110 par/s) nie kończył w 1800s → job HUNG co run (GOON-V), wątek leak.
# studio match AND (data±2d OR dur±30s) ⇒ 939k→~16k par, ~240s całość. Prawdziwy
# cross-source dup tej samej sceny ma to samo studio + ~tę samą datę/długość (jeden
# master). Pary o rozjechanym studio_id (rzadkie po studio-dedup) świadomie pomijamy
# — częściowe pokrycie które KOŃCZY > pełne które timeoutuje i nie merge'uje nic.
a = scene_meta.get(a_id)
b = scene_meta.get(b_id)
if not a or not b:
return False
a_studio, a_date, a_dur = a
b_studio, b_date, b_dur = b
# Kotwica: oba znają studio i to samo.
if a_studio is None or a_studio != b_studio:
return False
# Plus zgodność czasowa LUB długości (znana-i-bliska, nie tylko brak).
date_ok = bool(a_date and b_date and abs((a_date - b_date).days) <= 2)
dur_ok = bool(a_dur and b_dur and abs(a_dur - b_dur) <= 30)
return date_ok or dur_ok
seen_pairs: set[tuple[uuid.UUID, uuid.UUID]] = set()
for scene_ids in perf_to_scenes.values():
tpdb_only = [sid for sid in scene_ids if scene_kinds.get(sid) == {"tpdb"}]
stash_only = [sid for sid in scene_ids if scene_kinds.get(sid) == {"stashdb"}]
for a in tpdb_only:
for b in stash_only:
pair = (a, b) if a < b else (b, a)
if pair in seen_pairs:
continue
if not _candidate(a, b):
continue
seen_pairs.add(pair)
yield pair
return
seen_pairs = set()
for scene_ids in perf_to_scenes.values():
if len(scene_ids) < 2:
continue
for a, b in itertools.combinations(scene_ids, 2):
pair = (a, b) if a < b else (b, a)
if pair in seen_pairs:
continue
seen_pairs.add(pair)
yield pair
# ---- main entry --------------------------------------------------------
def run_bulk_dedup(
*,
strategy: str = "all", # 'phash' | 'performers' | 'all'
dry_run: bool = False,
max_hamming: int | None = None,
auto_merge_threshold: float | None = None,
review_threshold: float | None = None,
cross_source_only: bool = False,
) -> BulkCounters:
settings = get_settings()
auto_t = auto_merge_threshold or settings.auto_merge_threshold
review_t = review_threshold or settings.review_threshold
max_h = max_hamming or settings.fingerprint_hamming_max
counters = BulkCounters()
# Etap 1: zbierz wszystkie kandydujące pary (BEZ commita do DB).
log.info("bulk_dedup start strategy=%s dry_run=%s", strategy, dry_run)
with session_scope() as session:
pairs: list[tuple[uuid.UUID, uuid.UUID]] = []
if strategy in ("phash", "all"):
phash_pairs = list(_pairs_sharing_phash(session, max_hamming=max_h))
log.info("bulk_dedup: %d phash-shared pairs", len(phash_pairs))
pairs.extend(phash_pairs)
if strategy == "phash_exact":
# EXACT phash collisions via SQL (O(N log N), nie O(N²) jak _pairs_sharing_phash).
# Identyczny phash = identyczna miniaturka = ten sam scene → missing-merge
# (bug-report 2026-06-03 "ten sam czas, ta sama miniaturka, czemu się nie
# mergują"). Skalowalne dla 441k+ phashy gdzie Hamming-O(N²) jest nierealne.
exact_pairs = list(_pairs_exact_phash(session))
log.info("bulk_dedup: %d exact-phash pairs", len(exact_pairs))
pairs.extend(exact_pairs)
if strategy in ("performers", "all"):
perf_pairs = list(
_pairs_sharing_performer(session, cross_source_only=cross_source_only)
)
log.info(
"bulk_dedup: %d performer-shared pairs (cross_source_only=%s)",
len(perf_pairs), cross_source_only,
)
pairs.extend(perf_pairs)
# Dedup
unique_pairs = list({(a, b) if a < b else (b, a) for a, b in pairs})
log.info("bulk_dedup: %d unique pairs total", len(unique_pairs))
# Etap 2: per pair — score + decision. Każda decyzja w osobnej transakcji,
# żeby gdyby coś padło, częściowy postęp był persistowany.
for sid_a, sid_b in unique_pairs:
try:
_process_pair(
sid_a, sid_b,
auto_t=auto_t, review_t=review_t,
dry_run=dry_run, counters=counters,
cross_source_only=cross_source_only,
)
except Exception as e:
log.exception("bulk_dedup pair %s%s failed: %s", sid_a, sid_b, e)
log.info("bulk_dedup done: %s", counters)
return counters
def _scene_source_kinds(session: Session, scene_id: uuid.UUID) -> set[str]:
"""Zwraca set('tpdb', 'stashdb', 'scraper'...) — kinds external_refów sceny."""
from app.models.scene import SceneExternalRef
from app.models.source import Source
rows = session.execute(
select(Source.kind)
.join(SceneExternalRef, SceneExternalRef.source_id == Source.id)
.where(SceneExternalRef.scene_id == scene_id)
.distinct()
).all()
return {kind.value for (kind,) in rows}
def _is_cross_source_pair(session: Session, sid_a: uuid.UUID, sid_b: uuid.UUID) -> bool:
"""True gdy para to (tpdb-only ↔ stashdb-only) — czyli kandydaci do mergu który
odsłoniłby cross-source overlap. Pomija pary gdzie któryś już ma BOTH refs
(już zmergowane wcześniej) lub te same single-source (rzadko duplikaty)."""
a_kinds = _scene_source_kinds(session, sid_a)
b_kinds = _scene_source_kinds(session, sid_b)
canonical_a = a_kinds & {"tpdb", "stashdb"}
canonical_b = b_kinds & {"tpdb", "stashdb"}
# Obie strony muszą mieć dokładnie 1 canonical źródło, i muszą się różnić.
if len(canonical_a) != 1 or len(canonical_b) != 1:
return False
return canonical_a != canonical_b
def _process_pair(
sid_a: uuid.UUID,
sid_b: uuid.UUID,
*,
auto_t: float,
review_t: float,
dry_run: bool,
counters: BulkCounters,
cross_source_only: bool = False,
) -> None:
with session_scope() as session:
a = session.get(Scene, sid_a)
b = session.get(Scene, sid_b)
if a is None or b is None:
counters.skipped_self += 1
return
if a.id == b.id:
counters.skipped_self += 1
return
if cross_source_only and not _is_cross_source_pair(session, sid_a, sid_b):
counters.skipped_not_cross_source += 1
return
# Skip jeśli już istnieje pending/auto_merged/rejected dla tej pary.
existing = session.execute(
select(MergeCandidate).where(
MergeCandidate.kind == MergeKind.scene,
(
((MergeCandidate.left_id == a.id) & (MergeCandidate.right_id == b.id))
| ((MergeCandidate.left_id == b.id) & (MergeCandidate.right_id == a.id))
),
MergeCandidate.status.in_(
[MergeStatus.pending, MergeStatus.merged, MergeStatus.rejected]
),
).limit(1)
).scalar_one_or_none()
if existing is not None:
counters.skipped_already_pending += 1
return
breakdown = score_scene_pair(session, a, b)
counters.pairs_scored += 1
if counters.pairs_scored % 500 == 0:
log.info(
"bulk_dedup progress: %d pairs scored (auto=%d pending=%d rej=%d)",
counters.pairs_scored,
counters.auto_merged,
counters.pending_added,
counters.rejected_low,
)
decision = "auto" if breakdown.composite >= auto_t else (
"review" if breakdown.composite >= review_t else "reject"
)
if decision == "auto":
counters.auto_merged += 1
if dry_run:
log.info(
"[dry] AUTO %s '%s' (dur=%s) ↔ '%s' (dur=%s) score=%.3f",
a.id, a.title[:40], a.duration_sec, b.title[:40], b.duration_sec,
breakdown.composite,
)
return
# Pick keep = scene z większą liczbą external_refs, tie-break: starszy (lower created_at).
keep, drop = _pick_keep_drop(session, a, b)
log.info(
"AUTO merge keep=%s drop=%s score=%.3f",
keep.id, drop.id, breakdown.composite,
)
session.add(
MergeCandidate(
kind=MergeKind.scene,
left_id=keep.id,
right_id=keep.id, # audit: post-merge scene_id
score=breakdown.composite,
reasons={"path": "bulk_dedup", **breakdown.to_dict()},
status=MergeStatus.auto_merged,
)
)
merge_scenes(
session, keep_id=keep.id, drop_id=drop.id, resolved_by="bulk_dedup"
)
elif decision == "review":
counters.pending_added += 1
if dry_run:
log.info(
"[dry] PENDING '%s''%s' score=%.3f",
a.title[:40], b.title[:40], breakdown.composite,
)
return
session.add(
MergeCandidate(
kind=MergeKind.scene,
left_id=a.id,
right_id=b.id,
score=breakdown.composite,
reasons={"path": "bulk_dedup", **breakdown.to_dict()},
status=MergeStatus.pending,
)
)
else:
counters.rejected_low += 1
def _pick_keep_drop(session: Session, a: Scene, b: Scene) -> tuple[Scene, Scene]:
"""Keep = scena z większą liczbą external_refs. Tie → starsza."""
from app.models.scene import SceneExternalRef
a_count = session.execute(
select(func.count()).select_from(SceneExternalRef).where(SceneExternalRef.scene_id == a.id)
).scalar_one()
b_count = session.execute(
select(func.count()).select_from(SceneExternalRef).where(SceneExternalRef.scene_id == b.id)
).scalar_one()
if a_count > b_count:
return a, b
if b_count > a_count:
return b, a
# tie → starsza scena (lower created_at)
return (a, b) if a.created_at <= b.created_at else (b, a)