goon/scripts/dedup_favorite_performers.py
goon-foss ad0284585b Initial commit
Goon — self-hosted aggregator for adult-content scene metadata.

Indexes scenes from TPDB, StashDB, and 30+ public adult tube sites.
Cross-source deduplication via perceptual hash + Levenshtein distance.
FastAPI backend + APScheduler worker + React Native (Expo) mobile client.

FOSS, ad-free, donation-funded. See README for details.
2026-05-20 10:10:22 +02:00

368 lines
15 KiB
Python

"""Dedup scen ulubionych performerów: title-Levenshtein + duration gate +
filter promo/teaser tubes. Auto-merge wysoka confidence, reszta → pending
merge_candidates (audit trail).
Algorytm:
1. Per favorite performer (lub --performer-id):
a. Pobierz wszystkie scenes performera
b. Strip scenes ze WSZYSTKIMI playback w teaser tubes (xvideos/xnxx/redtube/
youporn — głównie cut-down promo) — tych nie warto matchować, bo
to fragmenty z innej canonical scene. Te scenes będą dropowane całkowicie
przy dedup, ale ZOSTAJĄ w bazie (mobile może je widzieć).
c. Normalize title (drop performer name, brand prefixes, generic vocab, dates)
d. Pairwise fuzz.token_set_ratio + partial_ratio (min z obu)
e. Duration gate: jeśli oba znane, max 15% diff
f. Min 3 tokens po normalize
2. Cluster (union-find) → per cluster pick canonical (max external_refs + max
duration_sec). Reszta klastra:
- score ≥ 0.95 + duration within 10% → call merge_scenes() (auto_merged)
- else → insert MergeCandidate(status=pending)
Run:
python /srv/scripts/dedup_favorite_performers.py --dry-run
python /srv/scripts/dedup_favorite_performers.py --performer-id <UUID>
python /srv/scripts/dedup_favorite_performers.py # wszystkie favorites
"""
from __future__ import annotations
import argparse
import logging
import re
import sys
import uuid as _uuid
from collections import defaultdict
from rapidfuzz import fuzz
from sqlalchemy import func, select
from sqlalchemy.orm import Session
sys.path.insert(0, "/srv")
from app.db import SessionLocal
from app.models.favorite_performer import FavoritePerformer
from app.models.merge_candidate import MergeCandidate, MergeKind, MergeStatus
from app.models.performer import Performer
from app.models.playback_source import PlaybackSource
from app.models.scene import Scene, SceneExternalRef, ScenePerformer
from app.resolve.scene_merge import MergeError, merge_scenes
log = logging.getLogger("dedup_favorites")
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
# Tube'y które serwują GŁÓWNIE cut-down/promo clips (>4% scen <5min, mainstream).
# Scenes z TYLKO tymi playback są pomijane przy dup-detection — to są re-uploads
# fragmentów innej canonical scene, nie warto je merge'ować z full version.
# Decyzja na podstawie duration distribution analysis 2026-05-17.
_TEASER_TUBES = frozenset({
"xvideoscom", # median 600s, 11.7% pod 5min
"xnxxcom", # median 600s, 7.6% pod 5min (sister site xvideos)
"redtubecom", # 2.9% (ale głównie krótkie pornhub-style clips)
"youporncom", # 4.4% pod 5min
"xhamstercom", # 4.2% pod 5min (mixed, 743s median)
"pornhubcom", # 1.6% ale głównie 918s median (krótkie)
})
# Title normalization — sync z title_levenshtein_benchmark.py
_SITE_BRANDS = {
"legalporno", "brazzers", "brazzersexxtra", "publicagent", "mofos",
"twistys", "vivid", "ddf", "ddfbusty", "kink", "naughty", "naughtyamerica",
"joymii", "julesjordan", "mompov", "perfectgirls", "puremature",
"realitykings", "scoreland", "brazilbang", "foxes", "lubed",
"castingcouchx", "throated", "collegerules", "fakehub", "faketaxi",
"fakedrivingschool", "porn", "publicpickups",
"blacked", "blackedraw", "tushy", "tushyraw", "vixen", "deeper",
"evilangel", "newsensations", "private", "privatecom",
"humiliatedmilfs", "adultmobile", "alettaoceanlive", "alettaoceanempire",
"roccosiffredi", "pixandvideo", "madsexparty", "familysinners",
"bangbros", "nurumassage", "sweetheart",
}
_NOISE_WORDS = {
"new", "brand", "full", "leaked", "masked", "ppv", "video", "scene",
"aka", "ft", "feat", "with", "and", "the", "a", "an",
"bimbo", "milf", "teen", "teens", "blonde", "brunette", "redhead",
"busty", "curvy", "skinny", "petite", "tall", "short", "thick", "thin",
"amateur", "homemade", "professional", "kinky", "sexy", "hot", "horny",
"beautiful", "gorgeous", "stunning", "perfect", "sweet", "naughty",
"wild", "tight", "young", "older",
"anal", "blowjob", "bj", "dp", "dvp", "dap", "dpp", "bbc", "bg",
"gangbang", "fuck", "fucking", "fucked", "fucks",
"boobs", "tits", "ass", "pussy", "cock", "dick", "facial", "cumshot",
"creampie", "rough", "deep", "hard", "wet", "pov", "pounding",
"small", "big", "huge", "massive", "intense",
"loves", "gets", "takes", "gives", "shares", "wants", "needs",
"her", "his", "she", "he", "for", "from", "to", "in", "on", "by",
"of", "with",
"first", "best", "girl", "boy", "girls", "guy", "guys", "boys",
"fwb", "wife", "wifey", "step", "stepmom", "stepdad", "stepsis",
}
_TRAILING_ID_RE = re.compile(r'\b(?:[a-z]*\d{4,}[a-z\d]*|\d{4,})\b', re.IGNORECASE)
_DOMAIN_RE = re.compile(r'\b[a-z0-9]+\.(?:com|net|so|to|sx|tv|video|porn|xxx|cc|biz|info)\b', re.IGNORECASE)
_HASHTAG_RE = re.compile(r'#\w+')
_PUNCT_RE = re.compile(r'[^a-z0-9\s]+')
_WS_RE = re.compile(r'\s+')
_DATE_RE = re.compile(r'\b\d{1,2}[/\-]\d{1,2}[/\-]\d{2,4}\b')
_TIME_RE = re.compile(r'\b\d{1,2}:\d{2}\s*[ap]m?\b', re.IGNORECASE)
_LIVE_RE = re.compile(r'\bstream\s+started\s+at\b', re.IGNORECASE)
_MIN_TOKENS = 3
# Score thresholds
_AUTO_MERGE_SCORE = 0.95
_AUTO_MERGE_DUR_TOLERANCE = 0.10 # 10% diff dla auto-merge
_PENDING_SCORE = 0.85
_DUR_GATE = 0.15 # 15% diff jako sanity gate dla pending
def normalize_title(title: str, performer_name: str | None = None) -> str:
if not title or _LIVE_RE.search(title):
return ""
t = title.lower()
t = _DATE_RE.sub(" ", t)
t = _TIME_RE.sub(" ", t)
t = _HASHTAG_RE.sub(" ", t)
t = _DOMAIN_RE.sub(" ", t)
t = _TRAILING_ID_RE.sub(" ", t)
if performer_name:
for part in performer_name.lower().split():
t = re.sub(rf'\b{re.escape(part)}\b', " ", t)
t = _PUNCT_RE.sub(" ", t)
t = _WS_RE.sub(" ", t).strip()
tokens = [tok for tok in t.split() if tok not in _NOISE_WORDS and tok not in _SITE_BRANDS and len(tok) > 1]
return " ".join(tokens)
def _scene_strength(session: Session, scene_id: _uuid.UUID, duration_sec: int | None) -> tuple:
"""Higher = better canonical candidate. (n_external_refs, duration_sec_or_0)."""
n_refs = session.execute(
select(func.count()).select_from(SceneExternalRef).where(SceneExternalRef.scene_id == scene_id)
).scalar_one()
return (n_refs, duration_sec or 0)
def _has_only_teaser_playback(session: Session, scene_id: _uuid.UUID) -> bool:
"""True jeśli scena ma alive playback TYLKO w _TEASER_TUBES (lub no alive)."""
origins = session.execute(
select(PlaybackSource.origin)
.where(PlaybackSource.scene_id == scene_id)
.where(PlaybackSource.dead_at.is_(None))
.where(PlaybackSource.origin.like("tube:%"))
).scalars().all()
if not origins:
return False # no alive tube playback — to canonical-only scena, dopuszczamy
tubes = {o.split(":", 1)[1] for o in origins}
return tubes.issubset(_TEASER_TUBES)
def _dur_diff(a: int | None, b: int | None) -> float | None:
"""Returns relative duration diff (0.0-1.0) or None if either unknown."""
if not a or not b:
return None
longer, shorter = max(a, b), min(a, b)
return (longer - shorter) / longer if longer > 0 else 0.0
def process_performer(session: Session, performer_id: _uuid.UUID, *, dry_run: bool) -> dict:
perf = session.get(Performer, performer_id)
if perf is None:
return {"error": "performer not found"}
rows = session.execute(
select(Scene.id, Scene.title, Scene.duration_sec)
.join(ScenePerformer, ScenePerformer.scene_id == Scene.id)
.where(ScenePerformer.performer_id == performer_id)
).all()
if len(rows) < 2:
return {"performer": perf.canonical_name, "scenes": len(rows), "auto_merged": 0, "pending": 0, "skipped_teaser": 0}
# Filter: drop scenes z TYLKO teaser playback (re-uploads fragmentów).
candidates = []
skipped_teaser = 0
for r in rows:
if _has_only_teaser_playback(session, r.id):
skipped_teaser += 1
continue
norm = normalize_title(r.title, perf.canonical_name)
if not norm or len(norm.split()) < _MIN_TOKENS:
continue
candidates.append((r.id, r.title, r.duration_sec, norm))
# Pairwise compare
pairs: list = [] # (score, dur_diff, sid_a, sid_b)
for i in range(len(candidates)):
sid_a, _, dur_a, norm_a = candidates[i]
for j in range(i + 1, len(candidates)):
sid_b, _, dur_b, norm_b = candidates[j]
dd = _dur_diff(dur_a, dur_b)
if dd is not None and dd > _DUR_GATE:
continue
ts = fuzz.token_set_ratio(norm_a, norm_b)
pr = fuzz.partial_ratio(norm_a, norm_b)
score = min(ts, pr) / 100.0
if score < _PENDING_SCORE:
continue
pairs.append((score, dd, sid_a, sid_b))
# Build clusters (union-find)
parent: dict = {s[0]: s[0] for s in candidates}
def find(x):
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x
def union(a, b):
ra, rb = find(a), find(b)
if ra != rb:
parent[ra] = rb
for _, _, a, b in pairs:
union(a, b)
clusters: dict = defaultdict(list)
for s in candidates:
clusters[find(s[0])].append(s)
dup_clusters = [c for c in clusters.values() if len(c) > 1]
auto_merged = 0
pending = 0
for cluster in dup_clusters:
# Pick canonical: max (n_external_refs, duration_sec_or_0)
strengths = {sid: _scene_strength(session, sid, dur) for sid, _, dur, _ in cluster}
canonical_sid = max(cluster, key=lambda s: strengths[s[0]])[0]
canonical_dur = next(dur for sid, _, dur, _ in cluster if sid == canonical_sid)
for sid, title, dur, norm in cluster:
if sid == canonical_sid:
continue
# Re-score vs canonical (cluster może mieć transitive members ze
# słabszym score'em pair-wise).
canonical_norm = next(n for s, _, _, n in cluster if s == canonical_sid)
ts = fuzz.token_set_ratio(norm, canonical_norm)
pr = fuzz.partial_ratio(norm, canonical_norm)
score = min(ts, pr) / 100.0
dd = _dur_diff(dur, canonical_dur)
auto_eligible = (
score >= _AUTO_MERGE_SCORE
and (dd is None or dd <= _AUTO_MERGE_DUR_TOLERANCE)
)
reasons = {
"algorithm": "title_levenshtein_v1",
"score": round(score, 3),
"token_set_ratio": ts,
"partial_ratio": pr,
"duration_diff": round(dd, 3) if dd is not None else None,
"performer_id": str(performer_id),
}
if auto_eligible:
if dry_run:
log.info("DRY auto-merge %s%s (score=%.2f dd=%s)",
canonical_sid, sid, score, dd)
else:
try:
# Sprawdź czy obie scenes wciąż istnieją (mogły zostać
# zmerged w tej samej sesji przez wcześniejszy cluster).
if not session.get(Scene, canonical_sid) or not session.get(Scene, sid):
continue
merge_scenes(
session, keep_id=canonical_sid, drop_id=sid,
resolved_by="dedup_favorites_script",
)
# Audit row
left, right = sorted([canonical_sid, sid])
session.add(MergeCandidate(
kind=MergeKind.scene,
left_id=left, right_id=right,
score=score, reasons=reasons,
status=MergeStatus.auto_merged,
resolved_by="dedup_favorites_script",
))
session.commit()
except MergeError as e:
log.warning("merge fail %s%s: %s", canonical_sid, sid, e)
session.rollback()
continue
auto_merged += 1
else:
if dry_run:
log.info("DRY pending %s%s (score=%.2f dd=%s)",
canonical_sid, sid, score, dd)
else:
left, right = sorted([canonical_sid, sid])
# Idempotent — skip jeśli już istnieje
exists_row = session.execute(
select(MergeCandidate.id)
.where(MergeCandidate.left_id == left)
.where(MergeCandidate.right_id == right)
.where(MergeCandidate.kind == MergeKind.scene)
).first()
if exists_row:
continue
session.add(MergeCandidate(
kind=MergeKind.scene,
left_id=left, right_id=right,
score=score, reasons=reasons,
status=MergeStatus.pending,
))
session.commit()
pending += 1
return {
"performer": perf.canonical_name,
"scenes": len(rows),
"after_teaser_filter": len(candidates),
"skipped_teaser": skipped_teaser,
"clusters": len(dup_clusters),
"auto_merged": auto_merged,
"pending": pending,
}
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument("--dry-run", action="store_true")
ap.add_argument("--performer-id", type=str, default=None,
help="single performer UUID (skip favorites loop)")
args = ap.parse_args()
with SessionLocal() as session:
if args.performer_id:
perf_ids = [_uuid.UUID(args.performer_id)]
else:
perf_ids = session.execute(
select(FavoritePerformer.performer_id)
).scalars().all()
log.info("Found %d favorite performers", len(perf_ids))
grand_total = {
"scenes": 0, "auto_merged": 0, "pending": 0, "skipped_teaser": 0,
"clusters": 0,
}
for pid in perf_ids:
with SessionLocal() as session:
result = process_performer(session, pid, dry_run=args.dry_run)
if "error" in result:
log.warning("skip %s: %s", pid, result["error"])
continue
log.info(
"%s: scenes=%d teaser_skip=%d clusters=%d auto=%d pending=%d",
result["performer"],
result["scenes"],
result.get("skipped_teaser", 0),
result.get("clusters", 0),
result["auto_merged"],
result["pending"],
)
for k in grand_total:
grand_total[k] += result.get(k, 0)
log.info(
"=== TOTAL: scenes=%d teaser_skip=%d clusters=%d auto_merged=%d pending=%d ===",
grand_total["scenes"],
grand_total["skipped_teaser"],
grand_total["clusters"],
grand_total["auto_merged"],
grand_total["pending"],
)
if __name__ == "__main__":
main()