goon/app/scheduler/title_duration_dedup.py
jtrzupek f014a901de feat(scheduler): periodic title+duration dedup (missing-merge tube dupes)
Missing-merge duplicates (same performer + identical normalized title + identical duration-to-the-second) that bulk_dedup misses — tube re-scrapes and cross-tube re-ingests like porn00 pulling a video already present from xnxx (reports 28fe8181/32df33b1). Extracted the proven merge_exact_title_duration logic into app/scheduler/title_duration_dedup.py (script now a thin wrapper), wired a 12h scheduler job (playback-only = what users actually see, GOON_SCHED_TITLE_DEDUP_HOURS). Signal is near-certain (two different videos don't share byte-identical title AND exact duration); no shared performer = not merged (over-match guard). Verified: job registers (jobs=14), backlog currently 0 after the one-shot global merge.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-19 11:20:48 +02:00

89 lines
3.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Dedup missing-merge: ten sam performer + identyczny znormalizowany tytuł + identyczna
długość (co do sekundy).
bulk_dedup łapie cross-source (tpdb↔stashdb) i exact-phash, ale NIE tube-dupy bez
fingerprintów (ta sama scena zescrapowana 2× pod różnym URL/slug, albo cross-tube np.
porn00 wciąga wideo już obecne z xnxx). User widzi „te same miniatury, duplikat"
(reports 28fe8181 / 32df33b1 — porn00). Sygnał `same performer + exact norm-title +
exact duration_sec` jest praktycznie pewny (dwa różne wideo nie mają byte-identycznego
tytułu I długości co do sekundy). Bez wspólnej aktorki NIE łączymy (over-match trap).
Re-ingesty pod nowymi slug/tytułami → dupy odrastają, stąd cyklicznie (scheduler).
Też jako one-shot (scripts/merge_exact_title_duration.py importuje stąd).
"""
from __future__ import annotations
import logging
import uuid as _u
from sqlalchemy import text
from app.db import session_scope
from app.resolve.scene_merge import merge_scenes
log = logging.getLogger(__name__)
def _groups(pid: str | None, playback_only: bool) -> list[list[str]]:
where_perf = "AND sp.performer_id = :pid" if pid else ""
# app-visible: tylko sceny z żywym playbackiem (to co user faktycznie widzi) —
# pomija canonical stuby bez tube-linków.
where_pb = (
"AND EXISTS (SELECT 1 FROM playback_sources p WHERE p.scene_id=s.id AND p.dead_at IS NULL)"
if playback_only else ""
)
sql = f"""
WITH cand AS (
SELECT s.id, sp.performer_id, lower(btrim(s.title)) nt, s.duration_sec dur, s.created_at,
(SELECT count(*) FROM scene_external_refs r WHERE r.scene_id=s.id) refs,
(SELECT count(*) FROM playback_sources p WHERE p.scene_id=s.id) srcs
FROM scenes s
JOIN scene_performers sp ON sp.scene_id=s.id {where_perf}
WHERE s.duration_sec IS NOT NULL AND btrim(s.title) <> '' {where_pb}
)
SELECT array_agg(id::text ORDER BY refs DESC, srcs DESC, created_at ASC) members
FROM cand
GROUP BY performer_id, nt, dur
HAVING count(*) > 1
"""
params = {"pid": pid} if pid else {}
with session_scope() as s:
rows = s.execute(text(sql), params).all()
seen: set[frozenset] = set()
out: list[list[str]] = []
for (members,) in rows:
key = frozenset(members)
if key in seen:
continue
seen.add(key)
out.append(list(members))
return out
def run_title_duration_dedup(
*, pid: str | None = None, playback_only: bool = True, commit: bool = True
) -> dict[str, int]:
"""Scal dupy o identycznym performer+tytuł+długość. Zwraca {groups, merges, merged, errors}."""
groups = _groups(pid, playback_only)
pairs = sum(len(g) - 1 for g in groups)
merged = errors = 0
for g in groups:
keep = g[0]
for drop in g[1:]:
if not commit:
continue
try:
with session_scope() as s:
merge_scenes(
s, keep_id=_u.UUID(keep), drop_id=_u.UUID(drop),
resolved_by="title_duration_dedup",
)
merged += 1
except Exception as e: # pragma: no cover - obronnie
errors += 1
if errors <= 20:
log.warning("title_dedup keep %s drop %s: %s", keep[:8], drop[:8], str(e)[:120])
result = {"groups": len(groups), "merges": pairs, "merged": merged, "errors": errors}
if commit:
log.info("title_duration_dedup: %s", result)
return result