Goon — self-hosted aggregator for adult-content scene metadata. Indexes scenes from TPDB, StashDB, and 30+ public adult tube sites. Cross-source deduplication via perceptual hash + Levenshtein distance. FastAPI backend + APScheduler worker + React Native (Expo) mobile client. FOSS, ad-free, donation-funded. See README for details.
152 lines
5.6 KiB
Python
152 lines
5.6 KiB
Python
"""Bulk auto-resolver dla zaległej kolejki merge_candidates(status=pending).
|
|
|
|
Polityka:
|
|
score >= 0.85 → merge (keep = strona z więcej external_refs; tie-break: starsza scena)
|
|
score < 0.85 → reject (zachowaj jako rejected w merge_candidates)
|
|
|
|
Tło: manual review queue urosła do 23k+ wpisów (przy 953 user-resolved kiedykolwiek).
|
|
Auto-merge threshold w pipeline (0.92) był ustawiony konserwatywnie wcześnie; po
|
|
miesiącach iteracji scoringu mamy zaufanie do resolverów. Bulk-czyszczenie obniża
|
|
threshold do 0.85 retroaktywnie dla legacy candidates — wszystko z >0.85 score idzie
|
|
auto, reszta odpada (zerowa szansa że user kiedyś dotknie).
|
|
|
|
Uruchamia się w batchach po N (commit per-batch), żeby długi run nie trzymał
|
|
ogromnej tranzakcji.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import logging
|
|
import sys
|
|
import time
|
|
from collections import Counter
|
|
|
|
from sqlalchemy import func, select
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.db import session_scope
|
|
from app.models.merge_candidate import MergeCandidate, MergeKind, MergeStatus
|
|
from app.models.scene import Scene, SceneExternalRef
|
|
from app.resolve.scene_merge import MergeError, merge_scenes
|
|
|
|
log = logging.getLogger("bulk_resolve")
|
|
|
|
|
|
MERGE_THRESHOLD = 0.85
|
|
|
|
|
|
def pick_keep_id(session: Session, left_id, right_id):
|
|
"""Decyduje która scena zostaje. Heurystyka:
|
|
1. Strona z większą liczbą external_refs (więcej źródeł = bardziej canonical).
|
|
2. Tie → starsza scena (pierwszy ingest = stabilniejsza referencja).
|
|
3. Tie tie → left.
|
|
"""
|
|
counts = dict(
|
|
session.execute(
|
|
select(SceneExternalRef.scene_id, func.count())
|
|
.where(SceneExternalRef.scene_id.in_([left_id, right_id]))
|
|
.group_by(SceneExternalRef.scene_id)
|
|
).all()
|
|
)
|
|
left_n = counts.get(left_id, 0)
|
|
right_n = counts.get(right_id, 0)
|
|
if left_n != right_n:
|
|
return left_id if left_n > right_n else right_id
|
|
|
|
# Tie-break po created_at
|
|
rows = session.execute(
|
|
select(Scene.id, Scene.created_at).where(Scene.id.in_([left_id, right_id]))
|
|
).all()
|
|
by_id = {r.id: r.created_at for r in rows}
|
|
lc = by_id.get(left_id)
|
|
rc = by_id.get(right_id)
|
|
if lc and rc and lc != rc:
|
|
return left_id if lc < rc else right_id
|
|
return left_id
|
|
|
|
|
|
def resolve_batch(*, batch_size: int, dry_run: bool) -> dict[str, int]:
|
|
"""Procesuje jeden batch. Zwraca counts: merged / rejected / errors / done (=batch processed)."""
|
|
counts = Counter()
|
|
with session_scope() as session:
|
|
cands = (
|
|
session.execute(
|
|
select(MergeCandidate)
|
|
.where(
|
|
MergeCandidate.status == MergeStatus.pending,
|
|
MergeCandidate.kind == MergeKind.scene,
|
|
)
|
|
.order_by(MergeCandidate.score.desc())
|
|
.limit(batch_size)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
|
|
if not cands:
|
|
return dict(counts)
|
|
|
|
for cand in cands:
|
|
counts["seen"] += 1
|
|
try:
|
|
if cand.score >= MERGE_THRESHOLD:
|
|
keep_id = pick_keep_id(session, cand.left_id, cand.right_id)
|
|
drop_id = cand.right_id if keep_id == cand.left_id else cand.left_id
|
|
if not dry_run:
|
|
merge_scenes(
|
|
session,
|
|
keep_id=keep_id,
|
|
drop_id=drop_id,
|
|
resolved_by="bulk_auto_resolver",
|
|
)
|
|
# `merge_scenes` deletes drop scene + closes pending candidates
|
|
# referencing drop_id. Mark THIS candidate too (it's about
|
|
# left↔right, but maybe untouched if neither side was drop_id —
|
|
# which can't happen here, drop_id is always one of them).
|
|
# _close_pending_candidates inside merge_scenes already handles it.
|
|
counts["merged"] += 1
|
|
else:
|
|
if not dry_run:
|
|
cand.status = MergeStatus.rejected
|
|
from datetime import UTC, datetime
|
|
cand.resolved_at = datetime.now(UTC)
|
|
cand.resolved_by = "bulk_auto_resolver"
|
|
counts["rejected"] += 1
|
|
except MergeError as e:
|
|
counts["errors"] += 1
|
|
log.warning("merge failed for cand=%s: %s", cand.id, e)
|
|
except Exception as e:
|
|
counts["errors"] += 1
|
|
log.exception("unexpected error for cand=%s: %s", cand.id, e)
|
|
|
|
return dict(counts)
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--batch", type=int, default=200, help="batch size per commit")
|
|
parser.add_argument("--max-batches", type=int, default=10000)
|
|
parser.add_argument("--dry-run", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s")
|
|
|
|
total = Counter()
|
|
t0 = time.time()
|
|
for i in range(1, args.max_batches + 1):
|
|
result = resolve_batch(batch_size=args.batch, dry_run=args.dry_run)
|
|
if not result.get("seen"):
|
|
log.info("queue empty after %d batches", i - 1)
|
|
break
|
|
total.update(result)
|
|
log.info(
|
|
"batch %d: %s | total: %s | elapsed=%ds",
|
|
i, result, dict(total), int(time.time() - t0),
|
|
)
|
|
|
|
log.info("DONE total=%s elapsed=%ds dry_run=%s", dict(total), int(time.time() - t0), args.dry_run)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|