goon/scripts/phash_dedup_scenes.py
goon-foss ad0284585b Initial commit
Goon — self-hosted aggregator for adult-content scene metadata.

Indexes scenes from TPDB, StashDB, and 30+ public adult tube sites.
Cross-source deduplication via perceptual hash + Levenshtein distance.
FastAPI backend + APScheduler worker + React Native (Expo) mobile client.

FOSS, ad-free, donation-funded. See README for details.
2026-05-20 10:10:22 +02:00

157 lines
5.8 KiB
Python

"""Auto-merge scene duplicates wykryte przez identical phash.
Bug-report 2026-05-13 (e5e83b5e): user na PerformerScenes widzi masę scen z
identycznymi miniaturkami (same content re-uploaded z różnymi tytułami). phash
fingerprint pozwala wykryć dokładne duplikaty.
Conservative heuristic — auto-merge tylko gdy:
1. Identical phash (hamming = 0)
2. Same primary performer (≥1 wspólny performer)
3. Similar duration (within 30s, lub jeden ma None duration)
Inne duplicate kandydaci (różne performerzy, hamming 1-5) → wymagają manual review,
nie auto-merge (false-positive ryzyko).
Keep policy: scena z większą liczbą `external_refs` (canonical TPDB/StashDB)
przejmuje content od mniejszej (orphan tube → canonical merge).
Run: `python /srv/scripts/phash_dedup_scenes.py [--dry-run]`
"""
from __future__ import annotations
import argparse
import logging
import sys
from sqlalchemy import select, func
from sqlalchemy.orm import Session
sys.path.insert(0, "/srv")
from app.db import SessionLocal
from app.models.scene import (
Scene,
SceneExternalRef,
SceneFingerprint,
ScenePerformer,
)
from app.resolve.scene_merge import merge_scenes
log = logging.getLogger("phash_dedup")
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
def _scene_signals(session: Session, scene_id) -> tuple[int, int, set]:
"""Zwraca (n_external_refs, duration_sec, performer_ids_set) dla weighting."""
n_refs = session.execute(
select(func.count()).select_from(SceneExternalRef).where(SceneExternalRef.scene_id == scene_id)
).scalar_one()
duration = session.execute(
select(Scene.duration_sec).where(Scene.id == scene_id)
).scalar_one_or_none() or 0
performers = set(
session.execute(
select(ScenePerformer.performer_id).where(ScenePerformer.scene_id == scene_id)
).scalars().all()
)
return (n_refs, duration, performers)
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument("--dry-run", action="store_true", help="Print planned merges, don't execute")
ap.add_argument("--max-pairs", type=int, default=1000, help="Limit pairs per run")
args = ap.parse_args()
# Read-only session dla bucket listy. Każdy merge dostaje fresh session
# żeby exception nie zostawiał głównej w PendingRollbackError state.
with SessionLocal() as read_session:
buckets = read_session.execute(
select(SceneFingerprint.value, func.array_agg(SceneFingerprint.scene_id))
.where(SceneFingerprint.kind == "phash")
.where(SceneFingerprint.value != "0")
.group_by(SceneFingerprint.value)
.having(func.count(SceneFingerprint.scene_id.distinct()) > 1)
).all()
log.info("found %d phash buckets with duplicates", len(buckets))
merged = 0
skipped_diff_performer = 0
skipped_diff_duration = 0
skipped_no_performer = 0
for value, scene_ids in buckets:
scene_ids = list(set(scene_ids))
if len(scene_ids) < 2:
continue
# Fresh session per bucket — signals + ranking.
with SessionLocal() as bucket_session:
sigs = {sid: _scene_signals(bucket_session, sid) for sid in scene_ids}
sorted_ids = sorted(
scene_ids,
key=lambda sid: (-sigs[sid][0], -sigs[sid][1]),
)
keep_id = sorted_ids[0]
keep_sig = sigs[keep_id]
for drop_id in sorted_ids[1:]:
if merged >= args.max_pairs:
log.info("reached max-pairs limit, stopping")
break
drop_sig = sigs[drop_id]
if keep_sig[2] and drop_sig[2]:
if not (keep_sig[2] & drop_sig[2]):
skipped_diff_performer += 1
continue
elif not keep_sig[2] and not drop_sig[2]:
skipped_no_performer += 1
continue
if keep_sig[1] and drop_sig[1]:
if abs(keep_sig[1] - drop_sig[1]) > 30:
skipped_diff_duration += 1
continue
if args.dry_run:
log.info("DRY: merge %s%s (phash=%s)", keep_id, drop_id, value)
merged += 1
continue
# Fresh session per merge — exception nie psuje innych iteracji.
with SessionLocal() as merge_session:
try:
merge_scenes(
merge_session, keep_id=keep_id, drop_id=drop_id,
resolved_by="phash-dedup-script",
)
merge_session.commit()
log.info("merged %s%s (phash=%s)", keep_id, drop_id, value)
merged += 1
# Re-fetch keep_sig — może mieć nowych performerów po merge
# (drop's performers przenoszą się do keep). Bez tego kolejny
# drop_id w buckecie testowany przeciwko STARYCH performers.
with SessionLocal() as refresh:
keep_sig = _scene_signals(refresh, keep_id)
except Exception as e:
merge_session.rollback()
log.warning("merge fail %s%s: %s", keep_id, drop_id, e)
if merged >= args.max_pairs:
break
log.info(
"done merged=%d skipped_perf=%d skipped_dur=%d skipped_noperf=%d",
merged, skipped_diff_performer, skipped_diff_duration, skipped_no_performer,
)
# Nonzero exit gdy nic NIE zmergowano mimo że buckety istniały — sygnał
# dla cron/systemd że run zakończył się "soft failure" (np. wszystkie pary
# blokowane przez heuristic). Operacyjnie warto wiedzieć.
if buckets and merged == 0:
sys.exit(2)
if __name__ == "__main__":
main()