"""M2: prosty resolver performera. Ścieżki: 1. exact external_ref dla source → update. 2. name_normalized match w `performers` → reuse + dopnij external_ref. 3. alias_normalized match w `performer_aliases` → reuse parent + external_ref. 4. insert nowego. (Fuzzy / triage merge_candidates: M8.) """ from __future__ import annotations import logging import uuid from sqlalchemy import select from sqlalchemy.orm import Session from app.models.performer import Performer, PerformerAlias, PerformerExternalRef from app.normalize.scenes import NormalizedPerformer from app.normalize.text import slugify log = logging.getLogger(__name__) def resolve_performer( session: Session, *, norm: NormalizedPerformer, source_id: uuid.UUID, ) -> Performer: if norm.external_id: ref = session.execute( select(PerformerExternalRef).where( PerformerExternalRef.source_id == source_id, PerformerExternalRef.external_id == norm.external_id, ) ).scalar_one_or_none() if ref is not None: performer = session.get(Performer, ref.performer_id) assert performer is not None _update_performer_fields(performer, norm) _ensure_aliases(session, performer.id, norm, source_id) return performer # name_normalized i alias_normalized nie mają unique constraint w schemacie — historycznie # się zdarzają duplikaty (np. ten sam name_normalized dla różnych Performer rows po niefortunych # ingestach). Bierzemy pierwszy stabilny match (po id) zamiast wybuchać. performer = session.execute( select(Performer) .where(Performer.name_normalized == norm.name_normalized) .order_by(Performer.id) .limit(1) ).scalars().first() if performer is None: alias_match = session.execute( select(PerformerAlias) .where(PerformerAlias.alias_normalized == norm.name_normalized) .order_by(PerformerAlias.performer_id) .limit(1) ).scalars().first() if alias_match is not None: performer = session.get(Performer, alias_match.performer_id) if performer is None: performer = Performer( canonical_name=norm.canonical_name, name_normalized=norm.name_normalized, slug=_unique_slug(session, norm.slug or slugify(norm.canonical_name) or "performer"), gender=norm.gender, birth_date=norm.birth_date, country=norm.country, ) session.add(performer) session.flush() log.debug("performer create id=%s name=%s", performer.id, performer.canonical_name) else: _update_performer_fields(performer, norm) if norm.external_id: existing_ref = session.execute( select(PerformerExternalRef).where( PerformerExternalRef.source_id == source_id, PerformerExternalRef.external_id == norm.external_id, ) ).scalar_one_or_none() if existing_ref is None: session.add( PerformerExternalRef( source_id=source_id, external_id=norm.external_id, performer_id=performer.id, confidence=1.0, ) ) _ensure_aliases(session, performer.id, norm, source_id) return performer def _update_performer_fields(performer: Performer, norm: NormalizedPerformer) -> None: if norm.gender and not performer.gender: performer.gender = norm.gender if norm.birth_date and not performer.birth_date: performer.birth_date = norm.birth_date if norm.country and not performer.country: performer.country = norm.country def _ensure_aliases( session: Session, performer_id: uuid.UUID, norm: NormalizedPerformer, source_id: uuid.UUID, ) -> None: seen: set[str] = set() for alias, alias_norm in zip(norm.aliases, norm.aliases_normalized, strict=True): if not alias_norm or alias_norm in seen or alias_norm == norm.name_normalized: continue seen.add(alias_norm) existing = session.execute( select(PerformerAlias.id).where( PerformerAlias.performer_id == performer_id, PerformerAlias.alias_normalized == alias_norm, ) ).first() if existing is None: session.add( PerformerAlias( performer_id=performer_id, alias=alias, alias_normalized=alias_norm, source_id=source_id, ) ) def _unique_slug(session: Session, base: str) -> str: candidate = base n = 1 while session.execute(select(Performer.id).where(Performer.slug == candidate)).first(): n += 1 candidate = f"{base}-{n}" return candidate