goon/app/ingest.py
goon-foss ad0284585b Initial commit
Goon — self-hosted aggregator for adult-content scene metadata.

Indexes scenes from TPDB, StashDB, and 30+ public adult tube sites.
Cross-source deduplication via perceptual hash + Levenshtein distance.
FastAPI backend + APScheduler worker + React Native (Expo) mobile client.

FOSS, ad-free, donation-funded. See README for details.
2026-05-20 10:10:22 +02:00

360 lines
12 KiB
Python

"""Orchestracja per-source ingest.
Cykl:
1. Otwórz IngestRun (status=running).
2. Iteruj `connector.fetch_scenes(since=last_successful_run-buffer, limit=limit)`.
3. Dla każdej RawScene:
- hash = sha256(canonical_json(raw))
- upsert do `external_records`: jeśli (source, kind, external_id) istnieje i hash niezmieniony → tylko `last_seen_at`. Inaczej → zapis i przekaż dalej.
- normalize_scene → resolve_scene (commit incrementally? na razie commit per-record).
4. Zamknij IngestRun (records_seen / new / updated / errors).
Idempotencja:
- external_records ma UNIQUE(source_id, entity_kind, external_id) — INSERT … ON CONFLICT DO UPDATE.
- resolver dla istniejącego SceneExternalRef tylko aktualizuje pola.
"""
from __future__ import annotations
import hashlib
import json
import logging
import uuid
from collections.abc import Iterable
from datetime import UTC, datetime, timedelta
from sqlalchemy import select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.orm import Session
from app.connectors.base import BaseConnector, BaseMovieConnector, RawMovie, RawScene
from app.db import session_scope
from app.models.external_record import EntityKind, ExternalRecord
from app.models.ingest_run import IngestRun, IngestStatus
from app.models.source import Source, SourceKind
from app.normalize.movies import normalize_movie
from app.normalize.scenes import normalize_scene
from app.resolve.movie_resolver import resolve_movie
from app.resolve.scene_resolver import resolve_scene
log = logging.getLogger(__name__)
def _canonical_json(payload: dict) -> bytes:
return json.dumps(payload, sort_keys=True, separators=(",", ":"), default=str).encode()
def _hash_raw(payload: dict) -> bytes:
return hashlib.sha256(_canonical_json(payload)).digest()
def get_or_create_source(
session: Session, *, kind: SourceKind, name: str, base_url: str | None = None
) -> Source:
src = session.execute(select(Source).where(Source.name == name)).scalar_one_or_none()
if src is not None:
return src
src = Source(kind=kind, name=name, base_url=base_url)
session.add(src)
session.flush()
return src
def _last_successful_finished_at(session: Session, source_id: uuid.UUID) -> datetime | None:
return session.execute(
select(IngestRun.finished_at)
.where(
IngestRun.source_id == source_id,
IngestRun.status == IngestStatus.success,
IngestRun.finished_at.is_not(None),
)
.order_by(IngestRun.finished_at.desc())
.limit(1)
).scalar_one_or_none()
def _upsert_external_record(
session: Session,
*,
source_id: uuid.UUID,
entity_kind: EntityKind,
external_id: str,
raw: dict,
raw_hash: bytes,
now: datetime,
) -> tuple[bool, bool]:
"""Returns (is_new, hash_changed)."""
existing = session.execute(
select(ExternalRecord.raw_hash).where(
ExternalRecord.source_id == source_id,
ExternalRecord.entity_kind == entity_kind,
ExternalRecord.external_id == external_id,
)
).scalar_one_or_none()
is_new = existing is None
hash_changed = is_new or bytes(existing) != raw_hash
stmt = (
pg_insert(ExternalRecord)
.values(
source_id=source_id,
entity_kind=entity_kind,
external_id=external_id,
raw=raw,
raw_hash=raw_hash,
fetched_at=now,
last_seen_at=now,
)
.on_conflict_do_update(
index_elements=["source_id", "entity_kind", "external_id"],
set_={
"raw": raw,
"raw_hash": raw_hash,
"fetched_at": now,
"last_seen_at": now,
}
if hash_changed
else {"last_seen_at": now},
)
)
session.execute(stmt)
return is_new, hash_changed
def ingest_from_connector(
connector: BaseConnector,
*,
limit: int | None = None,
use_delta: bool = True,
delta_buffer: timedelta = timedelta(hours=1),
) -> dict[str, int]:
"""Uruchamia jeden cykl ingest dla danego connectora. Zwraca counters."""
counters = {"seen": 0, "new": 0, "updated": 0, "skipped": 0, "errors": 0}
with session_scope() as session:
source = get_or_create_source(session, kind=connector.kind, name=connector.name)
since: datetime | None = None
if use_delta:
last = _last_successful_finished_at(session, source.id)
if last is not None:
since = last - delta_buffer
run = IngestRun(source_id=source.id, status=IngestStatus.running)
session.add(run)
session.flush()
run_id = run.id
source_id = source.id
log.info(
"ingest start source=%s since=%s limit=%s run_id=%s",
connector.name,
since.isoformat() if since else "FULL",
limit,
run_id,
)
failed = False
error_payload: dict | None = None
try:
for raw in connector.fetch_scenes(since=since, limit=limit):
counters["seen"] += 1
try:
_process_scene(source_id=source_id, raw_scene=raw, counters=counters)
except Exception as exc: # pragma: no cover - obronnie
counters["errors"] += 1
log.exception("ingest scene failed external_id=%s: %s", raw.external_id, exc)
if counters["errors"] > 50:
error_payload = {"message": "too many errors, aborting", "count": counters["errors"]}
raise
except Exception as exc:
failed = True
if error_payload is None:
error_payload = {"message": str(exc), "type": type(exc).__name__}
log.exception("ingest run failed: %s", exc)
with session_scope() as session:
run = session.get(IngestRun, run_id)
assert run is not None
run.finished_at = datetime.now(UTC)
if failed:
run.status = IngestStatus.failed
elif counters["errors"] > 0:
run.status = IngestStatus.partial
else:
run.status = IngestStatus.success
run.records_seen = counters["seen"]
run.records_new = counters["new"]
run.records_updated = counters["updated"]
if error_payload is not None:
run.errors = error_payload
log.info("ingest done source=%s counters=%s", connector.name, counters)
return counters
def _process_scene(*, source_id: uuid.UUID, raw_scene: RawScene, counters: dict[str, int]) -> None:
payload = raw_scene.raw or raw_scene.model_dump(mode="json")
raw_hash = _hash_raw(payload)
now = datetime.now(UTC)
with session_scope() as session:
is_new, hash_changed = _upsert_external_record(
session,
source_id=source_id,
entity_kind=EntityKind.scene,
external_id=raw_scene.external_id,
raw=payload,
raw_hash=raw_hash,
now=now,
)
if not hash_changed:
counters["skipped"] += 1
return
norm = normalize_scene(raw_scene)
result = resolve_scene(session, norm=norm, source_id=source_id)
if result.was_created:
counters["new"] += 1
else:
counters["updated"] += 1
log.debug(
"scene resolved external=%s path=%s scene_id=%s score=%s",
raw_scene.external_id,
result.path,
result.scene.id,
result.score,
)
def ingest_movies_from_connector(
connector: BaseMovieConnector,
*,
limit: int | None = None,
use_delta: bool = True,
delta_buffer: timedelta = timedelta(hours=1),
) -> dict[str, int]:
"""Ingest movies (paradisehill / psyplay / wp_movies). Symetryczne do
ingest_from_connector ale dla `RawMovie` → `Movie` przez `resolve_movie`.
Source row zaszyfrowuje kind+name z connectora; IngestRun trzymamy w tej
samej tabeli `ingest_runs` (typ entity rozróżniamy po `source.name` /
`external_records.entity_kind=movie`)."""
counters = {"seen": 0, "new": 0, "updated": 0, "skipped": 0, "errors": 0}
with session_scope() as session:
source = get_or_create_source(session, kind=connector.kind, name=connector.name)
since: datetime | None = None
if use_delta:
last = _last_successful_finished_at(session, source.id)
if last is not None:
since = last - delta_buffer
run = IngestRun(source_id=source.id, status=IngestStatus.running)
session.add(run)
session.flush()
run_id = run.id
source_id = source.id
log.info(
"ingest-movies start source=%s since=%s limit=%s run_id=%s",
connector.name,
since.isoformat() if since else "FULL",
limit,
run_id,
)
failed = False
error_payload: dict | None = None
try:
for raw in connector.fetch_movies(since=since, limit=limit):
counters["seen"] += 1
try:
_process_movie(source_id=source_id, raw_movie=raw, counters=counters)
except Exception as exc:
counters["errors"] += 1
log.exception("ingest movie failed external_id=%s: %s", raw.external_id, exc)
if counters["errors"] > 50:
error_payload = {"message": "too many errors, aborting", "count": counters["errors"]}
raise
except Exception as exc:
failed = True
if error_payload is None:
error_payload = {"message": str(exc), "type": type(exc).__name__}
log.exception("ingest-movies run failed: %s", exc)
with session_scope() as session:
run = session.get(IngestRun, run_id)
assert run is not None
run.finished_at = datetime.now(UTC)
if failed:
run.status = IngestStatus.failed
elif counters["errors"] > 0:
run.status = IngestStatus.partial
else:
run.status = IngestStatus.success
run.records_seen = counters["seen"]
run.records_new = counters["new"]
run.records_updated = counters["updated"]
if error_payload is not None:
run.errors = error_payload
log.info("ingest-movies done source=%s counters=%s", connector.name, counters)
return counters
def _process_movie(*, source_id: uuid.UUID, raw_movie: RawMovie, counters: dict[str, int]) -> None:
payload = raw_movie.raw or raw_movie.model_dump(mode="json")
raw_hash = _hash_raw(payload)
now = datetime.now(UTC)
with session_scope() as session:
is_new, hash_changed = _upsert_external_record(
session,
source_id=source_id,
entity_kind=EntityKind.movie,
external_id=raw_movie.external_id,
raw=payload,
raw_hash=raw_hash,
now=now,
)
if not hash_changed:
counters["skipped"] += 1
return
norm = normalize_movie(raw_movie)
result = resolve_movie(session, norm=norm, source_id=source_id)
if result.was_created:
counters["new"] += 1
else:
counters["updated"] += 1
log.debug(
"movie resolved external=%s path=%s movie_id=%s",
raw_movie.external_id,
result.path,
result.movie.id,
)
def ingest_iter(
raws: Iterable[RawScene],
*,
source_kind: SourceKind,
source_name: str,
) -> dict[str, int]:
"""Pomocnik do testów / scrapów ad-hoc bez connectora."""
counters = {"seen": 0, "new": 0, "updated": 0, "skipped": 0, "errors": 0}
with session_scope() as session:
source = get_or_create_source(session, kind=source_kind, name=source_name)
source_id = source.id
for raw in raws:
counters["seen"] += 1
try:
_process_scene(source_id=source_id, raw_scene=raw, counters=counters)
except Exception:
counters["errors"] += 1
log.exception("ingest_iter failed external_id=%s", raw.external_id)
return counters