Goon — self-hosted aggregator for adult-content scene metadata. Indexes scenes from TPDB, StashDB, and 30+ public adult tube sites. Cross-source deduplication via perceptual hash + Levenshtein distance. FastAPI backend + APScheduler worker + React Native (Expo) mobile client. FOSS, ad-free, donation-funded. See README for details.
157 lines
5.8 KiB
Python
157 lines
5.8 KiB
Python
"""Auto-merge scene duplicates wykryte przez identical phash.
|
|
|
|
Bug-report 2026-05-13 (e5e83b5e): user na PerformerScenes widzi masę scen z
|
|
identycznymi miniaturkami (same content re-uploaded z różnymi tytułami). phash
|
|
fingerprint pozwala wykryć dokładne duplikaty.
|
|
|
|
Conservative heuristic — auto-merge tylko gdy:
|
|
1. Identical phash (hamming = 0)
|
|
2. Same primary performer (≥1 wspólny performer)
|
|
3. Similar duration (within 30s, lub jeden ma None duration)
|
|
|
|
Inne duplicate kandydaci (różne performerzy, hamming 1-5) → wymagają manual review,
|
|
nie auto-merge (false-positive ryzyko).
|
|
|
|
Keep policy: scena z większą liczbą `external_refs` (canonical TPDB/StashDB)
|
|
przejmuje content od mniejszej (orphan tube → canonical merge).
|
|
|
|
Run: `python /srv/scripts/phash_dedup_scenes.py [--dry-run]`
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import logging
|
|
import sys
|
|
|
|
from sqlalchemy import select, func
|
|
from sqlalchemy.orm import Session
|
|
|
|
sys.path.insert(0, "/srv")
|
|
from app.db import SessionLocal
|
|
from app.models.scene import (
|
|
Scene,
|
|
SceneExternalRef,
|
|
SceneFingerprint,
|
|
ScenePerformer,
|
|
)
|
|
from app.resolve.scene_merge import merge_scenes
|
|
|
|
log = logging.getLogger("phash_dedup")
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
|
|
|
|
|
def _scene_signals(session: Session, scene_id) -> tuple[int, int, set]:
|
|
"""Zwraca (n_external_refs, duration_sec, performer_ids_set) dla weighting."""
|
|
n_refs = session.execute(
|
|
select(func.count()).select_from(SceneExternalRef).where(SceneExternalRef.scene_id == scene_id)
|
|
).scalar_one()
|
|
duration = session.execute(
|
|
select(Scene.duration_sec).where(Scene.id == scene_id)
|
|
).scalar_one_or_none() or 0
|
|
performers = set(
|
|
session.execute(
|
|
select(ScenePerformer.performer_id).where(ScenePerformer.scene_id == scene_id)
|
|
).scalars().all()
|
|
)
|
|
return (n_refs, duration, performers)
|
|
|
|
|
|
def main() -> None:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--dry-run", action="store_true", help="Print planned merges, don't execute")
|
|
ap.add_argument("--max-pairs", type=int, default=1000, help="Limit pairs per run")
|
|
args = ap.parse_args()
|
|
|
|
# Read-only session dla bucket listy. Każdy merge dostaje fresh session
|
|
# żeby exception nie zostawiał głównej w PendingRollbackError state.
|
|
with SessionLocal() as read_session:
|
|
buckets = read_session.execute(
|
|
select(SceneFingerprint.value, func.array_agg(SceneFingerprint.scene_id))
|
|
.where(SceneFingerprint.kind == "phash")
|
|
.where(SceneFingerprint.value != "0")
|
|
.group_by(SceneFingerprint.value)
|
|
.having(func.count(SceneFingerprint.scene_id.distinct()) > 1)
|
|
).all()
|
|
|
|
log.info("found %d phash buckets with duplicates", len(buckets))
|
|
|
|
merged = 0
|
|
skipped_diff_performer = 0
|
|
skipped_diff_duration = 0
|
|
skipped_no_performer = 0
|
|
|
|
for value, scene_ids in buckets:
|
|
scene_ids = list(set(scene_ids))
|
|
if len(scene_ids) < 2:
|
|
continue
|
|
|
|
# Fresh session per bucket — signals + ranking.
|
|
with SessionLocal() as bucket_session:
|
|
sigs = {sid: _scene_signals(bucket_session, sid) for sid in scene_ids}
|
|
|
|
sorted_ids = sorted(
|
|
scene_ids,
|
|
key=lambda sid: (-sigs[sid][0], -sigs[sid][1]),
|
|
)
|
|
keep_id = sorted_ids[0]
|
|
keep_sig = sigs[keep_id]
|
|
|
|
for drop_id in sorted_ids[1:]:
|
|
if merged >= args.max_pairs:
|
|
log.info("reached max-pairs limit, stopping")
|
|
break
|
|
drop_sig = sigs[drop_id]
|
|
|
|
if keep_sig[2] and drop_sig[2]:
|
|
if not (keep_sig[2] & drop_sig[2]):
|
|
skipped_diff_performer += 1
|
|
continue
|
|
elif not keep_sig[2] and not drop_sig[2]:
|
|
skipped_no_performer += 1
|
|
continue
|
|
|
|
if keep_sig[1] and drop_sig[1]:
|
|
if abs(keep_sig[1] - drop_sig[1]) > 30:
|
|
skipped_diff_duration += 1
|
|
continue
|
|
|
|
if args.dry_run:
|
|
log.info("DRY: merge %s ← %s (phash=%s)", keep_id, drop_id, value)
|
|
merged += 1
|
|
continue
|
|
|
|
# Fresh session per merge — exception nie psuje innych iteracji.
|
|
with SessionLocal() as merge_session:
|
|
try:
|
|
merge_scenes(
|
|
merge_session, keep_id=keep_id, drop_id=drop_id,
|
|
resolved_by="phash-dedup-script",
|
|
)
|
|
merge_session.commit()
|
|
log.info("merged %s ← %s (phash=%s)", keep_id, drop_id, value)
|
|
merged += 1
|
|
# Re-fetch keep_sig — może mieć nowych performerów po merge
|
|
# (drop's performers przenoszą się do keep). Bez tego kolejny
|
|
# drop_id w buckecie testowany przeciwko STARYCH performers.
|
|
with SessionLocal() as refresh:
|
|
keep_sig = _scene_signals(refresh, keep_id)
|
|
except Exception as e:
|
|
merge_session.rollback()
|
|
log.warning("merge fail %s ← %s: %s", keep_id, drop_id, e)
|
|
|
|
if merged >= args.max_pairs:
|
|
break
|
|
|
|
log.info(
|
|
"done merged=%d skipped_perf=%d skipped_dur=%d skipped_noperf=%d",
|
|
merged, skipped_diff_performer, skipped_diff_duration, skipped_no_performer,
|
|
)
|
|
# Nonzero exit gdy nic NIE zmergowano mimo że buckety istniały — sygnał
|
|
# dla cron/systemd że run zakończył się "soft failure" (np. wszystkie pary
|
|
# blokowane przez heuristic). Operacyjnie warto wiedzieć.
|
|
if buckets and merged == 0:
|
|
sys.exit(2)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|