goon/app/scheduler/performer_driven.py
https://github.com/goon-foss/goon 642f1ab8b8 Mobile 0.1.9: OTA enable, WebView cookie-dismiss fix, porndoe connector
Mobile / OTA:
- Enable Expo Updates (app.json + AndroidManifest) → api.goon-foss.org
- Bump 0.1.6 → 0.1.9 (build.gradle, app.json, appVersion.ts, main.py /version)
- backend.ts: default public backend auto-connect (no manual login)

WebView fallback fix (PlayerScreen INJECTED_JS):
- Auto-dismiss cookie/consent gates (hqporner et al. blocked kt_player init)
- Context-scoped: only clicks consent buttons inside cookie/gdpr containers
- Retry window for <source>.src polling raised 5→15 ticks (post-dismiss init)

Resolver:
- Series-position + modifier mismatch detector (Episode 2≠4, BTS/unedited)
  → composite_score hard-reject / cap; wired into scene_score + bulk_dedup
- aggregator-mode candidate query: LIMIT 500 + title-match ordering

Connectors:
- porndoe.com browse scraper (JSON-LD VideoObject) — theporndude audit pilot

landing: APK links → goon-v0.1.9.apk

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-22 11:20:57 +02:00

640 lines
24 KiB
Python

"""Performer-driven ingest strategy.
Rationale (memory: kompletność > świeżość):
Random newest-first ingest z TPDB/StashDB rzadko pokrywa się ze świeżym pull'em
z hqporner — bo każde źródło widzi inny wycinek katalogu w danym momencie. Dla
popularnych performerów jednak prawdopodobieństwo overlapu jest dużo wyższe,
a zasięg po jednym performerze jest skończony i da się przeszukać.
Cykl per performer:
1. Lookup performer_external_refs(tpdb, stashdb) — kanoniczne UUID-y.
2. Dla każdego źródła z UUID: pobierz wszystkie sceny tego performera (paginated).
3. Dla każdego direct tube scrapera: search tube po `canonical_name` (np. "Lola Noir").
4. Sceny lecą przez normalny `_process_scene` → resolver merguje cross-source.
Wybór listy performerów (kolejność prób):
a) `--performers "name1,name2"` — explicit.
b) `--performer-ids <uuid1>,<uuid2>` — explicit po naszym kanonicznym UUID.
c) Top-N z bazy po `scene_count desc` (`--top-n=N`, default 20).
"""
from __future__ import annotations
import logging
import uuid
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from app.connectors.stashdb import StashDBConnector
from app.connectors.tpdb import TPDBConnector
from app.db import session_scope
from app.ingest import (
_hash_raw, # type: ignore[attr-defined]
_process_scene, # type: ignore[attr-defined]
get_or_create_source,
)
from app.models.ingest_run import IngestRun, IngestStatus
from app.models.performer import Performer, PerformerExternalRef
from app.models.playback_source import PlaybackSource
from app.models.scene import Scene, ScenePerformer
from app.models.source import Source, SourceKind
log = logging.getLogger(__name__)
@dataclass
class PerformerTarget:
performer_id: uuid.UUID
canonical_name: str
tpdb_id: str | None = None
stashdb_id: str | None = None
@dataclass
class StrategyCounters:
performers_processed: int = 0
performers_skipped_no_refs: int = 0
per_source: dict[str, dict[str, int]] = field(default_factory=dict)
"""source_name → {seen, new, updated, skipped, errors} agregowane po wszystkich performerach."""
def merge(self, source_name: str, c: dict[str, int]) -> None:
bucket = self.per_source.setdefault(
source_name, {"seen": 0, "new": 0, "updated": 0, "skipped": 0, "errors": 0}
)
for k in bucket:
bucket[k] += c.get(k, 0)
# ---- Public API ------------------------------------------------------
def run_performer_driven(
*,
performer_names: list[str] | None = None,
performer_ids: list[uuid.UUID] | None = None,
top_n: int = 20,
sitetags: list[str] | None = None,
per_performer_limit: int | None = None,
canonical_per_performer_limit: int | None = None,
skip_tubes: bool = False,
skip_canonical: bool = False,
) -> StrategyCounters:
"""Główne entry point dla worker CLI."""
counters = StrategyCounters()
# Gdy user explicit żąda performerów po nazwach, automatycznie uzupełniamy brakujące
# canonical refs (TPDB/StashDB UUID) — historycznie wielu performerów miało tylko
# stashdb ref, więc TPDB ingest dla nich nigdy nie odpalał. Dla top-N path nie
# auto-fillujemy bo to dodatkowe API calls bez explicit user intent.
fill_missing_refs = bool(performer_names)
targets = _resolve_targets(
performer_names=performer_names,
performer_ids=performer_ids,
top_n=top_n,
fill_missing_refs=fill_missing_refs,
)
if not targets:
log.warning("performer-driven: no targets resolved (empty top-N or unknown names)")
return counters
log.info(
"performer-driven: %d performers -> tpdb=%s stashdb=%s tubes=%s",
len(targets),
sum(1 for t in targets if t.tpdb_id),
sum(1 for t in targets if t.stashdb_id),
not skip_tubes,
)
# `skip_canonical=True` używane przez continuous worker — pomija TPDB/StashDB
# ingest (są wolne, ~3min/perf). Continuous skupia się na search-based backfill;
# canonical refresh top performerów dzieje się przez scheduled `_job_performer_driven`
# co 12h i przez on-demand `/refresh` API.
tpdb = None if skip_canonical else _try_build(TPDBConnector)
stashdb = None if skip_canonical else _try_build(StashDBConnector)
for target in targets:
log.info("=== performer: %s (id=%s) ===", target.canonical_name, target.performer_id)
if not skip_canonical and not (target.tpdb_id or target.stashdb_id):
log.warning(
"skip %s: brak tpdb/stashdb external_ref, search-only by name nie pomoże w mergu",
target.canonical_name,
)
counters.performers_skipped_no_refs += 1
continue
# 1. TPDB — limit canonical_per_performer_limit (popular performerzy mają tysiące
# scen, fetch wszystkich za każdym refresh = wolne ticki). Default 200 = top 2
# strony, wystarczy do catch-up'u świeżych scen.
canonical_limit = canonical_per_performer_limit if canonical_per_performer_limit is not None else per_performer_limit
if tpdb and target.tpdb_id:
c = _ingest_iter_into_run(
source_kind=SourceKind.tpdb,
source_name="tpdb",
run_label=f"performer-driven:tpdb:{target.canonical_name}",
iterator_factory=lambda t=target: tpdb.fetch_scenes_for_performer(
t.tpdb_id, limit=canonical_limit
),
)
counters.merge("tpdb", c)
# 2. StashDB
if stashdb and target.stashdb_id:
c = _ingest_iter_into_run(
source_kind=SourceKind.stashdb,
source_name="stashdb",
run_label=f"performer-driven:stashdb:{target.canonical_name}",
iterator_factory=lambda t=target: stashdb.fetch_scenes_for_performer(
t.stashdb_id, limit=canonical_limit
),
)
counters.merge("stashdb", c)
# 3. Direct scrapery — per-tube HTTP scraping search. Każdy scraper to
# niezależny budżet rate-limit (osobny serwer tube). Wszystkie scrapery
# feedują do `Source(name='pornapp')` (legacy nazwa kept for DB compat) z
# external_id `f"{sitetag}:{url}"` — resolver mergeuje idempotentnie cross-source.
if not skip_tubes:
from app.connectors.direct_scrapers import ALL_DIRECT_SCRAPERS
sitetag_filter = set(sitetags or []) or None
scrapers = [
s for s in ALL_DIRECT_SCRAPERS
if sitetag_filter is None or s.sitetag in sitetag_filter
]
for scraper_cls in scrapers:
scraper = scraper_cls()
c = _ingest_iter_into_run(
source_kind=SourceKind.scraper,
source_name="pornapp",
run_label=f"performer-driven:direct:{scraper.sitetag}:{target.canonical_name}",
iterator_factory=lambda s=scraper, t=target: s.search(
t.canonical_name, page=1, limit=50
),
)
counters.merge("pornapp", c)
counters.performers_processed += 1
log.info("performer-driven done: %s", counters)
return counters
# ---- Internal --------------------------------------------------------
def _try_build(cls, **kwargs): # type: ignore[no-untyped-def]
try:
kw = {k: v for k, v in kwargs.items() if v is not None}
return cls(**kw)
except Exception as e:
log.warning("connector %s skipped: %s", cls.__name__, e)
return None
def _resolve_targets(
*,
performer_names: list[str] | None,
performer_ids: list[uuid.UUID] | None,
top_n: int,
fill_missing_refs: bool = False,
) -> list[PerformerTarget]:
"""Mapuje wejście (UUID-y / nazwy / top-N) na PerformerTarget z TPDB/StashDB ID.
Dla explicit nazw których nie znajdujemy w naszej bazie — robimy live lookup do
TPDB i StashDB (queryPerformers) żeby wyciągnąć kanoniczne UUID-y. Bez tego user
musiałby najpierw uruchomić newest-first ingest dla każdej performerki.
Gdy `fill_missing_refs=True`: także dla performerów istniejących w bazie próbuje
live-lookup brakujących TPDB/StashDB refów. Use case: user pyta o explicit nazwy
i chce pełny catch-up, nie tylko z tych źródeł które aktualnie mamy zmapowane.
"""
targets: list[PerformerTarget] = []
found_in_db_names: set[str] = set()
with session_scope() as session:
if performer_ids:
performers = session.execute(
select(Performer).where(Performer.id.in_(performer_ids))
).scalars().all()
elif performer_names:
normalized = [n.strip().lower() for n in performer_names if n.strip()]
if not normalized:
return []
performers = session.execute(
select(Performer).where(Performer.name_normalized.in_(normalized))
).scalars().all()
else:
performers = _top_n_by_scene_count(session, top_n)
# Cache external_refs by performer + source kind
tpdb_src = session.execute(
select(Source).where(Source.kind == SourceKind.tpdb)
).scalar_one_or_none()
stashdb_src = session.execute(
select(Source).where(Source.kind == SourceKind.stashdb)
).scalar_one_or_none()
for p in performers:
tpdb_id: str | None = None
stashdb_id: str | None = None
if tpdb_src:
tpdb_id = session.execute(
select(PerformerExternalRef.external_id).where(
PerformerExternalRef.performer_id == p.id,
PerformerExternalRef.source_id == tpdb_src.id,
).limit(1)
).scalar_one_or_none()
if stashdb_src:
stashdb_id = session.execute(
select(PerformerExternalRef.external_id).where(
PerformerExternalRef.performer_id == p.id,
PerformerExternalRef.source_id == stashdb_src.id,
).limit(1)
).scalar_one_or_none()
if fill_missing_refs:
if tpdb_src and tpdb_id is None:
tpdb_id = _ensure_canonical_ref(
session,
performer_id=p.id,
name=p.canonical_name,
kind=SourceKind.tpdb,
source_id=tpdb_src.id,
)
if stashdb_src and stashdb_id is None:
stashdb_id = _ensure_canonical_ref(
session,
performer_id=p.id,
name=p.canonical_name,
kind=SourceKind.stashdb,
source_id=stashdb_src.id,
)
targets.append(
PerformerTarget(
performer_id=p.id,
canonical_name=p.canonical_name,
tpdb_id=tpdb_id,
stashdb_id=stashdb_id,
)
)
found_in_db_names.add(p.name_normalized.strip().lower())
# Live lookup dla nazw nieznalezionych w bazie.
if performer_names:
missing = [
n.strip()
for n in performer_names
if n.strip() and n.strip().lower() not in found_in_db_names
]
for name in missing:
target = _lookup_via_api(name)
if target is not None:
targets.append(target)
return targets
def _ensure_canonical_ref(
session: Session,
*,
performer_id: uuid.UUID,
name: str,
kind: SourceKind,
source_id: uuid.UUID,
) -> str | None:
"""Live lookup TPDB/StashDB UUID by nazwa, INSERT PerformerExternalRef.
Returns external_id (świeżo dodany albo już zlinkowany) lub None gdy:
- lookup zwrócił None (nie ma takiego performera w danym źródle)
- external_id znaleziony, ale jest już zmapowany do innego performera lokalnego
(conflict — wymaga ręcznej decyzji o merge)
"""
try:
if kind == SourceKind.tpdb:
ext_id = TPDBConnector().find_performer_id_by_name(name)
elif kind == SourceKind.stashdb:
ext_id = StashDBConnector().find_performer_id_by_name(name)
else:
return None
except Exception as e:
log.warning("%s lookup failed for %s: %s", kind.value, name, e)
return None
if not ext_id:
return None
existing = session.execute(
select(PerformerExternalRef).where(
PerformerExternalRef.source_id == source_id,
PerformerExternalRef.external_id == ext_id,
)
).scalar_one_or_none()
if existing:
if existing.performer_id != performer_id:
log.warning(
"%s ref conflict: ext_id=%s already maps to performer %s (asked for %s)",
kind.value, ext_id, existing.performer_id, performer_id,
)
return None
return ext_id
session.add(
PerformerExternalRef(
source_id=source_id,
external_id=ext_id,
performer_id=performer_id,
confidence=0.85,
)
)
log.info("auto-filled %s ref for %s: %s", kind.value, name, ext_id)
return ext_id
def _lookup_via_api(name: str) -> PerformerTarget | None:
"""Live lookup nazwy → TPDB UUID + StashDB UUID. Tworzy lokalny placeholder
Performer + external_refs żeby kolejne runy nie wymagały re-lookupu.
Jeśli żadne ze źródeł nie zwróciło ID → None (nie ma sensu nic wciągać).
"""
tpdb_id: str | None = None
stashdb_id: str | None = None
try:
tpdb = TPDBConnector()
tpdb_id = tpdb.find_performer_id_by_name(name)
except Exception as e:
log.warning("tpdb lookup failed for %s: %s", name, e)
try:
stashdb = StashDBConnector()
stashdb_id = stashdb.find_performer_id_by_name(name)
except Exception as e:
log.warning("stashdb lookup failed for %s: %s", name, e)
if not (tpdb_id or stashdb_id):
log.warning("performer '%s' not found in any external source — skipping", name)
return None
log.info(
"live lookup: %s → tpdb=%s stashdb=%s", name, tpdb_id, stashdb_id
)
# Stwórz placeholder Performera w naszej bazie + external_refs, żeby resolver
# mógł zmappować scena→performer poprawnie i kolejne runy znalazły go w DB.
from app.normalize.text import normalize_person, slugify
with session_scope() as session:
normalized = normalize_person(name)
existing = session.execute(
select(Performer).where(Performer.name_normalized == normalized)
).scalar_one_or_none()
if existing is not None:
performer_id = existing.id
else:
performer = Performer(
canonical_name=name,
name_normalized=normalized,
slug=slugify(name),
)
session.add(performer)
session.flush()
performer_id = performer.id
if tpdb_id:
tpdb_src = session.execute(
select(Source).where(Source.kind == SourceKind.tpdb)
).scalar_one_or_none()
if tpdb_src is not None:
exists = session.execute(
select(PerformerExternalRef).where(
PerformerExternalRef.source_id == tpdb_src.id,
PerformerExternalRef.external_id == tpdb_id,
)
).scalar_one_or_none()
if exists is None:
session.add(
PerformerExternalRef(
source_id=tpdb_src.id,
external_id=tpdb_id,
performer_id=performer_id,
confidence=0.9,
)
)
if stashdb_id:
stashdb_src = session.execute(
select(Source).where(Source.kind == SourceKind.stashdb)
).scalar_one_or_none()
if stashdb_src is not None:
exists = session.execute(
select(PerformerExternalRef).where(
PerformerExternalRef.source_id == stashdb_src.id,
PerformerExternalRef.external_id == stashdb_id,
)
).scalar_one_or_none()
if exists is None:
session.add(
PerformerExternalRef(
source_id=stashdb_src.id,
external_id=stashdb_id,
performer_id=performer_id,
confidence=0.9,
)
)
return PerformerTarget(
performer_id=performer_id,
canonical_name=name,
tpdb_id=tpdb_id,
stashdb_id=stashdb_id,
)
def _top_n_by_scene_count(session: Session, n: int) -> list[Performer]:
rows = session.execute(
select(Performer, func.count(ScenePerformer.scene_id).label("c"))
.outerjoin(ScenePerformer, ScenePerformer.performer_id == Performer.id)
.group_by(Performer.id)
.order_by(func.count(ScenePerformer.scene_id).desc())
.limit(n)
).all()
return [p for p, _ in rows]
def _claim_next_for_search(
session: Session, *, refresh_after: timedelta, min_scene_count: int = 1
) -> Performer | None:
"""Wybiera 1 performera z queue + UPDATE last_searched_at = now() w jednej
transakcji (skip locked → safe pod konkurencyjnym workerze).
Queue priority (2026-05-20 update — orphan-rescue bias):
1. **Performerzy z RECENT scenes-without-playback** (last 7d, no live PS) —
najpilniejsi bo user widzi puste studio listings dla najnowszych scen.
Bug-report 2026-05-20: "brak Brazzers Exxtra po 15-05" → wszystkie nowe
TPDB sceny mają canonical metadata ale 0 playback bo continuous queue
nigdy ich nie dotyka (78k performers, 67k NULL → ~232 dni sweep).
2. Performerzy NIGDY niesearchowani (`last_searched_at IS NULL`)
3. Performerzy searchowani > `refresh_after` temu
4. Filtruj scene_count >= min_scene_count
"""
cutoff = datetime.now(UTC) - refresh_after
orphan_cutoff = datetime.now(UTC) - timedelta(days=7)
sc_sub = (
select(
ScenePerformer.performer_id.label("pid"),
func.count(ScenePerformer.scene_id).label("scene_count"),
)
.group_by(ScenePerformer.performer_id)
.subquery()
)
# Orphan-scene count per performer: scenes z release_date w ostatnich 7d
# AND brak żywego playback source. Wysoki count = performer-z-rekordów-pustych.
orphan_sub = (
select(
ScenePerformer.performer_id.label("pid"),
func.count(ScenePerformer.scene_id).label("orphan_count"),
)
.join(Scene, Scene.id == ScenePerformer.scene_id)
.where(Scene.release_date > orphan_cutoff)
.where(
~select(PlaybackSource.id)
.where(PlaybackSource.scene_id == Scene.id)
.where(PlaybackSource.dead_at.is_(None))
.exists()
)
.group_by(ScenePerformer.performer_id)
.subquery()
)
row = session.execute(
select(Performer)
.join(sc_sub, sc_sub.c.pid == Performer.id, isouter=False)
.join(orphan_sub, orphan_sub.c.pid == Performer.id, isouter=True)
.where(sc_sub.c.scene_count >= min_scene_count)
.where(
(Performer.last_searched_at.is_(None))
| (Performer.last_searched_at < cutoff)
)
.order_by(
# 1. Orphan scenes (last 7d, no playback) FIRST — desc count.
# COALESCE 0 sprawia że performerzy bez orphan idą za tymi z.
func.coalesce(orphan_sub.c.orphan_count, 0).desc(),
# 2. NULL last_searched_at next
Performer.last_searched_at.asc().nullsfirst(),
# 3. Highest scene_count (popular performers earlier)
sc_sub.c.scene_count.desc(),
)
.limit(1)
).scalar_one_or_none()
if row is None:
return None
row.last_searched_at = datetime.now(UTC)
row.search_run_count = (row.search_run_count or 0) + 1
session.flush()
return row
def run_continuous_one_at_a_time(
*,
refresh_after_days: int = 30,
min_scene_count: int = 1,
per_performer_limit: int | None = None,
canonical_per_performer_limit: int | None = 100,
) -> StrategyCounters | None:
"""Pobiera 1 performera z priority queue, runs full performer-driven search,
update last_searched_at. Zwraca counters (lub None gdy queue pusta).
Wywoływane przez APScheduler interval job (np. co 60s).
"""
refresh_after = timedelta(days=refresh_after_days)
with session_scope() as session:
target_perf = _claim_next_for_search(
session,
refresh_after=refresh_after,
min_scene_count=min_scene_count,
)
if target_perf is None:
log.info("performer-continuous: queue empty (all searched within %dd)", refresh_after_days)
return None
# Wyciągamy ID przed zamknięciem sesji bo chcemy dalej operować poza nią
performer_id = target_perf.id
canonical_name = target_perf.canonical_name
run_count = target_perf.search_run_count
session.commit()
log.info(
"performer-continuous: claimed %s (id=%s, run #%d)",
canonical_name, performer_id, run_count,
)
counters = run_performer_driven(
performer_ids=[performer_id],
top_n=0, # ignorowane gdy performer_ids podane
per_performer_limit=per_performer_limit,
canonical_per_performer_limit=canonical_per_performer_limit,
skip_canonical=True, # tylko direct scrapery (backward fill)
)
return counters
def _ingest_iter_into_run(
*,
source_kind: SourceKind,
source_name: str,
run_label: str,
iterator_factory, # type: ignore[no-untyped-def]
) -> dict[str, int]:
"""Wariant ingest_from_connector dla iteratorów ad-hoc (per-performer pull).
Otwiera IngestRun, iteruje, _process_scene per scene, finalizuje run. Counters
podobne do `ingest_from_connector` ale per-call.
"""
counters = {"seen": 0, "new": 0, "updated": 0, "skipped": 0, "errors": 0}
with session_scope() as session:
source = get_or_create_source(session, kind=source_kind, name=source_name)
run = IngestRun(source_id=source.id, status=IngestStatus.running)
session.add(run)
session.flush()
run_id = run.id
source_id = source.id
log.info("ingest start label=%s run_id=%s", run_label, run_id)
failed = False
try:
for raw in iterator_factory():
counters["seen"] += 1
try:
_process_scene(source_id=source_id, raw_scene=raw, counters=counters)
except Exception as exc: # pragma: no cover - obronnie
counters["errors"] += 1
log.exception("performer-driven scene failed external_id=%s: %s", raw.external_id, exc)
if counters["errors"] > 50:
raise
except Exception as exc:
failed = True
log.exception("performer-driven run failed (%s): %s", run_label, exc)
with session_scope() as session:
run = session.get(IngestRun, run_id)
assert run is not None
run.finished_at = datetime.now(UTC)
if failed:
run.status = IngestStatus.failed
elif counters["errors"] > 0:
run.status = IngestStatus.partial
else:
run.status = IngestStatus.success
run.records_seen = counters["seen"]
run.records_new = counters["new"]
run.records_updated = counters["updated"]
log.info("ingest done label=%s counters=%s", run_label, counters)
return counters
# Suppress unused-import lint
_ = _hash_raw
_ = timedelta