"""Bulk auto-resolver dla zaległej kolejki merge_candidates(status=pending). Polityka: score >= 0.85 → merge (keep = strona z więcej external_refs; tie-break: starsza scena) score < 0.85 → reject (zachowaj jako rejected w merge_candidates) Tło: manual review queue urosła do 23k+ wpisów (przy 953 user-resolved kiedykolwiek). Auto-merge threshold w pipeline (0.92) był ustawiony konserwatywnie wcześnie; po miesiącach iteracji scoringu mamy zaufanie do resolverów. Bulk-czyszczenie obniża threshold do 0.85 retroaktywnie dla legacy candidates — wszystko z >0.85 score idzie auto, reszta odpada (zerowa szansa że user kiedyś dotknie). Uruchamia się w batchach po N (commit per-batch), żeby długi run nie trzymał ogromnej tranzakcji. """ from __future__ import annotations import argparse import logging import sys import time from collections import Counter from sqlalchemy import func, select from sqlalchemy.orm import Session from app.db import session_scope from app.models.merge_candidate import MergeCandidate, MergeKind, MergeStatus from app.models.scene import Scene, SceneExternalRef from app.resolve.scene_merge import MergeError, merge_scenes log = logging.getLogger("bulk_resolve") MERGE_THRESHOLD = 0.85 def pick_keep_id(session: Session, left_id, right_id): """Decyduje która scena zostaje. Heurystyka: 1. Strona z większą liczbą external_refs (więcej źródeł = bardziej canonical). 2. Tie → starsza scena (pierwszy ingest = stabilniejsza referencja). 3. Tie tie → left. """ counts = dict( session.execute( select(SceneExternalRef.scene_id, func.count()) .where(SceneExternalRef.scene_id.in_([left_id, right_id])) .group_by(SceneExternalRef.scene_id) ).all() ) left_n = counts.get(left_id, 0) right_n = counts.get(right_id, 0) if left_n != right_n: return left_id if left_n > right_n else right_id # Tie-break po created_at rows = session.execute( select(Scene.id, Scene.created_at).where(Scene.id.in_([left_id, right_id])) ).all() by_id = {r.id: r.created_at for r in rows} lc = by_id.get(left_id) rc = by_id.get(right_id) if lc and rc and lc != rc: return left_id if lc < rc else right_id return left_id def resolve_batch(*, batch_size: int, dry_run: bool) -> dict[str, int]: """Procesuje jeden batch. Zwraca counts: merged / rejected / errors / done (=batch processed).""" counts = Counter() with session_scope() as session: cands = ( session.execute( select(MergeCandidate) .where( MergeCandidate.status == MergeStatus.pending, MergeCandidate.kind == MergeKind.scene, ) .order_by(MergeCandidate.score.desc()) .limit(batch_size) ) .scalars() .all() ) if not cands: return dict(counts) for cand in cands: counts["seen"] += 1 try: if cand.score >= MERGE_THRESHOLD: keep_id = pick_keep_id(session, cand.left_id, cand.right_id) drop_id = cand.right_id if keep_id == cand.left_id else cand.left_id if not dry_run: merge_scenes( session, keep_id=keep_id, drop_id=drop_id, resolved_by="bulk_auto_resolver", ) # `merge_scenes` deletes drop scene + closes pending candidates # referencing drop_id. Mark THIS candidate too (it's about # left↔right, but maybe untouched if neither side was drop_id — # which can't happen here, drop_id is always one of them). # _close_pending_candidates inside merge_scenes already handles it. counts["merged"] += 1 else: if not dry_run: cand.status = MergeStatus.rejected from datetime import UTC, datetime cand.resolved_at = datetime.now(UTC) cand.resolved_by = "bulk_auto_resolver" counts["rejected"] += 1 except MergeError as e: counts["errors"] += 1 log.warning("merge failed for cand=%s: %s", cand.id, e) except Exception as e: counts["errors"] += 1 log.exception("unexpected error for cand=%s: %s", cand.id, e) return dict(counts) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--batch", type=int, default=200, help="batch size per commit") parser.add_argument("--max-batches", type=int, default=10000) parser.add_argument("--dry-run", action="store_true") args = parser.parse_args() logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s") total = Counter() t0 = time.time() for i in range(1, args.max_batches + 1): result = resolve_batch(batch_size=args.batch, dry_run=args.dry_run) if not result.get("seen"): log.info("queue empty after %d batches", i - 1) break total.update(result) log.info( "batch %d: %s | total: %s | elapsed=%ds", i, result, dict(total), int(time.time() - t0), ) log.info("DONE total=%s elapsed=%ds dry_run=%s", dict(total), int(time.time() - t0), args.dry_run) return 0 if __name__ == "__main__": sys.exit(main())