feat(scheduler): periodic title+duration dedup (missing-merge tube dupes)
Missing-merge duplicates (same performer + identical normalized title + identical duration-to-the-second) that bulk_dedup misses — tube re-scrapes and cross-tube re-ingests like porn00 pulling a video already present from xnxx (reports 28fe8181/32df33b1). Extracted the proven merge_exact_title_duration logic into app/scheduler/title_duration_dedup.py (script now a thin wrapper), wired a 12h scheduler job (playback-only = what users actually see, GOON_SCHED_TITLE_DEDUP_HOURS). Signal is near-certain (two different videos don't share byte-identical title AND exact duration); no shared performer = not merged (over-match guard). Verified: job registers (jobs=14), backlog currently 0 after the one-shot global merge. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
476cbb8d16
commit
f014a901de
5 changed files with 150 additions and 87 deletions
|
|
@ -109,6 +109,13 @@ class Settings(BaseSettings):
|
||||||
sched_thumb_dedup_hours: int = Field(
|
sched_thumb_dedup_hours: int = Field(
|
||||||
default=12, validation_alias="GOON_SCHED_THUMB_DEDUP_HOURS"
|
default=12, validation_alias="GOON_SCHED_THUMB_DEDUP_HOURS"
|
||||||
)
|
)
|
||||||
|
# Title+duration dedup — scala missing-merge dupy (ten sam performer + identyczny
|
||||||
|
# znormalizowany tytuł + długość co do sekundy), których bulk_dedup nie łapie (tube
|
||||||
|
# re-scrape / cross-tube np. porn00 vs xnxx, reports 28fe8181/32df33b1). Odrastają
|
||||||
|
# przy re-ingeście, stąd cyklicznie. 12h, playback-only (to co user widzi). 0 = off.
|
||||||
|
sched_title_dedup_hours: int = Field(
|
||||||
|
default=12, validation_alias="GOON_SCHED_TITLE_DEDUP_HOURS"
|
||||||
|
)
|
||||||
# Ingest freshness watchdog — alert do Sentry gdy aktywny tube (origin
|
# Ingest freshness watchdog — alert do Sentry gdy aktywny tube (origin
|
||||||
# tube:<sitetag>) przestał dawać nowe sceny > próg. Łapie zamrożenie
|
# tube:<sitetag>) przestał dawać nowe sceny > próg. Łapie zamrożenie
|
||||||
# pojedynczego origin, którego globalny monitor (jeden Source "tube-scraper") nie
|
# pojedynczego origin, którego globalny monitor (jeden Source "tube-scraper") nie
|
||||||
|
|
|
||||||
|
|
@ -276,6 +276,23 @@ def _job_thumb_asset_dedup() -> None:
|
||||||
log.exception("[scheduler] thumb-asset dedup failed")
|
log.exception("[scheduler] thumb-asset dedup failed")
|
||||||
|
|
||||||
|
|
||||||
|
def _job_title_duration_dedup() -> None:
|
||||||
|
"""Scal missing-merge dupy (ten sam performer + identyczny tytuł + długość) których
|
||||||
|
bulk_dedup nie łapie (tube re-scrape / cross-tube np. porn00 vs xnxx, reports
|
||||||
|
28fe8181/32df33b1). playback-only = to co user widzi. Re-ingesty → dupy odrastają."""
|
||||||
|
log.info("[scheduler] title-duration dedup starting")
|
||||||
|
try:
|
||||||
|
from app.scheduler.title_duration_dedup import run_title_duration_dedup
|
||||||
|
|
||||||
|
_run_with_timeout(
|
||||||
|
lambda: run_title_duration_dedup(playback_only=True, commit=True),
|
||||||
|
label="title-duration-dedup",
|
||||||
|
)
|
||||||
|
log.info("[scheduler] title-duration dedup done")
|
||||||
|
except Exception:
|
||||||
|
log.exception("[scheduler] title-duration dedup failed")
|
||||||
|
|
||||||
|
|
||||||
def _job_ingest_watchdog(max_age_hours: int, search_max_age_hours: int) -> None:
|
def _job_ingest_watchdog(max_age_hours: int, search_max_age_hours: int) -> None:
|
||||||
"""Per-origin freshness watchdog — alert do Sentry gdy aktywny tube przestał dawać
|
"""Per-origin freshness watchdog — alert do Sentry gdy aktywny tube przestał dawać
|
||||||
nowe sceny > próg (browse: max_age_hours, search: search_max_age_hours). Globalny
|
nowe sceny > próg (browse: max_age_hours, search: search_max_age_hours). Globalny
|
||||||
|
|
@ -409,6 +426,17 @@ def build_scheduler(cfg: dict[str, Any]) -> BlockingScheduler:
|
||||||
)
|
)
|
||||||
log.info("scheduler: thumb-asset dedup every %dh", cfg["thumb_dedup_hours"])
|
log.info("scheduler: thumb-asset dedup every %dh", cfg["thumb_dedup_hours"])
|
||||||
|
|
||||||
|
if cfg.get("title_dedup_hours"):
|
||||||
|
sched.add_job(
|
||||||
|
_job_title_duration_dedup,
|
||||||
|
IntervalTrigger(hours=cfg["title_dedup_hours"], start_date=INTERVAL_ANCHOR),
|
||||||
|
id="title_duration_dedup",
|
||||||
|
replace_existing=True,
|
||||||
|
max_instances=1,
|
||||||
|
coalesce=True,
|
||||||
|
)
|
||||||
|
log.info("scheduler: title-duration dedup every %dh", cfg["title_dedup_hours"])
|
||||||
|
|
||||||
if cfg.get("ingest_watchdog_hours"):
|
if cfg.get("ingest_watchdog_hours"):
|
||||||
wd_max_age = cfg.get("ingest_watchdog_max_age_hours") or 48
|
wd_max_age = cfg.get("ingest_watchdog_max_age_hours") or 48
|
||||||
wd_search_max_age = cfg.get("ingest_watchdog_search_max_age_hours") or 168
|
wd_search_max_age = cfg.get("ingest_watchdog_search_max_age_hours") or 168
|
||||||
|
|
@ -561,4 +589,6 @@ DEFAULT_CONFIG: dict[str, Any] = {
|
||||||
"ingest_watchdog_search_max_age_hours": 168,
|
"ingest_watchdog_search_max_age_hours": 168,
|
||||||
# Hetzner Cloud bandwidth monitor — co 6h, alert Sentry przy progach % included.
|
# Hetzner Cloud bandwidth monitor — co 6h, alert Sentry przy progach % included.
|
||||||
"hetzner_monitor_hours": 6,
|
"hetzner_monitor_hours": 6,
|
||||||
|
# Title+duration dedup — missing-merge tube-dupy, co 12h, playback-only.
|
||||||
|
"title_dedup_hours": 12,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
89
app/scheduler/title_duration_dedup.py
Normal file
89
app/scheduler/title_duration_dedup.py
Normal file
|
|
@ -0,0 +1,89 @@
|
||||||
|
"""Dedup missing-merge: ten sam performer + identyczny znormalizowany tytuł + identyczna
|
||||||
|
długość (co do sekundy).
|
||||||
|
|
||||||
|
bulk_dedup łapie cross-source (tpdb↔stashdb) i exact-phash, ale NIE tube-dupy bez
|
||||||
|
fingerprintów (ta sama scena zescrapowana 2× pod różnym URL/slug, albo cross-tube np.
|
||||||
|
porn00 wciąga wideo już obecne z xnxx). User widzi „te same miniatury, duplikat"
|
||||||
|
(reports 28fe8181 / 32df33b1 — porn00). Sygnał `same performer + exact norm-title +
|
||||||
|
exact duration_sec` jest praktycznie pewny (dwa różne wideo nie mają byte-identycznego
|
||||||
|
tytułu I długości co do sekundy). Bez wspólnej aktorki NIE łączymy (over-match trap).
|
||||||
|
|
||||||
|
Re-ingesty pod nowymi slug/tytułami → dupy odrastają, stąd cyklicznie (scheduler).
|
||||||
|
Też jako one-shot (scripts/merge_exact_title_duration.py importuje stąd).
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import uuid as _u
|
||||||
|
|
||||||
|
from sqlalchemy import text
|
||||||
|
|
||||||
|
from app.db import session_scope
|
||||||
|
from app.resolve.scene_merge import merge_scenes
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def _groups(pid: str | None, playback_only: bool) -> list[list[str]]:
|
||||||
|
where_perf = "AND sp.performer_id = :pid" if pid else ""
|
||||||
|
# app-visible: tylko sceny z żywym playbackiem (to co user faktycznie widzi) —
|
||||||
|
# pomija canonical stuby bez tube-linków.
|
||||||
|
where_pb = (
|
||||||
|
"AND EXISTS (SELECT 1 FROM playback_sources p WHERE p.scene_id=s.id AND p.dead_at IS NULL)"
|
||||||
|
if playback_only else ""
|
||||||
|
)
|
||||||
|
sql = f"""
|
||||||
|
WITH cand AS (
|
||||||
|
SELECT s.id, sp.performer_id, lower(btrim(s.title)) nt, s.duration_sec dur, s.created_at,
|
||||||
|
(SELECT count(*) FROM scene_external_refs r WHERE r.scene_id=s.id) refs,
|
||||||
|
(SELECT count(*) FROM playback_sources p WHERE p.scene_id=s.id) srcs
|
||||||
|
FROM scenes s
|
||||||
|
JOIN scene_performers sp ON sp.scene_id=s.id {where_perf}
|
||||||
|
WHERE s.duration_sec IS NOT NULL AND btrim(s.title) <> '' {where_pb}
|
||||||
|
)
|
||||||
|
SELECT array_agg(id::text ORDER BY refs DESC, srcs DESC, created_at ASC) members
|
||||||
|
FROM cand
|
||||||
|
GROUP BY performer_id, nt, dur
|
||||||
|
HAVING count(*) > 1
|
||||||
|
"""
|
||||||
|
params = {"pid": pid} if pid else {}
|
||||||
|
with session_scope() as s:
|
||||||
|
rows = s.execute(text(sql), params).all()
|
||||||
|
seen: set[frozenset] = set()
|
||||||
|
out: list[list[str]] = []
|
||||||
|
for (members,) in rows:
|
||||||
|
key = frozenset(members)
|
||||||
|
if key in seen:
|
||||||
|
continue
|
||||||
|
seen.add(key)
|
||||||
|
out.append(list(members))
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def run_title_duration_dedup(
|
||||||
|
*, pid: str | None = None, playback_only: bool = True, commit: bool = True
|
||||||
|
) -> dict[str, int]:
|
||||||
|
"""Scal dupy o identycznym performer+tytuł+długość. Zwraca {groups, merges, merged, errors}."""
|
||||||
|
groups = _groups(pid, playback_only)
|
||||||
|
pairs = sum(len(g) - 1 for g in groups)
|
||||||
|
merged = errors = 0
|
||||||
|
for g in groups:
|
||||||
|
keep = g[0]
|
||||||
|
for drop in g[1:]:
|
||||||
|
if not commit:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
with session_scope() as s:
|
||||||
|
merge_scenes(
|
||||||
|
s, keep_id=_u.UUID(keep), drop_id=_u.UUID(drop),
|
||||||
|
resolved_by="title_duration_dedup",
|
||||||
|
)
|
||||||
|
merged += 1
|
||||||
|
except Exception as e: # pragma: no cover - obronnie
|
||||||
|
errors += 1
|
||||||
|
if errors <= 20:
|
||||||
|
log.warning("title_dedup keep %s drop %s: %s", keep[:8], drop[:8], str(e)[:120])
|
||||||
|
result = {"groups": len(groups), "merges": pairs, "merged": merged, "errors": errors}
|
||||||
|
if commit:
|
||||||
|
log.info("title_duration_dedup: %s", result)
|
||||||
|
return result
|
||||||
|
|
@ -210,6 +210,8 @@ def run_forever() -> int:
|
||||||
# Thumb-asset dedup — hdporn.gg/fullmovies.xxx same-video-różne-tytuły (reports
|
# Thumb-asset dedup — hdporn.gg/fullmovies.xxx same-video-różne-tytuły (reports
|
||||||
# 205b17d9/5a2944cb). bulk_dedup tego nie łapie; dupy odrastają przy re-ingeście.
|
# 205b17d9/5a2944cb). bulk_dedup tego nie łapie; dupy odrastają przy re-ingeście.
|
||||||
"thumb_dedup_hours": getattr(settings, "sched_thumb_dedup_hours", 12) or None,
|
"thumb_dedup_hours": getattr(settings, "sched_thumb_dedup_hours", 12) or None,
|
||||||
|
# Title+duration dedup — missing-merge tube-dupy (reports 28fe8181/32df33b1).
|
||||||
|
"title_dedup_hours": getattr(settings, "sched_title_dedup_hours", 12) or None,
|
||||||
# Taxonomy scene_count refresh — denormalizacja liczników (perf fix 0019).
|
# Taxonomy scene_count refresh — denormalizacja liczników (perf fix 0019).
|
||||||
"taxonomy_counts_hours": getattr(settings, "sched_taxonomy_counts_hours", 3) or None,
|
"taxonomy_counts_hours": getattr(settings, "sched_taxonomy_counts_hours", 3) or None,
|
||||||
# Ingest freshness watchdog — per-origin alert do Sentry (report 14f3a655).
|
# Ingest freshness watchdog — per-origin alert do Sentry (report 14f3a655).
|
||||||
|
|
|
||||||
|
|
@ -1,113 +1,48 @@
|
||||||
"""Merge missing-merge duplikatów: ten sam performer + identyczny znormalizowany tytuł
|
"""Merge missing-merge duplikatów: ten sam performer + identyczny znormalizowany tytuł
|
||||||
+ identyczna długość (co do sekundy).
|
+ identyczna długość (co do sekundy).
|
||||||
|
|
||||||
Kontekst: bulk_dedup łapie cross-source (tpdb↔stashdb) i exact-phash, ale NIE łapie
|
Logika w app/scheduler/title_duration_dedup.py (współdzielona ze schedulerem
|
||||||
tube-dup bez fingerprintów (np. ta sama scena zescrapowana 2× pod różnym URL/slug).
|
`_job_title_duration_dedup`). Ten plik to cienki CLI wrapper.
|
||||||
Na stronie performera user widzi wtedy "te same miniatury, duplikat" (bug-report
|
|
||||||
ef92809d — Bad Bella miała 25 takich par). Sygnał `same performer + exact norm-title
|
|
||||||
+ exact duration_sec` jest praktycznie pewny (dwa różne wideo nie mają byte-identycznego
|
|
||||||
tytułu I długości co do sekundy).
|
|
||||||
|
|
||||||
Keep = scena z największą liczbą external_refs → potem playback_sources → potem najstarsza.
|
|
||||||
Merge przez resolve.scene_merge.merge_scenes (przenosi refs/performers/tags/fingerprints/
|
|
||||||
playback_sources — playback move dodany 2026-06-08 razem z tym skryptem).
|
|
||||||
|
|
||||||
Użycie (kontener worker):
|
Użycie (kontener worker):
|
||||||
python scripts/merge_exact_title_duration.py [PERFORMER_ID] [--commit]
|
python scripts/merge_exact_title_duration.py [PERFORMER_ID] [--commit] [--playback-only]
|
||||||
Bez PERFORMER_ID = wszyscy performerzy (global). Bez --commit = dry-run.
|
Bez PERFORMER_ID = wszyscy (global). Bez --commit = dry-run (wypisuje pary).
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
from sqlalchemy import text
|
from app.scheduler.title_duration_dedup import _groups, run_title_duration_dedup
|
||||||
|
|
||||||
from app.db import session_scope
|
|
||||||
from app.resolve.scene_merge import merge_scenes
|
|
||||||
|
|
||||||
|
|
||||||
def _args() -> tuple[str | None, bool, bool]:
|
def main() -> None:
|
||||||
commit = "--commit" in sys.argv
|
commit = "--commit" in sys.argv
|
||||||
playback_only = "--playback-only" in sys.argv
|
playback_only = "--playback-only" in sys.argv
|
||||||
pid = None
|
pid = None
|
||||||
for a in sys.argv[1:]:
|
for a in sys.argv[1:]:
|
||||||
if not a.startswith("--") and len(a) >= 32:
|
if not a.startswith("--") and len(a) >= 32:
|
||||||
pid = a
|
pid = a
|
||||||
return pid, commit, playback_only
|
|
||||||
|
|
||||||
|
if not commit:
|
||||||
|
groups = _groups(pid, playback_only)
|
||||||
|
pairs = sum(len(g) - 1 for g in groups)
|
||||||
|
print(
|
||||||
|
f"performer={pid or 'ALL'} playback_only={playback_only} "
|
||||||
|
f"groups={len(groups)} merges={pairs} commit=False",
|
||||||
|
flush=True,
|
||||||
|
)
|
||||||
|
for g in groups:
|
||||||
|
for drop in g[1:]:
|
||||||
|
print(f" [dry] keep {g[0][:8]} <- drop {drop[:8]}")
|
||||||
|
print(f"DONE merged=0/{pairs} errors=0", flush=True)
|
||||||
|
return
|
||||||
|
|
||||||
def _groups(pid: str | None, playback_only: bool = False) -> list[list[str]]:
|
res = run_title_duration_dedup(pid=pid, playback_only=playback_only, commit=True)
|
||||||
# Grupy scen (per performer) o identycznym lower(trim(title)) + duration_sec.
|
|
||||||
# member order: refs DESC, srcs DESC, created_at ASC → pierwszy = keeper.
|
|
||||||
where_perf = "AND sp.performer_id = :pid" if pid else ""
|
|
||||||
# app-visible: tylko sceny z żywym playbackiem (to co user faktycznie widzi na
|
|
||||||
# stronach) — pomija canonical stuby bez tube-linków.
|
|
||||||
where_pb = (
|
|
||||||
"AND EXISTS (SELECT 1 FROM playback_sources p WHERE p.scene_id=s.id AND p.dead_at IS NULL)"
|
|
||||||
if playback_only else ""
|
|
||||||
)
|
|
||||||
sql = f"""
|
|
||||||
WITH cand AS (
|
|
||||||
SELECT s.id,
|
|
||||||
sp.performer_id,
|
|
||||||
lower(btrim(s.title)) nt,
|
|
||||||
s.duration_sec dur,
|
|
||||||
s.created_at,
|
|
||||||
(SELECT count(*) FROM scene_external_refs r WHERE r.scene_id=s.id) refs,
|
|
||||||
(SELECT count(*) FROM playback_sources p WHERE p.scene_id=s.id) srcs
|
|
||||||
FROM scenes s
|
|
||||||
JOIN scene_performers sp ON sp.scene_id=s.id {where_perf}
|
|
||||||
WHERE s.duration_sec IS NOT NULL AND btrim(s.title) <> '' {where_pb}
|
|
||||||
)
|
|
||||||
SELECT array_agg(id::text ORDER BY refs DESC, srcs DESC, created_at ASC) members
|
|
||||||
FROM cand
|
|
||||||
GROUP BY performer_id, nt, dur
|
|
||||||
HAVING count(*) > 1
|
|
||||||
"""
|
|
||||||
params = {"pid": pid} if pid else {}
|
|
||||||
with session_scope() as s:
|
|
||||||
rows = s.execute(text(sql), params).all()
|
|
||||||
# dedup grup (ten sam zestaw może wyjść dla 2 performerów dzielących sceny)
|
|
||||||
seen: set[frozenset] = set()
|
|
||||||
out: list[list[str]] = []
|
|
||||||
for (members,) in rows:
|
|
||||||
key = frozenset(members)
|
|
||||||
if key in seen:
|
|
||||||
continue
|
|
||||||
seen.add(key)
|
|
||||||
out.append(list(members))
|
|
||||||
return out
|
|
||||||
|
|
||||||
|
|
||||||
def main() -> None:
|
|
||||||
pid, commit, playback_only = _args()
|
|
||||||
groups = _groups(pid, playback_only)
|
|
||||||
pairs = sum(len(g) - 1 for g in groups)
|
|
||||||
print(
|
print(
|
||||||
f"performer={pid or 'ALL'} playback_only={playback_only} "
|
f"DONE merged={res['merged']}/{res['merges']} errors={res['errors']} "
|
||||||
f"groups={len(groups)} merges={pairs} commit={commit}",
|
f"groups={res['groups']}",
|
||||||
flush=True,
|
flush=True,
|
||||||
)
|
)
|
||||||
import uuid as _u
|
|
||||||
merged = 0
|
|
||||||
errors = 0
|
|
||||||
for g in groups:
|
|
||||||
keep = g[0]
|
|
||||||
for drop in g[1:]:
|
|
||||||
if not commit:
|
|
||||||
print(f" [dry] keep {keep[:8]} <- drop {drop[:8]}")
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
with session_scope() as s:
|
|
||||||
merge_scenes(s, keep_id=_u.UUID(keep), drop_id=_u.UUID(drop), resolved_by="merge_exact_title_duration")
|
|
||||||
merged += 1
|
|
||||||
if merged % 500 == 0:
|
|
||||||
print(f" progress merged={merged}/{pairs} errors={errors}", flush=True)
|
|
||||||
except Exception as e:
|
|
||||||
errors += 1
|
|
||||||
if errors <= 20:
|
|
||||||
print(f" ERR keep {keep[:8]} drop {drop[:8]}: {str(e)[:120]}")
|
|
||||||
print(f"DONE merged={merged}/{pairs} errors={errors}", flush=True)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue