goon/app/resolve/scene_match.py
goon-foss ad0284585b Initial commit
Goon — self-hosted aggregator for adult-content scene metadata.

Indexes scenes from TPDB, StashDB, and 30+ public adult tube sites.
Cross-source deduplication via perceptual hash + Levenshtein distance.
FastAPI backend + APScheduler worker + React Native (Expo) mobile client.

FOSS, ad-free, donation-funded. See README for details.
2026-05-20 10:10:22 +02:00

167 lines
5.3 KiB
Python

"""Helpery do znajdowania kandydatów scen w bazie (paths 1-4 resolvera)."""
from __future__ import annotations
import uuid
from datetime import date, timedelta
from sqlalchemy import and_, or_, select
from sqlalchemy.orm import Session
from app.config import get_settings
from app.models.scene import Scene, SceneExternalRef, SceneFingerprint
from app.models.source import Source
from app.resolve.scoring import hamming_distance_hex
def find_by_external_ref(
session: Session, *, source_id: uuid.UUID, external_id: str
) -> Scene | None:
"""Path 1: ten sam (source, external_id) widziany już wcześniej."""
ref = session.execute(
select(SceneExternalRef).where(
SceneExternalRef.source_id == source_id,
SceneExternalRef.external_id == external_id,
)
).scalar_one_or_none()
if ref is None:
return None
return session.get(Scene, ref.scene_id)
def find_by_cross_source_refs(
session: Session, *, refs: dict[str, str]
) -> tuple[Scene, str] | None:
"""Path 2: cross-source UUID. `refs` = {source_name: external_id}.
Zwraca (Scene, source_name_via_which_matched). Pierwszy match wygrywa.
"""
if not refs:
return None
sources = (
session.execute(select(Source).where(Source.name.in_(list(refs))))
.scalars()
.all()
)
by_name = {s.name: s for s in sources}
for source_name, external_id in refs.items():
src = by_name.get(source_name)
if src is None:
continue
ref = session.execute(
select(SceneExternalRef).where(
SceneExternalRef.source_id == src.id,
SceneExternalRef.external_id == external_id,
)
).scalar_one_or_none()
if ref is not None:
scene = session.get(Scene, ref.scene_id)
if scene is not None:
return scene, source_name
return None
def find_by_fingerprint_exact(
session: Session, *, kind: str, value: str
) -> Scene | None:
"""Path 3a: oshash / md5 — exact match."""
row = session.execute(
select(SceneFingerprint.scene_id)
.where(SceneFingerprint.kind == kind, SceneFingerprint.value == value)
.limit(1)
).scalar_one_or_none()
if row is None:
return None
return session.get(Scene, row)
def find_by_phash_within(
session: Session,
*,
phash: str,
max_hamming: int | None = None,
) -> tuple[Scene, int] | None:
"""Path 3b: pHash w obrębie max_hamming (Hamming distance bitów hex).
Implementacja: seq scan po wszystkich phashach. Akceptowalne dla self-hosted
rzędu 10⁵ scen; przy 10⁶+ można dodać locality-sensitive index (BK-tree, MinHash).
Zwraca (Scene, distance) dla najbliższego matcha ≤ max_hamming, albo None.
"""
if max_hamming is None:
max_hamming = get_settings().fingerprint_hamming_max
rows = session.execute(
select(SceneFingerprint.scene_id, SceneFingerprint.value).where(
SceneFingerprint.kind == "phash"
)
).all()
best: tuple[uuid.UUID, int] | None = None
target_len = len(phash)
for scene_id, value in rows:
if len(value) != target_len:
continue
try:
d = hamming_distance_hex(phash, value)
except ValueError:
continue
if d <= max_hamming and (best is None or d < best[1]):
best = (scene_id, d)
if d == 0:
break
if best is None:
return None
scene = session.get(Scene, best[0])
if scene is None:
return None
return scene, best[1]
def find_blocking_candidates(
session: Session,
*,
studio_id: uuid.UUID | None,
release_date: date | None,
window_days: int | None = None,
title_normalized: str | None = None,
limit: int = 50,
) -> list[Scene]:
"""Path 4 blocking: zawęża space scen do potencjalnych kandydatów.
Reguły:
- jeśli mamy studio + date → studio_id == X AND date BETWEEN ±window_days
- jeśli mamy tylko date → date BETWEEN ±window_days
- jeśli mamy tylko studio → studio_id == X
- dodatkowo, jeśli `title_normalized` podany, OR-uj exact title match
(przydaje się gdy date/studio brakuje)
"""
if window_days is None:
window_days = get_settings().date_window_days
conds = []
if studio_id is not None and release_date is not None:
conds.append(
and_(
Scene.studio_id == studio_id,
Scene.release_date.is_not(None),
Scene.release_date >= release_date - timedelta(days=window_days),
Scene.release_date <= release_date + timedelta(days=window_days),
)
)
elif release_date is not None:
conds.append(
and_(
Scene.release_date >= release_date - timedelta(days=window_days),
Scene.release_date <= release_date + timedelta(days=window_days),
)
)
elif studio_id is not None:
conds.append(Scene.studio_id == studio_id)
if title_normalized:
conds.append(Scene.title_normalized == title_normalized)
if not conds:
return []
stmt = select(Scene).where(or_(*conds)).limit(limit)
return list(session.execute(stmt).scalars().all())