"""Re-resolve freshporno orphans przeciw świeżej canonical bazie. Dla każdej freshporno scene bez TPDB/StashDB ref: - bierze jej phash(e) - find_by_phash_within w hamming ≤ 5 - jeśli match → score scene pair → auto-merge gdy threshold met run_bulk_dedup (strategia 'phash' / O(N²) cross-product) OOM-uje przy 197k phashes (7GB RAM w worker container, OOM-killer zabija). Per-orphan approach robi 11k queries × 197k seq scan = 2.2 mld comparisons ale stream-wise, ~50 MB RAM peak. """ from __future__ import annotations import logging import uuid from sqlalchemy import select from app.db import session_scope from app.models.scene import Scene, SceneFingerprint from app.resolve.scene_match import find_by_phash_within from app.scheduler.bulk_dedup import _process_pair, BulkCounters from app.config import get_settings logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s") log = logging.getLogger(__name__) ORPHANS_QUERY = """ SELECT DISTINCT ps.scene_id FROM playback_sources ps WHERE ps.origin = %s AND ps.dead_at IS NULL AND NOT EXISTS ( SELECT 1 FROM scene_external_refs er JOIN sources s ON s.id = er.source_id WHERE er.scene_id = ps.scene_id AND s.name IN (%s, %s) ) """ def main() -> None: settings = get_settings() auto_t = settings.auto_merge_threshold review_t = settings.review_threshold max_h = settings.fingerprint_hamming_max counters = BulkCounters() # Etap 1: zbieram listę orphan scene IDs with session_scope() as sess: rows = list(sess.connection().exec_driver_sql( ORPHANS_QUERY, ("tube:freshpornoorg", "tpdb", "stashdb"), )) orphan_ids = [r[0] for r in rows] log.info("found %d freshporno orphans to re-resolve", len(orphan_ids)) # Etap 2: per orphan, query phash match candidates_seen: set[tuple[uuid.UUID, uuid.UUID]] = set() for i, orphan_id in enumerate(orphan_ids, 1): if i % 500 == 0: log.info("progress %d/%d, candidates=%d, counters=%s", i, len(orphan_ids), len(candidates_seen), counters) try: with session_scope() as sess: # Bierz phashe orphana phashes = sess.execute( select(SceneFingerprint.value).where( SceneFingerprint.scene_id == orphan_id, SceneFingerprint.kind == "phash", ) ).scalars().all() if not phashes: continue for ph in phashes: match = find_by_phash_within(sess, phash=ph, max_hamming=max_h) if match is None: continue matched_scene, distance = match if matched_scene.id == orphan_id: continue pair = (orphan_id, matched_scene.id) if orphan_id < matched_scene.id else (matched_scene.id, orphan_id) if pair in candidates_seen: break # już sprawdzony candidate candidates_seen.add(pair) # Process pair w osobnej transakcji _process_pair( pair[0], pair[1], auto_t=auto_t, review_t=review_t, dry_run=False, counters=counters, cross_source_only=False, ) break # 1 match wystarcza na orphan except Exception as e: log.exception("orphan %s re-resolve failed: %s", orphan_id, e) log.info("DONE: orphans=%d candidates=%d counters=%s", len(orphan_ids), len(candidates_seen), counters) if __name__ == "__main__": main()