"""Helpery do znajdowania kandydatów scen w bazie (paths 1-4 resolvera).""" from __future__ import annotations import uuid from datetime import date, timedelta from sqlalchemy import and_, or_, select, text from sqlalchemy.orm import Session from app.config import get_settings from app.models.scene import Scene, SceneExternalRef, SceneFingerprint from app.models.source import Source from app.resolve.scoring import hamming_distance_hex def find_by_external_ref( session: Session, *, source_id: uuid.UUID, external_id: str ) -> Scene | None: """Path 1: ten sam (source, external_id) widziany już wcześniej.""" ref = session.execute( select(SceneExternalRef).where( SceneExternalRef.source_id == source_id, SceneExternalRef.external_id == external_id, ) ).scalar_one_or_none() if ref is None: return None return session.get(Scene, ref.scene_id) def find_by_cross_source_refs( session: Session, *, refs: dict[str, str] ) -> tuple[Scene, str] | None: """Path 2: cross-source UUID. `refs` = {source_name: external_id}. Zwraca (Scene, source_name_via_which_matched). Pierwszy match wygrywa. """ if not refs: return None sources = ( session.execute(select(Source).where(Source.name.in_(list(refs)))) .scalars() .all() ) by_name = {s.name: s for s in sources} for source_name, external_id in refs.items(): src = by_name.get(source_name) if src is None: continue ref = session.execute( select(SceneExternalRef).where( SceneExternalRef.source_id == src.id, SceneExternalRef.external_id == external_id, ) ).scalar_one_or_none() if ref is not None: scene = session.get(Scene, ref.scene_id) if scene is not None: return scene, source_name return None def find_by_fingerprint_exact( session: Session, *, kind: str, value: str ) -> Scene | None: """Path 3a: oshash / md5 — exact match.""" row = session.execute( select(SceneFingerprint.scene_id) .where(SceneFingerprint.kind == kind, SceneFingerprint.value == value) .limit(1) ).scalar_one_or_none() if row is None: return None return session.get(Scene, row) def find_by_phash_within( session: Session, *, phash: str, max_hamming: int | None = None, ) -> tuple[Scene, int] | None: """Path 3b: pHash w obrębie max_hamming (Hamming distance bitów hex). Hamming liczony server-side: `bit_count(a # b)` na 64-bitowych bit-stringach (`('x'||hex)::bit(64)`), ORDER BY dist LIMIT 1 → najbliższy match. Postgres robi popcount w C nad całym zbiorem phashy (~10⁵-10⁶) w kilkadziesiąt ms zamiast Python-loop ~6s/scenę (był bottleneck zabijający długie ingest-runy: każda scena z phashem skanowała wszystkie 277k fingerprintów po stronie aplikacji). Wymaga 64-bit (16 hex) phasha — `imagehash.phash(hash_size=8)` zawsze taki jest. Dla nietypowej długości fallback do Python-loop (rzadkie, np. legacy/uszkodzone). Zwraca (Scene, distance) dla najbliższego matcha ≤ max_hamming, albo None. """ if max_hamming is None: max_hamming = get_settings().fingerprint_hamming_max if len(phash) == 16: row = session.execute( text( "SELECT scene_id, " "bit_count(('x'||value)::bit(64) # ('x'||:phash)::bit(64)) AS dist " "FROM scene_fingerprints " "WHERE kind = 'phash' AND length(value) = 16 " "ORDER BY dist ASC LIMIT 1" ), {"phash": phash}, ).first() if row is None or row.dist > max_hamming: return None scene = session.get(Scene, row.scene_id) if scene is None: return None return scene, int(row.dist) # Fallback dla phashy o nietypowej długości — Python-loop nad zgodnymi długościami. rows = session.execute( select(SceneFingerprint.scene_id, SceneFingerprint.value).where( SceneFingerprint.kind == "phash" ) ).all() best: tuple[uuid.UUID, int] | None = None target_len = len(phash) for scene_id, value in rows: if len(value) != target_len: continue try: d = hamming_distance_hex(phash, value) except ValueError: continue if d <= max_hamming and (best is None or d < best[1]): best = (scene_id, d) if d == 0: break if best is None: return None scene = session.get(Scene, best[0]) if scene is None: return None return scene, best[1] def find_blocking_candidates( session: Session, *, studio_id: uuid.UUID | None, release_date: date | None, window_days: int | None = None, title_normalized: str | None = None, limit: int = 50, ) -> list[Scene]: """Path 4 blocking: zawęża space scen do potencjalnych kandydatów. Reguły: - jeśli mamy studio + date → studio_id == X AND date BETWEEN ±window_days - jeśli mamy tylko date → date BETWEEN ±window_days - jeśli mamy tylko studio → studio_id == X - dodatkowo, jeśli `title_normalized` podany, OR-uj exact title match (przydaje się gdy date/studio brakuje) """ if window_days is None: window_days = get_settings().date_window_days conds = [] if studio_id is not None and release_date is not None: conds.append( and_( Scene.studio_id == studio_id, Scene.release_date.is_not(None), Scene.release_date >= release_date - timedelta(days=window_days), Scene.release_date <= release_date + timedelta(days=window_days), ) ) elif release_date is not None: conds.append( and_( Scene.release_date >= release_date - timedelta(days=window_days), Scene.release_date <= release_date + timedelta(days=window_days), ) ) elif studio_id is not None: conds.append(Scene.studio_id == studio_id) if title_normalized: conds.append(Scene.title_normalized == title_normalized) if not conds: return [] stmt = select(Scene).where(or_(*conds)).limit(limit) return list(session.execute(stmt).scalars().all())