goon/scripts/bulk_resolve_merges.py
goon-foss ad0284585b Initial commit
Goon — self-hosted aggregator for adult-content scene metadata.

Indexes scenes from TPDB, StashDB, and 30+ public adult tube sites.
Cross-source deduplication via perceptual hash + Levenshtein distance.
FastAPI backend + APScheduler worker + React Native (Expo) mobile client.

FOSS, ad-free, donation-funded. See README for details.
2026-05-20 10:10:22 +02:00

152 lines
5.6 KiB
Python

"""Bulk auto-resolver dla zaległej kolejki merge_candidates(status=pending).
Polityka:
score >= 0.85 → merge (keep = strona z więcej external_refs; tie-break: starsza scena)
score < 0.85 → reject (zachowaj jako rejected w merge_candidates)
Tło: manual review queue urosła do 23k+ wpisów (przy 953 user-resolved kiedykolwiek).
Auto-merge threshold w pipeline (0.92) był ustawiony konserwatywnie wcześnie; po
miesiącach iteracji scoringu mamy zaufanie do resolverów. Bulk-czyszczenie obniża
threshold do 0.85 retroaktywnie dla legacy candidates — wszystko z >0.85 score idzie
auto, reszta odpada (zerowa szansa że user kiedyś dotknie).
Uruchamia się w batchach po N (commit per-batch), żeby długi run nie trzymał
ogromnej tranzakcji.
"""
from __future__ import annotations
import argparse
import logging
import sys
import time
from collections import Counter
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from app.db import session_scope
from app.models.merge_candidate import MergeCandidate, MergeKind, MergeStatus
from app.models.scene import Scene, SceneExternalRef
from app.resolve.scene_merge import MergeError, merge_scenes
log = logging.getLogger("bulk_resolve")
MERGE_THRESHOLD = 0.85
def pick_keep_id(session: Session, left_id, right_id):
"""Decyduje która scena zostaje. Heurystyka:
1. Strona z większą liczbą external_refs (więcej źródeł = bardziej canonical).
2. Tie → starsza scena (pierwszy ingest = stabilniejsza referencja).
3. Tie tie → left.
"""
counts = dict(
session.execute(
select(SceneExternalRef.scene_id, func.count())
.where(SceneExternalRef.scene_id.in_([left_id, right_id]))
.group_by(SceneExternalRef.scene_id)
).all()
)
left_n = counts.get(left_id, 0)
right_n = counts.get(right_id, 0)
if left_n != right_n:
return left_id if left_n > right_n else right_id
# Tie-break po created_at
rows = session.execute(
select(Scene.id, Scene.created_at).where(Scene.id.in_([left_id, right_id]))
).all()
by_id = {r.id: r.created_at for r in rows}
lc = by_id.get(left_id)
rc = by_id.get(right_id)
if lc and rc and lc != rc:
return left_id if lc < rc else right_id
return left_id
def resolve_batch(*, batch_size: int, dry_run: bool) -> dict[str, int]:
"""Procesuje jeden batch. Zwraca counts: merged / rejected / errors / done (=batch processed)."""
counts = Counter()
with session_scope() as session:
cands = (
session.execute(
select(MergeCandidate)
.where(
MergeCandidate.status == MergeStatus.pending,
MergeCandidate.kind == MergeKind.scene,
)
.order_by(MergeCandidate.score.desc())
.limit(batch_size)
)
.scalars()
.all()
)
if not cands:
return dict(counts)
for cand in cands:
counts["seen"] += 1
try:
if cand.score >= MERGE_THRESHOLD:
keep_id = pick_keep_id(session, cand.left_id, cand.right_id)
drop_id = cand.right_id if keep_id == cand.left_id else cand.left_id
if not dry_run:
merge_scenes(
session,
keep_id=keep_id,
drop_id=drop_id,
resolved_by="bulk_auto_resolver",
)
# `merge_scenes` deletes drop scene + closes pending candidates
# referencing drop_id. Mark THIS candidate too (it's about
# left↔right, but maybe untouched if neither side was drop_id —
# which can't happen here, drop_id is always one of them).
# _close_pending_candidates inside merge_scenes already handles it.
counts["merged"] += 1
else:
if not dry_run:
cand.status = MergeStatus.rejected
from datetime import UTC, datetime
cand.resolved_at = datetime.now(UTC)
cand.resolved_by = "bulk_auto_resolver"
counts["rejected"] += 1
except MergeError as e:
counts["errors"] += 1
log.warning("merge failed for cand=%s: %s", cand.id, e)
except Exception as e:
counts["errors"] += 1
log.exception("unexpected error for cand=%s: %s", cand.id, e)
return dict(counts)
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--batch", type=int, default=200, help="batch size per commit")
parser.add_argument("--max-batches", type=int, default=10000)
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s")
total = Counter()
t0 = time.time()
for i in range(1, args.max_batches + 1):
result = resolve_batch(batch_size=args.batch, dry_run=args.dry_run)
if not result.get("seen"):
log.info("queue empty after %d batches", i - 1)
break
total.update(result)
log.info(
"batch %d: %s | total: %s | elapsed=%ds",
i, result, dict(total), int(time.time() - t0),
)
log.info("DONE total=%s elapsed=%ds dry_run=%s", dict(total), int(time.time() - t0), args.dry_run)
return 0
if __name__ == "__main__":
sys.exit(main())