Deep-crawling tube catalogs pulls in lots of <3min trailers/teasers (porndoe). Add min_ingest_duration_sec (default 180): _process_scene skips scraper-source scenes whose known duration is below the floor (unknown duration kept; canonical TPDB/StashDB untouched). Deleted 67 existing porndoe-only orphan trailers (<180s, no canonical, no non-porndoe live playback) via cascade. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
416 lines
14 KiB
Python
416 lines
14 KiB
Python
"""Orchestracja per-source ingest.
|
|
|
|
Cykl:
|
|
1. Otwórz IngestRun (status=running).
|
|
2. Iteruj `connector.fetch_scenes(since=last_successful_run-buffer, limit=limit)`.
|
|
3. Dla każdej RawScene:
|
|
- hash = sha256(canonical_json(raw))
|
|
- upsert do `external_records`: jeśli (source, kind, external_id) istnieje i hash niezmieniony → tylko `last_seen_at`. Inaczej → zapis i przekaż dalej.
|
|
- normalize_scene → resolve_scene (commit incrementally? na razie commit per-record).
|
|
4. Zamknij IngestRun (records_seen / new / updated / errors).
|
|
|
|
Idempotencja:
|
|
- external_records ma UNIQUE(source_id, entity_kind, external_id) — INSERT … ON CONFLICT DO UPDATE.
|
|
- resolver dla istniejącego SceneExternalRef tylko aktualizuje pola.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import re
|
|
import uuid
|
|
from collections.abc import Iterable
|
|
from datetime import UTC, datetime, timedelta
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.config import get_settings
|
|
from app.connectors.base import BaseConnector, BaseMovieConnector, RawMovie, RawScene
|
|
from app.db import session_scope
|
|
from app.models.external_record import EntityKind, ExternalRecord
|
|
from app.models.ingest_run import IngestRun, IngestStatus
|
|
from app.models.source import Source, SourceKind
|
|
from app.normalize.movies import normalize_movie
|
|
from app.normalize.scenes import NormalizedScene, normalize_scene
|
|
from app.resolve.movie_resolver import resolve_movie
|
|
from app.resolve.scene_resolver import resolve_scene
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
# Clip-store studia (ManyVids/IWantClips/Clips4Sale/...) — content twórców z paywalla.
|
|
# Darmowe tube'y go nie hostują, więc z canonical (TPDB/StashDB) wjeżdża jako permanentny
|
|
# orphan (56% ingestu canonical, ~860/dzień, ~550k w DB). Skipujemy resolve dla tych scen
|
|
# gdy źródło jest canonical — oszczędza ~połowę resolve-time i nie zaśmieca katalogu.
|
|
# NIE skipujemy dla tube'ów: tube scena z clip-store studiem MA playback, więc jest grywalna.
|
|
_CLIP_STORE_RE = re.compile(
|
|
r"^\s*(manyvids|i ?want ?clips|clips4sale|fancentro|loyalfans|onlyfans|fansly|modelhub)\b",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
def is_clip_store_studio(name: str | None) -> bool:
|
|
return bool(name) and _CLIP_STORE_RE.match(name) is not None
|
|
|
|
|
|
def _skip_clip_store_canonical(session: Session, *, source_id: uuid.UUID, studio_name: str | None) -> bool:
|
|
"""True gdy scena to clip-store content z canonical source → pomijamy resolve."""
|
|
if not getattr(get_settings(), "skip_clip_store", True):
|
|
return False
|
|
if not is_clip_store_studio(studio_name):
|
|
return False
|
|
src = session.get(Source, source_id)
|
|
return src is not None and src.kind in (SourceKind.tpdb, SourceKind.stashdb)
|
|
|
|
|
|
def _skip_short_tube_scene(session: Session, *, source_id: uuid.UUID, norm: NormalizedScene) -> bool:
|
|
"""True gdy scena ze scrapera/tube ma ZNANY duration < `min_ingest_duration_sec`
|
|
(trailer/teaser/preview — śmieć). Nieznany duration → NIE wycinamy (mogłaby być pełna
|
|
scena bez metadanych). Tylko scraper-source — canonical (TPDB/StashDB) zostawiamy.
|
|
porndoe/deep-crawl ciągną z głębi katalogu sporo trailerów <3min (2026-06-03)."""
|
|
floor = getattr(get_settings(), "min_ingest_duration_sec", 0)
|
|
if not floor:
|
|
return False
|
|
dur = norm.duration_sec
|
|
if dur is None:
|
|
ps_durs = [ps.duration_sec for ps in norm.playback_sources if ps.duration_sec]
|
|
dur = max(ps_durs) if ps_durs else None
|
|
if dur is None or dur >= floor:
|
|
return False
|
|
src = session.get(Source, source_id)
|
|
return src is not None and src.kind == SourceKind.scraper
|
|
|
|
|
|
def _canonical_json(payload: dict) -> bytes:
|
|
return json.dumps(payload, sort_keys=True, separators=(",", ":"), default=str).encode()
|
|
|
|
|
|
def _hash_raw(payload: dict) -> bytes:
|
|
return hashlib.sha256(_canonical_json(payload)).digest()
|
|
|
|
|
|
def get_or_create_source(
|
|
session: Session, *, kind: SourceKind, name: str, base_url: str | None = None
|
|
) -> Source:
|
|
src = session.execute(select(Source).where(Source.name == name)).scalar_one_or_none()
|
|
if src is not None:
|
|
return src
|
|
src = Source(kind=kind, name=name, base_url=base_url)
|
|
session.add(src)
|
|
session.flush()
|
|
return src
|
|
|
|
|
|
def _last_successful_finished_at(session: Session, source_id: uuid.UUID) -> datetime | None:
|
|
return session.execute(
|
|
select(IngestRun.finished_at)
|
|
.where(
|
|
IngestRun.source_id == source_id,
|
|
IngestRun.status == IngestStatus.success,
|
|
IngestRun.finished_at.is_not(None),
|
|
)
|
|
.order_by(IngestRun.finished_at.desc())
|
|
.limit(1)
|
|
).scalar_one_or_none()
|
|
|
|
|
|
def _upsert_external_record(
|
|
session: Session,
|
|
*,
|
|
source_id: uuid.UUID,
|
|
entity_kind: EntityKind,
|
|
external_id: str,
|
|
raw: dict,
|
|
raw_hash: bytes,
|
|
now: datetime,
|
|
) -> tuple[bool, bool]:
|
|
"""Returns (is_new, hash_changed)."""
|
|
existing = session.execute(
|
|
select(ExternalRecord.raw_hash).where(
|
|
ExternalRecord.source_id == source_id,
|
|
ExternalRecord.entity_kind == entity_kind,
|
|
ExternalRecord.external_id == external_id,
|
|
)
|
|
).scalar_one_or_none()
|
|
|
|
is_new = existing is None
|
|
hash_changed = is_new or bytes(existing) != raw_hash
|
|
|
|
stmt = (
|
|
pg_insert(ExternalRecord)
|
|
.values(
|
|
source_id=source_id,
|
|
entity_kind=entity_kind,
|
|
external_id=external_id,
|
|
raw=raw,
|
|
raw_hash=raw_hash,
|
|
fetched_at=now,
|
|
last_seen_at=now,
|
|
)
|
|
.on_conflict_do_update(
|
|
index_elements=["source_id", "entity_kind", "external_id"],
|
|
set_={
|
|
"raw": raw,
|
|
"raw_hash": raw_hash,
|
|
"fetched_at": now,
|
|
"last_seen_at": now,
|
|
}
|
|
if hash_changed
|
|
else {"last_seen_at": now},
|
|
)
|
|
)
|
|
session.execute(stmt)
|
|
return is_new, hash_changed
|
|
|
|
|
|
def ingest_from_connector(
|
|
connector: BaseConnector,
|
|
*,
|
|
limit: int | None = None,
|
|
use_delta: bool = True,
|
|
delta_buffer: timedelta = timedelta(hours=1),
|
|
) -> dict[str, int]:
|
|
"""Uruchamia jeden cykl ingest dla danego connectora. Zwraca counters."""
|
|
|
|
counters = {"seen": 0, "new": 0, "updated": 0, "skipped": 0, "errors": 0}
|
|
|
|
with session_scope() as session:
|
|
source = get_or_create_source(session, kind=connector.kind, name=connector.name)
|
|
since: datetime | None = None
|
|
if use_delta:
|
|
last = _last_successful_finished_at(session, source.id)
|
|
if last is not None:
|
|
since = last - delta_buffer
|
|
|
|
run = IngestRun(source_id=source.id, status=IngestStatus.running)
|
|
session.add(run)
|
|
session.flush()
|
|
run_id = run.id
|
|
source_id = source.id
|
|
|
|
log.info(
|
|
"ingest start source=%s since=%s limit=%s run_id=%s",
|
|
connector.name,
|
|
since.isoformat() if since else "FULL",
|
|
limit,
|
|
run_id,
|
|
)
|
|
|
|
failed = False
|
|
error_payload: dict | None = None
|
|
|
|
try:
|
|
for raw in connector.fetch_scenes(since=since, limit=limit):
|
|
counters["seen"] += 1
|
|
try:
|
|
_process_scene(source_id=source_id, raw_scene=raw, counters=counters)
|
|
except Exception as exc: # pragma: no cover - obronnie
|
|
counters["errors"] += 1
|
|
log.exception("ingest scene failed external_id=%s: %s", raw.external_id, exc)
|
|
if counters["errors"] > 50:
|
|
error_payload = {"message": "too many errors, aborting", "count": counters["errors"]}
|
|
raise
|
|
except Exception as exc:
|
|
failed = True
|
|
if error_payload is None:
|
|
error_payload = {"message": str(exc), "type": type(exc).__name__}
|
|
log.exception("ingest run failed: %s", exc)
|
|
|
|
with session_scope() as session:
|
|
run = session.get(IngestRun, run_id)
|
|
assert run is not None
|
|
run.finished_at = datetime.now(UTC)
|
|
if failed:
|
|
run.status = IngestStatus.failed
|
|
elif counters["errors"] > 0:
|
|
run.status = IngestStatus.partial
|
|
else:
|
|
run.status = IngestStatus.success
|
|
run.records_seen = counters["seen"]
|
|
run.records_new = counters["new"]
|
|
run.records_updated = counters["updated"]
|
|
if error_payload is not None:
|
|
run.errors = error_payload
|
|
|
|
log.info("ingest done source=%s counters=%s", connector.name, counters)
|
|
return counters
|
|
|
|
|
|
def _process_scene(*, source_id: uuid.UUID, raw_scene: RawScene, counters: dict[str, int]) -> None:
|
|
payload = raw_scene.raw or raw_scene.model_dump(mode="json")
|
|
raw_hash = _hash_raw(payload)
|
|
now = datetime.now(UTC)
|
|
|
|
with session_scope() as session:
|
|
is_new, hash_changed = _upsert_external_record(
|
|
session,
|
|
source_id=source_id,
|
|
entity_kind=EntityKind.scene,
|
|
external_id=raw_scene.external_id,
|
|
raw=payload,
|
|
raw_hash=raw_hash,
|
|
now=now,
|
|
)
|
|
if not hash_changed:
|
|
counters["skipped"] += 1
|
|
return
|
|
|
|
norm = normalize_scene(raw_scene)
|
|
|
|
if _skip_clip_store_canonical(
|
|
session, source_id=source_id, studio_name=norm.studio.name if norm.studio else None
|
|
):
|
|
counters["skipped"] += 1
|
|
return
|
|
|
|
if _skip_short_tube_scene(session, source_id=source_id, norm=norm):
|
|
counters["skipped"] += 1
|
|
return
|
|
|
|
result = resolve_scene(session, norm=norm, source_id=source_id)
|
|
|
|
if result.was_created:
|
|
counters["new"] += 1
|
|
else:
|
|
counters["updated"] += 1
|
|
log.debug(
|
|
"scene resolved external=%s path=%s scene_id=%s score=%s",
|
|
raw_scene.external_id,
|
|
result.path,
|
|
result.scene.id,
|
|
result.score,
|
|
)
|
|
|
|
|
|
def ingest_movies_from_connector(
|
|
connector: BaseMovieConnector,
|
|
*,
|
|
limit: int | None = None,
|
|
use_delta: bool = True,
|
|
delta_buffer: timedelta = timedelta(hours=1),
|
|
) -> dict[str, int]:
|
|
"""Ingest movies (paradisehill / psyplay / wp_movies). Symetryczne do
|
|
ingest_from_connector ale dla `RawMovie` → `Movie` przez `resolve_movie`.
|
|
|
|
Source row zaszyfrowuje kind+name z connectora; IngestRun trzymamy w tej
|
|
samej tabeli `ingest_runs` (typ entity rozróżniamy po `source.name` /
|
|
`external_records.entity_kind=movie`)."""
|
|
counters = {"seen": 0, "new": 0, "updated": 0, "skipped": 0, "errors": 0}
|
|
|
|
with session_scope() as session:
|
|
source = get_or_create_source(session, kind=connector.kind, name=connector.name)
|
|
since: datetime | None = None
|
|
if use_delta:
|
|
last = _last_successful_finished_at(session, source.id)
|
|
if last is not None:
|
|
since = last - delta_buffer
|
|
run = IngestRun(source_id=source.id, status=IngestStatus.running)
|
|
session.add(run)
|
|
session.flush()
|
|
run_id = run.id
|
|
source_id = source.id
|
|
|
|
log.info(
|
|
"ingest-movies start source=%s since=%s limit=%s run_id=%s",
|
|
connector.name,
|
|
since.isoformat() if since else "FULL",
|
|
limit,
|
|
run_id,
|
|
)
|
|
|
|
failed = False
|
|
error_payload: dict | None = None
|
|
|
|
try:
|
|
for raw in connector.fetch_movies(since=since, limit=limit):
|
|
counters["seen"] += 1
|
|
try:
|
|
_process_movie(source_id=source_id, raw_movie=raw, counters=counters)
|
|
except Exception as exc:
|
|
counters["errors"] += 1
|
|
log.exception("ingest movie failed external_id=%s: %s", raw.external_id, exc)
|
|
if counters["errors"] > 50:
|
|
error_payload = {"message": "too many errors, aborting", "count": counters["errors"]}
|
|
raise
|
|
except Exception as exc:
|
|
failed = True
|
|
if error_payload is None:
|
|
error_payload = {"message": str(exc), "type": type(exc).__name__}
|
|
log.exception("ingest-movies run failed: %s", exc)
|
|
|
|
with session_scope() as session:
|
|
run = session.get(IngestRun, run_id)
|
|
assert run is not None
|
|
run.finished_at = datetime.now(UTC)
|
|
if failed:
|
|
run.status = IngestStatus.failed
|
|
elif counters["errors"] > 0:
|
|
run.status = IngestStatus.partial
|
|
else:
|
|
run.status = IngestStatus.success
|
|
run.records_seen = counters["seen"]
|
|
run.records_new = counters["new"]
|
|
run.records_updated = counters["updated"]
|
|
if error_payload is not None:
|
|
run.errors = error_payload
|
|
|
|
log.info("ingest-movies done source=%s counters=%s", connector.name, counters)
|
|
return counters
|
|
|
|
|
|
def _process_movie(*, source_id: uuid.UUID, raw_movie: RawMovie, counters: dict[str, int]) -> None:
|
|
payload = raw_movie.raw or raw_movie.model_dump(mode="json")
|
|
raw_hash = _hash_raw(payload)
|
|
now = datetime.now(UTC)
|
|
|
|
with session_scope() as session:
|
|
is_new, hash_changed = _upsert_external_record(
|
|
session,
|
|
source_id=source_id,
|
|
entity_kind=EntityKind.movie,
|
|
external_id=raw_movie.external_id,
|
|
raw=payload,
|
|
raw_hash=raw_hash,
|
|
now=now,
|
|
)
|
|
if not hash_changed:
|
|
counters["skipped"] += 1
|
|
return
|
|
|
|
norm = normalize_movie(raw_movie)
|
|
result = resolve_movie(session, norm=norm, source_id=source_id)
|
|
|
|
if result.was_created:
|
|
counters["new"] += 1
|
|
else:
|
|
counters["updated"] += 1
|
|
log.debug(
|
|
"movie resolved external=%s path=%s movie_id=%s",
|
|
raw_movie.external_id,
|
|
result.path,
|
|
result.movie.id,
|
|
)
|
|
|
|
|
|
def ingest_iter(
|
|
raws: Iterable[RawScene],
|
|
*,
|
|
source_kind: SourceKind,
|
|
source_name: str,
|
|
) -> dict[str, int]:
|
|
"""Pomocnik do testów / scrapów ad-hoc bez connectora."""
|
|
counters = {"seen": 0, "new": 0, "updated": 0, "skipped": 0, "errors": 0}
|
|
with session_scope() as session:
|
|
source = get_or_create_source(session, kind=source_kind, name=source_name)
|
|
source_id = source.id
|
|
for raw in raws:
|
|
counters["seen"] += 1
|
|
try:
|
|
_process_scene(source_id=source_id, raw_scene=raw, counters=counters)
|
|
except Exception:
|
|
counters["errors"] += 1
|
|
log.exception("ingest_iter failed external_id=%s", raw.external_id)
|
|
return counters
|