"""Auto-merge freshporno orphan scenes do TPDB/StashDB canonical. Wcześniejszy bulk_dedup `all` / `performers` OOM-ował na O(N²) collection wszystkich par. Tutaj inny pattern: O(N) — dla każdej freshporno orphan-with-date, query candidate canonical scen przez indexes (performer overlap + release_date window), score, decyzja. Wykonanie po backfillu release_date dla 10390 freshporno scen — teraz mamy sygnał daty który wcześniej był null i blokował composite score ≥0.92. Decyzje: - score ≥ auto_t (0.92): przenieś playback_source z tube → canonical, skopiuj brakujące tagi, usuń tube scenę. - review_t ≤ score < auto_t: insert merge_candidate (pending). - score < 0.75: skip. Idempotent: orphan scene bez kandydatów lub już zmerged → no-op. """ from __future__ import annotations import logging from datetime import timedelta from sqlalchemy import and_, exists, select from sqlalchemy.dialects.postgresql import insert as pg_insert from app.config import get_settings from app.db import session_scope from app.models.merge_candidate import MergeCandidate, MergeKind, MergeStatus from app.models.playback_source import PlaybackSource from app.models.scene import Scene, ScenePerformer, SceneExternalRef, SceneTag from app.models.source import Source from app.scheduler.bulk_dedup import score_scene_pair log = logging.getLogger(__name__) DATE_WINDOW_DAYS = 7 def main() -> int: logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") settings = get_settings() auto_t = settings.auto_merge_threshold review_t = settings.review_threshold log.info("auto_t=%.2f review_t=%.2f", auto_t, review_t) with session_scope() as session: canon_src_ids = list(session.execute( select(Source.id).where(Source.name.in_(["tpdb", "stashdb"])) ).scalars().all()) # Freshporno orphans z release_date (nasi kandydaci na merge w canonical). orphan_ids = list(session.execute( select(Scene.id) .join(PlaybackSource, PlaybackSource.scene_id == Scene.id) .where(PlaybackSource.origin == "tube:freshpornoorg") .where(Scene.release_date.is_not(None)) .where(~exists().where(and_( SceneExternalRef.scene_id == Scene.id, SceneExternalRef.source_id.in_(canon_src_ids), ))) .distinct() ).scalars().all()) log.info("freshporno orphan candidates (with date): %d", len(orphan_ids)) merged = 0 pending_added = 0 no_candidates = 0 no_match = 0 errors = 0 for scene_id in orphan_ids: try: with session_scope() as session: tube = session.get(Scene, scene_id) if tube is None: continue if tube.release_date is None: continue # Performery tube scene perfs = list(session.execute( select(ScenePerformer.performer_id).where( ScenePerformer.scene_id == tube.id ) ).scalars().all()) if not perfs: no_candidates += 1 continue # Query canonical candidates: scenes które mają ≥1 wspólnego performera # AND release_date w oknie ±N dni AND mają canonical external_ref (TPDB/StashDB). date_low = tube.release_date - timedelta(days=DATE_WINDOW_DAYS) date_high = tube.release_date + timedelta(days=DATE_WINDOW_DAYS) cand_ids = list(session.execute( select(Scene.id).distinct() .join(ScenePerformer, ScenePerformer.scene_id == Scene.id) .where(ScenePerformer.performer_id.in_(perfs)) .where(Scene.release_date.is_not(None)) .where(Scene.release_date.between(date_low, date_high)) .where(Scene.id != tube.id) .where(exists().where(and_( SceneExternalRef.scene_id == Scene.id, SceneExternalRef.source_id.in_(canon_src_ids), ))) ).scalars().all()) if not cand_ids: no_candidates += 1 continue # Score wszystkich kandydatów, weź best best_cand = None best_score = 0.0 best_breakdown = None for cand_id in cand_ids: cand = session.get(Scene, cand_id) if cand is None: continue b = score_scene_pair(session, tube, cand) if b.composite > best_score: best_score = b.composite best_cand = cand best_breakdown = b if best_cand is None or best_score < review_t: no_match += 1 continue if best_score >= auto_t: # Auto-merge: przenieś playback do canonical, skopiuj tagi, usuń tube scenę. session.execute( PlaybackSource.__table__.update() .where(PlaybackSource.scene_id == tube.id) .values(scene_id=best_cand.id) ) # Merge tagi (unique constraint na pair scene_id+tag_id — ignore conflict) tube_tag_ids = list(session.execute( select(SceneTag.tag_id).where(SceneTag.scene_id == tube.id) ).scalars().all()) for tag_id in tube_tag_ids: session.execute( pg_insert(SceneTag.__table__) .values(scene_id=best_cand.id, tag_id=tag_id) .on_conflict_do_nothing() ) # Move external_refs (freshporno) session.execute( SceneExternalRef.__table__.update() .where(SceneExternalRef.scene_id == tube.id) .values(scene_id=best_cand.id) ) # Drop remaining attached rows + scene session.execute( SceneTag.__table__.delete().where(SceneTag.scene_id == tube.id) ) session.execute( ScenePerformer.__table__.delete().where(ScenePerformer.scene_id == tube.id) ) session.delete(tube) merged += 1 # Log audit session.add(MergeCandidate( kind=MergeKind.scene, left_id=best_cand.id, right_id=best_cand.id, # self-ref dla audit (drop scene już nie istnieje) score=best_score, reasons={"path": "freshporno_backfill_auto", **(best_breakdown.reasons if best_breakdown else {})}, status=MergeStatus.auto_merged, )) else: # Pending review (0.75-0.92) a_id, b_id = (tube.id, best_cand.id) if tube.id < best_cand.id else (best_cand.id, tube.id) existing = session.execute( select(MergeCandidate).where( MergeCandidate.kind == MergeKind.scene, MergeCandidate.left_id == a_id, MergeCandidate.right_id == b_id, ).limit(1) ).scalar_one_or_none() if existing is None: session.add(MergeCandidate( kind=MergeKind.scene, left_id=a_id, right_id=b_id, score=best_score, reasons={"path": "freshporno_backfill_review", **(best_breakdown.reasons if best_breakdown else {})}, status=MergeStatus.pending, )) pending_added += 1 except Exception as e: errors += 1 if errors <= 5: log.warning("scene=%s failed: %s", scene_id, e) total_done = merged + pending_added + no_candidates + no_match + errors if total_done % 200 == 0: log.info( "progress %d/%d: merged=%d pending=%d no_cand=%d no_match=%d errors=%d", total_done, len(orphan_ids), merged, pending_added, no_candidates, no_match, errors, ) log.info( "DONE: orphans=%d merged=%d pending_added=%d no_candidates=%d no_match=%d errors=%d", len(orphan_ids), merged, pending_added, no_candidates, no_match, errors, ) return 0 if __name__ == "__main__": raise SystemExit(main())