goon/app/resolve/scene_match.py
jtrzupek da7fcda132 feat(ingest): SQL phash match, tag inference + backfill, clip-store skip, browse tubes, watchdog
Resolver/perf:
- find_by_phash_within: nearest match via Postgres bit_count over bit(64) XOR
  instead of Python scan of all phash fingerprints (~20x faster per scene;
  unblocks long delta runs that were killed mid-run before since advanced).

Scheduler/reliability:
- reap ingest_runs stuck in 'running' on worker startup (killed_by_restart).
- smoke_test: per-source ingest health, stuck-run and browse-freshness checks
  -> Sentry; exclude killed_by_restart from the failed-run alarm.

Tags (ingest with tags + fill blanks):
- wire infer_tag_slugs into normalize_scene so tube scenes get title-inferred
  tags (was dead code); union with connector tags.
- scripts/backfill_inferred_tags.py: keyset/batched/idempotent backfill for
  existing tagless scenes (playable tag coverage 16% -> ~52%).

Clip-store:
- skip ManyVids/IWantClips/Clips4Sale/... from canonical sources at ingest
  (GOON_SKIP_CLIP_STORE, default on) — permanent orphans, ~56% of canonical
  ingest, never have a free-tube playback source.

Browse tubes:
- enable fullmovies + hdporn.gg: studio parsed from title prefix instead of
  the /networks/ sidebar (which always yielded the first listed network);
  drop phash compute (pilot: 0% canonical hit within Hamming 5 — auto-screenshots),
  matching relies on title/performer/duration.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-01 15:07:35 +02:00

191 lines
6.4 KiB
Python

"""Helpery do znajdowania kandydatów scen w bazie (paths 1-4 resolvera)."""
from __future__ import annotations
import uuid
from datetime import date, timedelta
from sqlalchemy import and_, or_, select, text
from sqlalchemy.orm import Session
from app.config import get_settings
from app.models.scene import Scene, SceneExternalRef, SceneFingerprint
from app.models.source import Source
from app.resolve.scoring import hamming_distance_hex
def find_by_external_ref(
session: Session, *, source_id: uuid.UUID, external_id: str
) -> Scene | None:
"""Path 1: ten sam (source, external_id) widziany już wcześniej."""
ref = session.execute(
select(SceneExternalRef).where(
SceneExternalRef.source_id == source_id,
SceneExternalRef.external_id == external_id,
)
).scalar_one_or_none()
if ref is None:
return None
return session.get(Scene, ref.scene_id)
def find_by_cross_source_refs(
session: Session, *, refs: dict[str, str]
) -> tuple[Scene, str] | None:
"""Path 2: cross-source UUID. `refs` = {source_name: external_id}.
Zwraca (Scene, source_name_via_which_matched). Pierwszy match wygrywa.
"""
if not refs:
return None
sources = (
session.execute(select(Source).where(Source.name.in_(list(refs))))
.scalars()
.all()
)
by_name = {s.name: s for s in sources}
for source_name, external_id in refs.items():
src = by_name.get(source_name)
if src is None:
continue
ref = session.execute(
select(SceneExternalRef).where(
SceneExternalRef.source_id == src.id,
SceneExternalRef.external_id == external_id,
)
).scalar_one_or_none()
if ref is not None:
scene = session.get(Scene, ref.scene_id)
if scene is not None:
return scene, source_name
return None
def find_by_fingerprint_exact(
session: Session, *, kind: str, value: str
) -> Scene | None:
"""Path 3a: oshash / md5 — exact match."""
row = session.execute(
select(SceneFingerprint.scene_id)
.where(SceneFingerprint.kind == kind, SceneFingerprint.value == value)
.limit(1)
).scalar_one_or_none()
if row is None:
return None
return session.get(Scene, row)
def find_by_phash_within(
session: Session,
*,
phash: str,
max_hamming: int | None = None,
) -> tuple[Scene, int] | None:
"""Path 3b: pHash w obrębie max_hamming (Hamming distance bitów hex).
Hamming liczony server-side: `bit_count(a # b)` na 64-bitowych bit-stringach
(`('x'||hex)::bit(64)`), ORDER BY dist LIMIT 1 → najbliższy match. Postgres robi
popcount w C nad całym zbiorem phashy (~10⁵-10⁶) w kilkadziesiąt ms zamiast
Python-loop ~6s/scenę (był bottleneck zabijający długie ingest-runy: każda scena
z phashem skanowała wszystkie 277k fingerprintów po stronie aplikacji).
Wymaga 64-bit (16 hex) phasha — `imagehash.phash(hash_size=8)` zawsze taki jest.
Dla nietypowej długości fallback do Python-loop (rzadkie, np. legacy/uszkodzone).
Zwraca (Scene, distance) dla najbliższego matcha ≤ max_hamming, albo None.
"""
if max_hamming is None:
max_hamming = get_settings().fingerprint_hamming_max
if len(phash) == 16:
row = session.execute(
text(
"SELECT scene_id, "
"bit_count(('x'||value)::bit(64) # ('x'||:phash)::bit(64)) AS dist "
"FROM scene_fingerprints "
"WHERE kind = 'phash' AND length(value) = 16 "
"ORDER BY dist ASC LIMIT 1"
),
{"phash": phash},
).first()
if row is None or row.dist > max_hamming:
return None
scene = session.get(Scene, row.scene_id)
if scene is None:
return None
return scene, int(row.dist)
# Fallback dla phashy o nietypowej długości — Python-loop nad zgodnymi długościami.
rows = session.execute(
select(SceneFingerprint.scene_id, SceneFingerprint.value).where(
SceneFingerprint.kind == "phash"
)
).all()
best: tuple[uuid.UUID, int] | None = None
target_len = len(phash)
for scene_id, value in rows:
if len(value) != target_len:
continue
try:
d = hamming_distance_hex(phash, value)
except ValueError:
continue
if d <= max_hamming and (best is None or d < best[1]):
best = (scene_id, d)
if d == 0:
break
if best is None:
return None
scene = session.get(Scene, best[0])
if scene is None:
return None
return scene, best[1]
def find_blocking_candidates(
session: Session,
*,
studio_id: uuid.UUID | None,
release_date: date | None,
window_days: int | None = None,
title_normalized: str | None = None,
limit: int = 50,
) -> list[Scene]:
"""Path 4 blocking: zawęża space scen do potencjalnych kandydatów.
Reguły:
- jeśli mamy studio + date → studio_id == X AND date BETWEEN ±window_days
- jeśli mamy tylko date → date BETWEEN ±window_days
- jeśli mamy tylko studio → studio_id == X
- dodatkowo, jeśli `title_normalized` podany, OR-uj exact title match
(przydaje się gdy date/studio brakuje)
"""
if window_days is None:
window_days = get_settings().date_window_days
conds = []
if studio_id is not None and release_date is not None:
conds.append(
and_(
Scene.studio_id == studio_id,
Scene.release_date.is_not(None),
Scene.release_date >= release_date - timedelta(days=window_days),
Scene.release_date <= release_date + timedelta(days=window_days),
)
)
elif release_date is not None:
conds.append(
and_(
Scene.release_date >= release_date - timedelta(days=window_days),
Scene.release_date <= release_date + timedelta(days=window_days),
)
)
elif studio_id is not None:
conds.append(Scene.studio_id == studio_id)
if title_normalized:
conds.append(Scene.title_normalized == title_normalized)
if not conds:
return []
stmt = select(Scene).where(or_(*conds)).limit(limit)
return list(session.execute(stmt).scalars().all())