"""Resolver sceny — pełny pipeline dedup (M3). Ścieżki, w kolejności first-match-wins: 1. **same-source external_ref** — to samo (source, external_id) widziane już wcześniej → update kanonicznej. 2. **cross-source UUID** — scena z source A deklaruje że jest tym samym UUID w source B (np. StashDB → tpdb_id w scene.urls). Auto-merge: dołącz nowy external_ref do istniejącej kanonicznej + log MergeCandidate(auto_merged). 3. **fingerprint match** — oshash exact lub phash Hamming ≤ N. Auto-merge. 4. **composite fuzzy** — blocking (studio + date) → score → triage: composite ≥ auto_merge_threshold → auto-merge ≥ review_threshold → utwórz nową kanoniczną + zapisz MergeCandidate(pending) < review_threshold → utwórz nową, bez kandydata Funkcja zwraca SceneResolveResult opisujący jak doszło do dopasowania. """ from __future__ import annotations import logging import uuid from dataclasses import dataclass from sqlalchemy import select from sqlalchemy.orm import Session from app.models.merge_candidate import MergeCandidate, MergeKind, MergeStatus from app.models.performer import Performer from app.models.playback_source import PlaybackSource from app.models.scene import ( Scene, SceneExternalRef, SceneFingerprint, ScenePerformer, SceneTag, ) from app.models.source import Source, SourceKind from app.normalize.scenes import NormalizedScene from app.normalize.text import slugify from app.resolve.performer_resolver import resolve_performer from app.resolve.scene_match import ( find_blocking_candidates, find_by_cross_source_refs, find_by_external_ref, find_by_fingerprint_exact, find_by_phash_within, ) from app.resolve.scene_score import score_candidate from app.resolve.scoring import ScoreBreakdown, triage from app.resolve.studio_resolver import resolve_studio from app.resolve.tag_resolver import resolve_tag log = logging.getLogger(__name__) @dataclass class SceneResolveResult: scene: Scene was_created: bool path: str # 'same_source' | 'cross_source' | 'fp_oshash' | 'fp_phash' | 'composite_auto' | 'composite_review' | 'new' score: float | None = None candidate_id: uuid.UUID | None = None # competitor in pending review (path=composite_review) def resolve_scene( session: Session, *, norm: NormalizedScene, source_id: uuid.UUID, ) -> SceneResolveResult: studio = resolve_studio(session, norm=norm.studio, source_id=source_id) if norm.studio else None studio_id = studio.id if studio else None # Tube/agregator (np. pornapp) → studio nie jest informatywny, performers ważniejsze. src = session.get(Source, source_id) source_kind = src.kind if src else SourceKind.manual aggregator_mode = src is not None and src.kind == SourceKind.scraper # Path 1: same-source external_ref existing = find_by_external_ref(session, source_id=source_id, external_id=norm.external_id) if existing is not None: _update_scene_fields(existing, norm, studio_id=studio_id, source_kind=source_kind, session=session) _sync_attached_entities(session, scene=existing, norm=norm, source_id=source_id) return SceneResolveResult(scene=existing, was_created=False, path="same_source") # Path 2: cross-source UUID cross = find_by_cross_source_refs(session, refs=norm.cross_source_refs) if cross is not None: scene_match, via_source_name = cross _update_scene_fields(scene_match, norm, studio_id=studio_id, source_kind=source_kind, session=session) _attach_external_ref(session, scene_id=scene_match.id, source_id=source_id, norm=norm) _sync_attached_entities(session, scene=scene_match, norm=norm, source_id=source_id) _log_auto_merge( session, scene_id=scene_match.id, score=1.0, reasons={"path": "cross_source", "via_source": via_source_name}, ) return SceneResolveResult(scene=scene_match, was_created=False, path="cross_source", score=1.0) # Path 3: fingerprints for kind, value in norm.fingerprints: if kind == "phash": continue # phash leci osobno żeby użyć Hamming match = find_by_fingerprint_exact(session, kind=kind, value=value) if match is not None: _update_scene_fields(match, norm, studio_id=studio_id, source_kind=source_kind, session=session) _attach_external_ref(session, scene_id=match.id, source_id=source_id, norm=norm) _sync_attached_entities(session, scene=match, norm=norm, source_id=source_id) _log_auto_merge( session, scene_id=match.id, score=1.0, reasons={"path": "fp_exact", "kind": kind, "value": value}, ) return SceneResolveResult(scene=match, was_created=False, path=f"fp_{kind}", score=1.0) for kind, value in norm.fingerprints: if kind != "phash": continue result = find_by_phash_within(session, phash=value) if result is not None: scene_match, distance = result raw_phash_score = 1.0 - distance / 64.0 # Duration sanity check: phash może collide gdy compilation zawiera chapter sceny # (oba mają ten sam frame sample), ale duration będzie wyraźnie inny. # Wymagamy proximity ≥0.5 (±30s) dla auto-merge; inaczej → review queue. from app.resolve.scoring import duration_proximity, series_mismatch_strength dur_prox = duration_proximity(scene_match.duration_sec, norm.duration_sec) # Series-position guard (Episode 2 vs Episode 4): phash zwykle pixel-identical # bo studio reusuje cover art między episodami, ale to OSOBNE sceny. Hard split, # bez merge_candidate (nie ma czego mergować — żaden human reviewer też nie # powie "Episode 2 to to samo co Episode 4"). sp_strength = series_mismatch_strength( scene_match.title_normalized, norm.title_normalized ) if sp_strength >= 1.0: new_scene = _create_canonical(session, norm=norm, studio_id=studio_id) _attach_external_ref(session, scene_id=new_scene.id, source_id=source_id, norm=norm) _sync_attached_entities(session, scene=new_scene, norm=norm, source_id=source_id) return SceneResolveResult( scene=new_scene, was_created=True, path="fp_phash_series_split", score=0.0, ) if dur_prox is not None and dur_prox < 0.5: # phash match ale duration rozjeżdża się → tworzymy nową scenę + review. # Score reflectuje że to NIE jest auto-merge: dur_prox * phash_score, # plus dalej cap przez series modifier mismatch (BTS/bonus/unedited). penalised_score = raw_phash_score * max(dur_prox, 0.1) if 0.0 < sp_strength < 1.0: penalised_score = min(penalised_score, 1.0 - sp_strength) new_scene = _create_canonical(session, norm=norm, studio_id=studio_id) _attach_external_ref(session, scene_id=new_scene.id, source_id=source_id, norm=norm) _sync_attached_entities(session, scene=new_scene, norm=norm, source_id=source_id) session.add( MergeCandidate( kind=MergeKind.scene, left_id=scene_match.id, right_id=new_scene.id, score=penalised_score, reasons={ "path": "fp_phash", "hamming": distance, "phash_score": raw_phash_score, "duration_mismatch": True, "dur_prox": dur_prox, "series_mismatch_strength": sp_strength, "left_dur": scene_match.duration_sec, "right_dur": norm.duration_sec, }, status=MergeStatus.pending, ) ) return SceneResolveResult( scene=new_scene, was_created=True, path="fp_phash_review", score=penalised_score, candidate_id=scene_match.id, ) # Modifier tag mismatch (BTS/bonus/unedited po jednej stronie) — nie hard-split, # ale auto-merge zablokowane: tworzymy nową scenę + pending review. if 0.0 < sp_strength < 1.0: penalised_score = min(raw_phash_score, 1.0 - sp_strength) new_scene = _create_canonical(session, norm=norm, studio_id=studio_id) _attach_external_ref(session, scene_id=new_scene.id, source_id=source_id, norm=norm) _sync_attached_entities(session, scene=new_scene, norm=norm, source_id=source_id) session.add( MergeCandidate( kind=MergeKind.scene, left_id=scene_match.id, right_id=new_scene.id, score=penalised_score, reasons={ "path": "fp_phash", "hamming": distance, "phash_score": raw_phash_score, "series_modifier_mismatch": True, "series_mismatch_strength": sp_strength, }, status=MergeStatus.pending, ) ) return SceneResolveResult( scene=new_scene, was_created=True, path="fp_phash_modifier_review", score=penalised_score, candidate_id=scene_match.id, ) score = raw_phash_score _update_scene_fields(scene_match, norm, studio_id=studio_id, source_kind=source_kind, session=session) _attach_external_ref(session, scene_id=scene_match.id, source_id=source_id, norm=norm) _sync_attached_entities(session, scene=scene_match, norm=norm, source_id=source_id) _log_auto_merge( session, scene_id=scene_match.id, score=score, reasons={"path": "fp_phash", "hamming": distance, "dur_prox": dur_prox}, ) return SceneResolveResult( scene=scene_match, was_created=False, path="fp_phash", score=score ) # Dedup norm.performers po external_id (lub slug jako fallback) — pornhub i # niektóre tube connectorzy zwracają tę samą performerkę 2x (raz pod aliasem # scenowym typu "Lilkarina2", raz canonical "Lil Karina") → bulk INSERT # performer_external_refs hitował PK violation (Sentry GOON-C). Preferujemy # wariant z aliasem (informacyjnie bogatszy). _seen_perf: set[str | tuple[str, str]] = set() _deduped_perf = [] for p_norm in norm.performers: key: str | tuple[str, str] = p_norm.external_id or ("slug", p_norm.slug or "") if key in _seen_perf: # Już mamy — jeśli poprzedni był bez aliasu a ten ma, podmień. if p_norm.as_alias_in_scene: for idx, existing in enumerate(_deduped_perf): existing_key = existing.external_id or ("slug", existing.slug or "") if existing_key == key and not existing.as_alias_in_scene: _deduped_perf[idx] = p_norm break continue _seen_perf.add(key) _deduped_perf.append(p_norm) norm.performers = _deduped_perf # Pre-resolve performerów do canonical UUID, żeby Path 4 mógł liczyć Jaccard. resolved_performers: list[tuple[uuid.UUID, str | None]] = [] for p_norm in norm.performers: performer = resolve_performer(session, norm=p_norm, source_id=source_id) resolved_performers.append((performer.id, p_norm.as_alias_in_scene)) performer_ids = [pid for pid, _ in resolved_performers] # Path 4: composite fuzzy. # W aggregator_mode studio z tube'a (np. "HQPorner") nie jest fizycznym studiem produkcyjnym, # więc filtrowanie kandydatów po nim wyklucza wszystkie canonical sceny z TPDB/StashDB # (które mają prawdziwe studia jak "Brazzers"). Wyłączamy studio blocking. blocking_studio = None if aggregator_mode else studio_id candidates = find_blocking_candidates( session, studio_id=blocking_studio, release_date=norm.release_date, title_normalized=norm.title_normalized, ) # Plus: dla aggregator_mode dorzucamy jako kandydatów wszystkie canonical sceny # które mają wspólny choć jeden performer z naszą sceną (mocny sygnał — performerzy # to też nasz "blocking key" gdy studio i date są nieinformatywne). if aggregator_mode and performer_ids: # **2026-05-20 fix**: poprzednio LIMIT 50 BEZ ORDER BY → dla popular performera # (Eveline Dellai z 200+ scen w bazie) prawdziwy match mógł być out of top-50, # postgres zwracał arbitrary order → resolver nie widział kandydata → duplicate. # Bug-report: brak Brazzers Exxtra po 15-05. Now: 500 limit + title-match priority # ORDER, plus exact title match jako gwarantowany kandydat (CASE expression). from sqlalchemy import case title_match_expr = case( (Scene.title_normalized == norm.title_normalized, 1), else_=0, ) more = ( session.execute( select(Scene) .join(ScenePerformer, ScenePerformer.scene_id == Scene.id) .where(ScenePerformer.performer_id.in_(performer_ids)) .group_by(Scene.id) .order_by(title_match_expr.desc(), Scene.release_date.desc().nullslast()) .limit(500) ) .scalars() .all() ) seen = {c.id for c in candidates} for c in more: if c.id not in seen: candidates.append(c) best_scene: Scene | None = None best_breakdown: ScoreBreakdown | None = None for cand in candidates: breakdown = score_candidate( session, candidate=cand, norm=norm, resolved_performer_ids=performer_ids, studio_id=studio_id, aggregator_mode=aggregator_mode, ) if best_breakdown is None or breakdown.composite > best_breakdown.composite: best_scene = cand best_breakdown = breakdown if best_scene is not None and best_breakdown is not None: decision = triage(best_breakdown.composite) if decision == "auto": _update_scene_fields(best_scene, norm, studio_id=studio_id, source_kind=source_kind, session=session) _attach_external_ref(session, scene_id=best_scene.id, source_id=source_id, norm=norm) _sync_performers(session, scene_id=best_scene.id, resolved=resolved_performers) _sync_tags(session, scene_id=best_scene.id, norm=norm, source_id=source_id) _sync_fingerprints( session, scene_id=best_scene.id, norm=norm, source_id=source_id ) _sync_playback_sources(session, scene_id=best_scene.id, norm=norm) _log_auto_merge( session, scene_id=best_scene.id, score=best_breakdown.composite, reasons={"path": "composite", **best_breakdown.to_dict()}, ) return SceneResolveResult( scene=best_scene, was_created=False, path="composite_auto", score=best_breakdown.composite, ) if decision == "review": new_scene = _create_canonical(session, norm=norm, studio_id=studio_id) _attach_external_ref(session, scene_id=new_scene.id, source_id=source_id, norm=norm) _sync_performers(session, scene_id=new_scene.id, resolved=resolved_performers) _sync_tags(session, scene_id=new_scene.id, norm=norm, source_id=source_id) _sync_fingerprints( session, scene_id=new_scene.id, norm=norm, source_id=source_id ) _sync_playback_sources(session, scene_id=new_scene.id, norm=norm) session.add( MergeCandidate( kind=MergeKind.scene, left_id=best_scene.id, right_id=new_scene.id, score=best_breakdown.composite, reasons={"path": "composite", **best_breakdown.to_dict()}, status=MergeStatus.pending, ) ) return SceneResolveResult( scene=new_scene, was_created=True, path="composite_review", score=best_breakdown.composite, candidate_id=best_scene.id, ) # Brak żadnego sensownego dopasowania → nowa kanoniczna new_scene = _create_canonical(session, norm=norm, studio_id=studio_id) _attach_external_ref(session, scene_id=new_scene.id, source_id=source_id, norm=norm) _sync_performers(session, scene_id=new_scene.id, resolved=resolved_performers) _sync_tags(session, scene_id=new_scene.id, norm=norm, source_id=source_id) _sync_fingerprints(session, scene_id=new_scene.id, norm=norm, source_id=source_id) _sync_playback_sources(session, scene_id=new_scene.id, norm=norm) return SceneResolveResult(scene=new_scene, was_created=True, path="new") # ---- helpery -------------------------------------------------------------- def _create_canonical( session: Session, *, norm: NormalizedScene, studio_id: uuid.UUID | None ) -> Scene: scene = Scene( title=norm.title, title_normalized=norm.title_normalized, slug=norm.slug or slugify(norm.title), release_date=norm.release_date, studio_id=studio_id, duration_sec=norm.duration_sec, description=norm.description, code=norm.code, director=norm.director, ) session.add(scene) session.flush() return scene def _update_scene_fields( scene: Scene, norm: NormalizedScene, *, studio_id: uuid.UUID | None, source_kind: SourceKind, session: Session | None = None, ) -> None: """Aktualizuje pola kanoniczne sceny z incoming normalized scene. **Source rank**: - TPDB/StashDB (canonical): mogą nadpisywać. Gdy istniejąca scena ma TYLKO scraper external_refs (tube-rooted), canonical zawsze nadpisuje (czyści SEO tytuły). Gdy scena już ma canonical ref, longer-wins dla title. - Scraper/tube (pornapp): TYLKO fill-in braków, nigdy nie nadpisuje już ustawionych pól. Tube'y mają długie SEO tytuły, które bez tego ranga by zaśmiecały tytuły z TPDB/StashDB. """ is_canonical = source_kind in (SourceKind.tpdb, SourceKind.stashdb) # Title: canonical → overwrite (czyści tube-pollution); scraper tylko gdy pusto. if not scene.title: scene.title = norm.title scene.title_normalized = norm.title_normalized elif is_canonical: scene_has_canonical = ( session is not None and _has_canonical_external_ref(session, scene_id=scene.id) ) if not scene_has_canonical or len(norm.title) > len(scene.title): # Pierwszy canonical zastępuje tube SEO crap; canonical-vs-canonical longer-wins. scene.title = norm.title scene.title_normalized = norm.title_normalized if norm.slug and not scene.slug: scene.slug = norm.slug if norm.release_date and not scene.release_date: scene.release_date = norm.release_date if studio_id and not scene.studio_id: scene.studio_id = studio_id # Duration: canonical może doprecyzować (TPDB/StashDB lepiej to mierzą niż tube # który czasem reportuje compilation length); scraper tylko gdy null. if norm.duration_sec and (not scene.duration_sec or is_canonical): scene.duration_sec = norm.duration_sec if norm.description and not scene.description: scene.description = norm.description if norm.code and not scene.code: scene.code = norm.code if norm.director and not scene.director: scene.director = norm.director def _has_canonical_external_ref(session: Session, *, scene_id: uuid.UUID) -> bool: """Czy scena ma już choć jeden external_ref ze źródła canonical (tpdb/stashdb)?""" row = session.execute( select(SceneExternalRef.source_id) .join(Source, Source.id == SceneExternalRef.source_id) .where( SceneExternalRef.scene_id == scene_id, Source.kind.in_([SourceKind.tpdb, SourceKind.stashdb]), ) .limit(1) ).first() return row is not None def _attach_external_ref( session: Session, *, scene_id: uuid.UUID, source_id: uuid.UUID, norm: NormalizedScene, ) -> None: existing = session.execute( select(SceneExternalRef).where( SceneExternalRef.source_id == source_id, SceneExternalRef.external_id == norm.external_id, ) ).scalar_one_or_none() if existing is None: session.add( SceneExternalRef( source_id=source_id, external_id=norm.external_id, scene_id=scene_id, confidence=1.0, url=norm.url, ) ) else: if norm.url and not existing.url: existing.url = norm.url def _sync_attached_entities( session: Session, *, scene: Scene, norm: NormalizedScene, source_id: uuid.UUID, ) -> None: """Zsynchronizuj performerów/tagi/fingerprinty dla istniejącej już kanonicznej sceny. Używane w pathach 1-3 (gdzie performerzy nie byli pre-resolved przez resolver). """ resolved: list[tuple[uuid.UUID, str | None]] = [] for p_norm in norm.performers: performer = resolve_performer(session, norm=p_norm, source_id=source_id) resolved.append((performer.id, p_norm.as_alias_in_scene)) _sync_performers(session, scene_id=scene.id, resolved=resolved) _sync_tags(session, scene_id=scene.id, norm=norm, source_id=source_id) _sync_fingerprints(session, scene_id=scene.id, norm=norm, source_id=source_id) _sync_playback_sources(session, scene_id=scene.id, norm=norm) def _sync_performers( session: Session, *, scene_id: uuid.UUID, resolved: list[tuple[uuid.UUID, str | None]], ) -> None: # Deduplikuj — dwa różne aliasy tej samej osoby (np. "Aj Applegate" + "AJ Applegate") # przejdą przez resolve_performer zwracając ten sam Performer.id. Bez tej dedup # flush rzuci UNIQUE violation na pk_scene_performers (scene_id, performer_id). seen_ids: set[uuid.UUID] = set() deduped: list[tuple[uuid.UUID, str | None]] = [] for pid, alias in resolved: if pid in seen_ids: continue seen_ids.add(pid) deduped.append((pid, alias)) for position, (performer_id, as_alias) in enumerate(deduped): existing = session.execute( select(ScenePerformer).where( ScenePerformer.scene_id == scene_id, ScenePerformer.performer_id == performer_id, ) ).scalar_one_or_none() if existing is None: if session.get(Performer, performer_id) is None: continue session.add( ScenePerformer( scene_id=scene_id, performer_id=performer_id, position=position, as_alias=as_alias, ) ) elif as_alias and not existing.as_alias: existing.as_alias = as_alias def _sync_tags( session: Session, *, scene_id: uuid.UUID, norm: NormalizedScene, source_id: uuid.UUID, ) -> None: for t_norm in norm.tags: tag = resolve_tag(session, norm=t_norm) if tag is None: continue existing = session.execute( select(SceneTag).where( SceneTag.scene_id == scene_id, SceneTag.tag_id == tag.id, ) ).scalar_one_or_none() if existing is None: session.add(SceneTag(scene_id=scene_id, tag_id=tag.id, source_id=source_id)) def _sync_fingerprints( session: Session, *, scene_id: uuid.UUID, norm: NormalizedScene, source_id: uuid.UUID, ) -> None: for kind, value in norm.fingerprints: existing = session.execute( select(SceneFingerprint.id).where( SceneFingerprint.scene_id == scene_id, SceneFingerprint.kind == kind, SceneFingerprint.value == value, ) ).first() if existing is None: session.add( SceneFingerprint( scene_id=scene_id, kind=kind, value=value, source_id=source_id ) ) def _sync_playback_sources( session: Session, *, scene_id: uuid.UUID, norm: NormalizedScene ) -> None: """Upsert per (origin, page_url). Bez modyfikacji existing — chyba że dorzucamy brakujące pola (np. stream_url po późniejszym resolve).""" from datetime import UTC, datetime for ps in norm.playback_sources: if not ps.page_url: continue existing = session.execute( select(PlaybackSource).where( PlaybackSource.origin == ps.origin, PlaybackSource.page_url == ps.page_url, ) ).scalar_one_or_none() if existing is None: session.add( PlaybackSource( scene_id=scene_id, origin=ps.origin, page_url=ps.page_url, embed_url=ps.embed_url, stream_url=ps.stream_url, quality=ps.quality, duration_sec=ps.duration_sec, thumbnail_url=ps.thumbnail_url, animated_thumbnail_url=ps.animated_thumbnail_url, ) ) else: # Refresh + uzupełnij braki (nigdy nie nadpisujemy istniejących wartości). existing.last_seen_at = datetime.now(UTC) if existing.scene_id != scene_id: # Ten sam (origin, page_url) trafił do innej canonical sceny — to znaczy # że dedup zmergował. Re-link do bieżącej. existing.scene_id = scene_id for attr in ("embed_url", "stream_url", "quality", "duration_sec", "thumbnail_url", "animated_thumbnail_url"): if getattr(existing, attr) is None and getattr(ps, attr) is not None: setattr(existing, attr, getattr(ps, attr)) def _log_auto_merge( session: Session, *, scene_id: uuid.UUID, score: float, reasons: dict ) -> None: """Audit log auto-merga. left=right=scene_id (jednostronne — to nie diff dwóch kanonicznych, tylko trace że ścieżka X przyniosła kolejny external_ref do sceny X). """ session.add( MergeCandidate( kind=MergeKind.scene, left_id=scene_id, right_id=scene_id, score=score, reasons=reasons, status=MergeStatus.auto_merged, ) ) def add_fingerprints( session: Session, *, scene_id: uuid.UUID, fingerprints: list[tuple[str, str]], source_id: uuid.UUID, ) -> None: """Compat-alias z M2 (używany przez ingest.py). Zachowuje kontrakt.""" for kind, value in fingerprints: existing = session.execute( select(SceneFingerprint.id).where( SceneFingerprint.scene_id == scene_id, SceneFingerprint.kind == kind, SceneFingerprint.value == value, ) ).first() if existing is None: session.add( SceneFingerprint( scene_id=scene_id, kind=kind, value=value, source_id=source_id ) )