"""Orchestracja per-source ingest. Cykl: 1. Otwórz IngestRun (status=running). 2. Iteruj `connector.fetch_scenes(since=last_successful_run-buffer, limit=limit)`. 3. Dla każdej RawScene: - hash = sha256(canonical_json(raw)) - upsert do `external_records`: jeśli (source, kind, external_id) istnieje i hash niezmieniony → tylko `last_seen_at`. Inaczej → zapis i przekaż dalej. - normalize_scene → resolve_scene (commit incrementally? na razie commit per-record). 4. Zamknij IngestRun (records_seen / new / updated / errors). Idempotencja: - external_records ma UNIQUE(source_id, entity_kind, external_id) — INSERT … ON CONFLICT DO UPDATE. - resolver dla istniejącego SceneExternalRef tylko aktualizuje pola. """ from __future__ import annotations import hashlib import json import logging import re import uuid from collections.abc import Iterable from datetime import UTC, datetime, timedelta from sqlalchemy import select from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.orm import Session from app.config import get_settings from app.connectors.base import BaseConnector, BaseMovieConnector, RawMovie, RawScene from app.db import session_scope from app.models.external_record import EntityKind, ExternalRecord from app.models.ingest_run import IngestRun, IngestStatus from app.models.source import Source, SourceKind from app.normalize.movies import normalize_movie from app.normalize.scenes import NormalizedScene, normalize_scene from app.resolve.movie_resolver import resolve_movie from app.resolve.scene_resolver import resolve_scene log = logging.getLogger(__name__) # Clip-store studia (ManyVids/IWantClips/Clips4Sale/...) — content twórców z paywalla. # Darmowe tube'y go nie hostują, więc z canonical (TPDB/StashDB) wjeżdża jako permanentny # orphan (56% ingestu canonical, ~860/dzień, ~550k w DB). Skipujemy resolve dla tych scen # gdy źródło jest canonical — oszczędza ~połowę resolve-time i nie zaśmieca katalogu. # NIE skipujemy dla tube'ów: tube scena z clip-store studiem MA playback, więc jest grywalna. _CLIP_STORE_RE = re.compile( r"^\s*(manyvids|i ?want ?clips|clips4sale|fancentro|loyalfans|onlyfans|fansly|modelhub)\b", re.IGNORECASE, ) def is_clip_store_studio(name: str | None) -> bool: return bool(name) and _CLIP_STORE_RE.match(name) is not None def _skip_clip_store_canonical(session: Session, *, source_id: uuid.UUID, studio_name: str | None) -> bool: """True gdy scena to clip-store content z canonical source → pomijamy resolve.""" if not getattr(get_settings(), "skip_clip_store", True): return False if not is_clip_store_studio(studio_name): return False src = session.get(Source, source_id) return src is not None and src.kind in (SourceKind.tpdb, SourceKind.stashdb) def _skip_short_tube_scene(session: Session, *, source_id: uuid.UUID, norm: NormalizedScene) -> bool: """True gdy scena ze scrapera/tube ma ZNANY duration < `min_ingest_duration_sec` (trailer/teaser/preview — śmieć). Nieznany duration → NIE wycinamy (mogłaby być pełna scena bez metadanych). Tylko scraper-source — canonical (TPDB/StashDB) zostawiamy. porndoe/deep-crawl ciągną z głębi katalogu sporo trailerów <3min (2026-06-03).""" floor = getattr(get_settings(), "min_ingest_duration_sec", 0) if not floor: return False dur = norm.duration_sec if dur is None: ps_durs = [ps.duration_sec for ps in norm.playback_sources if ps.duration_sec] dur = max(ps_durs) if ps_durs else None if dur is None or dur >= floor: return False src = session.get(Source, source_id) return src is not None and src.kind == SourceKind.scraper def _canonical_json(payload: dict) -> bytes: return json.dumps(payload, sort_keys=True, separators=(",", ":"), default=str).encode() def _hash_raw(payload: dict) -> bytes: return hashlib.sha256(_canonical_json(payload)).digest() def get_or_create_source( session: Session, *, kind: SourceKind, name: str, base_url: str | None = None ) -> Source: src = session.execute(select(Source).where(Source.name == name)).scalar_one_or_none() if src is not None: return src src = Source(kind=kind, name=name, base_url=base_url) session.add(src) session.flush() return src def _last_successful_finished_at(session: Session, source_id: uuid.UUID) -> datetime | None: return session.execute( select(IngestRun.finished_at) .where( IngestRun.source_id == source_id, IngestRun.status == IngestStatus.success, IngestRun.finished_at.is_not(None), ) .order_by(IngestRun.finished_at.desc()) .limit(1) ).scalar_one_or_none() def _upsert_external_record( session: Session, *, source_id: uuid.UUID, entity_kind: EntityKind, external_id: str, raw: dict, raw_hash: bytes, now: datetime, ) -> tuple[bool, bool]: """Returns (is_new, hash_changed).""" existing = session.execute( select(ExternalRecord.raw_hash).where( ExternalRecord.source_id == source_id, ExternalRecord.entity_kind == entity_kind, ExternalRecord.external_id == external_id, ) ).scalar_one_or_none() is_new = existing is None hash_changed = is_new or bytes(existing) != raw_hash stmt = ( pg_insert(ExternalRecord) .values( source_id=source_id, entity_kind=entity_kind, external_id=external_id, raw=raw, raw_hash=raw_hash, fetched_at=now, last_seen_at=now, ) .on_conflict_do_update( index_elements=["source_id", "entity_kind", "external_id"], set_={ "raw": raw, "raw_hash": raw_hash, "fetched_at": now, "last_seen_at": now, } if hash_changed else {"last_seen_at": now}, ) ) session.execute(stmt) return is_new, hash_changed def ingest_from_connector( connector: BaseConnector, *, limit: int | None = None, use_delta: bool = True, delta_buffer: timedelta = timedelta(hours=1), ) -> dict[str, int]: """Uruchamia jeden cykl ingest dla danego connectora. Zwraca counters.""" counters = {"seen": 0, "new": 0, "updated": 0, "skipped": 0, "errors": 0} with session_scope() as session: source = get_or_create_source(session, kind=connector.kind, name=connector.name) since: datetime | None = None if use_delta: last = _last_successful_finished_at(session, source.id) if last is not None: since = last - delta_buffer run = IngestRun(source_id=source.id, status=IngestStatus.running) session.add(run) session.flush() run_id = run.id source_id = source.id log.info( "ingest start source=%s since=%s limit=%s run_id=%s", connector.name, since.isoformat() if since else "FULL", limit, run_id, ) failed = False error_payload: dict | None = None try: for raw in connector.fetch_scenes(since=since, limit=limit): counters["seen"] += 1 try: _process_scene(source_id=source_id, raw_scene=raw, counters=counters) except Exception as exc: # pragma: no cover - obronnie counters["errors"] += 1 log.exception("ingest scene failed external_id=%s: %s", raw.external_id, exc) if counters["errors"] > 50: error_payload = {"message": "too many errors, aborting", "count": counters["errors"]} raise except Exception as exc: failed = True if error_payload is None: error_payload = {"message": str(exc), "type": type(exc).__name__} log.exception("ingest run failed: %s", exc) with session_scope() as session: run = session.get(IngestRun, run_id) assert run is not None run.finished_at = datetime.now(UTC) if failed: run.status = IngestStatus.failed elif counters["errors"] > 0: run.status = IngestStatus.partial else: run.status = IngestStatus.success run.records_seen = counters["seen"] run.records_new = counters["new"] run.records_updated = counters["updated"] if error_payload is not None: run.errors = error_payload log.info("ingest done source=%s counters=%s", connector.name, counters) return counters def _process_scene(*, source_id: uuid.UUID, raw_scene: RawScene, counters: dict[str, int]) -> None: payload = raw_scene.raw or raw_scene.model_dump(mode="json") raw_hash = _hash_raw(payload) now = datetime.now(UTC) with session_scope() as session: is_new, hash_changed = _upsert_external_record( session, source_id=source_id, entity_kind=EntityKind.scene, external_id=raw_scene.external_id, raw=payload, raw_hash=raw_hash, now=now, ) if not hash_changed: counters["skipped"] += 1 return norm = normalize_scene(raw_scene) if _skip_clip_store_canonical( session, source_id=source_id, studio_name=norm.studio.name if norm.studio else None ): counters["skipped"] += 1 return if _skip_short_tube_scene(session, source_id=source_id, norm=norm): counters["skipped"] += 1 return result = resolve_scene(session, norm=norm, source_id=source_id) if result.was_created: counters["new"] += 1 else: counters["updated"] += 1 log.debug( "scene resolved external=%s path=%s scene_id=%s score=%s", raw_scene.external_id, result.path, result.scene.id, result.score, ) def ingest_movies_from_connector( connector: BaseMovieConnector, *, limit: int | None = None, use_delta: bool = True, delta_buffer: timedelta = timedelta(hours=1), ) -> dict[str, int]: """Ingest movies (paradisehill / psyplay / wp_movies). Symetryczne do ingest_from_connector ale dla `RawMovie` → `Movie` przez `resolve_movie`. Source row zaszyfrowuje kind+name z connectora; IngestRun trzymamy w tej samej tabeli `ingest_runs` (typ entity rozróżniamy po `source.name` / `external_records.entity_kind=movie`).""" counters = {"seen": 0, "new": 0, "updated": 0, "skipped": 0, "errors": 0} with session_scope() as session: source = get_or_create_source(session, kind=connector.kind, name=connector.name) since: datetime | None = None if use_delta: last = _last_successful_finished_at(session, source.id) if last is not None: since = last - delta_buffer run = IngestRun(source_id=source.id, status=IngestStatus.running) session.add(run) session.flush() run_id = run.id source_id = source.id log.info( "ingest-movies start source=%s since=%s limit=%s run_id=%s", connector.name, since.isoformat() if since else "FULL", limit, run_id, ) failed = False error_payload: dict | None = None try: for raw in connector.fetch_movies(since=since, limit=limit): counters["seen"] += 1 try: _process_movie(source_id=source_id, raw_movie=raw, counters=counters) except Exception as exc: counters["errors"] += 1 log.exception("ingest movie failed external_id=%s: %s", raw.external_id, exc) if counters["errors"] > 50: error_payload = {"message": "too many errors, aborting", "count": counters["errors"]} raise except Exception as exc: failed = True if error_payload is None: error_payload = {"message": str(exc), "type": type(exc).__name__} log.exception("ingest-movies run failed: %s", exc) with session_scope() as session: run = session.get(IngestRun, run_id) assert run is not None run.finished_at = datetime.now(UTC) if failed: run.status = IngestStatus.failed elif counters["errors"] > 0: run.status = IngestStatus.partial else: run.status = IngestStatus.success run.records_seen = counters["seen"] run.records_new = counters["new"] run.records_updated = counters["updated"] if error_payload is not None: run.errors = error_payload log.info("ingest-movies done source=%s counters=%s", connector.name, counters) return counters def _process_movie(*, source_id: uuid.UUID, raw_movie: RawMovie, counters: dict[str, int]) -> None: payload = raw_movie.raw or raw_movie.model_dump(mode="json") raw_hash = _hash_raw(payload) now = datetime.now(UTC) with session_scope() as session: is_new, hash_changed = _upsert_external_record( session, source_id=source_id, entity_kind=EntityKind.movie, external_id=raw_movie.external_id, raw=payload, raw_hash=raw_hash, now=now, ) if not hash_changed: counters["skipped"] += 1 return norm = normalize_movie(raw_movie) result = resolve_movie(session, norm=norm, source_id=source_id) if result.was_created: counters["new"] += 1 else: counters["updated"] += 1 log.debug( "movie resolved external=%s path=%s movie_id=%s", raw_movie.external_id, result.path, result.movie.id, ) def ingest_iter( raws: Iterable[RawScene], *, source_kind: SourceKind, source_name: str, ) -> dict[str, int]: """Pomocnik do testów / scrapów ad-hoc bez connectora.""" counters = {"seen": 0, "new": 0, "updated": 0, "skipped": 0, "errors": 0} with session_scope() as session: source = get_or_create_source(session, kind=source_kind, name=source_name) source_id = source.id for raw in raws: counters["seen"] += 1 try: _process_scene(source_id=source_id, raw_scene=raw, counters=counters) except Exception: counters["errors"] += 1 log.exception("ingest_iter failed external_id=%s", raw.external_id) return counters