"""Performer-driven ingest strategy. Rationale (memory: kompletność > świeżość): Random newest-first ingest z TPDB/StashDB rzadko pokrywa się ze świeżym pull'em z hqporner — bo każde źródło widzi inny wycinek katalogu w danym momencie. Dla popularnych performerów jednak prawdopodobieństwo overlapu jest dużo wyższe, a zasięg po jednym performerze jest skończony i da się przeszukać. Cykl per performer: 1. Lookup performer_external_refs(tpdb, stashdb) — kanoniczne UUID-y. 2. Dla każdego źródła z UUID: pobierz wszystkie sceny tego performera (paginated). 3. Dla każdego direct tube scrapera: search tube po `canonical_name` (np. "Lola Noir"). 4. Sceny lecą przez normalny `_process_scene` → resolver merguje cross-source. Wybór listy performerów (kolejność prób): a) `--performers "name1,name2"` — explicit. b) `--performer-ids ,` — explicit po naszym kanonicznym UUID. c) Top-N z bazy po `scene_count desc` (`--top-n=N`, default 20). """ from __future__ import annotations import logging import uuid from dataclasses import dataclass, field from datetime import UTC, datetime, timedelta from sqlalchemy import func, select from sqlalchemy.orm import Session from app.connectors.stashdb import StashDBConnector from app.connectors.tpdb import TPDBConnector from app.db import session_scope from app.ingest import ( _hash_raw, # type: ignore[attr-defined] _process_scene, # type: ignore[attr-defined] get_or_create_source, ) from app.models.ingest_run import IngestRun, IngestStatus from app.models.performer import Performer, PerformerExternalRef from app.models.playback_source import PlaybackSource from app.models.scene import Scene, ScenePerformer from app.models.source import Source, SourceKind log = logging.getLogger(__name__) @dataclass class PerformerTarget: performer_id: uuid.UUID canonical_name: str tpdb_id: str | None = None stashdb_id: str | None = None @dataclass class StrategyCounters: performers_processed: int = 0 performers_skipped_no_refs: int = 0 per_source: dict[str, dict[str, int]] = field(default_factory=dict) """source_name → {seen, new, updated, skipped, errors} agregowane po wszystkich performerach.""" def merge(self, source_name: str, c: dict[str, int]) -> None: bucket = self.per_source.setdefault( source_name, {"seen": 0, "new": 0, "updated": 0, "skipped": 0, "errors": 0} ) for k in bucket: bucket[k] += c.get(k, 0) # ---- Public API ------------------------------------------------------ def run_performer_driven( *, performer_names: list[str] | None = None, performer_ids: list[uuid.UUID] | None = None, top_n: int = 20, sitetags: list[str] | None = None, per_performer_limit: int | None = None, canonical_per_performer_limit: int | None = None, skip_tubes: bool = False, skip_canonical: bool = False, ) -> StrategyCounters: """Główne entry point dla worker CLI.""" counters = StrategyCounters() # Gdy user explicit żąda performerów po nazwach, automatycznie uzupełniamy brakujące # canonical refs (TPDB/StashDB UUID) — historycznie wielu performerów miało tylko # stashdb ref, więc TPDB ingest dla nich nigdy nie odpalał. Dla top-N path nie # auto-fillujemy bo to dodatkowe API calls bez explicit user intent. fill_missing_refs = bool(performer_names) targets = _resolve_targets( performer_names=performer_names, performer_ids=performer_ids, top_n=top_n, fill_missing_refs=fill_missing_refs, ) if not targets: log.warning("performer-driven: no targets resolved (empty top-N or unknown names)") return counters log.info( "performer-driven: %d performers -> tpdb=%s stashdb=%s tubes=%s", len(targets), sum(1 for t in targets if t.tpdb_id), sum(1 for t in targets if t.stashdb_id), not skip_tubes, ) # `skip_canonical=True` używane przez continuous worker — pomija TPDB/StashDB # ingest (są wolne, ~3min/perf). Continuous skupia się na search-based backfill; # canonical refresh top performerów dzieje się przez scheduled `_job_performer_driven` # co 12h i przez on-demand `/refresh` API. tpdb = None if skip_canonical else _try_build(TPDBConnector) stashdb = None if skip_canonical else _try_build(StashDBConnector) for target in targets: log.info("=== performer: %s (id=%s) ===", target.canonical_name, target.performer_id) if not skip_canonical and not (target.tpdb_id or target.stashdb_id): log.warning( "skip %s: brak tpdb/stashdb external_ref, search-only by name nie pomoże w mergu", target.canonical_name, ) counters.performers_skipped_no_refs += 1 continue # 1. TPDB — limit canonical_per_performer_limit (popular performerzy mają tysiące # scen, fetch wszystkich za każdym refresh = wolne ticki). Default 200 = top 2 # strony, wystarczy do catch-up'u świeżych scen. canonical_limit = canonical_per_performer_limit if canonical_per_performer_limit is not None else per_performer_limit if tpdb and target.tpdb_id: c = _ingest_iter_into_run( source_kind=SourceKind.tpdb, source_name="tpdb", run_label=f"performer-driven:tpdb:{target.canonical_name}", iterator_factory=lambda t=target: tpdb.fetch_scenes_for_performer( t.tpdb_id, limit=canonical_limit ), ) counters.merge("tpdb", c) # 2. StashDB if stashdb and target.stashdb_id: c = _ingest_iter_into_run( source_kind=SourceKind.stashdb, source_name="stashdb", run_label=f"performer-driven:stashdb:{target.canonical_name}", iterator_factory=lambda t=target: stashdb.fetch_scenes_for_performer( t.stashdb_id, limit=canonical_limit ), ) counters.merge("stashdb", c) # 3. Direct scrapery — per-tube HTTP scraping search. Każdy scraper to # niezależny budżet rate-limit (osobny serwer tube). Wszystkie scrapery # feedują do `Source(name='pornapp')` (legacy nazwa kept for DB compat) z # external_id `f"{sitetag}:{url}"` — resolver mergeuje idempotentnie cross-source. if not skip_tubes: from app.connectors.direct_scrapers import ALL_DIRECT_SCRAPERS, SCRAPER_SOURCE_NAME sitetag_filter = set(sitetags or []) or None scrapers = [ s for s in ALL_DIRECT_SCRAPERS if sitetag_filter is None or s.sitetag in sitetag_filter ] for scraper_cls in scrapers: scraper = scraper_cls() c = _ingest_iter_into_run( source_kind=SourceKind.scraper, source_name=SCRAPER_SOURCE_NAME, run_label=f"performer-driven:direct:{scraper.sitetag}:{target.canonical_name}", iterator_factory=lambda s=scraper, t=target: s.search( t.canonical_name, page=1, limit=50 ), ) counters.merge(SCRAPER_SOURCE_NAME, c) counters.performers_processed += 1 log.info("performer-driven done: %s", counters) return counters # ---- Internal -------------------------------------------------------- def _try_build(cls, **kwargs): # type: ignore[no-untyped-def] try: kw = {k: v for k, v in kwargs.items() if v is not None} return cls(**kw) except Exception as e: log.warning("connector %s skipped: %s", cls.__name__, e) return None def _resolve_targets( *, performer_names: list[str] | None, performer_ids: list[uuid.UUID] | None, top_n: int, fill_missing_refs: bool = False, ) -> list[PerformerTarget]: """Mapuje wejście (UUID-y / nazwy / top-N) na PerformerTarget z TPDB/StashDB ID. Dla explicit nazw których nie znajdujemy w naszej bazie — robimy live lookup do TPDB i StashDB (queryPerformers) żeby wyciągnąć kanoniczne UUID-y. Bez tego user musiałby najpierw uruchomić newest-first ingest dla każdej performerki. Gdy `fill_missing_refs=True`: także dla performerów istniejących w bazie próbuje live-lookup brakujących TPDB/StashDB refów. Use case: user pyta o explicit nazwy i chce pełny catch-up, nie tylko z tych źródeł które aktualnie mamy zmapowane. """ targets: list[PerformerTarget] = [] found_in_db_names: set[str] = set() with session_scope() as session: if performer_ids: performers = session.execute( select(Performer).where(Performer.id.in_(performer_ids)) ).scalars().all() elif performer_names: normalized = [n.strip().lower() for n in performer_names if n.strip()] if not normalized: return [] performers = session.execute( select(Performer).where(Performer.name_normalized.in_(normalized)) ).scalars().all() else: performers = _top_n_by_scene_count(session, top_n) # Cache external_refs by performer + source kind tpdb_src = session.execute( select(Source).where(Source.kind == SourceKind.tpdb) ).scalar_one_or_none() stashdb_src = session.execute( select(Source).where(Source.kind == SourceKind.stashdb) ).scalar_one_or_none() for p in performers: tpdb_id: str | None = None stashdb_id: str | None = None if tpdb_src: tpdb_id = session.execute( select(PerformerExternalRef.external_id).where( PerformerExternalRef.performer_id == p.id, PerformerExternalRef.source_id == tpdb_src.id, ).limit(1) ).scalar_one_or_none() if stashdb_src: stashdb_id = session.execute( select(PerformerExternalRef.external_id).where( PerformerExternalRef.performer_id == p.id, PerformerExternalRef.source_id == stashdb_src.id, ).limit(1) ).scalar_one_or_none() if fill_missing_refs: if tpdb_src and tpdb_id is None: tpdb_id = _ensure_canonical_ref( session, performer_id=p.id, name=p.canonical_name, kind=SourceKind.tpdb, source_id=tpdb_src.id, ) if stashdb_src and stashdb_id is None: stashdb_id = _ensure_canonical_ref( session, performer_id=p.id, name=p.canonical_name, kind=SourceKind.stashdb, source_id=stashdb_src.id, ) targets.append( PerformerTarget( performer_id=p.id, canonical_name=p.canonical_name, tpdb_id=tpdb_id, stashdb_id=stashdb_id, ) ) found_in_db_names.add(p.name_normalized.strip().lower()) # Live lookup dla nazw nieznalezionych w bazie. if performer_names: missing = [ n.strip() for n in performer_names if n.strip() and n.strip().lower() not in found_in_db_names ] for name in missing: target = _lookup_via_api(name) if target is not None: targets.append(target) return targets def _ensure_canonical_ref( session: Session, *, performer_id: uuid.UUID, name: str, kind: SourceKind, source_id: uuid.UUID, ) -> str | None: """Live lookup TPDB/StashDB UUID by nazwa, INSERT PerformerExternalRef. Returns external_id (świeżo dodany albo już zlinkowany) lub None gdy: - lookup zwrócił None (nie ma takiego performera w danym źródle) - external_id znaleziony, ale jest już zmapowany do innego performera lokalnego (conflict — wymaga ręcznej decyzji o merge) """ try: if kind == SourceKind.tpdb: ext_id = TPDBConnector().find_performer_id_by_name(name) elif kind == SourceKind.stashdb: ext_id = StashDBConnector().find_performer_id_by_name(name) else: return None except Exception as e: log.warning("%s lookup failed for %s: %s", kind.value, name, e) return None if not ext_id: return None existing = session.execute( select(PerformerExternalRef).where( PerformerExternalRef.source_id == source_id, PerformerExternalRef.external_id == ext_id, ) ).scalar_one_or_none() if existing: if existing.performer_id != performer_id: log.warning( "%s ref conflict: ext_id=%s already maps to performer %s (asked for %s)", kind.value, ext_id, existing.performer_id, performer_id, ) return None return ext_id session.add( PerformerExternalRef( source_id=source_id, external_id=ext_id, performer_id=performer_id, confidence=0.85, ) ) log.info("auto-filled %s ref for %s: %s", kind.value, name, ext_id) return ext_id def _lookup_via_api(name: str) -> PerformerTarget | None: """Live lookup nazwy → TPDB UUID + StashDB UUID. Tworzy lokalny placeholder Performer + external_refs żeby kolejne runy nie wymagały re-lookupu. Jeśli żadne ze źródeł nie zwróciło ID → None (nie ma sensu nic wciągać). """ tpdb_id: str | None = None stashdb_id: str | None = None try: tpdb = TPDBConnector() tpdb_id = tpdb.find_performer_id_by_name(name) except Exception as e: log.warning("tpdb lookup failed for %s: %s", name, e) try: stashdb = StashDBConnector() stashdb_id = stashdb.find_performer_id_by_name(name) except Exception as e: log.warning("stashdb lookup failed for %s: %s", name, e) if not (tpdb_id or stashdb_id): log.warning("performer '%s' not found in any external source — skipping", name) return None log.info( "live lookup: %s → tpdb=%s stashdb=%s", name, tpdb_id, stashdb_id ) # Stwórz placeholder Performera w naszej bazie + external_refs, żeby resolver # mógł zmappować scena→performer poprawnie i kolejne runy znalazły go w DB. from app.normalize.text import normalize_person, slugify with session_scope() as session: normalized = normalize_person(name) existing = session.execute( select(Performer).where(Performer.name_normalized == normalized) ).scalar_one_or_none() if existing is not None: performer_id = existing.id else: performer = Performer( canonical_name=name, name_normalized=normalized, slug=slugify(name), ) session.add(performer) session.flush() performer_id = performer.id if tpdb_id: tpdb_src = session.execute( select(Source).where(Source.kind == SourceKind.tpdb) ).scalar_one_or_none() if tpdb_src is not None: exists = session.execute( select(PerformerExternalRef).where( PerformerExternalRef.source_id == tpdb_src.id, PerformerExternalRef.external_id == tpdb_id, ) ).scalar_one_or_none() if exists is None: session.add( PerformerExternalRef( source_id=tpdb_src.id, external_id=tpdb_id, performer_id=performer_id, confidence=0.9, ) ) if stashdb_id: stashdb_src = session.execute( select(Source).where(Source.kind == SourceKind.stashdb) ).scalar_one_or_none() if stashdb_src is not None: exists = session.execute( select(PerformerExternalRef).where( PerformerExternalRef.source_id == stashdb_src.id, PerformerExternalRef.external_id == stashdb_id, ) ).scalar_one_or_none() if exists is None: session.add( PerformerExternalRef( source_id=stashdb_src.id, external_id=stashdb_id, performer_id=performer_id, confidence=0.9, ) ) return PerformerTarget( performer_id=performer_id, canonical_name=name, tpdb_id=tpdb_id, stashdb_id=stashdb_id, ) def _top_n_by_scene_count(session: Session, n: int) -> list[Performer]: rows = session.execute( select(Performer, func.count(ScenePerformer.scene_id).label("c")) .outerjoin(ScenePerformer, ScenePerformer.performer_id == Performer.id) .group_by(Performer.id) .order_by(func.count(ScenePerformer.scene_id).desc()) .limit(n) ).all() return [p for p, _ in rows] def _claim_next_for_search( session: Session, *, refresh_after: timedelta, min_scene_count: int = 1 ) -> Performer | None: """Wybiera 1 performera z queue + UPDATE last_searched_at = now() w jednej transakcji (skip locked → safe pod konkurencyjnym workerze). Queue priority (2026-05-20 update — orphan-rescue bias): 1. **Performerzy z RECENT scenes-without-playback** (last 7d, no live PS) — najpilniejsi bo user widzi puste studio listings dla najnowszych scen. Bug-report 2026-05-20: "brak Brazzers Exxtra po 15-05" → wszystkie nowe TPDB sceny mają canonical metadata ale 0 playback bo continuous queue nigdy ich nie dotyka (78k performers, 67k NULL → ~232 dni sweep). 2. Performerzy NIGDY niesearchowani (`last_searched_at IS NULL`) 3. Performerzy searchowani > `refresh_after` temu 4. Filtruj scene_count >= min_scene_count """ cutoff = datetime.now(UTC) - refresh_after orphan_cutoff = datetime.now(UTC) - timedelta(days=7) sc_sub = ( select( ScenePerformer.performer_id.label("pid"), func.count(ScenePerformer.scene_id).label("scene_count"), ) .group_by(ScenePerformer.performer_id) .subquery() ) # Orphan-scene count per performer: scenes z release_date w ostatnich 7d # AND brak żywego playback source. Wysoki count = performer-z-rekordów-pustych. orphan_sub = ( select( ScenePerformer.performer_id.label("pid"), func.count(ScenePerformer.scene_id).label("orphan_count"), ) .join(Scene, Scene.id == ScenePerformer.scene_id) .where(Scene.release_date > orphan_cutoff) .where( ~select(PlaybackSource.id) .where(PlaybackSource.scene_id == Scene.id) .where(PlaybackSource.dead_at.is_(None)) .exists() ) .group_by(ScenePerformer.performer_id) .subquery() ) row = session.execute( select(Performer) .join(sc_sub, sc_sub.c.pid == Performer.id, isouter=False) .join(orphan_sub, orphan_sub.c.pid == Performer.id, isouter=True) .where(sc_sub.c.scene_count >= min_scene_count) .where( (Performer.last_searched_at.is_(None)) | (Performer.last_searched_at < cutoff) ) .order_by( # 1. Orphan scenes (last 7d, no playback) FIRST — desc count. # COALESCE 0 sprawia że performerzy bez orphan idą za tymi z. func.coalesce(orphan_sub.c.orphan_count, 0).desc(), # 2. NULL last_searched_at next Performer.last_searched_at.asc().nullsfirst(), # 3. Highest scene_count (popular performers earlier) sc_sub.c.scene_count.desc(), ) .limit(1) ).scalar_one_or_none() if row is None: return None row.last_searched_at = datetime.now(UTC) row.search_run_count = (row.search_run_count or 0) + 1 session.flush() return row def run_continuous_one_at_a_time( *, refresh_after_days: int = 30, min_scene_count: int = 1, per_performer_limit: int | None = None, canonical_per_performer_limit: int | None = 100, ) -> StrategyCounters | None: """Pobiera 1 performera z priority queue, runs full performer-driven search, update last_searched_at. Zwraca counters (lub None gdy queue pusta). Wywoływane przez APScheduler interval job (np. co 60s). """ refresh_after = timedelta(days=refresh_after_days) with session_scope() as session: target_perf = _claim_next_for_search( session, refresh_after=refresh_after, min_scene_count=min_scene_count, ) if target_perf is None: log.info("performer-continuous: queue empty (all searched within %dd)", refresh_after_days) return None # Wyciągamy ID przed zamknięciem sesji bo chcemy dalej operować poza nią performer_id = target_perf.id canonical_name = target_perf.canonical_name run_count = target_perf.search_run_count session.commit() log.info( "performer-continuous: claimed %s (id=%s, run #%d)", canonical_name, performer_id, run_count, ) counters = run_performer_driven( performer_ids=[performer_id], top_n=0, # ignorowane gdy performer_ids podane per_performer_limit=per_performer_limit, canonical_per_performer_limit=canonical_per_performer_limit, skip_canonical=True, # tylko direct scrapery (backward fill) ) return counters def _ingest_iter_into_run( *, source_kind: SourceKind, source_name: str, run_label: str, iterator_factory, # type: ignore[no-untyped-def] ) -> dict[str, int]: """Wariant ingest_from_connector dla iteratorów ad-hoc (per-performer pull). Otwiera IngestRun, iteruje, _process_scene per scene, finalizuje run. Counters podobne do `ingest_from_connector` ale per-call. """ counters = {"seen": 0, "new": 0, "updated": 0, "skipped": 0, "errors": 0} with session_scope() as session: source = get_or_create_source(session, kind=source_kind, name=source_name) run = IngestRun(source_id=source.id, status=IngestStatus.running) session.add(run) session.flush() run_id = run.id source_id = source.id log.info("ingest start label=%s run_id=%s", run_label, run_id) failed = False try: for raw in iterator_factory(): counters["seen"] += 1 try: _process_scene(source_id=source_id, raw_scene=raw, counters=counters) except Exception as exc: # pragma: no cover - obronnie counters["errors"] += 1 log.exception("performer-driven scene failed external_id=%s: %s", raw.external_id, exc) if counters["errors"] > 50: raise except Exception as exc: failed = True log.exception("performer-driven run failed (%s): %s", run_label, exc) with session_scope() as session: run = session.get(IngestRun, run_id) assert run is not None run.finished_at = datetime.now(UTC) if failed: run.status = IngestStatus.failed elif counters["errors"] > 0: run.status = IngestStatus.partial else: run.status = IngestStatus.success run.records_seen = counters["seen"] run.records_new = counters["new"] run.records_updated = counters["updated"] log.info("ingest done label=%s counters=%s", run_label, counters) return counters # Suppress unused-import lint _ = _hash_raw _ = timedelta