feat(scheduler): periodic thumb-asset dedup (hdporn.gg/fullmovies.xxx)
The one-off cleanup merged ~13.5k same-video-different-title dupes, but they regrow as these sibling tubes re-ingest under new titles. Wire the asset-id+duration merge into the scheduler (every 12h, GOON_SCHED_THUMB_DEDUP_HOURS, 0=off) so it stays clean. Shared logic lives in app/scheduler/thumb_dedup.py (run_thumb_asset_dedup); the one-shot script now imports it. Same tight signature as the cleanup: family hosts only + identical duration (the bare asset-id number is reused across unrelated CDNs, so cross-host/diff- duration grouping is excluded). Reports 205b17d9 / 5a2944cb. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
parent
b5d9473898
commit
8b4783771f
5 changed files with 153 additions and 99 deletions
|
|
@ -103,6 +103,12 @@ class Settings(BaseSettings):
|
|||
sched_bulk_dedup_hours: int = Field(
|
||||
default=12, validation_alias="GOON_SCHED_BULK_DEDUP_HOURS"
|
||||
)
|
||||
# Thumb-asset dedup — scala dupy hdporn.gg/fullmovies.xxx (ten sam film, różne tytuły,
|
||||
# ten sam asset-id miniatury + długość). bulk_dedup tego nie łapie (brak phash/tytuł).
|
||||
# Re-ingesty pod nowymi tytułami → dupy odrastają, stąd cykliczny job. 12h. 0 = off.
|
||||
sched_thumb_dedup_hours: int = Field(
|
||||
default=12, validation_alias="GOON_SCHED_THUMB_DEDUP_HOURS"
|
||||
)
|
||||
# Taxonomy scene_count refresh — przelicza denormalizowane liczniki scen na
|
||||
# tags/performers/studios (hot-path /tags|/performers|/studios|/favorites czyta
|
||||
# gotową kolumnę zamiast agregować 6.3M scene_tags per-request). 3h cadence —
|
||||
|
|
|
|||
|
|
@ -258,6 +258,24 @@ def _job_bulk_dedup_performers() -> None:
|
|||
log.exception("[scheduler] bulk_dedup performers failed")
|
||||
|
||||
|
||||
def _job_thumb_asset_dedup() -> None:
|
||||
"""Scala same-video-różne-tytuły dupy hdporn.gg/fullmovies.xxx po asset-id miniatury
|
||||
(reports 205b17d9/5a2944cb). bulk_dedup tego nie łapie (różne tytuły, brak phash);
|
||||
re-ingesty pod nowymi tytułami → dupy odrastają, stąd cyklicznie."""
|
||||
log.info("[scheduler] thumb-asset dedup starting")
|
||||
try:
|
||||
from app.scheduler.thumb_dedup import run_thumb_asset_dedup
|
||||
# run_thumb_asset_dedup loguje liczby/errory wewnętrznie; _run_with_timeout
|
||||
# połyka return, więc nie kapturujemy.
|
||||
_run_with_timeout(
|
||||
lambda: run_thumb_asset_dedup(commit=True),
|
||||
label="thumb-asset-dedup",
|
||||
)
|
||||
log.info("[scheduler] thumb-asset dedup done")
|
||||
except Exception:
|
||||
log.exception("[scheduler] thumb-asset dedup failed")
|
||||
|
||||
|
||||
def _job_performer_continuous(refresh_after_days: int) -> None:
|
||||
"""Continuous worker — 1 performer per tick, ORDER BY last_searched_at NULLS FIRST.
|
||||
|
||||
|
|
@ -351,6 +369,17 @@ def build_scheduler(cfg: dict[str, Any]) -> BlockingScheduler:
|
|||
)
|
||||
log.info("scheduler: bulk-dedup performers every %dh", cfg["bulk_dedup_hours"])
|
||||
|
||||
if cfg.get("thumb_dedup_hours"):
|
||||
sched.add_job(
|
||||
_job_thumb_asset_dedup,
|
||||
IntervalTrigger(hours=cfg["thumb_dedup_hours"], start_date=INTERVAL_ANCHOR),
|
||||
id="thumb_asset_dedup",
|
||||
replace_existing=True,
|
||||
max_instances=1,
|
||||
coalesce=True,
|
||||
)
|
||||
log.info("scheduler: thumb-asset dedup every %dh", cfg["thumb_dedup_hours"])
|
||||
|
||||
if cfg.get("movie_ingest_hours"):
|
||||
sched.add_job(
|
||||
_job_movie_ingest,
|
||||
|
|
|
|||
95
app/scheduler/thumb_dedup.py
Normal file
95
app/scheduler/thumb_dedup.py
Normal file
|
|
@ -0,0 +1,95 @@
|
|||
"""Dedup tube-dupów po asset-id miniatury (rodzina hdporn.gg / fullmovies.xxx).
|
||||
|
||||
Te siostrzane platformy dzielą jedną przestrzeń id wideo i ingestują ten sam film pod
|
||||
RÓŻNYMI tytułami → bulk_dedup tego nie łapie (różne tytuły, brak phash). Sygnał:
|
||||
identyczny asset-id w ścieżce miniatury `/<bucket>000/<id>/` na img.hdporn.gg LUB
|
||||
img.fullmovies.xxx + IDENTYCZNA długość = ten sam film (verified 2026-06-14, próbka =
|
||||
realne dupy; reports 205b17d9 / 5a2944cb).
|
||||
|
||||
KRYTYCZNE: wspólny id-space TYLKO dla tej pary hostów. Inne CDN-y z tym samym wzorcem
|
||||
ścieżki (ptx.cdntrex, porn00, freshporno...) reużywają numery dla NIEpowiązanych filmów
|
||||
→ twardy host-filter + guard długości (GROUP BY asset_id, dur) wyklucza fałszywe pary.
|
||||
|
||||
Wołane periodycznie przez scheduler (`_job_thumb_asset_dedup`) — bo hdporn/fullmovies
|
||||
re-ingestują pod nowymi tytułami i dupy odrastają. Też jako one-shot skrypt
|
||||
(scripts/merge_dupe_thumb_asset.py).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import uuid as _u
|
||||
|
||||
from sqlalchemy import text
|
||||
|
||||
from app.db import session_scope
|
||||
from app.resolve.scene_merge import merge_scenes
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
_HOST_RE = r"://img\.(hdporn\.gg|fullmovies\.xxx)/[0-9]+000/[0-9]+/"
|
||||
|
||||
|
||||
def _groups(studio_id: str | None) -> list[list[str]]:
|
||||
where_studio = "AND s.studio_id = :sid" if studio_id else ""
|
||||
sql = f"""
|
||||
WITH cand AS (
|
||||
SELECT DISTINCT s.id,
|
||||
substring(p.thumbnail_url from '/[0-9]+000/([0-9]+)/') AS asset_id,
|
||||
s.duration_sec dur,
|
||||
s.created_at,
|
||||
(SELECT count(*) FROM scene_external_refs r WHERE r.scene_id=s.id) refs,
|
||||
(SELECT count(*) FROM playback_sources pp WHERE pp.scene_id=s.id) srcs
|
||||
FROM scenes s
|
||||
JOIN playback_sources p ON p.scene_id=s.id
|
||||
WHERE p.thumbnail_url ~ '{_HOST_RE}'
|
||||
AND p.dead_at IS NULL
|
||||
AND s.duration_sec IS NOT NULL
|
||||
{where_studio}
|
||||
)
|
||||
SELECT array_agg(id::text ORDER BY refs DESC, srcs DESC, created_at ASC) members
|
||||
FROM cand
|
||||
WHERE asset_id IS NOT NULL
|
||||
GROUP BY asset_id, dur
|
||||
HAVING count(DISTINCT id) > 1
|
||||
"""
|
||||
params = {"sid": studio_id} if studio_id else {}
|
||||
with session_scope() as s:
|
||||
rows = s.execute(text(sql), params).all()
|
||||
seen: set[frozenset] = set()
|
||||
out: list[list[str]] = []
|
||||
for (members,) in rows:
|
||||
key = frozenset(members)
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
out.append(list(members))
|
||||
return out
|
||||
|
||||
|
||||
def run_thumb_asset_dedup(
|
||||
*, studio_id: str | None = None, commit: bool = True
|
||||
) -> dict[str, int]:
|
||||
"""Scal dupy po asset-id miniatury. Zwraca {groups, merges, merged, errors}."""
|
||||
groups = _groups(studio_id)
|
||||
pairs = sum(len(g) - 1 for g in groups)
|
||||
merged = errors = 0
|
||||
for g in groups:
|
||||
keep = g[0]
|
||||
for drop in g[1:]:
|
||||
if not commit:
|
||||
continue
|
||||
try:
|
||||
with session_scope() as s:
|
||||
merge_scenes(
|
||||
s, keep_id=_u.UUID(keep), drop_id=_u.UUID(drop),
|
||||
resolved_by="thumb_asset_dedup",
|
||||
)
|
||||
merged += 1
|
||||
except Exception as e: # pragma: no cover - obronnie
|
||||
errors += 1
|
||||
if errors <= 20:
|
||||
log.warning("thumb_dedup keep %s drop %s: %s", keep[:8], drop[:8], str(e)[:120])
|
||||
result = {"groups": len(groups), "merges": pairs, "merged": merged, "errors": errors}
|
||||
if commit:
|
||||
log.info("thumb_asset_dedup: %s", result)
|
||||
return result
|
||||
|
|
@ -207,6 +207,9 @@ def run_forever() -> int:
|
|||
# Bulk-dedup performers — safety net dla duplikatów które resolver
|
||||
# pominął (np. freshporno scen przed fixem release_date). Run 12h.
|
||||
"bulk_dedup_hours": getattr(settings, "sched_bulk_dedup_hours", 12) or None,
|
||||
# Thumb-asset dedup — hdporn.gg/fullmovies.xxx same-video-różne-tytuły (reports
|
||||
# 205b17d9/5a2944cb). bulk_dedup tego nie łapie; dupy odrastają przy re-ingeście.
|
||||
"thumb_dedup_hours": getattr(settings, "sched_thumb_dedup_hours", 12) or None,
|
||||
# Taxonomy scene_count refresh — denormalizacja liczników (perf fix 0019).
|
||||
"taxonomy_counts_hours": getattr(settings, "sched_taxonomy_counts_hours", 3) or None,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,114 +1,35 @@
|
|||
"""Merge tube-dup po asset-id miniatury (rodzina hdporn.gg / fullmovies.xxx).
|
||||
"""One-shot merge tube-dupów po asset-id miniatury (rodzina hdporn.gg / fullmovies.xxx).
|
||||
|
||||
Te siostrzane platformy dzielą jedną przestrzeń id wideo i ingestują ten sam film pod
|
||||
RÓŻNYMI tytułami → bulk_dedup tego nie łapie (różne tytuły, brak phash, exact-title-merge
|
||||
nie działa). Sygnał: identyczny asset-id w ścieżce miniatury `/<bucket>000/<id>/` na
|
||||
`img.hdporn.gg` LUB `img.fullmovies.xxx` + IDENTYCZNA długość (co do sekundy) = ten sam
|
||||
film (zweryfikowane 2026-06-14, próbka = realne dupy; report 205b17d9 / 5a2944cb).
|
||||
|
||||
KRYTYCZNE: wspólny id-space istnieje TYLKO dla tej pary hostów. Inne CDN-y z tym samym
|
||||
wzorcem ścieżki (ptx.cdntrex, porn00, freshporno, pornhat...) reużywają te numery dla
|
||||
NIEpowiązanych filmów — grupowanie cross-host po gołym numerze daje fałszywe pary
|
||||
(dry-run 2026-06-14: "UsePOV Gia Paige" vs "chelsie rae bikini squad" pod tym samym
|
||||
numerem). Stąd twarde `~ img.(hdporn.gg|fullmovies.xxx)` + guard długości (GROUP BY
|
||||
asset_id, dur → różna długość przy tym samym numerze NIE jest łączona).
|
||||
|
||||
Keep = scena z największą liczbą external_refs → playback_sources → najstarsza.
|
||||
Merge przez resolve.scene_merge.merge_scenes (przenosi refs/performers/tags/fingerprints/
|
||||
playback_sources, kasuje drop).
|
||||
Logika współdzielona ze schedulerem: app/scheduler/thumb_dedup.py (job
|
||||
`_job_thumb_asset_dedup` woła to samo periodycznie). Pełny opis sygnatury i guardów
|
||||
w tamtym module.
|
||||
|
||||
Użycie (kontener worker):
|
||||
python scripts/merge_dupe_thumb_asset.py [STUDIO_ID] [--commit]
|
||||
Bez STUDIO_ID = global. Bez --commit = dry-run.
|
||||
Bez STUDIO_ID = global. Bez --commit = dry-run (lista par, nic nie scala).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
import uuid as _u
|
||||
|
||||
from sqlalchemy import text
|
||||
|
||||
from app.db import session_scope
|
||||
from app.resolve.scene_merge import merge_scenes
|
||||
|
||||
_HOST_RE = r"://img\.(hdporn\.gg|fullmovies\.xxx)/[0-9]+000/[0-9]+/"
|
||||
|
||||
|
||||
def _args() -> tuple[str | None, bool]:
|
||||
commit = "--commit" in sys.argv
|
||||
studio = None
|
||||
for a in sys.argv[1:]:
|
||||
if not a.startswith("--") and len(a) >= 32:
|
||||
studio = a
|
||||
return studio, commit
|
||||
|
||||
|
||||
def _groups(studio_id: str | None) -> list[list[str]]:
|
||||
where_studio = "AND s.studio_id = :sid" if studio_id else ""
|
||||
sql = f"""
|
||||
WITH cand AS (
|
||||
SELECT DISTINCT s.id,
|
||||
substring(p.thumbnail_url from '/[0-9]+000/([0-9]+)/') AS asset_id,
|
||||
s.duration_sec dur,
|
||||
s.created_at,
|
||||
(SELECT count(*) FROM scene_external_refs r WHERE r.scene_id=s.id) refs,
|
||||
(SELECT count(*) FROM playback_sources pp WHERE pp.scene_id=s.id) srcs
|
||||
FROM scenes s
|
||||
JOIN playback_sources p ON p.scene_id=s.id
|
||||
WHERE p.thumbnail_url ~ '{_HOST_RE}'
|
||||
AND p.dead_at IS NULL
|
||||
AND s.duration_sec IS NOT NULL
|
||||
{where_studio}
|
||||
)
|
||||
SELECT array_agg(id::text ORDER BY refs DESC, srcs DESC, created_at ASC) members
|
||||
FROM cand
|
||||
WHERE asset_id IS NOT NULL
|
||||
GROUP BY asset_id, dur
|
||||
HAVING count(DISTINCT id) > 1
|
||||
"""
|
||||
params = {"sid": studio_id} if studio_id else {}
|
||||
with session_scope() as s:
|
||||
rows = s.execute(text(sql), params).all()
|
||||
seen: set[frozenset] = set()
|
||||
out: list[list[str]] = []
|
||||
for (members,) in rows:
|
||||
key = frozenset(members)
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
out.append(list(members))
|
||||
return out
|
||||
from app.scheduler.thumb_dedup import _groups, run_thumb_asset_dedup
|
||||
|
||||
|
||||
def main() -> None:
|
||||
studio_id, commit = _args()
|
||||
groups = _groups(studio_id)
|
||||
pairs = sum(len(g) - 1 for g in groups)
|
||||
print(
|
||||
f"studio={studio_id or 'ALL'} groups={len(groups)} merges={pairs} commit={commit}",
|
||||
flush=True,
|
||||
)
|
||||
merged = errors = 0
|
||||
for g in groups:
|
||||
keep = g[0]
|
||||
for drop in g[1:]:
|
||||
commit = "--commit" in sys.argv
|
||||
studio = next((a for a in sys.argv[1:] if not a.startswith("--") and len(a) >= 32), None)
|
||||
|
||||
if not commit:
|
||||
print(f" [dry] keep {keep[:8]} <- drop {drop[:8]}")
|
||||
continue
|
||||
try:
|
||||
with session_scope() as s:
|
||||
merge_scenes(
|
||||
s, keep_id=_u.UUID(keep), drop_id=_u.UUID(drop),
|
||||
resolved_by="merge_dupe_thumb_asset",
|
||||
)
|
||||
merged += 1
|
||||
if merged % 500 == 0:
|
||||
print(f" progress merged={merged}/{pairs} errors={errors}", flush=True)
|
||||
except Exception as e:
|
||||
errors += 1
|
||||
if errors <= 20:
|
||||
print(f" ERR keep {keep[:8]} drop {drop[:8]}: {str(e)[:120]}")
|
||||
print(f"DONE merged={merged}/{pairs} errors={errors}", flush=True)
|
||||
groups = _groups(studio)
|
||||
pairs = sum(len(g) - 1 for g in groups)
|
||||
print(f"studio={studio or 'ALL'} groups={len(groups)} merges={pairs} commit=False", flush=True)
|
||||
for g in groups:
|
||||
for drop in g[1:]:
|
||||
print(f" [dry] keep {g[0][:8]} <- drop {drop[:8]}")
|
||||
return
|
||||
|
||||
res = run_thumb_asset_dedup(studio_id=studio, commit=True)
|
||||
print(f"studio={studio or 'ALL'} {res}", flush=True)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue