"""Merge missing-merge duplikatów: ten sam performer + identyczny znormalizowany tytuł + identyczna długość (co do sekundy). Kontekst: bulk_dedup łapie cross-source (tpdb↔stashdb) i exact-phash, ale NIE łapie tube-dup bez fingerprintów (np. ta sama scena zescrapowana 2× pod różnym URL/slug). Na stronie performera user widzi wtedy "te same miniatury, duplikat" (bug-report ef92809d — Bad Bella miała 25 takich par). Sygnał `same performer + exact norm-title + exact duration_sec` jest praktycznie pewny (dwa różne wideo nie mają byte-identycznego tytułu I długości co do sekundy). Keep = scena z największą liczbą external_refs → potem playback_sources → potem najstarsza. Merge przez resolve.scene_merge.merge_scenes (przenosi refs/performers/tags/fingerprints/ playback_sources — playback move dodany 2026-06-08 razem z tym skryptem). Użycie (kontener worker): python scripts/merge_exact_title_duration.py [PERFORMER_ID] [--commit] Bez PERFORMER_ID = wszyscy performerzy (global). Bez --commit = dry-run. """ from __future__ import annotations import sys from sqlalchemy import text from app.db import session_scope from app.resolve.scene_merge import merge_scenes def _args() -> tuple[str | None, bool, bool]: commit = "--commit" in sys.argv playback_only = "--playback-only" in sys.argv pid = None for a in sys.argv[1:]: if not a.startswith("--") and len(a) >= 32: pid = a return pid, commit, playback_only def _groups(pid: str | None, playback_only: bool = False) -> list[list[str]]: # Grupy scen (per performer) o identycznym lower(trim(title)) + duration_sec. # member order: refs DESC, srcs DESC, created_at ASC → pierwszy = keeper. where_perf = "AND sp.performer_id = :pid" if pid else "" # app-visible: tylko sceny z żywym playbackiem (to co user faktycznie widzi na # stronach) — pomija canonical stuby bez tube-linków. where_pb = ( "AND EXISTS (SELECT 1 FROM playback_sources p WHERE p.scene_id=s.id AND p.dead_at IS NULL)" if playback_only else "" ) sql = f""" WITH cand AS ( SELECT s.id, sp.performer_id, lower(btrim(s.title)) nt, s.duration_sec dur, s.created_at, (SELECT count(*) FROM scene_external_refs r WHERE r.scene_id=s.id) refs, (SELECT count(*) FROM playback_sources p WHERE p.scene_id=s.id) srcs FROM scenes s JOIN scene_performers sp ON sp.scene_id=s.id {where_perf} WHERE s.duration_sec IS NOT NULL AND btrim(s.title) <> '' {where_pb} ) SELECT array_agg(id::text ORDER BY refs DESC, srcs DESC, created_at ASC) members FROM cand GROUP BY performer_id, nt, dur HAVING count(*) > 1 """ params = {"pid": pid} if pid else {} with session_scope() as s: rows = s.execute(text(sql), params).all() # dedup grup (ten sam zestaw może wyjść dla 2 performerów dzielących sceny) seen: set[frozenset] = set() out: list[list[str]] = [] for (members,) in rows: key = frozenset(members) if key in seen: continue seen.add(key) out.append(list(members)) return out def main() -> None: pid, commit, playback_only = _args() groups = _groups(pid, playback_only) pairs = sum(len(g) - 1 for g in groups) print( f"performer={pid or 'ALL'} playback_only={playback_only} " f"groups={len(groups)} merges={pairs} commit={commit}", flush=True, ) import uuid as _u merged = 0 errors = 0 for g in groups: keep = g[0] for drop in g[1:]: if not commit: print(f" [dry] keep {keep[:8]} <- drop {drop[:8]}") continue try: with session_scope() as s: merge_scenes(s, keep_id=_u.UUID(keep), drop_id=_u.UUID(drop), resolved_by="merge_exact_title_duration") merged += 1 if merged % 500 == 0: print(f" progress merged={merged}/{pairs} errors={errors}", flush=True) except Exception as e: errors += 1 if errors <= 20: print(f" ERR keep {keep[:8]} drop {drop[:8]}: {str(e)[:120]}") print(f"DONE merged={merged}/{pairs} errors={errors}", flush=True) if __name__ == "__main__": main()