The umbrella Source.name for all direct tube scrapers (deep-crawl, browse-latest, performer-driven) was "pornapp" — a misleading leftover from the removed external porn-app API. It read like a dependency on a third-party "pornapp" service; it is not — these are our own scrapers hitting 25+ tubes directly (kind=scraper, origin tube:<sitetag>). Renamed to "tube-scraper" via a single SCRAPER_SOURCE_NAME constant; DB row renamed in place (UPDATE name, same id) so all ingest_runs + external_records history stays linked. No behavior change — external_id keying (sitetag:url) and dedup are unaffected. NOTE: playback_sources.origin "pornapp:<sitetag>" prefix is a separate legacy format (resolve_playback parses it) and is intentionally left untouched. Verified on prod: row renamed (0 stray "pornapp"), new runs land on "tube-scraper". Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
640 lines
24 KiB
Python
640 lines
24 KiB
Python
"""Performer-driven ingest strategy.
|
|
|
|
Rationale (memory: kompletność > świeżość):
|
|
Random newest-first ingest z TPDB/StashDB rzadko pokrywa się ze świeżym pull'em
|
|
z hqporner — bo każde źródło widzi inny wycinek katalogu w danym momencie. Dla
|
|
popularnych performerów jednak prawdopodobieństwo overlapu jest dużo wyższe,
|
|
a zasięg po jednym performerze jest skończony i da się przeszukać.
|
|
|
|
Cykl per performer:
|
|
1. Lookup performer_external_refs(tpdb, stashdb) — kanoniczne UUID-y.
|
|
2. Dla każdego źródła z UUID: pobierz wszystkie sceny tego performera (paginated).
|
|
3. Dla każdego direct tube scrapera: search tube po `canonical_name` (np. "Lola Noir").
|
|
4. Sceny lecą przez normalny `_process_scene` → resolver merguje cross-source.
|
|
|
|
Wybór listy performerów (kolejność prób):
|
|
a) `--performers "name1,name2"` — explicit.
|
|
b) `--performer-ids <uuid1>,<uuid2>` — explicit po naszym kanonicznym UUID.
|
|
c) Top-N z bazy po `scene_count desc` (`--top-n=N`, default 20).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import uuid
|
|
from dataclasses import dataclass, field
|
|
from datetime import UTC, datetime, timedelta
|
|
|
|
from sqlalchemy import func, select
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.connectors.stashdb import StashDBConnector
|
|
from app.connectors.tpdb import TPDBConnector
|
|
from app.db import session_scope
|
|
from app.ingest import (
|
|
_hash_raw, # type: ignore[attr-defined]
|
|
_process_scene, # type: ignore[attr-defined]
|
|
get_or_create_source,
|
|
)
|
|
from app.models.ingest_run import IngestRun, IngestStatus
|
|
from app.models.performer import Performer, PerformerExternalRef
|
|
from app.models.playback_source import PlaybackSource
|
|
from app.models.scene import Scene, ScenePerformer
|
|
from app.models.source import Source, SourceKind
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class PerformerTarget:
|
|
performer_id: uuid.UUID
|
|
canonical_name: str
|
|
tpdb_id: str | None = None
|
|
stashdb_id: str | None = None
|
|
|
|
|
|
@dataclass
|
|
class StrategyCounters:
|
|
performers_processed: int = 0
|
|
performers_skipped_no_refs: int = 0
|
|
per_source: dict[str, dict[str, int]] = field(default_factory=dict)
|
|
"""source_name → {seen, new, updated, skipped, errors} agregowane po wszystkich performerach."""
|
|
|
|
def merge(self, source_name: str, c: dict[str, int]) -> None:
|
|
bucket = self.per_source.setdefault(
|
|
source_name, {"seen": 0, "new": 0, "updated": 0, "skipped": 0, "errors": 0}
|
|
)
|
|
for k in bucket:
|
|
bucket[k] += c.get(k, 0)
|
|
|
|
|
|
# ---- Public API ------------------------------------------------------
|
|
|
|
def run_performer_driven(
|
|
*,
|
|
performer_names: list[str] | None = None,
|
|
performer_ids: list[uuid.UUID] | None = None,
|
|
top_n: int = 20,
|
|
sitetags: list[str] | None = None,
|
|
per_performer_limit: int | None = None,
|
|
canonical_per_performer_limit: int | None = None,
|
|
skip_tubes: bool = False,
|
|
skip_canonical: bool = False,
|
|
) -> StrategyCounters:
|
|
"""Główne entry point dla worker CLI."""
|
|
counters = StrategyCounters()
|
|
|
|
# Gdy user explicit żąda performerów po nazwach, automatycznie uzupełniamy brakujące
|
|
# canonical refs (TPDB/StashDB UUID) — historycznie wielu performerów miało tylko
|
|
# stashdb ref, więc TPDB ingest dla nich nigdy nie odpalał. Dla top-N path nie
|
|
# auto-fillujemy bo to dodatkowe API calls bez explicit user intent.
|
|
fill_missing_refs = bool(performer_names)
|
|
|
|
targets = _resolve_targets(
|
|
performer_names=performer_names,
|
|
performer_ids=performer_ids,
|
|
top_n=top_n,
|
|
fill_missing_refs=fill_missing_refs,
|
|
)
|
|
if not targets:
|
|
log.warning("performer-driven: no targets resolved (empty top-N or unknown names)")
|
|
return counters
|
|
|
|
log.info(
|
|
"performer-driven: %d performers -> tpdb=%s stashdb=%s tubes=%s",
|
|
len(targets),
|
|
sum(1 for t in targets if t.tpdb_id),
|
|
sum(1 for t in targets if t.stashdb_id),
|
|
not skip_tubes,
|
|
)
|
|
|
|
# `skip_canonical=True` używane przez continuous worker — pomija TPDB/StashDB
|
|
# ingest (są wolne, ~3min/perf). Continuous skupia się na search-based backfill;
|
|
# canonical refresh top performerów dzieje się przez scheduled `_job_performer_driven`
|
|
# co 12h i przez on-demand `/refresh` API.
|
|
tpdb = None if skip_canonical else _try_build(TPDBConnector)
|
|
stashdb = None if skip_canonical else _try_build(StashDBConnector)
|
|
|
|
for target in targets:
|
|
log.info("=== performer: %s (id=%s) ===", target.canonical_name, target.performer_id)
|
|
if not skip_canonical and not (target.tpdb_id or target.stashdb_id):
|
|
log.warning(
|
|
"skip %s: brak tpdb/stashdb external_ref, search-only by name nie pomoże w mergu",
|
|
target.canonical_name,
|
|
)
|
|
counters.performers_skipped_no_refs += 1
|
|
continue
|
|
|
|
# 1. TPDB — limit canonical_per_performer_limit (popular performerzy mają tysiące
|
|
# scen, fetch wszystkich za każdym refresh = wolne ticki). Default 200 = top 2
|
|
# strony, wystarczy do catch-up'u świeżych scen.
|
|
canonical_limit = canonical_per_performer_limit if canonical_per_performer_limit is not None else per_performer_limit
|
|
if tpdb and target.tpdb_id:
|
|
c = _ingest_iter_into_run(
|
|
source_kind=SourceKind.tpdb,
|
|
source_name="tpdb",
|
|
run_label=f"performer-driven:tpdb:{target.canonical_name}",
|
|
iterator_factory=lambda t=target: tpdb.fetch_scenes_for_performer(
|
|
t.tpdb_id, limit=canonical_limit
|
|
),
|
|
)
|
|
counters.merge("tpdb", c)
|
|
|
|
# 2. StashDB
|
|
if stashdb and target.stashdb_id:
|
|
c = _ingest_iter_into_run(
|
|
source_kind=SourceKind.stashdb,
|
|
source_name="stashdb",
|
|
run_label=f"performer-driven:stashdb:{target.canonical_name}",
|
|
iterator_factory=lambda t=target: stashdb.fetch_scenes_for_performer(
|
|
t.stashdb_id, limit=canonical_limit
|
|
),
|
|
)
|
|
counters.merge("stashdb", c)
|
|
|
|
# 3. Direct scrapery — per-tube HTTP scraping search. Każdy scraper to
|
|
# niezależny budżet rate-limit (osobny serwer tube). Wszystkie scrapery
|
|
# feedują do `Source(name='pornapp')` (legacy nazwa kept for DB compat) z
|
|
# external_id `f"{sitetag}:{url}"` — resolver mergeuje idempotentnie cross-source.
|
|
if not skip_tubes:
|
|
from app.connectors.direct_scrapers import ALL_DIRECT_SCRAPERS, SCRAPER_SOURCE_NAME
|
|
|
|
sitetag_filter = set(sitetags or []) or None
|
|
scrapers = [
|
|
s for s in ALL_DIRECT_SCRAPERS
|
|
if sitetag_filter is None or s.sitetag in sitetag_filter
|
|
]
|
|
|
|
for scraper_cls in scrapers:
|
|
scraper = scraper_cls()
|
|
c = _ingest_iter_into_run(
|
|
source_kind=SourceKind.scraper,
|
|
source_name=SCRAPER_SOURCE_NAME,
|
|
run_label=f"performer-driven:direct:{scraper.sitetag}:{target.canonical_name}",
|
|
iterator_factory=lambda s=scraper, t=target: s.search(
|
|
t.canonical_name, page=1, limit=50
|
|
),
|
|
)
|
|
counters.merge(SCRAPER_SOURCE_NAME, c)
|
|
|
|
counters.performers_processed += 1
|
|
|
|
log.info("performer-driven done: %s", counters)
|
|
return counters
|
|
|
|
|
|
# ---- Internal --------------------------------------------------------
|
|
|
|
def _try_build(cls, **kwargs): # type: ignore[no-untyped-def]
|
|
try:
|
|
kw = {k: v for k, v in kwargs.items() if v is not None}
|
|
return cls(**kw)
|
|
except Exception as e:
|
|
log.warning("connector %s skipped: %s", cls.__name__, e)
|
|
return None
|
|
|
|
|
|
def _resolve_targets(
|
|
*,
|
|
performer_names: list[str] | None,
|
|
performer_ids: list[uuid.UUID] | None,
|
|
top_n: int,
|
|
fill_missing_refs: bool = False,
|
|
) -> list[PerformerTarget]:
|
|
"""Mapuje wejście (UUID-y / nazwy / top-N) na PerformerTarget z TPDB/StashDB ID.
|
|
|
|
Dla explicit nazw których nie znajdujemy w naszej bazie — robimy live lookup do
|
|
TPDB i StashDB (queryPerformers) żeby wyciągnąć kanoniczne UUID-y. Bez tego user
|
|
musiałby najpierw uruchomić newest-first ingest dla każdej performerki.
|
|
|
|
Gdy `fill_missing_refs=True`: także dla performerów istniejących w bazie próbuje
|
|
live-lookup brakujących TPDB/StashDB refów. Use case: user pyta o explicit nazwy
|
|
i chce pełny catch-up, nie tylko z tych źródeł które aktualnie mamy zmapowane.
|
|
"""
|
|
targets: list[PerformerTarget] = []
|
|
found_in_db_names: set[str] = set()
|
|
|
|
with session_scope() as session:
|
|
if performer_ids:
|
|
performers = session.execute(
|
|
select(Performer).where(Performer.id.in_(performer_ids))
|
|
).scalars().all()
|
|
elif performer_names:
|
|
normalized = [n.strip().lower() for n in performer_names if n.strip()]
|
|
if not normalized:
|
|
return []
|
|
performers = session.execute(
|
|
select(Performer).where(Performer.name_normalized.in_(normalized))
|
|
).scalars().all()
|
|
else:
|
|
performers = _top_n_by_scene_count(session, top_n)
|
|
|
|
# Cache external_refs by performer + source kind
|
|
tpdb_src = session.execute(
|
|
select(Source).where(Source.kind == SourceKind.tpdb)
|
|
).scalar_one_or_none()
|
|
stashdb_src = session.execute(
|
|
select(Source).where(Source.kind == SourceKind.stashdb)
|
|
).scalar_one_or_none()
|
|
|
|
for p in performers:
|
|
tpdb_id: str | None = None
|
|
stashdb_id: str | None = None
|
|
if tpdb_src:
|
|
tpdb_id = session.execute(
|
|
select(PerformerExternalRef.external_id).where(
|
|
PerformerExternalRef.performer_id == p.id,
|
|
PerformerExternalRef.source_id == tpdb_src.id,
|
|
).limit(1)
|
|
).scalar_one_or_none()
|
|
if stashdb_src:
|
|
stashdb_id = session.execute(
|
|
select(PerformerExternalRef.external_id).where(
|
|
PerformerExternalRef.performer_id == p.id,
|
|
PerformerExternalRef.source_id == stashdb_src.id,
|
|
).limit(1)
|
|
).scalar_one_or_none()
|
|
|
|
if fill_missing_refs:
|
|
if tpdb_src and tpdb_id is None:
|
|
tpdb_id = _ensure_canonical_ref(
|
|
session,
|
|
performer_id=p.id,
|
|
name=p.canonical_name,
|
|
kind=SourceKind.tpdb,
|
|
source_id=tpdb_src.id,
|
|
)
|
|
if stashdb_src and stashdb_id is None:
|
|
stashdb_id = _ensure_canonical_ref(
|
|
session,
|
|
performer_id=p.id,
|
|
name=p.canonical_name,
|
|
kind=SourceKind.stashdb,
|
|
source_id=stashdb_src.id,
|
|
)
|
|
|
|
targets.append(
|
|
PerformerTarget(
|
|
performer_id=p.id,
|
|
canonical_name=p.canonical_name,
|
|
tpdb_id=tpdb_id,
|
|
stashdb_id=stashdb_id,
|
|
)
|
|
)
|
|
found_in_db_names.add(p.name_normalized.strip().lower())
|
|
|
|
# Live lookup dla nazw nieznalezionych w bazie.
|
|
if performer_names:
|
|
missing = [
|
|
n.strip()
|
|
for n in performer_names
|
|
if n.strip() and n.strip().lower() not in found_in_db_names
|
|
]
|
|
for name in missing:
|
|
target = _lookup_via_api(name)
|
|
if target is not None:
|
|
targets.append(target)
|
|
|
|
return targets
|
|
|
|
|
|
def _ensure_canonical_ref(
|
|
session: Session,
|
|
*,
|
|
performer_id: uuid.UUID,
|
|
name: str,
|
|
kind: SourceKind,
|
|
source_id: uuid.UUID,
|
|
) -> str | None:
|
|
"""Live lookup TPDB/StashDB UUID by nazwa, INSERT PerformerExternalRef.
|
|
|
|
Returns external_id (świeżo dodany albo już zlinkowany) lub None gdy:
|
|
- lookup zwrócił None (nie ma takiego performera w danym źródle)
|
|
- external_id znaleziony, ale jest już zmapowany do innego performera lokalnego
|
|
(conflict — wymaga ręcznej decyzji o merge)
|
|
"""
|
|
try:
|
|
if kind == SourceKind.tpdb:
|
|
ext_id = TPDBConnector().find_performer_id_by_name(name)
|
|
elif kind == SourceKind.stashdb:
|
|
ext_id = StashDBConnector().find_performer_id_by_name(name)
|
|
else:
|
|
return None
|
|
except Exception as e:
|
|
log.warning("%s lookup failed for %s: %s", kind.value, name, e)
|
|
return None
|
|
if not ext_id:
|
|
return None
|
|
existing = session.execute(
|
|
select(PerformerExternalRef).where(
|
|
PerformerExternalRef.source_id == source_id,
|
|
PerformerExternalRef.external_id == ext_id,
|
|
)
|
|
).scalar_one_or_none()
|
|
if existing:
|
|
if existing.performer_id != performer_id:
|
|
log.warning(
|
|
"%s ref conflict: ext_id=%s already maps to performer %s (asked for %s)",
|
|
kind.value, ext_id, existing.performer_id, performer_id,
|
|
)
|
|
return None
|
|
return ext_id
|
|
session.add(
|
|
PerformerExternalRef(
|
|
source_id=source_id,
|
|
external_id=ext_id,
|
|
performer_id=performer_id,
|
|
confidence=0.85,
|
|
)
|
|
)
|
|
log.info("auto-filled %s ref for %s: %s", kind.value, name, ext_id)
|
|
return ext_id
|
|
|
|
|
|
def _lookup_via_api(name: str) -> PerformerTarget | None:
|
|
"""Live lookup nazwy → TPDB UUID + StashDB UUID. Tworzy lokalny placeholder
|
|
Performer + external_refs żeby kolejne runy nie wymagały re-lookupu.
|
|
|
|
Jeśli żadne ze źródeł nie zwróciło ID → None (nie ma sensu nic wciągać).
|
|
"""
|
|
tpdb_id: str | None = None
|
|
stashdb_id: str | None = None
|
|
try:
|
|
tpdb = TPDBConnector()
|
|
tpdb_id = tpdb.find_performer_id_by_name(name)
|
|
except Exception as e:
|
|
log.warning("tpdb lookup failed for %s: %s", name, e)
|
|
try:
|
|
stashdb = StashDBConnector()
|
|
stashdb_id = stashdb.find_performer_id_by_name(name)
|
|
except Exception as e:
|
|
log.warning("stashdb lookup failed for %s: %s", name, e)
|
|
|
|
if not (tpdb_id or stashdb_id):
|
|
log.warning("performer '%s' not found in any external source — skipping", name)
|
|
return None
|
|
|
|
log.info(
|
|
"live lookup: %s → tpdb=%s stashdb=%s", name, tpdb_id, stashdb_id
|
|
)
|
|
|
|
# Stwórz placeholder Performera w naszej bazie + external_refs, żeby resolver
|
|
# mógł zmappować scena→performer poprawnie i kolejne runy znalazły go w DB.
|
|
from app.normalize.text import normalize_person, slugify
|
|
|
|
with session_scope() as session:
|
|
normalized = normalize_person(name)
|
|
existing = session.execute(
|
|
select(Performer).where(Performer.name_normalized == normalized)
|
|
).scalar_one_or_none()
|
|
if existing is not None:
|
|
performer_id = existing.id
|
|
else:
|
|
performer = Performer(
|
|
canonical_name=name,
|
|
name_normalized=normalized,
|
|
slug=slugify(name),
|
|
)
|
|
session.add(performer)
|
|
session.flush()
|
|
performer_id = performer.id
|
|
|
|
if tpdb_id:
|
|
tpdb_src = session.execute(
|
|
select(Source).where(Source.kind == SourceKind.tpdb)
|
|
).scalar_one_or_none()
|
|
if tpdb_src is not None:
|
|
exists = session.execute(
|
|
select(PerformerExternalRef).where(
|
|
PerformerExternalRef.source_id == tpdb_src.id,
|
|
PerformerExternalRef.external_id == tpdb_id,
|
|
)
|
|
).scalar_one_or_none()
|
|
if exists is None:
|
|
session.add(
|
|
PerformerExternalRef(
|
|
source_id=tpdb_src.id,
|
|
external_id=tpdb_id,
|
|
performer_id=performer_id,
|
|
confidence=0.9,
|
|
)
|
|
)
|
|
if stashdb_id:
|
|
stashdb_src = session.execute(
|
|
select(Source).where(Source.kind == SourceKind.stashdb)
|
|
).scalar_one_or_none()
|
|
if stashdb_src is not None:
|
|
exists = session.execute(
|
|
select(PerformerExternalRef).where(
|
|
PerformerExternalRef.source_id == stashdb_src.id,
|
|
PerformerExternalRef.external_id == stashdb_id,
|
|
)
|
|
).scalar_one_or_none()
|
|
if exists is None:
|
|
session.add(
|
|
PerformerExternalRef(
|
|
source_id=stashdb_src.id,
|
|
external_id=stashdb_id,
|
|
performer_id=performer_id,
|
|
confidence=0.9,
|
|
)
|
|
)
|
|
|
|
return PerformerTarget(
|
|
performer_id=performer_id,
|
|
canonical_name=name,
|
|
tpdb_id=tpdb_id,
|
|
stashdb_id=stashdb_id,
|
|
)
|
|
|
|
|
|
def _top_n_by_scene_count(session: Session, n: int) -> list[Performer]:
|
|
rows = session.execute(
|
|
select(Performer, func.count(ScenePerformer.scene_id).label("c"))
|
|
.outerjoin(ScenePerformer, ScenePerformer.performer_id == Performer.id)
|
|
.group_by(Performer.id)
|
|
.order_by(func.count(ScenePerformer.scene_id).desc())
|
|
.limit(n)
|
|
).all()
|
|
return [p for p, _ in rows]
|
|
|
|
|
|
def _claim_next_for_search(
|
|
session: Session, *, refresh_after: timedelta, min_scene_count: int = 1
|
|
) -> Performer | None:
|
|
"""Wybiera 1 performera z queue + UPDATE last_searched_at = now() w jednej
|
|
transakcji (skip locked → safe pod konkurencyjnym workerze).
|
|
|
|
Queue priority (2026-05-20 update — orphan-rescue bias):
|
|
1. **Performerzy z RECENT scenes-without-playback** (last 7d, no live PS) —
|
|
najpilniejsi bo user widzi puste studio listings dla najnowszych scen.
|
|
Bug-report 2026-05-20: "brak Brazzers Exxtra po 15-05" → wszystkie nowe
|
|
TPDB sceny mają canonical metadata ale 0 playback bo continuous queue
|
|
nigdy ich nie dotyka (78k performers, 67k NULL → ~232 dni sweep).
|
|
2. Performerzy NIGDY niesearchowani (`last_searched_at IS NULL`)
|
|
3. Performerzy searchowani > `refresh_after` temu
|
|
4. Filtruj scene_count >= min_scene_count
|
|
"""
|
|
cutoff = datetime.now(UTC) - refresh_after
|
|
orphan_cutoff = datetime.now(UTC) - timedelta(days=7)
|
|
|
|
sc_sub = (
|
|
select(
|
|
ScenePerformer.performer_id.label("pid"),
|
|
func.count(ScenePerformer.scene_id).label("scene_count"),
|
|
)
|
|
.group_by(ScenePerformer.performer_id)
|
|
.subquery()
|
|
)
|
|
|
|
# Orphan-scene count per performer: scenes z release_date w ostatnich 7d
|
|
# AND brak żywego playback source. Wysoki count = performer-z-rekordów-pustych.
|
|
orphan_sub = (
|
|
select(
|
|
ScenePerformer.performer_id.label("pid"),
|
|
func.count(ScenePerformer.scene_id).label("orphan_count"),
|
|
)
|
|
.join(Scene, Scene.id == ScenePerformer.scene_id)
|
|
.where(Scene.release_date > orphan_cutoff)
|
|
.where(
|
|
~select(PlaybackSource.id)
|
|
.where(PlaybackSource.scene_id == Scene.id)
|
|
.where(PlaybackSource.dead_at.is_(None))
|
|
.exists()
|
|
)
|
|
.group_by(ScenePerformer.performer_id)
|
|
.subquery()
|
|
)
|
|
|
|
row = session.execute(
|
|
select(Performer)
|
|
.join(sc_sub, sc_sub.c.pid == Performer.id, isouter=False)
|
|
.join(orphan_sub, orphan_sub.c.pid == Performer.id, isouter=True)
|
|
.where(sc_sub.c.scene_count >= min_scene_count)
|
|
.where(
|
|
(Performer.last_searched_at.is_(None))
|
|
| (Performer.last_searched_at < cutoff)
|
|
)
|
|
.order_by(
|
|
# 1. Orphan scenes (last 7d, no playback) FIRST — desc count.
|
|
# COALESCE 0 sprawia że performerzy bez orphan idą za tymi z.
|
|
func.coalesce(orphan_sub.c.orphan_count, 0).desc(),
|
|
# 2. NULL last_searched_at next
|
|
Performer.last_searched_at.asc().nullsfirst(),
|
|
# 3. Highest scene_count (popular performers earlier)
|
|
sc_sub.c.scene_count.desc(),
|
|
)
|
|
.limit(1)
|
|
).scalar_one_or_none()
|
|
|
|
if row is None:
|
|
return None
|
|
|
|
row.last_searched_at = datetime.now(UTC)
|
|
row.search_run_count = (row.search_run_count or 0) + 1
|
|
session.flush()
|
|
return row
|
|
|
|
|
|
def run_continuous_one_at_a_time(
|
|
*,
|
|
refresh_after_days: int = 30,
|
|
min_scene_count: int = 1,
|
|
per_performer_limit: int | None = None,
|
|
canonical_per_performer_limit: int | None = 100,
|
|
) -> StrategyCounters | None:
|
|
"""Pobiera 1 performera z priority queue, runs full performer-driven search,
|
|
update last_searched_at. Zwraca counters (lub None gdy queue pusta).
|
|
|
|
Wywoływane przez APScheduler interval job (np. co 60s).
|
|
"""
|
|
refresh_after = timedelta(days=refresh_after_days)
|
|
with session_scope() as session:
|
|
target_perf = _claim_next_for_search(
|
|
session,
|
|
refresh_after=refresh_after,
|
|
min_scene_count=min_scene_count,
|
|
)
|
|
if target_perf is None:
|
|
log.info("performer-continuous: queue empty (all searched within %dd)", refresh_after_days)
|
|
return None
|
|
# Wyciągamy ID przed zamknięciem sesji bo chcemy dalej operować poza nią
|
|
performer_id = target_perf.id
|
|
canonical_name = target_perf.canonical_name
|
|
run_count = target_perf.search_run_count
|
|
session.commit()
|
|
|
|
log.info(
|
|
"performer-continuous: claimed %s (id=%s, run #%d)",
|
|
canonical_name, performer_id, run_count,
|
|
)
|
|
|
|
counters = run_performer_driven(
|
|
performer_ids=[performer_id],
|
|
top_n=0, # ignorowane gdy performer_ids podane
|
|
per_performer_limit=per_performer_limit,
|
|
canonical_per_performer_limit=canonical_per_performer_limit,
|
|
skip_canonical=True, # tylko direct scrapery (backward fill)
|
|
)
|
|
return counters
|
|
|
|
|
|
def _ingest_iter_into_run(
|
|
*,
|
|
source_kind: SourceKind,
|
|
source_name: str,
|
|
run_label: str,
|
|
iterator_factory, # type: ignore[no-untyped-def]
|
|
) -> dict[str, int]:
|
|
"""Wariant ingest_from_connector dla iteratorów ad-hoc (per-performer pull).
|
|
|
|
Otwiera IngestRun, iteruje, _process_scene per scene, finalizuje run. Counters
|
|
podobne do `ingest_from_connector` ale per-call.
|
|
"""
|
|
counters = {"seen": 0, "new": 0, "updated": 0, "skipped": 0, "errors": 0}
|
|
|
|
with session_scope() as session:
|
|
source = get_or_create_source(session, kind=source_kind, name=source_name)
|
|
run = IngestRun(source_id=source.id, status=IngestStatus.running)
|
|
session.add(run)
|
|
session.flush()
|
|
run_id = run.id
|
|
source_id = source.id
|
|
|
|
log.info("ingest start label=%s run_id=%s", run_label, run_id)
|
|
|
|
failed = False
|
|
try:
|
|
for raw in iterator_factory():
|
|
counters["seen"] += 1
|
|
try:
|
|
_process_scene(source_id=source_id, raw_scene=raw, counters=counters)
|
|
except Exception as exc: # pragma: no cover - obronnie
|
|
counters["errors"] += 1
|
|
log.exception("performer-driven scene failed external_id=%s: %s", raw.external_id, exc)
|
|
if counters["errors"] > 50:
|
|
raise
|
|
except Exception as exc:
|
|
failed = True
|
|
log.exception("performer-driven run failed (%s): %s", run_label, exc)
|
|
|
|
with session_scope() as session:
|
|
run = session.get(IngestRun, run_id)
|
|
assert run is not None
|
|
run.finished_at = datetime.now(UTC)
|
|
if failed:
|
|
run.status = IngestStatus.failed
|
|
elif counters["errors"] > 0:
|
|
run.status = IngestStatus.partial
|
|
else:
|
|
run.status = IngestStatus.success
|
|
run.records_seen = counters["seen"]
|
|
run.records_new = counters["new"]
|
|
run.records_updated = counters["updated"]
|
|
|
|
log.info("ingest done label=%s counters=%s", run_label, counters)
|
|
return counters
|
|
|
|
|
|
# Suppress unused-import lint
|
|
_ = _hash_raw
|
|
_ = timedelta
|