"""Bulk dedup pass dla istniejących scen kanonicznych. Sceny ingestowane przed dodaniem `duration_proximity` + `duration_perf_strong_match_bump` (2026-05-03) nigdy nie zobaczyły nowego scoringu — zostały obok siebie jako odrębne canonical entries. Ten moduł re-scoruje pary scen ze SHARED phashem / performerem i tworzy auto_merge / pending kandydaty zgodnie z aktualnym scoringiem. Strategie: phash — pary scen mające ten sam phash (Hamming ≤5). Mocny sygnał. performers — dla każdego performera, pair-wise scenes; trafia bumpa duration±6s + perf≥0.5. `dry_run=True` tylko loguje co by zrobił, nic nie zapisuje. """ from __future__ import annotations import itertools import logging import uuid from collections.abc import Iterable from dataclasses import dataclass, field from sqlalchemy import func, select from sqlalchemy.orm import Session from app.config import get_settings from app.db import session_scope from app.models.merge_candidate import MergeCandidate, MergeKind, MergeStatus from app.models.scene import Scene, SceneFingerprint, ScenePerformer from app.resolve.scene_merge import merge_scenes from app.resolve.scoring import ( ScoreBreakdown, composite_score, date_proximity, duration_proximity, hamming_distance_hex, performer_set_similarity, phash_similarity, series_mismatch_strength, title_similarity, triage, ) log = logging.getLogger(__name__) @dataclass class BulkCounters: pairs_scored: int = 0 auto_merged: int = 0 pending_added: int = 0 rejected_low: int = 0 skipped_already_pending: int = 0 skipped_self: int = 0 skipped_not_cross_source: int = 0 # ---- pair scoring ----------------------------------------------------- def _scene_phashes(session: Session, scene_id: uuid.UUID) -> list[str]: return list( session.execute( select(SceneFingerprint.value).where( SceneFingerprint.scene_id == scene_id, SceneFingerprint.kind == "phash", ) ) .scalars() .all() ) def _scene_performers(session: Session, scene_id: uuid.UUID) -> list[uuid.UUID]: return list( session.execute( select(ScenePerformer.performer_id).where(ScenePerformer.scene_id == scene_id) ) .scalars() .all() ) def _best_phash(left_phashes: list[str], right_phashes: list[str]) -> float | None: """Najlepsza similarity między dwoma listami phashy (równa długość).""" if not left_phashes or not right_phashes: return None best = 0.0 for a in left_phashes: for b in right_phashes: if len(a) != len(b): continue try: sim = phash_similarity(a, b) except ValueError: continue if sim > best: best = sim return best if best > 0 else None def score_scene_pair(session: Session, a: Scene, b: Scene) -> ScoreBreakdown: """Odpowiednik `score_candidate` ale bez wrappowania w NormalizedScene.""" a_phashes = _scene_phashes(session, a.id) b_phashes = _scene_phashes(session, b.id) fp = _best_phash(a_phashes, b_phashes) title = title_similarity(a.title_normalized, b.title_normalized) a_perfs = _scene_performers(session, a.id) b_perfs = _scene_performers(session, b.id) perf = ( performer_set_similarity(a_perfs, b_perfs) if (a_perfs or b_perfs) else None ) date_score = date_proximity(a.release_date, b.release_date) if (a.release_date and b.release_date) else None duration_score = duration_proximity(a.duration_sec, b.duration_sec) studio_match: bool | None if a.studio_id is None or b.studio_id is None: studio_match = None else: studio_match = a.studio_id == b.studio_id series_mismatch = series_mismatch_strength(a.title_normalized, b.title_normalized) # Bulk dedup nie jest aggregator — porównujemy dwie kanoniczne sceny, studio # to prawdziwe studio. Aggregator mode tylko w resolverze przy ingest z tube'a. composite, reasons = composite_score( fp=fp, title=title, performers=perf, date_score=date_score, duration_score=duration_score, studio_match=studio_match, aggregator_mode=False, series_mismatch=series_mismatch, ) return ScoreBreakdown( fp=fp, title=title, performers=perf, date=date_score, duration=duration_score, studio_match=studio_match, composite=composite, reasons=reasons, ) # ---- candidate selection --------------------------------------------- def _pairs_sharing_phash(session: Session, max_hamming: int) -> Iterable[tuple[uuid.UUID, uuid.UUID]]: """Yield pary scene_id mające phashe w odległości Hamminga ≤ max_hamming. Nie próbujemy pgvector / BK-tree — dla 215 phashy O(N²) = 46k iteracji jest OK. """ rows = session.execute( select(SceneFingerprint.scene_id, SceneFingerprint.value).where( SceneFingerprint.kind == "phash" ) ).all() seen_pairs: set[tuple[uuid.UUID, uuid.UUID]] = set() for (sid_a, hash_a), (sid_b, hash_b) in itertools.combinations(rows, 2): if sid_a == sid_b: continue if len(hash_a) != len(hash_b): continue try: d = hamming_distance_hex(hash_a, hash_b) except ValueError: continue if d > max_hamming: continue pair = (sid_a, sid_b) if sid_a < sid_b else (sid_b, sid_a) if pair in seen_pairs: continue seen_pairs.add(pair) yield pair def _pairs_exact_phash(session: Session) -> Iterable[tuple[uuid.UUID, uuid.UUID]]: """Yield pary scen mających IDENTYCZNY phash (exact value match) — przez SQL self-join, O(N log N) zamiast O(N²) z `_pairs_sharing_phash`. Identyczny phash = identyczna miniaturka = niemal pewnie ten sam scene (potem `_process_pair` scoruje i auto-merguje tylko ≥auto_t, więc przypadkowe kolizje z innym performerem/duration wpadną w review). Skalowalne dla setek tys. phashy gdzie Hamming-fuzzy O(N²) jest nierealne.""" from sqlalchemy import text rows = session.execute(text(""" SELECT DISTINCT a.scene_id AS sa, b.scene_id AS sb FROM scene_fingerprints a JOIN scene_fingerprints b ON a.value = b.value AND a.scene_id < b.scene_id WHERE a.kind = 'phash' AND b.kind = 'phash' """)) for sa, sb in rows: yield (sa, sb) def _pairs_sharing_performer( session: Session, *, cross_source_only: bool = False, ) -> Iterable[tuple[uuid.UUID, uuid.UUID]]: """Yield pary scen z wspólnym performerem. Dedup pair (a bool: # Prefiltr cross-source par PRZED scoringiem (drogi). Wymagamy KONIUNKCJI # sygnałów: identyczne studio ORAZ zbieżna data/długość. Wcześniej alternatywa # (studio LUB data±7d LUB dur±30s) przepuszczała 939k par — płodny performer w # jednym studio generuje tpdb×stashdb kartezjan, a sam wspólny studio go nie tnie. # Etap 2 (~110 par/s) nie kończył w 1800s → job HUNG co run (GOON-V), wątek leak. # studio match AND (data±2d OR dur±30s) ⇒ 939k→~16k par, ~240s całość. Prawdziwy # cross-source dup tej samej sceny ma to samo studio + ~tę samą datę/długość (jeden # master). Pary o rozjechanym studio_id (rzadkie po studio-dedup) świadomie pomijamy # — częściowe pokrycie które KOŃCZY > pełne które timeoutuje i nie merge'uje nic. a = scene_meta.get(a_id) b = scene_meta.get(b_id) if not a or not b: return False a_studio, a_date, a_dur = a b_studio, b_date, b_dur = b # Kotwica: oba znają studio i to samo. if a_studio is None or a_studio != b_studio: return False # Plus zgodność czasowa LUB długości (znana-i-bliska, nie tylko brak). date_ok = bool(a_date and b_date and abs((a_date - b_date).days) <= 2) dur_ok = bool(a_dur and b_dur and abs(a_dur - b_dur) <= 30) return date_ok or dur_ok seen_pairs: set[tuple[uuid.UUID, uuid.UUID]] = set() for scene_ids in perf_to_scenes.values(): tpdb_only = [sid for sid in scene_ids if scene_kinds.get(sid) == {"tpdb"}] stash_only = [sid for sid in scene_ids if scene_kinds.get(sid) == {"stashdb"}] for a in tpdb_only: for b in stash_only: pair = (a, b) if a < b else (b, a) if pair in seen_pairs: continue if not _candidate(a, b): continue seen_pairs.add(pair) yield pair return seen_pairs = set() for scene_ids in perf_to_scenes.values(): if len(scene_ids) < 2: continue for a, b in itertools.combinations(scene_ids, 2): pair = (a, b) if a < b else (b, a) if pair in seen_pairs: continue seen_pairs.add(pair) yield pair # ---- main entry -------------------------------------------------------- def run_bulk_dedup( *, strategy: str = "all", # 'phash' | 'performers' | 'all' dry_run: bool = False, max_hamming: int | None = None, auto_merge_threshold: float | None = None, review_threshold: float | None = None, cross_source_only: bool = False, ) -> BulkCounters: settings = get_settings() auto_t = auto_merge_threshold or settings.auto_merge_threshold review_t = review_threshold or settings.review_threshold max_h = max_hamming or settings.fingerprint_hamming_max counters = BulkCounters() # Etap 1: zbierz wszystkie kandydujące pary (BEZ commita do DB). log.info("bulk_dedup start strategy=%s dry_run=%s", strategy, dry_run) with session_scope() as session: pairs: list[tuple[uuid.UUID, uuid.UUID]] = [] if strategy in ("phash", "all"): phash_pairs = list(_pairs_sharing_phash(session, max_hamming=max_h)) log.info("bulk_dedup: %d phash-shared pairs", len(phash_pairs)) pairs.extend(phash_pairs) if strategy == "phash_exact": # EXACT phash collisions via SQL (O(N log N), nie O(N²) jak _pairs_sharing_phash). # Identyczny phash = identyczna miniaturka = ten sam scene → missing-merge # (bug-report 2026-06-03 "ten sam czas, ta sama miniaturka, czemu się nie # mergują"). Skalowalne dla 441k+ phashy gdzie Hamming-O(N²) jest nierealne. exact_pairs = list(_pairs_exact_phash(session)) log.info("bulk_dedup: %d exact-phash pairs", len(exact_pairs)) pairs.extend(exact_pairs) if strategy in ("performers", "all"): perf_pairs = list( _pairs_sharing_performer(session, cross_source_only=cross_source_only) ) log.info( "bulk_dedup: %d performer-shared pairs (cross_source_only=%s)", len(perf_pairs), cross_source_only, ) pairs.extend(perf_pairs) # Dedup unique_pairs = list({(a, b) if a < b else (b, a) for a, b in pairs}) log.info("bulk_dedup: %d unique pairs total", len(unique_pairs)) # Etap 2: per pair — score + decision. Każda decyzja w osobnej transakcji, # żeby gdyby coś padło, częściowy postęp był persistowany. for sid_a, sid_b in unique_pairs: try: _process_pair( sid_a, sid_b, auto_t=auto_t, review_t=review_t, dry_run=dry_run, counters=counters, cross_source_only=cross_source_only, ) except Exception as e: log.exception("bulk_dedup pair %s↔%s failed: %s", sid_a, sid_b, e) log.info("bulk_dedup done: %s", counters) return counters def _scene_source_kinds(session: Session, scene_id: uuid.UUID) -> set[str]: """Zwraca set('tpdb', 'stashdb', 'scraper'...) — kinds external_refów sceny.""" from app.models.scene import SceneExternalRef from app.models.source import Source rows = session.execute( select(Source.kind) .join(SceneExternalRef, SceneExternalRef.source_id == Source.id) .where(SceneExternalRef.scene_id == scene_id) .distinct() ).all() return {kind.value for (kind,) in rows} def _is_cross_source_pair(session: Session, sid_a: uuid.UUID, sid_b: uuid.UUID) -> bool: """True gdy para to (tpdb-only ↔ stashdb-only) — czyli kandydaci do mergu który odsłoniłby cross-source overlap. Pomija pary gdzie któryś już ma BOTH refs (już zmergowane wcześniej) lub te same single-source (rzadko duplikaty).""" a_kinds = _scene_source_kinds(session, sid_a) b_kinds = _scene_source_kinds(session, sid_b) canonical_a = a_kinds & {"tpdb", "stashdb"} canonical_b = b_kinds & {"tpdb", "stashdb"} # Obie strony muszą mieć dokładnie 1 canonical źródło, i muszą się różnić. if len(canonical_a) != 1 or len(canonical_b) != 1: return False return canonical_a != canonical_b def _process_pair( sid_a: uuid.UUID, sid_b: uuid.UUID, *, auto_t: float, review_t: float, dry_run: bool, counters: BulkCounters, cross_source_only: bool = False, ) -> None: with session_scope() as session: a = session.get(Scene, sid_a) b = session.get(Scene, sid_b) if a is None or b is None: counters.skipped_self += 1 return if a.id == b.id: counters.skipped_self += 1 return if cross_source_only and not _is_cross_source_pair(session, sid_a, sid_b): counters.skipped_not_cross_source += 1 return # Skip jeśli już istnieje pending/auto_merged/rejected dla tej pary. existing = session.execute( select(MergeCandidate).where( MergeCandidate.kind == MergeKind.scene, ( ((MergeCandidate.left_id == a.id) & (MergeCandidate.right_id == b.id)) | ((MergeCandidate.left_id == b.id) & (MergeCandidate.right_id == a.id)) ), MergeCandidate.status.in_( [MergeStatus.pending, MergeStatus.merged, MergeStatus.rejected] ), ).limit(1) ).scalar_one_or_none() if existing is not None: counters.skipped_already_pending += 1 return breakdown = score_scene_pair(session, a, b) counters.pairs_scored += 1 if counters.pairs_scored % 500 == 0: log.info( "bulk_dedup progress: %d pairs scored (auto=%d pending=%d rej=%d)", counters.pairs_scored, counters.auto_merged, counters.pending_added, counters.rejected_low, ) decision = "auto" if breakdown.composite >= auto_t else ( "review" if breakdown.composite >= review_t else "reject" ) if decision == "auto": counters.auto_merged += 1 if dry_run: log.info( "[dry] AUTO %s '%s' (dur=%s) ↔ '%s' (dur=%s) score=%.3f", a.id, a.title[:40], a.duration_sec, b.title[:40], b.duration_sec, breakdown.composite, ) return # Pick keep = scene z większą liczbą external_refs, tie-break: starszy (lower created_at). keep, drop = _pick_keep_drop(session, a, b) log.info( "AUTO merge keep=%s drop=%s score=%.3f", keep.id, drop.id, breakdown.composite, ) session.add( MergeCandidate( kind=MergeKind.scene, left_id=keep.id, right_id=keep.id, # audit: post-merge scene_id score=breakdown.composite, reasons={"path": "bulk_dedup", **breakdown.to_dict()}, status=MergeStatus.auto_merged, ) ) merge_scenes( session, keep_id=keep.id, drop_id=drop.id, resolved_by="bulk_dedup" ) elif decision == "review": counters.pending_added += 1 if dry_run: log.info( "[dry] PENDING '%s' ↔ '%s' score=%.3f", a.title[:40], b.title[:40], breakdown.composite, ) return session.add( MergeCandidate( kind=MergeKind.scene, left_id=a.id, right_id=b.id, score=breakdown.composite, reasons={"path": "bulk_dedup", **breakdown.to_dict()}, status=MergeStatus.pending, ) ) else: counters.rejected_low += 1 def _pick_keep_drop(session: Session, a: Scene, b: Scene) -> tuple[Scene, Scene]: """Keep = scena z większą liczbą external_refs. Tie → starsza.""" from app.models.scene import SceneExternalRef a_count = session.execute( select(func.count()).select_from(SceneExternalRef).where(SceneExternalRef.scene_id == a.id) ).scalar_one() b_count = session.execute( select(func.count()).select_from(SceneExternalRef).where(SceneExternalRef.scene_id == b.id) ).scalar_one() if a_count > b_count: return a, b if b_count > a_count: return b, a # tie → starsza scena (lower created_at) return (a, b) if a.created_at <= b.created_at else (b, a)