"""Dedup missing-merge: ten sam performer + identyczny znormalizowany tytuł + identyczna długość (co do sekundy). bulk_dedup łapie cross-source (tpdb↔stashdb) i exact-phash, ale NIE tube-dupy bez fingerprintów (ta sama scena zescrapowana 2× pod różnym URL/slug, albo cross-tube np. porn00 wciąga wideo już obecne z xnxx). User widzi „te same miniatury, duplikat" (reports 28fe8181 / 32df33b1 — porn00). Sygnał `same performer + exact norm-title + exact duration_sec` jest praktycznie pewny (dwa różne wideo nie mają byte-identycznego tytułu I długości co do sekundy). Bez wspólnej aktorki NIE łączymy (over-match trap). Re-ingesty pod nowymi slug/tytułami → dupy odrastają, stąd cyklicznie (scheduler). Też jako one-shot (scripts/merge_exact_title_duration.py importuje stąd). """ from __future__ import annotations import logging import uuid as _u from sqlalchemy import text from app.db import session_scope from app.resolve.scene_merge import merge_scenes log = logging.getLogger(__name__) def _groups(pid: str | None, playback_only: bool) -> list[list[str]]: where_perf = "AND sp.performer_id = :pid" if pid else "" # app-visible: tylko sceny z żywym playbackiem (to co user faktycznie widzi) — # pomija canonical stuby bez tube-linków. where_pb = ( "AND EXISTS (SELECT 1 FROM playback_sources p WHERE p.scene_id=s.id AND p.dead_at IS NULL)" if playback_only else "" ) sql = f""" WITH cand AS ( SELECT s.id, sp.performer_id, lower(btrim(s.title)) nt, s.duration_sec dur, s.created_at, (SELECT count(*) FROM scene_external_refs r WHERE r.scene_id=s.id) refs, (SELECT count(*) FROM playback_sources p WHERE p.scene_id=s.id) srcs FROM scenes s JOIN scene_performers sp ON sp.scene_id=s.id {where_perf} WHERE s.duration_sec IS NOT NULL AND btrim(s.title) <> '' {where_pb} ) SELECT array_agg(id::text ORDER BY refs DESC, srcs DESC, created_at ASC) members FROM cand GROUP BY performer_id, nt, dur HAVING count(*) > 1 """ params = {"pid": pid} if pid else {} with session_scope() as s: rows = s.execute(text(sql), params).all() seen: set[frozenset] = set() out: list[list[str]] = [] for (members,) in rows: key = frozenset(members) if key in seen: continue seen.add(key) out.append(list(members)) return out def run_title_duration_dedup( *, pid: str | None = None, playback_only: bool = True, commit: bool = True ) -> dict[str, int]: """Scal dupy o identycznym performer+tytuł+długość. Zwraca {groups, merges, merged, errors}.""" groups = _groups(pid, playback_only) pairs = sum(len(g) - 1 for g in groups) merged = errors = 0 for g in groups: keep = g[0] for drop in g[1:]: if not commit: continue try: with session_scope() as s: merge_scenes( s, keep_id=_u.UUID(keep), drop_id=_u.UUID(drop), resolved_by="title_duration_dedup", ) merged += 1 except Exception as e: # pragma: no cover - obronnie errors += 1 if errors <= 20: log.warning("title_dedup keep %s drop %s: %s", keep[:8], drop[:8], str(e)[:120]) result = {"groups": len(groups), "merges": pairs, "merged": merged, "errors": errors} if commit: log.info("title_duration_dedup: %s", result) return result