"""Orchestracja per-source ingest. Cykl: 1. Otwórz IngestRun (status=running). 2. Iteruj `connector.fetch_scenes(since=last_successful_run-buffer, limit=limit)`. 3. Dla każdej RawScene: - hash = sha256(canonical_json(raw)) - upsert do `external_records`: jeśli (source, kind, external_id) istnieje i hash niezmieniony → tylko `last_seen_at`. Inaczej → zapis i przekaż dalej. - normalize_scene → resolve_scene (commit incrementally? na razie commit per-record). 4. Zamknij IngestRun (records_seen / new / updated / errors). Idempotencja: - external_records ma UNIQUE(source_id, entity_kind, external_id) — INSERT … ON CONFLICT DO UPDATE. - resolver dla istniejącego SceneExternalRef tylko aktualizuje pola. """ from __future__ import annotations import hashlib import json import logging import uuid from collections.abc import Iterable from datetime import UTC, datetime, timedelta from sqlalchemy import select from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.orm import Session from app.connectors.base import BaseConnector, BaseMovieConnector, RawMovie, RawScene from app.db import session_scope from app.models.external_record import EntityKind, ExternalRecord from app.models.ingest_run import IngestRun, IngestStatus from app.models.source import Source, SourceKind from app.normalize.movies import normalize_movie from app.normalize.scenes import normalize_scene from app.resolve.movie_resolver import resolve_movie from app.resolve.scene_resolver import resolve_scene log = logging.getLogger(__name__) def _canonical_json(payload: dict) -> bytes: return json.dumps(payload, sort_keys=True, separators=(",", ":"), default=str).encode() def _hash_raw(payload: dict) -> bytes: return hashlib.sha256(_canonical_json(payload)).digest() def get_or_create_source( session: Session, *, kind: SourceKind, name: str, base_url: str | None = None ) -> Source: src = session.execute(select(Source).where(Source.name == name)).scalar_one_or_none() if src is not None: return src src = Source(kind=kind, name=name, base_url=base_url) session.add(src) session.flush() return src def _last_successful_finished_at(session: Session, source_id: uuid.UUID) -> datetime | None: return session.execute( select(IngestRun.finished_at) .where( IngestRun.source_id == source_id, IngestRun.status == IngestStatus.success, IngestRun.finished_at.is_not(None), ) .order_by(IngestRun.finished_at.desc()) .limit(1) ).scalar_one_or_none() def _upsert_external_record( session: Session, *, source_id: uuid.UUID, entity_kind: EntityKind, external_id: str, raw: dict, raw_hash: bytes, now: datetime, ) -> tuple[bool, bool]: """Returns (is_new, hash_changed).""" existing = session.execute( select(ExternalRecord.raw_hash).where( ExternalRecord.source_id == source_id, ExternalRecord.entity_kind == entity_kind, ExternalRecord.external_id == external_id, ) ).scalar_one_or_none() is_new = existing is None hash_changed = is_new or bytes(existing) != raw_hash stmt = ( pg_insert(ExternalRecord) .values( source_id=source_id, entity_kind=entity_kind, external_id=external_id, raw=raw, raw_hash=raw_hash, fetched_at=now, last_seen_at=now, ) .on_conflict_do_update( index_elements=["source_id", "entity_kind", "external_id"], set_={ "raw": raw, "raw_hash": raw_hash, "fetched_at": now, "last_seen_at": now, } if hash_changed else {"last_seen_at": now}, ) ) session.execute(stmt) return is_new, hash_changed def ingest_from_connector( connector: BaseConnector, *, limit: int | None = None, use_delta: bool = True, delta_buffer: timedelta = timedelta(hours=1), ) -> dict[str, int]: """Uruchamia jeden cykl ingest dla danego connectora. Zwraca counters.""" counters = {"seen": 0, "new": 0, "updated": 0, "skipped": 0, "errors": 0} with session_scope() as session: source = get_or_create_source(session, kind=connector.kind, name=connector.name) since: datetime | None = None if use_delta: last = _last_successful_finished_at(session, source.id) if last is not None: since = last - delta_buffer run = IngestRun(source_id=source.id, status=IngestStatus.running) session.add(run) session.flush() run_id = run.id source_id = source.id log.info( "ingest start source=%s since=%s limit=%s run_id=%s", connector.name, since.isoformat() if since else "FULL", limit, run_id, ) failed = False error_payload: dict | None = None try: for raw in connector.fetch_scenes(since=since, limit=limit): counters["seen"] += 1 try: _process_scene(source_id=source_id, raw_scene=raw, counters=counters) except Exception as exc: # pragma: no cover - obronnie counters["errors"] += 1 log.exception("ingest scene failed external_id=%s: %s", raw.external_id, exc) if counters["errors"] > 50: error_payload = {"message": "too many errors, aborting", "count": counters["errors"]} raise except Exception as exc: failed = True if error_payload is None: error_payload = {"message": str(exc), "type": type(exc).__name__} log.exception("ingest run failed: %s", exc) with session_scope() as session: run = session.get(IngestRun, run_id) assert run is not None run.finished_at = datetime.now(UTC) if failed: run.status = IngestStatus.failed elif counters["errors"] > 0: run.status = IngestStatus.partial else: run.status = IngestStatus.success run.records_seen = counters["seen"] run.records_new = counters["new"] run.records_updated = counters["updated"] if error_payload is not None: run.errors = error_payload log.info("ingest done source=%s counters=%s", connector.name, counters) return counters def _process_scene(*, source_id: uuid.UUID, raw_scene: RawScene, counters: dict[str, int]) -> None: payload = raw_scene.raw or raw_scene.model_dump(mode="json") raw_hash = _hash_raw(payload) now = datetime.now(UTC) with session_scope() as session: is_new, hash_changed = _upsert_external_record( session, source_id=source_id, entity_kind=EntityKind.scene, external_id=raw_scene.external_id, raw=payload, raw_hash=raw_hash, now=now, ) if not hash_changed: counters["skipped"] += 1 return norm = normalize_scene(raw_scene) result = resolve_scene(session, norm=norm, source_id=source_id) if result.was_created: counters["new"] += 1 else: counters["updated"] += 1 log.debug( "scene resolved external=%s path=%s scene_id=%s score=%s", raw_scene.external_id, result.path, result.scene.id, result.score, ) def ingest_movies_from_connector( connector: BaseMovieConnector, *, limit: int | None = None, use_delta: bool = True, delta_buffer: timedelta = timedelta(hours=1), ) -> dict[str, int]: """Ingest movies (paradisehill / psyplay / wp_movies). Symetryczne do ingest_from_connector ale dla `RawMovie` → `Movie` przez `resolve_movie`. Source row zaszyfrowuje kind+name z connectora; IngestRun trzymamy w tej samej tabeli `ingest_runs` (typ entity rozróżniamy po `source.name` / `external_records.entity_kind=movie`).""" counters = {"seen": 0, "new": 0, "updated": 0, "skipped": 0, "errors": 0} with session_scope() as session: source = get_or_create_source(session, kind=connector.kind, name=connector.name) since: datetime | None = None if use_delta: last = _last_successful_finished_at(session, source.id) if last is not None: since = last - delta_buffer run = IngestRun(source_id=source.id, status=IngestStatus.running) session.add(run) session.flush() run_id = run.id source_id = source.id log.info( "ingest-movies start source=%s since=%s limit=%s run_id=%s", connector.name, since.isoformat() if since else "FULL", limit, run_id, ) failed = False error_payload: dict | None = None try: for raw in connector.fetch_movies(since=since, limit=limit): counters["seen"] += 1 try: _process_movie(source_id=source_id, raw_movie=raw, counters=counters) except Exception as exc: counters["errors"] += 1 log.exception("ingest movie failed external_id=%s: %s", raw.external_id, exc) if counters["errors"] > 50: error_payload = {"message": "too many errors, aborting", "count": counters["errors"]} raise except Exception as exc: failed = True if error_payload is None: error_payload = {"message": str(exc), "type": type(exc).__name__} log.exception("ingest-movies run failed: %s", exc) with session_scope() as session: run = session.get(IngestRun, run_id) assert run is not None run.finished_at = datetime.now(UTC) if failed: run.status = IngestStatus.failed elif counters["errors"] > 0: run.status = IngestStatus.partial else: run.status = IngestStatus.success run.records_seen = counters["seen"] run.records_new = counters["new"] run.records_updated = counters["updated"] if error_payload is not None: run.errors = error_payload log.info("ingest-movies done source=%s counters=%s", connector.name, counters) return counters def _process_movie(*, source_id: uuid.UUID, raw_movie: RawMovie, counters: dict[str, int]) -> None: payload = raw_movie.raw or raw_movie.model_dump(mode="json") raw_hash = _hash_raw(payload) now = datetime.now(UTC) with session_scope() as session: is_new, hash_changed = _upsert_external_record( session, source_id=source_id, entity_kind=EntityKind.movie, external_id=raw_movie.external_id, raw=payload, raw_hash=raw_hash, now=now, ) if not hash_changed: counters["skipped"] += 1 return norm = normalize_movie(raw_movie) result = resolve_movie(session, norm=norm, source_id=source_id) if result.was_created: counters["new"] += 1 else: counters["updated"] += 1 log.debug( "movie resolved external=%s path=%s movie_id=%s", raw_movie.external_id, result.path, result.movie.id, ) def ingest_iter( raws: Iterable[RawScene], *, source_kind: SourceKind, source_name: str, ) -> dict[str, int]: """Pomocnik do testów / scrapów ad-hoc bez connectora.""" counters = {"seen": 0, "new": 0, "updated": 0, "skipped": 0, "errors": 0} with session_scope() as session: source = get_or_create_source(session, kind=source_kind, name=source_name) source_id = source.id for raw in raws: counters["seen"] += 1 try: _process_scene(source_id=source_id, raw_scene=raw, counters=counters) except Exception: counters["errors"] += 1 log.exception("ingest_iter failed external_id=%s", raw.external_id) return counters