From f014a901de4e44d1c515d0e71d59f60758126f4c Mon Sep 17 00:00:00 2001 From: jtrzupek Date: Fri, 19 Jun 2026 11:20:48 +0200 Subject: [PATCH] feat(scheduler): periodic title+duration dedup (missing-merge tube dupes) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Missing-merge duplicates (same performer + identical normalized title + identical duration-to-the-second) that bulk_dedup misses — tube re-scrapes and cross-tube re-ingests like porn00 pulling a video already present from xnxx (reports 28fe8181/32df33b1). Extracted the proven merge_exact_title_duration logic into app/scheduler/title_duration_dedup.py (script now a thin wrapper), wired a 12h scheduler job (playback-only = what users actually see, GOON_SCHED_TITLE_DEDUP_HOURS). Signal is near-certain (two different videos don't share byte-identical title AND exact duration); no shared performer = not merged (over-match guard). Verified: job registers (jobs=14), backlog currently 0 after the one-shot global merge. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/config.py | 7 ++ app/scheduler/jobs.py | 30 +++++++ app/scheduler/title_duration_dedup.py | 89 +++++++++++++++++++++ app/scheduler/worker.py | 2 + scripts/merge_exact_title_duration.py | 109 ++++++-------------------- 5 files changed, 150 insertions(+), 87 deletions(-) create mode 100644 app/scheduler/title_duration_dedup.py diff --git a/app/config.py b/app/config.py index 7e4d053..524e453 100644 --- a/app/config.py +++ b/app/config.py @@ -109,6 +109,13 @@ class Settings(BaseSettings): sched_thumb_dedup_hours: int = Field( default=12, validation_alias="GOON_SCHED_THUMB_DEDUP_HOURS" ) + # Title+duration dedup — scala missing-merge dupy (ten sam performer + identyczny + # znormalizowany tytuł + długość co do sekundy), których bulk_dedup nie łapie (tube + # re-scrape / cross-tube np. porn00 vs xnxx, reports 28fe8181/32df33b1). Odrastają + # przy re-ingeście, stąd cyklicznie. 12h, playback-only (to co user widzi). 0 = off. + sched_title_dedup_hours: int = Field( + default=12, validation_alias="GOON_SCHED_TITLE_DEDUP_HOURS" + ) # Ingest freshness watchdog — alert do Sentry gdy aktywny tube (origin # tube:) przestał dawać nowe sceny > próg. Łapie zamrożenie # pojedynczego origin, którego globalny monitor (jeden Source "tube-scraper") nie diff --git a/app/scheduler/jobs.py b/app/scheduler/jobs.py index 9a973ef..3033aa5 100644 --- a/app/scheduler/jobs.py +++ b/app/scheduler/jobs.py @@ -276,6 +276,23 @@ def _job_thumb_asset_dedup() -> None: log.exception("[scheduler] thumb-asset dedup failed") +def _job_title_duration_dedup() -> None: + """Scal missing-merge dupy (ten sam performer + identyczny tytuł + długość) których + bulk_dedup nie łapie (tube re-scrape / cross-tube np. porn00 vs xnxx, reports + 28fe8181/32df33b1). playback-only = to co user widzi. Re-ingesty → dupy odrastają.""" + log.info("[scheduler] title-duration dedup starting") + try: + from app.scheduler.title_duration_dedup import run_title_duration_dedup + + _run_with_timeout( + lambda: run_title_duration_dedup(playback_only=True, commit=True), + label="title-duration-dedup", + ) + log.info("[scheduler] title-duration dedup done") + except Exception: + log.exception("[scheduler] title-duration dedup failed") + + def _job_ingest_watchdog(max_age_hours: int, search_max_age_hours: int) -> None: """Per-origin freshness watchdog — alert do Sentry gdy aktywny tube przestał dawać nowe sceny > próg (browse: max_age_hours, search: search_max_age_hours). Globalny @@ -409,6 +426,17 @@ def build_scheduler(cfg: dict[str, Any]) -> BlockingScheduler: ) log.info("scheduler: thumb-asset dedup every %dh", cfg["thumb_dedup_hours"]) + if cfg.get("title_dedup_hours"): + sched.add_job( + _job_title_duration_dedup, + IntervalTrigger(hours=cfg["title_dedup_hours"], start_date=INTERVAL_ANCHOR), + id="title_duration_dedup", + replace_existing=True, + max_instances=1, + coalesce=True, + ) + log.info("scheduler: title-duration dedup every %dh", cfg["title_dedup_hours"]) + if cfg.get("ingest_watchdog_hours"): wd_max_age = cfg.get("ingest_watchdog_max_age_hours") or 48 wd_search_max_age = cfg.get("ingest_watchdog_search_max_age_hours") or 168 @@ -561,4 +589,6 @@ DEFAULT_CONFIG: dict[str, Any] = { "ingest_watchdog_search_max_age_hours": 168, # Hetzner Cloud bandwidth monitor — co 6h, alert Sentry przy progach % included. "hetzner_monitor_hours": 6, + # Title+duration dedup — missing-merge tube-dupy, co 12h, playback-only. + "title_dedup_hours": 12, } diff --git a/app/scheduler/title_duration_dedup.py b/app/scheduler/title_duration_dedup.py new file mode 100644 index 0000000..d668ea7 --- /dev/null +++ b/app/scheduler/title_duration_dedup.py @@ -0,0 +1,89 @@ +"""Dedup missing-merge: ten sam performer + identyczny znormalizowany tytuł + identyczna +długość (co do sekundy). + +bulk_dedup łapie cross-source (tpdb↔stashdb) i exact-phash, ale NIE tube-dupy bez +fingerprintów (ta sama scena zescrapowana 2× pod różnym URL/slug, albo cross-tube np. +porn00 wciąga wideo już obecne z xnxx). User widzi „te same miniatury, duplikat" +(reports 28fe8181 / 32df33b1 — porn00). Sygnał `same performer + exact norm-title + +exact duration_sec` jest praktycznie pewny (dwa różne wideo nie mają byte-identycznego +tytułu I długości co do sekundy). Bez wspólnej aktorki NIE łączymy (over-match trap). + +Re-ingesty pod nowymi slug/tytułami → dupy odrastają, stąd cyklicznie (scheduler). +Też jako one-shot (scripts/merge_exact_title_duration.py importuje stąd). +""" +from __future__ import annotations + +import logging +import uuid as _u + +from sqlalchemy import text + +from app.db import session_scope +from app.resolve.scene_merge import merge_scenes + +log = logging.getLogger(__name__) + + +def _groups(pid: str | None, playback_only: bool) -> list[list[str]]: + where_perf = "AND sp.performer_id = :pid" if pid else "" + # app-visible: tylko sceny z żywym playbackiem (to co user faktycznie widzi) — + # pomija canonical stuby bez tube-linków. + where_pb = ( + "AND EXISTS (SELECT 1 FROM playback_sources p WHERE p.scene_id=s.id AND p.dead_at IS NULL)" + if playback_only else "" + ) + sql = f""" + WITH cand AS ( + SELECT s.id, sp.performer_id, lower(btrim(s.title)) nt, s.duration_sec dur, s.created_at, + (SELECT count(*) FROM scene_external_refs r WHERE r.scene_id=s.id) refs, + (SELECT count(*) FROM playback_sources p WHERE p.scene_id=s.id) srcs + FROM scenes s + JOIN scene_performers sp ON sp.scene_id=s.id {where_perf} + WHERE s.duration_sec IS NOT NULL AND btrim(s.title) <> '' {where_pb} + ) + SELECT array_agg(id::text ORDER BY refs DESC, srcs DESC, created_at ASC) members + FROM cand + GROUP BY performer_id, nt, dur + HAVING count(*) > 1 + """ + params = {"pid": pid} if pid else {} + with session_scope() as s: + rows = s.execute(text(sql), params).all() + seen: set[frozenset] = set() + out: list[list[str]] = [] + for (members,) in rows: + key = frozenset(members) + if key in seen: + continue + seen.add(key) + out.append(list(members)) + return out + + +def run_title_duration_dedup( + *, pid: str | None = None, playback_only: bool = True, commit: bool = True +) -> dict[str, int]: + """Scal dupy o identycznym performer+tytuł+długość. Zwraca {groups, merges, merged, errors}.""" + groups = _groups(pid, playback_only) + pairs = sum(len(g) - 1 for g in groups) + merged = errors = 0 + for g in groups: + keep = g[0] + for drop in g[1:]: + if not commit: + continue + try: + with session_scope() as s: + merge_scenes( + s, keep_id=_u.UUID(keep), drop_id=_u.UUID(drop), + resolved_by="title_duration_dedup", + ) + merged += 1 + except Exception as e: # pragma: no cover - obronnie + errors += 1 + if errors <= 20: + log.warning("title_dedup keep %s drop %s: %s", keep[:8], drop[:8], str(e)[:120]) + result = {"groups": len(groups), "merges": pairs, "merged": merged, "errors": errors} + if commit: + log.info("title_duration_dedup: %s", result) + return result diff --git a/app/scheduler/worker.py b/app/scheduler/worker.py index bd7bf0a..a88b485 100644 --- a/app/scheduler/worker.py +++ b/app/scheduler/worker.py @@ -210,6 +210,8 @@ def run_forever() -> int: # Thumb-asset dedup — hdporn.gg/fullmovies.xxx same-video-różne-tytuły (reports # 205b17d9/5a2944cb). bulk_dedup tego nie łapie; dupy odrastają przy re-ingeście. "thumb_dedup_hours": getattr(settings, "sched_thumb_dedup_hours", 12) or None, + # Title+duration dedup — missing-merge tube-dupy (reports 28fe8181/32df33b1). + "title_dedup_hours": getattr(settings, "sched_title_dedup_hours", 12) or None, # Taxonomy scene_count refresh — denormalizacja liczników (perf fix 0019). "taxonomy_counts_hours": getattr(settings, "sched_taxonomy_counts_hours", 3) or None, # Ingest freshness watchdog — per-origin alert do Sentry (report 14f3a655). diff --git a/scripts/merge_exact_title_duration.py b/scripts/merge_exact_title_duration.py index df3406f..1bd7a5e 100644 --- a/scripts/merge_exact_title_duration.py +++ b/scripts/merge_exact_title_duration.py @@ -1,113 +1,48 @@ """Merge missing-merge duplikatów: ten sam performer + identyczny znormalizowany tytuł + identyczna długość (co do sekundy). -Kontekst: bulk_dedup łapie cross-source (tpdb↔stashdb) i exact-phash, ale NIE łapie -tube-dup bez fingerprintów (np. ta sama scena zescrapowana 2× pod różnym URL/slug). -Na stronie performera user widzi wtedy "te same miniatury, duplikat" (bug-report -ef92809d — Bad Bella miała 25 takich par). Sygnał `same performer + exact norm-title -+ exact duration_sec` jest praktycznie pewny (dwa różne wideo nie mają byte-identycznego -tytułu I długości co do sekundy). - -Keep = scena z największą liczbą external_refs → potem playback_sources → potem najstarsza. -Merge przez resolve.scene_merge.merge_scenes (przenosi refs/performers/tags/fingerprints/ -playback_sources — playback move dodany 2026-06-08 razem z tym skryptem). +Logika w app/scheduler/title_duration_dedup.py (współdzielona ze schedulerem +`_job_title_duration_dedup`). Ten plik to cienki CLI wrapper. Użycie (kontener worker): - python scripts/merge_exact_title_duration.py [PERFORMER_ID] [--commit] -Bez PERFORMER_ID = wszyscy performerzy (global). Bez --commit = dry-run. + python scripts/merge_exact_title_duration.py [PERFORMER_ID] [--commit] [--playback-only] +Bez PERFORMER_ID = wszyscy (global). Bez --commit = dry-run (wypisuje pary). """ from __future__ import annotations import sys -from sqlalchemy import text - -from app.db import session_scope -from app.resolve.scene_merge import merge_scenes +from app.scheduler.title_duration_dedup import _groups, run_title_duration_dedup -def _args() -> tuple[str | None, bool, bool]: +def main() -> None: commit = "--commit" in sys.argv playback_only = "--playback-only" in sys.argv pid = None for a in sys.argv[1:]: if not a.startswith("--") and len(a) >= 32: pid = a - return pid, commit, playback_only + if not commit: + groups = _groups(pid, playback_only) + pairs = sum(len(g) - 1 for g in groups) + print( + f"performer={pid or 'ALL'} playback_only={playback_only} " + f"groups={len(groups)} merges={pairs} commit=False", + flush=True, + ) + for g in groups: + for drop in g[1:]: + print(f" [dry] keep {g[0][:8]} <- drop {drop[:8]}") + print(f"DONE merged=0/{pairs} errors=0", flush=True) + return -def _groups(pid: str | None, playback_only: bool = False) -> list[list[str]]: - # Grupy scen (per performer) o identycznym lower(trim(title)) + duration_sec. - # member order: refs DESC, srcs DESC, created_at ASC → pierwszy = keeper. - where_perf = "AND sp.performer_id = :pid" if pid else "" - # app-visible: tylko sceny z żywym playbackiem (to co user faktycznie widzi na - # stronach) — pomija canonical stuby bez tube-linków. - where_pb = ( - "AND EXISTS (SELECT 1 FROM playback_sources p WHERE p.scene_id=s.id AND p.dead_at IS NULL)" - if playback_only else "" - ) - sql = f""" - WITH cand AS ( - SELECT s.id, - sp.performer_id, - lower(btrim(s.title)) nt, - s.duration_sec dur, - s.created_at, - (SELECT count(*) FROM scene_external_refs r WHERE r.scene_id=s.id) refs, - (SELECT count(*) FROM playback_sources p WHERE p.scene_id=s.id) srcs - FROM scenes s - JOIN scene_performers sp ON sp.scene_id=s.id {where_perf} - WHERE s.duration_sec IS NOT NULL AND btrim(s.title) <> '' {where_pb} - ) - SELECT array_agg(id::text ORDER BY refs DESC, srcs DESC, created_at ASC) members - FROM cand - GROUP BY performer_id, nt, dur - HAVING count(*) > 1 - """ - params = {"pid": pid} if pid else {} - with session_scope() as s: - rows = s.execute(text(sql), params).all() - # dedup grup (ten sam zestaw może wyjść dla 2 performerów dzielących sceny) - seen: set[frozenset] = set() - out: list[list[str]] = [] - for (members,) in rows: - key = frozenset(members) - if key in seen: - continue - seen.add(key) - out.append(list(members)) - return out - - -def main() -> None: - pid, commit, playback_only = _args() - groups = _groups(pid, playback_only) - pairs = sum(len(g) - 1 for g in groups) + res = run_title_duration_dedup(pid=pid, playback_only=playback_only, commit=True) print( - f"performer={pid or 'ALL'} playback_only={playback_only} " - f"groups={len(groups)} merges={pairs} commit={commit}", + f"DONE merged={res['merged']}/{res['merges']} errors={res['errors']} " + f"groups={res['groups']}", flush=True, ) - import uuid as _u - merged = 0 - errors = 0 - for g in groups: - keep = g[0] - for drop in g[1:]: - if not commit: - print(f" [dry] keep {keep[:8]} <- drop {drop[:8]}") - continue - try: - with session_scope() as s: - merge_scenes(s, keep_id=_u.UUID(keep), drop_id=_u.UUID(drop), resolved_by="merge_exact_title_duration") - merged += 1 - if merged % 500 == 0: - print(f" progress merged={merged}/{pairs} errors={errors}", flush=True) - except Exception as e: - errors += 1 - if errors <= 20: - print(f" ERR keep {keep[:8]} drop {drop[:8]}: {str(e)[:120]}") - print(f"DONE merged={merged}/{pairs} errors={errors}", flush=True) if __name__ == "__main__":