goon/app/resolve/scene_resolver.py
jtrzupek bb9e1afc31 fix(resolver): refresh thumbnails on re-scrape instead of fill-only-if-null
_upsert_playback_sources only set thumbnail_url when the existing value was NULL,
so signed CDN thumbnails that ROT (sxyprn/trafficdeposit tokens expire ~weekly →
404) were never replaced even when a fresh re-scrape captured a valid URL — making
the rot permanent (bug 2026-06-10). Always overwrite thumbnail_url/animated_thumbnail_url
with the freshly-scraped value when present; other fields keep fill-if-null. Lets
the regular performer-driven ingest self-heal thumbnails for re-crawled scenes.

(Note: old sxyprn backlog can't be bulk-refreshed — search/listings don't re-surface
those posts, verified 0 overlap — so it's forward-looking; old sxyprn-only scenes
fall back to the clean placeholder.)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-10 10:28:18 +02:00

705 lines
30 KiB
Python

"""Resolver sceny — pełny pipeline dedup (M3).
Ścieżki, w kolejności first-match-wins:
1. **same-source external_ref** — to samo (source, external_id) widziane już wcześniej
→ update kanonicznej.
2. **cross-source UUID** — scena z source A deklaruje że jest tym samym UUID
w source B (np. StashDB → tpdb_id w scene.urls). Auto-merge: dołącz nowy
external_ref do istniejącej kanonicznej + log MergeCandidate(auto_merged).
3. **fingerprint match** — oshash exact lub phash Hamming ≤ N.
Auto-merge.
4. **composite fuzzy** — blocking (studio + date) → score → triage:
composite ≥ auto_merge_threshold → auto-merge
≥ review_threshold → utwórz nową kanoniczną + zapisz
MergeCandidate(pending)
< review_threshold → utwórz nową, bez kandydata
Funkcja zwraca SceneResolveResult opisujący jak doszło do dopasowania.
"""
from __future__ import annotations
import logging
import uuid
from dataclasses import dataclass
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.models.merge_candidate import MergeCandidate, MergeKind, MergeStatus
from app.models.performer import Performer
from app.models.playback_source import PlaybackSource
from app.models.scene import (
Scene,
SceneExternalRef,
SceneFingerprint,
ScenePerformer,
SceneTag,
)
from app.models.source import Source, SourceKind
from app.normalize.scenes import NormalizedScene
from app.normalize.text import slugify
from app.resolve.performer_resolver import resolve_performer
from app.resolve.scene_match import (
find_blocking_candidates,
find_by_cross_source_refs,
find_by_external_ref,
find_by_fingerprint_exact,
find_by_phash_within,
)
from app.resolve.scene_score import score_candidate
from app.resolve.scoring import ScoreBreakdown, triage
from app.resolve.studio_resolver import resolve_studio
from app.resolve.tag_resolver import resolve_tag
log = logging.getLogger(__name__)
@dataclass
class SceneResolveResult:
scene: Scene
was_created: bool
path: str # 'same_source' | 'cross_source' | 'fp_oshash' | 'fp_phash' | 'composite_auto' | 'composite_review' | 'new'
score: float | None = None
candidate_id: uuid.UUID | None = None # competitor in pending review (path=composite_review)
def resolve_scene(
session: Session,
*,
norm: NormalizedScene,
source_id: uuid.UUID,
) -> SceneResolveResult:
studio = resolve_studio(session, norm=norm.studio, source_id=source_id) if norm.studio else None
studio_id = studio.id if studio else None
# Tube/agregator (np. pornapp) → studio nie jest informatywny, performers ważniejsze.
src = session.get(Source, source_id)
source_kind = src.kind if src else SourceKind.manual
aggregator_mode = src is not None and src.kind == SourceKind.scraper
# Path 1: same-source external_ref
existing = find_by_external_ref(session, source_id=source_id, external_id=norm.external_id)
if existing is not None:
_update_scene_fields(existing, norm, studio_id=studio_id, source_kind=source_kind, session=session)
_sync_attached_entities(session, scene=existing, norm=norm, source_id=source_id)
return SceneResolveResult(scene=existing, was_created=False, path="same_source")
# Path 2: cross-source UUID
cross = find_by_cross_source_refs(session, refs=norm.cross_source_refs)
if cross is not None:
scene_match, via_source_name = cross
_update_scene_fields(scene_match, norm, studio_id=studio_id, source_kind=source_kind, session=session)
_attach_external_ref(session, scene_id=scene_match.id, source_id=source_id, norm=norm)
_sync_attached_entities(session, scene=scene_match, norm=norm, source_id=source_id)
_log_auto_merge(
session,
scene_id=scene_match.id,
score=1.0,
reasons={"path": "cross_source", "via_source": via_source_name},
)
return SceneResolveResult(scene=scene_match, was_created=False, path="cross_source", score=1.0)
# Path 3: fingerprints
for kind, value in norm.fingerprints:
if kind == "phash":
continue # phash leci osobno żeby użyć Hamming
match = find_by_fingerprint_exact(session, kind=kind, value=value)
if match is not None:
_update_scene_fields(match, norm, studio_id=studio_id, source_kind=source_kind, session=session)
_attach_external_ref(session, scene_id=match.id, source_id=source_id, norm=norm)
_sync_attached_entities(session, scene=match, norm=norm, source_id=source_id)
_log_auto_merge(
session,
scene_id=match.id,
score=1.0,
reasons={"path": "fp_exact", "kind": kind, "value": value},
)
return SceneResolveResult(scene=match, was_created=False, path=f"fp_{kind}", score=1.0)
for kind, value in norm.fingerprints:
if kind != "phash":
continue
result = find_by_phash_within(session, phash=value)
if result is not None:
scene_match, distance = result
raw_phash_score = 1.0 - distance / 64.0
# Duration sanity check: phash może collide gdy compilation zawiera chapter sceny
# (oba mają ten sam frame sample), ale duration będzie wyraźnie inny.
# Wymagamy proximity ≥0.5 (±30s) dla auto-merge; inaczej → review queue.
from app.resolve.scoring import duration_proximity, series_mismatch_strength
dur_prox = duration_proximity(scene_match.duration_sec, norm.duration_sec)
# Series-position guard (Episode 2 vs Episode 4): phash zwykle pixel-identical
# bo studio reusuje cover art między episodami, ale to OSOBNE sceny. Hard split,
# bez merge_candidate (nie ma czego mergować — żaden human reviewer też nie
# powie "Episode 2 to to samo co Episode 4").
sp_strength = series_mismatch_strength(
scene_match.title_normalized, norm.title_normalized
)
if sp_strength >= 1.0:
new_scene = _create_canonical(session, norm=norm, studio_id=studio_id)
_attach_external_ref(session, scene_id=new_scene.id, source_id=source_id, norm=norm)
_sync_attached_entities(session, scene=new_scene, norm=norm, source_id=source_id)
return SceneResolveResult(
scene=new_scene,
was_created=True,
path="fp_phash_series_split",
score=0.0,
)
if dur_prox is not None and dur_prox < 0.5:
# phash match ale duration rozjeżdża się → tworzymy nową scenę + review.
# Score reflectuje że to NIE jest auto-merge: dur_prox * phash_score,
# plus dalej cap przez series modifier mismatch (BTS/bonus/unedited).
penalised_score = raw_phash_score * max(dur_prox, 0.1)
if 0.0 < sp_strength < 1.0:
penalised_score = min(penalised_score, 1.0 - sp_strength)
new_scene = _create_canonical(session, norm=norm, studio_id=studio_id)
_attach_external_ref(session, scene_id=new_scene.id, source_id=source_id, norm=norm)
_sync_attached_entities(session, scene=new_scene, norm=norm, source_id=source_id)
session.add(
MergeCandidate(
kind=MergeKind.scene,
left_id=scene_match.id,
right_id=new_scene.id,
score=penalised_score,
reasons={
"path": "fp_phash",
"hamming": distance,
"phash_score": raw_phash_score,
"duration_mismatch": True,
"dur_prox": dur_prox,
"series_mismatch_strength": sp_strength,
"left_dur": scene_match.duration_sec,
"right_dur": norm.duration_sec,
},
status=MergeStatus.pending,
)
)
return SceneResolveResult(
scene=new_scene,
was_created=True,
path="fp_phash_review",
score=penalised_score,
candidate_id=scene_match.id,
)
# Modifier tag mismatch (BTS/bonus/unedited po jednej stronie) — nie hard-split,
# ale auto-merge zablokowane: tworzymy nową scenę + pending review.
if 0.0 < sp_strength < 1.0:
penalised_score = min(raw_phash_score, 1.0 - sp_strength)
new_scene = _create_canonical(session, norm=norm, studio_id=studio_id)
_attach_external_ref(session, scene_id=new_scene.id, source_id=source_id, norm=norm)
_sync_attached_entities(session, scene=new_scene, norm=norm, source_id=source_id)
session.add(
MergeCandidate(
kind=MergeKind.scene,
left_id=scene_match.id,
right_id=new_scene.id,
score=penalised_score,
reasons={
"path": "fp_phash",
"hamming": distance,
"phash_score": raw_phash_score,
"series_modifier_mismatch": True,
"series_mismatch_strength": sp_strength,
},
status=MergeStatus.pending,
)
)
return SceneResolveResult(
scene=new_scene,
was_created=True,
path="fp_phash_modifier_review",
score=penalised_score,
candidate_id=scene_match.id,
)
score = raw_phash_score
_update_scene_fields(scene_match, norm, studio_id=studio_id, source_kind=source_kind, session=session)
_attach_external_ref(session, scene_id=scene_match.id, source_id=source_id, norm=norm)
_sync_attached_entities(session, scene=scene_match, norm=norm, source_id=source_id)
_log_auto_merge(
session,
scene_id=scene_match.id,
score=score,
reasons={"path": "fp_phash", "hamming": distance, "dur_prox": dur_prox},
)
return SceneResolveResult(
scene=scene_match, was_created=False, path="fp_phash", score=score
)
# Dedup norm.performers po external_id (lub slug jako fallback) — pornhub i
# niektóre tube connectorzy zwracają tę samą performerkę 2x (raz pod aliasem
# scenowym typu "Lilkarina2", raz canonical "Lil Karina") → bulk INSERT
# performer_external_refs hitował PK violation (Sentry GOON-C). Preferujemy
# wariant z aliasem (informacyjnie bogatszy).
_seen_perf: set[str | tuple[str, str]] = set()
_deduped_perf = []
for p_norm in norm.performers:
key: str | tuple[str, str] = p_norm.external_id or ("slug", p_norm.slug or "")
if key in _seen_perf:
# Już mamy — jeśli poprzedni był bez aliasu a ten ma, podmień.
if p_norm.as_alias_in_scene:
for idx, existing in enumerate(_deduped_perf):
existing_key = existing.external_id or ("slug", existing.slug or "")
if existing_key == key and not existing.as_alias_in_scene:
_deduped_perf[idx] = p_norm
break
continue
_seen_perf.add(key)
_deduped_perf.append(p_norm)
norm.performers = _deduped_perf
# Pre-resolve performerów do canonical UUID, żeby Path 4 mógł liczyć Jaccard.
resolved_performers: list[tuple[uuid.UUID, str | None]] = []
for p_norm in norm.performers:
performer = resolve_performer(session, norm=p_norm, source_id=source_id)
resolved_performers.append((performer.id, p_norm.as_alias_in_scene))
performer_ids = [pid for pid, _ in resolved_performers]
# Path 4: composite fuzzy.
# W aggregator_mode studio z tube'a (np. "HQPorner") nie jest fizycznym studiem produkcyjnym,
# więc filtrowanie kandydatów po nim wyklucza wszystkie canonical sceny z TPDB/StashDB
# (które mają prawdziwe studia jak "Brazzers"). Wyłączamy studio blocking.
blocking_studio = None if aggregator_mode else studio_id
candidates = find_blocking_candidates(
session,
studio_id=blocking_studio,
release_date=norm.release_date,
title_normalized=norm.title_normalized,
)
# Plus: dla aggregator_mode dorzucamy jako kandydatów wszystkie canonical sceny
# które mają wspólny choć jeden performer z naszą sceną (mocny sygnał — performerzy
# to też nasz "blocking key" gdy studio i date są nieinformatywne).
if aggregator_mode and performer_ids:
# **2026-05-20 fix**: poprzednio LIMIT 50 BEZ ORDER BY → dla popular performera
# (Eveline Dellai z 200+ scen w bazie) prawdziwy match mógł być out of top-50,
# postgres zwracał arbitrary order → resolver nie widział kandydata → duplicate.
# Bug-report: brak Brazzers Exxtra po 15-05. Now: 500 limit + title-match priority
# ORDER, plus exact title match jako gwarantowany kandydat (CASE expression).
from sqlalchemy import case
title_match_expr = case(
(Scene.title_normalized == norm.title_normalized, 1),
else_=0,
)
more = (
session.execute(
select(Scene)
.join(ScenePerformer, ScenePerformer.scene_id == Scene.id)
.where(ScenePerformer.performer_id.in_(performer_ids))
.group_by(Scene.id)
.order_by(title_match_expr.desc(), Scene.release_date.desc().nullslast())
.limit(500)
)
.scalars()
.all()
)
seen = {c.id for c in candidates}
for c in more:
if c.id not in seen:
candidates.append(c)
best_scene: Scene | None = None
best_breakdown: ScoreBreakdown | None = None
for cand in candidates:
breakdown = score_candidate(
session,
candidate=cand,
norm=norm,
resolved_performer_ids=performer_ids,
studio_id=studio_id,
aggregator_mode=aggregator_mode,
)
if best_breakdown is None or breakdown.composite > best_breakdown.composite:
best_scene = cand
best_breakdown = breakdown
if best_scene is not None and best_breakdown is not None:
decision = triage(best_breakdown.composite)
if decision == "auto":
_update_scene_fields(best_scene, norm, studio_id=studio_id, source_kind=source_kind, session=session)
_attach_external_ref(session, scene_id=best_scene.id, source_id=source_id, norm=norm)
_sync_performers(session, scene_id=best_scene.id, resolved=resolved_performers)
_sync_tags(session, scene_id=best_scene.id, norm=norm, source_id=source_id)
_sync_fingerprints(
session, scene_id=best_scene.id, norm=norm, source_id=source_id
)
_sync_playback_sources(session, scene_id=best_scene.id, norm=norm)
_log_auto_merge(
session,
scene_id=best_scene.id,
score=best_breakdown.composite,
reasons={"path": "composite", **best_breakdown.to_dict()},
)
return SceneResolveResult(
scene=best_scene,
was_created=False,
path="composite_auto",
score=best_breakdown.composite,
)
if decision == "review":
new_scene = _create_canonical(session, norm=norm, studio_id=studio_id)
_attach_external_ref(session, scene_id=new_scene.id, source_id=source_id, norm=norm)
_sync_performers(session, scene_id=new_scene.id, resolved=resolved_performers)
_sync_tags(session, scene_id=new_scene.id, norm=norm, source_id=source_id)
_sync_fingerprints(
session, scene_id=new_scene.id, norm=norm, source_id=source_id
)
_sync_playback_sources(session, scene_id=new_scene.id, norm=norm)
session.add(
MergeCandidate(
kind=MergeKind.scene,
left_id=best_scene.id,
right_id=new_scene.id,
score=best_breakdown.composite,
reasons={"path": "composite", **best_breakdown.to_dict()},
status=MergeStatus.pending,
)
)
return SceneResolveResult(
scene=new_scene,
was_created=True,
path="composite_review",
score=best_breakdown.composite,
candidate_id=best_scene.id,
)
# Brak żadnego sensownego dopasowania → nowa kanoniczna
new_scene = _create_canonical(session, norm=norm, studio_id=studio_id)
_attach_external_ref(session, scene_id=new_scene.id, source_id=source_id, norm=norm)
_sync_performers(session, scene_id=new_scene.id, resolved=resolved_performers)
_sync_tags(session, scene_id=new_scene.id, norm=norm, source_id=source_id)
_sync_fingerprints(session, scene_id=new_scene.id, norm=norm, source_id=source_id)
_sync_playback_sources(session, scene_id=new_scene.id, norm=norm)
return SceneResolveResult(scene=new_scene, was_created=True, path="new")
# ---- helpery --------------------------------------------------------------
def _effective_duration(norm: NormalizedScene) -> int | None:
"""Duration sceny: scene-level z connectora, a gdy brak — max z playback_sources.
Tube'y często podają duration TYLKO na playbacku (norm.duration_sec=None, ale
norm.playback_sources[].duration_sec ustawione). Bez tego fallbacku Scene.duration_sec
zostaje NULL → mobilny filtr `min_duration_sec=60` (Scene.duration_sec >= 60) wycina
scenę mimo że jest grywalna i znamy jej długość (74% katalogu było ukryte, fix 2026-06-01).
"""
if norm.duration_sec:
return norm.duration_sec
ps_durs = [ps.duration_sec for ps in norm.playback_sources if ps.duration_sec]
return max(ps_durs) if ps_durs else None
def _create_canonical(
session: Session, *, norm: NormalizedScene, studio_id: uuid.UUID | None
) -> Scene:
scene = Scene(
title=norm.title,
title_normalized=norm.title_normalized,
slug=norm.slug or slugify(norm.title),
release_date=norm.release_date,
studio_id=studio_id,
duration_sec=_effective_duration(norm),
description=norm.description,
code=norm.code,
director=norm.director,
)
session.add(scene)
session.flush()
return scene
def _update_scene_fields(
scene: Scene,
norm: NormalizedScene,
*,
studio_id: uuid.UUID | None,
source_kind: SourceKind,
session: Session | None = None,
) -> None:
"""Aktualizuje pola kanoniczne sceny z incoming normalized scene.
**Source rank**:
- TPDB/StashDB (canonical): mogą nadpisywać. Gdy istniejąca scena ma TYLKO
scraper external_refs (tube-rooted), canonical zawsze nadpisuje (czyści
SEO tytuły). Gdy scena już ma canonical ref, longer-wins dla title.
- Scraper/tube (pornapp): TYLKO fill-in braków, nigdy nie nadpisuje
już ustawionych pól. Tube'y mają długie SEO tytuły, które bez tego
ranga by zaśmiecały tytuły z TPDB/StashDB.
"""
is_canonical = source_kind in (SourceKind.tpdb, SourceKind.stashdb)
# Title: canonical → overwrite (czyści tube-pollution); scraper tylko gdy pusto.
if not scene.title:
scene.title = norm.title
scene.title_normalized = norm.title_normalized
elif is_canonical:
scene_has_canonical = (
session is not None
and _has_canonical_external_ref(session, scene_id=scene.id)
)
if not scene_has_canonical or len(norm.title) > len(scene.title):
# Pierwszy canonical zastępuje tube SEO crap; canonical-vs-canonical longer-wins.
scene.title = norm.title
scene.title_normalized = norm.title_normalized
if norm.slug and not scene.slug:
scene.slug = norm.slug
if norm.release_date and not scene.release_date:
scene.release_date = norm.release_date
if studio_id and not scene.studio_id:
scene.studio_id = studio_id
# Duration: canonical może doprecyzować (TPDB/StashDB lepiej to mierzą niż tube
# który czasem reportuje compilation length); scraper tylko gdy null. Fallback do
# duration z playback_source gdy connector nie dał scene-level (patrz _effective_duration).
eff_duration = _effective_duration(norm)
if eff_duration and (not scene.duration_sec or is_canonical):
scene.duration_sec = eff_duration
if norm.description and not scene.description:
scene.description = norm.description
if norm.code and not scene.code:
scene.code = norm.code
if norm.director and not scene.director:
scene.director = norm.director
def _has_canonical_external_ref(session: Session, *, scene_id: uuid.UUID) -> bool:
"""Czy scena ma już choć jeden external_ref ze źródła canonical (tpdb/stashdb)?"""
row = session.execute(
select(SceneExternalRef.source_id)
.join(Source, Source.id == SceneExternalRef.source_id)
.where(
SceneExternalRef.scene_id == scene_id,
Source.kind.in_([SourceKind.tpdb, SourceKind.stashdb]),
)
.limit(1)
).first()
return row is not None
def _attach_external_ref(
session: Session,
*,
scene_id: uuid.UUID,
source_id: uuid.UUID,
norm: NormalizedScene,
) -> None:
existing = session.execute(
select(SceneExternalRef).where(
SceneExternalRef.source_id == source_id,
SceneExternalRef.external_id == norm.external_id,
)
).scalar_one_or_none()
if existing is None:
session.add(
SceneExternalRef(
source_id=source_id,
external_id=norm.external_id,
scene_id=scene_id,
confidence=1.0,
url=norm.url,
)
)
else:
if norm.url and not existing.url:
existing.url = norm.url
def _sync_attached_entities(
session: Session,
*,
scene: Scene,
norm: NormalizedScene,
source_id: uuid.UUID,
) -> None:
"""Zsynchronizuj performerów/tagi/fingerprinty dla istniejącej już kanonicznej sceny.
Używane w pathach 1-3 (gdzie performerzy nie byli pre-resolved przez resolver).
"""
resolved: list[tuple[uuid.UUID, str | None]] = []
for p_norm in norm.performers:
performer = resolve_performer(session, norm=p_norm, source_id=source_id)
resolved.append((performer.id, p_norm.as_alias_in_scene))
_sync_performers(session, scene_id=scene.id, resolved=resolved)
_sync_tags(session, scene_id=scene.id, norm=norm, source_id=source_id)
_sync_fingerprints(session, scene_id=scene.id, norm=norm, source_id=source_id)
_sync_playback_sources(session, scene_id=scene.id, norm=norm)
def _sync_performers(
session: Session,
*,
scene_id: uuid.UUID,
resolved: list[tuple[uuid.UUID, str | None]],
) -> None:
# Deduplikuj — dwa różne aliasy tej samej osoby (np. "Aj Applegate" + "AJ Applegate")
# przejdą przez resolve_performer zwracając ten sam Performer.id. Bez tej dedup
# flush rzuci UNIQUE violation na pk_scene_performers (scene_id, performer_id).
seen_ids: set[uuid.UUID] = set()
deduped: list[tuple[uuid.UUID, str | None]] = []
for pid, alias in resolved:
if pid in seen_ids:
continue
seen_ids.add(pid)
deduped.append((pid, alias))
for position, (performer_id, as_alias) in enumerate(deduped):
existing = session.execute(
select(ScenePerformer).where(
ScenePerformer.scene_id == scene_id,
ScenePerformer.performer_id == performer_id,
)
).scalar_one_or_none()
if existing is None:
if session.get(Performer, performer_id) is None:
continue
session.add(
ScenePerformer(
scene_id=scene_id,
performer_id=performer_id,
position=position,
as_alias=as_alias,
)
)
elif as_alias and not existing.as_alias:
existing.as_alias = as_alias
def _sync_tags(
session: Session,
*,
scene_id: uuid.UUID,
norm: NormalizedScene,
source_id: uuid.UUID,
) -> None:
for t_norm in norm.tags:
tag = resolve_tag(session, norm=t_norm)
if tag is None:
continue
existing = session.execute(
select(SceneTag).where(
SceneTag.scene_id == scene_id,
SceneTag.tag_id == tag.id,
)
).scalar_one_or_none()
if existing is None:
session.add(SceneTag(scene_id=scene_id, tag_id=tag.id, source_id=source_id))
def _sync_fingerprints(
session: Session,
*,
scene_id: uuid.UUID,
norm: NormalizedScene,
source_id: uuid.UUID,
) -> None:
for kind, value in norm.fingerprints:
existing = session.execute(
select(SceneFingerprint.id).where(
SceneFingerprint.scene_id == scene_id,
SceneFingerprint.kind == kind,
SceneFingerprint.value == value,
)
).first()
if existing is None:
session.add(
SceneFingerprint(
scene_id=scene_id, kind=kind, value=value, source_id=source_id
)
)
def _sync_playback_sources(
session: Session, *, scene_id: uuid.UUID, norm: NormalizedScene
) -> None:
"""Upsert per (origin, page_url). Bez modyfikacji existing — chyba że dorzucamy
brakujące pola (np. stream_url po późniejszym resolve)."""
from datetime import UTC, datetime
for ps in norm.playback_sources:
if not ps.page_url:
continue
existing = session.execute(
select(PlaybackSource).where(
PlaybackSource.origin == ps.origin,
PlaybackSource.page_url == ps.page_url,
)
).scalar_one_or_none()
if existing is None:
session.add(
PlaybackSource(
scene_id=scene_id,
origin=ps.origin,
page_url=ps.page_url,
embed_url=ps.embed_url,
stream_url=ps.stream_url,
quality=ps.quality,
duration_sec=ps.duration_sec,
thumbnail_url=ps.thumbnail_url,
animated_thumbnail_url=ps.animated_thumbnail_url,
)
)
else:
# Refresh + uzupełnij braki (nie nadpisujemy istniejących wartości).
existing.last_seen_at = datetime.now(UTC)
if existing.scene_id != scene_id:
# Ten sam (origin, page_url) trafił do innej canonical sceny — to znaczy
# że dedup zmergował. Re-link do bieżącej.
existing.scene_id = scene_id
for attr in ("embed_url", "stream_url", "quality", "duration_sec"):
if getattr(existing, attr) is None and getattr(ps, attr) is not None:
setattr(existing, attr, getattr(ps, attr))
# Thumbnaile ZAWSZE odśwież do świeżej wartości ze scrape (gdy podana).
# Signed CDN thumbnaile (sxyprn/trafficdeposit) rotują — token wygasa po
# ~tygodniach i stary URL daje 404. Trzymanie "tylko gdy NULL" robiło rot
# permanentnym (bug 2026-06-10). Re-scrape teraz odświeża martwe miniaturki.
for attr in ("thumbnail_url", "animated_thumbnail_url"):
new_val = getattr(ps, attr)
if new_val is not None:
setattr(existing, attr, new_val)
def _log_auto_merge(
session: Session, *, scene_id: uuid.UUID, score: float, reasons: dict
) -> None:
"""Audit log auto-merga. left=right=scene_id (jednostronne — to nie diff dwóch
kanonicznych, tylko trace że ścieżka X przyniosła kolejny external_ref do sceny X).
"""
session.add(
MergeCandidate(
kind=MergeKind.scene,
left_id=scene_id,
right_id=scene_id,
score=score,
reasons=reasons,
status=MergeStatus.auto_merged,
)
)
def add_fingerprints(
session: Session,
*,
scene_id: uuid.UUID,
fingerprints: list[tuple[str, str]],
source_id: uuid.UUID,
) -> None:
"""Compat-alias z M2 (używany przez ingest.py). Zachowuje kontrakt."""
for kind, value in fingerprints:
existing = session.execute(
select(SceneFingerprint.id).where(
SceneFingerprint.scene_id == scene_id,
SceneFingerprint.kind == kind,
SceneFingerprint.value == value,
)
).first()
if existing is None:
session.add(
SceneFingerprint(
scene_id=scene_id, kind=kind, value=value, source_id=source_id
)
)