"""Dedup scen ulubionych performerów: title-Levenshtein + duration gate + filter promo/teaser tubes. Auto-merge wysoka confidence, reszta → pending merge_candidates (audit trail). Algorytm: 1. Per favorite performer (lub --performer-id): a. Pobierz wszystkie scenes performera b. Strip scenes ze WSZYSTKIMI playback w teaser tubes (xvideos/xnxx/redtube/ youporn — głównie cut-down promo) — tych nie warto matchować, bo to fragmenty z innej canonical scene. Te scenes będą dropowane całkowicie przy dedup, ale ZOSTAJĄ w bazie (mobile może je widzieć). c. Normalize title (drop performer name, brand prefixes, generic vocab, dates) d. Pairwise fuzz.token_set_ratio + partial_ratio (min z obu) e. Duration gate: jeśli oba znane, max 15% diff f. Min 3 tokens po normalize 2. Cluster (union-find) → per cluster pick canonical (max external_refs + max duration_sec). Reszta klastra: - score ≥ 0.95 + duration within 10% → call merge_scenes() (auto_merged) - else → insert MergeCandidate(status=pending) Run: python /srv/scripts/dedup_favorite_performers.py --dry-run python /srv/scripts/dedup_favorite_performers.py --performer-id python /srv/scripts/dedup_favorite_performers.py # wszystkie favorites """ from __future__ import annotations import argparse import logging import re import sys import uuid as _uuid from collections import defaultdict from rapidfuzz import fuzz from sqlalchemy import func, select from sqlalchemy.orm import Session sys.path.insert(0, "/srv") from app.db import SessionLocal from app.models.favorite_performer import FavoritePerformer from app.models.merge_candidate import MergeCandidate, MergeKind, MergeStatus from app.models.performer import Performer from app.models.playback_source import PlaybackSource from app.models.scene import Scene, SceneExternalRef, ScenePerformer from app.resolve.scene_merge import MergeError, merge_scenes log = logging.getLogger("dedup_favorites") logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") # Tube'y które serwują GŁÓWNIE cut-down/promo clips (>4% scen <5min, mainstream). # Scenes z TYLKO tymi playback są pomijane przy dup-detection — to są re-uploads # fragmentów innej canonical scene, nie warto je merge'ować z full version. # Decyzja na podstawie duration distribution analysis 2026-05-17. _TEASER_TUBES = frozenset({ "xvideoscom", # median 600s, 11.7% pod 5min "xnxxcom", # median 600s, 7.6% pod 5min (sister site xvideos) "redtubecom", # 2.9% (ale głównie krótkie pornhub-style clips) "youporncom", # 4.4% pod 5min "xhamstercom", # 4.2% pod 5min (mixed, 743s median) "pornhubcom", # 1.6% ale głównie 918s median (krótkie) }) # Title normalization — sync z title_levenshtein_benchmark.py _SITE_BRANDS = { "legalporno", "brazzers", "brazzersexxtra", "publicagent", "mofos", "twistys", "vivid", "ddf", "ddfbusty", "kink", "naughty", "naughtyamerica", "joymii", "julesjordan", "mompov", "perfectgirls", "puremature", "realitykings", "scoreland", "brazilbang", "foxes", "lubed", "castingcouchx", "throated", "collegerules", "fakehub", "faketaxi", "fakedrivingschool", "porn", "publicpickups", "blacked", "blackedraw", "tushy", "tushyraw", "vixen", "deeper", "evilangel", "newsensations", "private", "privatecom", "humiliatedmilfs", "adultmobile", "alettaoceanlive", "alettaoceanempire", "roccosiffredi", "pixandvideo", "madsexparty", "familysinners", "bangbros", "nurumassage", "sweetheart", } _NOISE_WORDS = { "new", "brand", "full", "leaked", "masked", "ppv", "video", "scene", "aka", "ft", "feat", "with", "and", "the", "a", "an", "bimbo", "milf", "teen", "teens", "blonde", "brunette", "redhead", "busty", "curvy", "skinny", "petite", "tall", "short", "thick", "thin", "amateur", "homemade", "professional", "kinky", "sexy", "hot", "horny", "beautiful", "gorgeous", "stunning", "perfect", "sweet", "naughty", "wild", "tight", "young", "older", "anal", "blowjob", "bj", "dp", "dvp", "dap", "dpp", "bbc", "bg", "gangbang", "fuck", "fucking", "fucked", "fucks", "boobs", "tits", "ass", "pussy", "cock", "dick", "facial", "cumshot", "creampie", "rough", "deep", "hard", "wet", "pov", "pounding", "small", "big", "huge", "massive", "intense", "loves", "gets", "takes", "gives", "shares", "wants", "needs", "her", "his", "she", "he", "for", "from", "to", "in", "on", "by", "of", "with", "first", "best", "girl", "boy", "girls", "guy", "guys", "boys", "fwb", "wife", "wifey", "step", "stepmom", "stepdad", "stepsis", } _TRAILING_ID_RE = re.compile(r'\b(?:[a-z]*\d{4,}[a-z\d]*|\d{4,})\b', re.IGNORECASE) _DOMAIN_RE = re.compile(r'\b[a-z0-9]+\.(?:com|net|so|to|sx|tv|video|porn|xxx|cc|biz|info)\b', re.IGNORECASE) _HASHTAG_RE = re.compile(r'#\w+') _PUNCT_RE = re.compile(r'[^a-z0-9\s]+') _WS_RE = re.compile(r'\s+') _DATE_RE = re.compile(r'\b\d{1,2}[/\-]\d{1,2}[/\-]\d{2,4}\b') _TIME_RE = re.compile(r'\b\d{1,2}:\d{2}\s*[ap]m?\b', re.IGNORECASE) _LIVE_RE = re.compile(r'\bstream\s+started\s+at\b', re.IGNORECASE) _MIN_TOKENS = 3 # Score thresholds _AUTO_MERGE_SCORE = 0.95 _AUTO_MERGE_DUR_TOLERANCE = 0.10 # 10% diff dla auto-merge _PENDING_SCORE = 0.85 _DUR_GATE = 0.15 # 15% diff jako sanity gate dla pending def normalize_title(title: str, performer_name: str | None = None) -> str: if not title or _LIVE_RE.search(title): return "" t = title.lower() t = _DATE_RE.sub(" ", t) t = _TIME_RE.sub(" ", t) t = _HASHTAG_RE.sub(" ", t) t = _DOMAIN_RE.sub(" ", t) t = _TRAILING_ID_RE.sub(" ", t) if performer_name: for part in performer_name.lower().split(): t = re.sub(rf'\b{re.escape(part)}\b', " ", t) t = _PUNCT_RE.sub(" ", t) t = _WS_RE.sub(" ", t).strip() tokens = [tok for tok in t.split() if tok not in _NOISE_WORDS and tok not in _SITE_BRANDS and len(tok) > 1] return " ".join(tokens) def _scene_strength(session: Session, scene_id: _uuid.UUID, duration_sec: int | None) -> tuple: """Higher = better canonical candidate. (n_external_refs, duration_sec_or_0).""" n_refs = session.execute( select(func.count()).select_from(SceneExternalRef).where(SceneExternalRef.scene_id == scene_id) ).scalar_one() return (n_refs, duration_sec or 0) def _has_only_teaser_playback(session: Session, scene_id: _uuid.UUID) -> bool: """True jeśli scena ma alive playback TYLKO w _TEASER_TUBES (lub no alive).""" origins = session.execute( select(PlaybackSource.origin) .where(PlaybackSource.scene_id == scene_id) .where(PlaybackSource.dead_at.is_(None)) .where(PlaybackSource.origin.like("tube:%")) ).scalars().all() if not origins: return False # no alive tube playback — to canonical-only scena, dopuszczamy tubes = {o.split(":", 1)[1] for o in origins} return tubes.issubset(_TEASER_TUBES) def _dur_diff(a: int | None, b: int | None) -> float | None: """Returns relative duration diff (0.0-1.0) or None if either unknown.""" if not a or not b: return None longer, shorter = max(a, b), min(a, b) return (longer - shorter) / longer if longer > 0 else 0.0 def process_performer(session: Session, performer_id: _uuid.UUID, *, dry_run: bool) -> dict: perf = session.get(Performer, performer_id) if perf is None: return {"error": "performer not found"} rows = session.execute( select(Scene.id, Scene.title, Scene.duration_sec) .join(ScenePerformer, ScenePerformer.scene_id == Scene.id) .where(ScenePerformer.performer_id == performer_id) ).all() if len(rows) < 2: return {"performer": perf.canonical_name, "scenes": len(rows), "auto_merged": 0, "pending": 0, "skipped_teaser": 0} # Filter: drop scenes z TYLKO teaser playback (re-uploads fragmentów). candidates = [] skipped_teaser = 0 for r in rows: if _has_only_teaser_playback(session, r.id): skipped_teaser += 1 continue norm = normalize_title(r.title, perf.canonical_name) if not norm or len(norm.split()) < _MIN_TOKENS: continue candidates.append((r.id, r.title, r.duration_sec, norm)) # Pairwise compare pairs: list = [] # (score, dur_diff, sid_a, sid_b) for i in range(len(candidates)): sid_a, _, dur_a, norm_a = candidates[i] for j in range(i + 1, len(candidates)): sid_b, _, dur_b, norm_b = candidates[j] dd = _dur_diff(dur_a, dur_b) if dd is not None and dd > _DUR_GATE: continue ts = fuzz.token_set_ratio(norm_a, norm_b) pr = fuzz.partial_ratio(norm_a, norm_b) score = min(ts, pr) / 100.0 if score < _PENDING_SCORE: continue pairs.append((score, dd, sid_a, sid_b)) # Build clusters (union-find) parent: dict = {s[0]: s[0] for s in candidates} def find(x): while parent[x] != x: parent[x] = parent[parent[x]] x = parent[x] return x def union(a, b): ra, rb = find(a), find(b) if ra != rb: parent[ra] = rb for _, _, a, b in pairs: union(a, b) clusters: dict = defaultdict(list) for s in candidates: clusters[find(s[0])].append(s) dup_clusters = [c for c in clusters.values() if len(c) > 1] auto_merged = 0 pending = 0 for cluster in dup_clusters: # Pick canonical: max (n_external_refs, duration_sec_or_0) strengths = {sid: _scene_strength(session, sid, dur) for sid, _, dur, _ in cluster} canonical_sid = max(cluster, key=lambda s: strengths[s[0]])[0] canonical_dur = next(dur for sid, _, dur, _ in cluster if sid == canonical_sid) for sid, title, dur, norm in cluster: if sid == canonical_sid: continue # Re-score vs canonical (cluster może mieć transitive members ze # słabszym score'em pair-wise). canonical_norm = next(n for s, _, _, n in cluster if s == canonical_sid) ts = fuzz.token_set_ratio(norm, canonical_norm) pr = fuzz.partial_ratio(norm, canonical_norm) score = min(ts, pr) / 100.0 dd = _dur_diff(dur, canonical_dur) auto_eligible = ( score >= _AUTO_MERGE_SCORE and (dd is None or dd <= _AUTO_MERGE_DUR_TOLERANCE) ) reasons = { "algorithm": "title_levenshtein_v1", "score": round(score, 3), "token_set_ratio": ts, "partial_ratio": pr, "duration_diff": round(dd, 3) if dd is not None else None, "performer_id": str(performer_id), } if auto_eligible: if dry_run: log.info("DRY auto-merge %s ← %s (score=%.2f dd=%s)", canonical_sid, sid, score, dd) else: try: # Sprawdź czy obie scenes wciąż istnieją (mogły zostać # zmerged w tej samej sesji przez wcześniejszy cluster). if not session.get(Scene, canonical_sid) or not session.get(Scene, sid): continue merge_scenes( session, keep_id=canonical_sid, drop_id=sid, resolved_by="dedup_favorites_script", ) # Audit row left, right = sorted([canonical_sid, sid]) session.add(MergeCandidate( kind=MergeKind.scene, left_id=left, right_id=right, score=score, reasons=reasons, status=MergeStatus.auto_merged, resolved_by="dedup_favorites_script", )) session.commit() except MergeError as e: log.warning("merge fail %s ← %s: %s", canonical_sid, sid, e) session.rollback() continue auto_merged += 1 else: if dry_run: log.info("DRY pending %s ↔ %s (score=%.2f dd=%s)", canonical_sid, sid, score, dd) else: left, right = sorted([canonical_sid, sid]) # Idempotent — skip jeśli już istnieje exists_row = session.execute( select(MergeCandidate.id) .where(MergeCandidate.left_id == left) .where(MergeCandidate.right_id == right) .where(MergeCandidate.kind == MergeKind.scene) ).first() if exists_row: continue session.add(MergeCandidate( kind=MergeKind.scene, left_id=left, right_id=right, score=score, reasons=reasons, status=MergeStatus.pending, )) session.commit() pending += 1 return { "performer": perf.canonical_name, "scenes": len(rows), "after_teaser_filter": len(candidates), "skipped_teaser": skipped_teaser, "clusters": len(dup_clusters), "auto_merged": auto_merged, "pending": pending, } def main() -> None: ap = argparse.ArgumentParser() ap.add_argument("--dry-run", action="store_true") ap.add_argument("--performer-id", type=str, default=None, help="single performer UUID (skip favorites loop)") args = ap.parse_args() with SessionLocal() as session: if args.performer_id: perf_ids = [_uuid.UUID(args.performer_id)] else: perf_ids = session.execute( select(FavoritePerformer.performer_id) ).scalars().all() log.info("Found %d favorite performers", len(perf_ids)) grand_total = { "scenes": 0, "auto_merged": 0, "pending": 0, "skipped_teaser": 0, "clusters": 0, } for pid in perf_ids: with SessionLocal() as session: result = process_performer(session, pid, dry_run=args.dry_run) if "error" in result: log.warning("skip %s: %s", pid, result["error"]) continue log.info( "%s: scenes=%d teaser_skip=%d clusters=%d auto=%d pending=%d", result["performer"], result["scenes"], result.get("skipped_teaser", 0), result.get("clusters", 0), result["auto_merged"], result["pending"], ) for k in grand_total: grand_total[k] += result.get(k, 0) log.info( "=== TOTAL: scenes=%d teaser_skip=%d clusters=%d auto_merged=%d pending=%d ===", grand_total["scenes"], grand_total["skipped_teaser"], grand_total["clusters"], grand_total["auto_merged"], grand_total["pending"], ) if __name__ == "__main__": main()