"""Dedup tube-dupów po asset-id miniatury (rodzina hdporn.gg / fullmovies.xxx). Te siostrzane platformy dzielą jedną przestrzeń id wideo i ingestują ten sam film pod RÓŻNYMI tytułami → bulk_dedup tego nie łapie (różne tytuły, brak phash). Sygnał: identyczny asset-id w ścieżce miniatury `/000//` na img.hdporn.gg LUB img.fullmovies.xxx + IDENTYCZNA długość = ten sam film (verified 2026-06-14, próbka = realne dupy; reports 205b17d9 / 5a2944cb). KRYTYCZNE: wspólny id-space TYLKO dla tej pary hostów. Inne CDN-y z tym samym wzorcem ścieżki (ptx.cdntrex, porn00, freshporno...) reużywają numery dla NIEpowiązanych filmów → twardy host-filter + guard długości (GROUP BY asset_id, dur) wyklucza fałszywe pary. Wołane periodycznie przez scheduler (`_job_thumb_asset_dedup`) — bo hdporn/fullmovies re-ingestują pod nowymi tytułami i dupy odrastają. Też jako one-shot skrypt (scripts/merge_dupe_thumb_asset.py). """ from __future__ import annotations import logging import uuid as _u from sqlalchemy import text from app.db import session_scope from app.resolve.scene_merge import merge_scenes log = logging.getLogger(__name__) _HOST_RE = r"://img\.(hdporn\.gg|fullmovies\.xxx)/[0-9]+000/[0-9]+/" def _groups(studio_id: str | None) -> list[list[str]]: where_studio = "AND s.studio_id = :sid" if studio_id else "" sql = f""" WITH cand AS ( SELECT DISTINCT s.id, substring(p.thumbnail_url from '/[0-9]+000/([0-9]+)/') AS asset_id, s.duration_sec dur, s.created_at, (SELECT count(*) FROM scene_external_refs r WHERE r.scene_id=s.id) refs, (SELECT count(*) FROM playback_sources pp WHERE pp.scene_id=s.id) srcs FROM scenes s JOIN playback_sources p ON p.scene_id=s.id WHERE p.thumbnail_url ~ '{_HOST_RE}' AND p.dead_at IS NULL AND s.duration_sec IS NOT NULL {where_studio} ) SELECT array_agg(id::text ORDER BY refs DESC, srcs DESC, created_at ASC) members FROM cand WHERE asset_id IS NOT NULL GROUP BY asset_id, dur HAVING count(DISTINCT id) > 1 """ params = {"sid": studio_id} if studio_id else {} with session_scope() as s: rows = s.execute(text(sql), params).all() seen: set[frozenset] = set() out: list[list[str]] = [] for (members,) in rows: key = frozenset(members) if key in seen: continue seen.add(key) out.append(list(members)) return out def run_thumb_asset_dedup( *, studio_id: str | None = None, commit: bool = True ) -> dict[str, int]: """Scal dupy po asset-id miniatury. Zwraca {groups, merges, merged, errors}.""" groups = _groups(studio_id) pairs = sum(len(g) - 1 for g in groups) merged = errors = 0 for g in groups: keep = g[0] for drop in g[1:]: if not commit: continue try: with session_scope() as s: merge_scenes( s, keep_id=_u.UUID(keep), drop_id=_u.UUID(drop), resolved_by="thumb_asset_dedup", ) merged += 1 except Exception as e: # pragma: no cover - obronnie errors += 1 if errors <= 20: log.warning("thumb_dedup keep %s drop %s: %s", keep[:8], drop[:8], str(e)[:120]) result = {"groups": len(groups), "merges": pairs, "merged": merged, "errors": errors} if commit: log.info("thumb_asset_dedup: %s", result) return result