"""Helpery do znajdowania kandydatów scen w bazie (paths 1-4 resolvera).""" from __future__ import annotations import uuid from datetime import date, timedelta from sqlalchemy import and_, or_, select from sqlalchemy.orm import Session from app.config import get_settings from app.models.scene import Scene, SceneExternalRef, SceneFingerprint from app.models.source import Source from app.resolve.scoring import hamming_distance_hex def find_by_external_ref( session: Session, *, source_id: uuid.UUID, external_id: str ) -> Scene | None: """Path 1: ten sam (source, external_id) widziany już wcześniej.""" ref = session.execute( select(SceneExternalRef).where( SceneExternalRef.source_id == source_id, SceneExternalRef.external_id == external_id, ) ).scalar_one_or_none() if ref is None: return None return session.get(Scene, ref.scene_id) def find_by_cross_source_refs( session: Session, *, refs: dict[str, str] ) -> tuple[Scene, str] | None: """Path 2: cross-source UUID. `refs` = {source_name: external_id}. Zwraca (Scene, source_name_via_which_matched). Pierwszy match wygrywa. """ if not refs: return None sources = ( session.execute(select(Source).where(Source.name.in_(list(refs)))) .scalars() .all() ) by_name = {s.name: s for s in sources} for source_name, external_id in refs.items(): src = by_name.get(source_name) if src is None: continue ref = session.execute( select(SceneExternalRef).where( SceneExternalRef.source_id == src.id, SceneExternalRef.external_id == external_id, ) ).scalar_one_or_none() if ref is not None: scene = session.get(Scene, ref.scene_id) if scene is not None: return scene, source_name return None def find_by_fingerprint_exact( session: Session, *, kind: str, value: str ) -> Scene | None: """Path 3a: oshash / md5 — exact match.""" row = session.execute( select(SceneFingerprint.scene_id) .where(SceneFingerprint.kind == kind, SceneFingerprint.value == value) .limit(1) ).scalar_one_or_none() if row is None: return None return session.get(Scene, row) def find_by_phash_within( session: Session, *, phash: str, max_hamming: int | None = None, ) -> tuple[Scene, int] | None: """Path 3b: pHash w obrębie max_hamming (Hamming distance bitów hex). Implementacja: seq scan po wszystkich phashach. Akceptowalne dla self-hosted rzędu 10⁵ scen; przy 10⁶+ można dodać locality-sensitive index (BK-tree, MinHash). Zwraca (Scene, distance) dla najbliższego matcha ≤ max_hamming, albo None. """ if max_hamming is None: max_hamming = get_settings().fingerprint_hamming_max rows = session.execute( select(SceneFingerprint.scene_id, SceneFingerprint.value).where( SceneFingerprint.kind == "phash" ) ).all() best: tuple[uuid.UUID, int] | None = None target_len = len(phash) for scene_id, value in rows: if len(value) != target_len: continue try: d = hamming_distance_hex(phash, value) except ValueError: continue if d <= max_hamming and (best is None or d < best[1]): best = (scene_id, d) if d == 0: break if best is None: return None scene = session.get(Scene, best[0]) if scene is None: return None return scene, best[1] def find_blocking_candidates( session: Session, *, studio_id: uuid.UUID | None, release_date: date | None, window_days: int | None = None, title_normalized: str | None = None, limit: int = 50, ) -> list[Scene]: """Path 4 blocking: zawęża space scen do potencjalnych kandydatów. Reguły: - jeśli mamy studio + date → studio_id == X AND date BETWEEN ±window_days - jeśli mamy tylko date → date BETWEEN ±window_days - jeśli mamy tylko studio → studio_id == X - dodatkowo, jeśli `title_normalized` podany, OR-uj exact title match (przydaje się gdy date/studio brakuje) """ if window_days is None: window_days = get_settings().date_window_days conds = [] if studio_id is not None and release_date is not None: conds.append( and_( Scene.studio_id == studio_id, Scene.release_date.is_not(None), Scene.release_date >= release_date - timedelta(days=window_days), Scene.release_date <= release_date + timedelta(days=window_days), ) ) elif release_date is not None: conds.append( and_( Scene.release_date >= release_date - timedelta(days=window_days), Scene.release_date <= release_date + timedelta(days=window_days), ) ) elif studio_id is not None: conds.append(Scene.studio_id == studio_id) if title_normalized: conds.append(Scene.title_normalized == title_normalized) if not conds: return [] stmt = select(Scene).where(or_(*conds)).limit(limit) return list(session.execute(stmt).scalars().all())