"""Resolver movies — MVP (M2): same-source ref + create canonical. Composite fuzzy dedup (mirrory) wjeżdża w M5 — `_score_movie_candidate` + triage do `merge_candidates(kind=movie)`. Na tym etapie paradisehill jest jedynym źródłem, więc same-source path 1 wystarcza. """ from __future__ import annotations import logging import uuid from dataclasses import dataclass from sqlalchemy import select from sqlalchemy.orm import Session from app.models.merge_candidate import MergeCandidate, MergeKind, MergeStatus from app.models.movie import ( Movie, MovieChapter, MovieExternalRef, MoviePerformer, MovieTag, ) from app.models.movie_playback_source import MoviePlaybackSource from app.models.performer import Performer from app.normalize.movies import NormalizedMovie from app.normalize.text import slugify from app.resolve.movie_match import find_movie_candidates from app.resolve.movie_score import ( MovieScoreBreakdown, score_movie_candidate, triage_movie, ) from app.resolve.performer_resolver import resolve_performer from app.resolve.studio_resolver import resolve_studio from app.resolve.tag_resolver import resolve_tag log = logging.getLogger(__name__) @dataclass class MovieResolveResult: movie: Movie was_created: bool path: str # 'same_source' | 'composite_auto' | 'composite_review' | 'new' score: float | None = None candidate_id: uuid.UUID | None = None def resolve_movie( session: Session, *, norm: NormalizedMovie, source_id: uuid.UUID, ) -> MovieResolveResult: studio = resolve_studio(session, norm=norm.studio, source_id=source_id) if norm.studio else None studio_id = studio.id if studio else None # Path 1: same-source external_ref existing_ref = session.execute( select(MovieExternalRef).where( MovieExternalRef.source_id == source_id, MovieExternalRef.external_id == norm.external_id, ) ).scalar_one_or_none() if existing_ref is not None: movie = session.get(Movie, existing_ref.movie_id) if movie is not None: _update_movie_fields(movie, norm, studio_id=studio_id) _sync_attached_entities(session, movie=movie, norm=norm, source_id=source_id) return MovieResolveResult(movie=movie, was_created=False, path="same_source") # Pre-resolve performerów (potrzebne dla cast Jaccard score) resolved_performers: list[tuple[uuid.UUID, str | None]] = [] for p_norm in norm.performers: performer = resolve_performer(session, norm=p_norm, source_id=source_id) resolved_performers.append((performer.id, p_norm.as_alias_in_scene)) performer_ids = [pid for pid, _ in resolved_performers] # Path 2: composite fuzzy candidates = find_movie_candidates( session, title_normalized=norm.title_normalized, studio_id=studio_id, release_year=norm.release_year, ) best_movie: Movie | None = None best_breakdown: MovieScoreBreakdown | None = None for cand in candidates: breakdown = score_movie_candidate( session, candidate=cand, norm_title_normalized=norm.title_normalized, norm_release_year=norm.release_year, norm_studio_id=studio_id, norm_performer_ids=performer_ids, ) if best_breakdown is None or breakdown.composite > best_breakdown.composite: best_movie = cand best_breakdown = breakdown if best_movie is not None and best_breakdown is not None: decision = triage_movie(best_breakdown.composite) if decision == "auto": _update_movie_fields(best_movie, norm, studio_id=studio_id) _attach_external_ref(session, movie_id=best_movie.id, source_id=source_id, norm=norm) _sync_performers(session, movie_id=best_movie.id, resolved=resolved_performers) _sync_tags(session, movie_id=best_movie.id, norm=norm, source_id=source_id) _sync_chapters(session, movie_id=best_movie.id, norm=norm) _sync_playback_sources(session, movie_id=best_movie.id, norm=norm) log.info( "movie auto-merge: %s ← incoming '%s' (score=%.2f)", best_movie.id, norm.title, best_breakdown.composite, ) return MovieResolveResult( movie=best_movie, was_created=False, path="composite_auto", score=best_breakdown.composite, ) if decision == "review": new_movie = _create_canonical(session, norm=norm, studio_id=studio_id) _attach_external_ref(session, movie_id=new_movie.id, source_id=source_id, norm=norm) _sync_performers(session, movie_id=new_movie.id, resolved=resolved_performers) _sync_tags(session, movie_id=new_movie.id, norm=norm, source_id=source_id) _sync_chapters(session, movie_id=new_movie.id, norm=norm) _sync_playback_sources(session, movie_id=new_movie.id, norm=norm) session.add( MergeCandidate( kind=MergeKind.movie, left_id=best_movie.id, right_id=new_movie.id, score=best_breakdown.composite, reasons={"path": "composite", **best_breakdown.to_dict()}, status=MergeStatus.pending, ) ) return MovieResolveResult( movie=new_movie, was_created=True, path="composite_review", score=best_breakdown.composite, candidate_id=best_movie.id, ) # Brak match → nowa kanoniczna movie = _create_canonical(session, norm=norm, studio_id=studio_id) _attach_external_ref(session, movie_id=movie.id, source_id=source_id, norm=norm) _sync_performers(session, movie_id=movie.id, resolved=resolved_performers) _sync_tags(session, movie_id=movie.id, norm=norm, source_id=source_id) _sync_chapters(session, movie_id=movie.id, norm=norm) _sync_playback_sources(session, movie_id=movie.id, norm=norm) return MovieResolveResult(movie=movie, was_created=True, path="new") # --------------------------------------------------------------------------- # helpers # --------------------------------------------------------------------------- def _create_canonical( session: Session, *, norm: NormalizedMovie, studio_id: uuid.UUID | None ) -> Movie: movie = Movie( title=norm.title, title_normalized=norm.title_normalized, slug=norm.slug or slugify(norm.title), release_year=norm.release_year, release_date=norm.release_date, studio_id=studio_id, director=norm.director, country=norm.country, duration_sec=norm.duration_sec, description=norm.description, poster_url=norm.poster_url, backdrop_url=norm.backdrop_url, rating=norm.rating, ) session.add(movie) session.flush() return movie def _update_movie_fields( movie: Movie, norm: NormalizedMovie, *, studio_id: uuid.UUID | None ) -> None: """Fill-in only — paradisehill jest primary, więc nie nadpisujemy ustawionych pól. Gdy doszłaby konkurencja źródeł (M5), dodamy source-rank logic jak w scene_resolver.""" if norm.title and not movie.title: movie.title = norm.title movie.title_normalized = norm.title_normalized if norm.slug and not movie.slug: movie.slug = norm.slug if norm.release_year and not movie.release_year: movie.release_year = norm.release_year if norm.release_date and not movie.release_date: movie.release_date = norm.release_date if studio_id and not movie.studio_id: movie.studio_id = studio_id if norm.director and not movie.director: movie.director = norm.director if norm.country and not movie.country: movie.country = norm.country if norm.duration_sec and not movie.duration_sec: movie.duration_sec = norm.duration_sec if norm.description and not movie.description: movie.description = norm.description if norm.poster_url and not movie.poster_url: movie.poster_url = norm.poster_url if norm.backdrop_url and not movie.backdrop_url: movie.backdrop_url = norm.backdrop_url if norm.rating is not None and movie.rating is None: movie.rating = norm.rating def _attach_external_ref( session: Session, *, movie_id: uuid.UUID, source_id: uuid.UUID, norm: NormalizedMovie ) -> None: existing = session.execute( select(MovieExternalRef).where( MovieExternalRef.source_id == source_id, MovieExternalRef.external_id == norm.external_id, ) ).scalar_one_or_none() if existing is None: session.add( MovieExternalRef( source_id=source_id, external_id=norm.external_id, movie_id=movie_id, confidence=1.0, url=norm.url, ) ) elif norm.url and not existing.url: existing.url = norm.url def _sync_attached_entities( session: Session, *, movie: Movie, norm: NormalizedMovie, source_id: uuid.UUID ) -> None: resolved: list[tuple[uuid.UUID, str | None]] = [] for p_norm in norm.performers: performer = resolve_performer(session, norm=p_norm, source_id=source_id) resolved.append((performer.id, p_norm.as_alias_in_scene)) _sync_performers(session, movie_id=movie.id, resolved=resolved) _sync_tags(session, movie_id=movie.id, norm=norm, source_id=source_id) _sync_chapters(session, movie_id=movie.id, norm=norm) _sync_playback_sources(session, movie_id=movie.id, norm=norm) def _sync_performers( session: Session, *, movie_id: uuid.UUID, resolved: list[tuple[uuid.UUID, str | None]], ) -> None: seen_ids: set[uuid.UUID] = set() for position, (performer_id, as_alias) in enumerate(resolved): if performer_id in seen_ids: continue seen_ids.add(performer_id) existing = session.execute( select(MoviePerformer).where( MoviePerformer.movie_id == movie_id, MoviePerformer.performer_id == performer_id, ) ).scalar_one_or_none() if existing is None: if session.get(Performer, performer_id) is None: continue session.add( MoviePerformer( movie_id=movie_id, performer_id=performer_id, position=position, as_alias=as_alias, ) ) elif as_alias and not existing.as_alias: existing.as_alias = as_alias def _sync_tags( session: Session, *, movie_id: uuid.UUID, norm: NormalizedMovie, source_id: uuid.UUID, ) -> None: # PostgreSQL INSERT ... ON CONFLICT DO NOTHING dla race-safe insert. # Bez tego concurrent movie ingests rzucały IntegrityError pk_movie_tags # (GOON-M, analogicznie do scene_tags GOON-H). from sqlalchemy.dialects.postgresql import insert as pg_insert seen_tag_ids: set[uuid.UUID] = set() for t_norm in norm.tags: tag = resolve_tag(session, norm=t_norm) if tag is None or tag.id in seen_tag_ids: continue seen_tag_ids.add(tag.id) stmt = ( pg_insert(MovieTag.__table__) .values(movie_id=movie_id, tag_id=tag.id, source_id=source_id) .on_conflict_do_nothing(index_elements=["movie_id", "tag_id"]) ) session.execute(stmt) def _sync_chapters( session: Session, *, movie_id: uuid.UUID, norm: NormalizedMovie ) -> None: """Idempotent: po `chapter_index`. Nowy chapter dodaje, istniejący update'uje pola ale nie kasuje nieprzysłanych — caller upserchuje co ma.""" for raw_ch in norm.chapters: existing = session.execute( select(MovieChapter).where( MovieChapter.movie_id == movie_id, MovieChapter.chapter_index == raw_ch.chapter_index, ) ).scalar_one_or_none() if existing is None: session.add( MovieChapter( movie_id=movie_id, chapter_index=raw_ch.chapter_index, title=raw_ch.title, start_sec=raw_ch.start_sec, end_sec=raw_ch.end_sec, ) ) else: if raw_ch.title and not existing.title: existing.title = raw_ch.title if raw_ch.start_sec is not None and existing.start_sec is None: existing.start_sec = raw_ch.start_sec if raw_ch.end_sec is not None and existing.end_sec is None: existing.end_sec = raw_ch.end_sec def _sync_playback_sources( session: Session, *, movie_id: uuid.UUID, norm: NormalizedMovie ) -> None: """Upsert po (origin, page_url) — unique constraint w tabeli.""" for pb in norm.playback_sources: existing = session.execute( select(MoviePlaybackSource).where( MoviePlaybackSource.origin == pb.origin, MoviePlaybackSource.page_url == pb.page_url, ) ).scalar_one_or_none() if existing is None: session.add( MoviePlaybackSource( movie_id=movie_id, origin=pb.origin, page_url=pb.page_url, embed_url=pb.embed_url, stream_url=pb.stream_url, quality=pb.quality, duration_sec=pb.duration_sec, thumbnail_url=pb.thumbnail_url, animated_thumbnail_url=pb.animated_thumbnail_url, ) ) else: if pb.embed_url and not existing.embed_url: existing.embed_url = pb.embed_url if pb.stream_url and not existing.stream_url: existing.stream_url = pb.stream_url if pb.thumbnail_url and not existing.thumbnail_url: existing.thumbnail_url = pb.thumbnail_url if pb.animated_thumbnail_url and not existing.animated_thumbnail_url: existing.animated_thumbnail_url = pb.animated_thumbnail_url