goon/app/scheduler/worker.py
jtrzupek 7e46e5ac48 feat(scheduler): deep-crawl full tube catalogs (Phase 2a — ingest-all)
We ingested only ~3% of each browse tube's catalog (porndoe >62k scenes; we had 1959)
because tubes were hit only by performer-search + top-N browse. Pilot (porndoe pages
64-110): 1119 new scenes, 100% playable + 100% tagged, 0% canonical overlap (purely
additive — content not in TPDB/StashDB).

- app/scheduler/deep_crawl.py: round-robin over ALL_BROWSE_SCRAPERS, per-tube page cursor
  in app/_state/deepcrawl_state.json (no DB migration), deep-paginate from the cursor,
  idempotent (resolver skips known by raw_hash), mark 'exhausted' at catalog end then
  reset cursors for an incremental re-sweep.
- _job_deep_crawl: hourly, 60 pages/run (~1860 scenes, ~22 min), wrapped in the 1h
  hard-timeout; registered in build_scheduler (jobs=10).
- config: sched_deep_crawl_hours=1, deep_crawl_pages_per_run=60, deepcrawl_state_path.
- scripts/pilot_porndoe_deepcrawl.py: one-off pilot used to validate the approach.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-03 09:26:44 +02:00

341 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Worker process.
Tryby:
--once --source=tpdb|stashdb [--limit=N] [--no-delta]
Klasyczny ingest z jednego connectora.
--once --strategy=performer-driven [--top-n=N] [--performers="Lola,Mia"]
Strategia performer-driven: dla top-N performerów (lub explicit listy)
pobierz wszystkie sceny z TPDB/StashDB + szukaj w direct tube scrapach.
Patrz `app/scheduler/performer_driven.py`.
Pełen scheduler (APScheduler + cron) — patrz `jobs.py`.
"""
from __future__ import annotations
import argparse
import logging
import signal
import sys
import time
import uuid
from datetime import UTC, datetime, timedelta
from app.config import get_settings
from app.connectors.base import BaseConnector
from app.connectors.stashdb import StashDBConnector
from app.connectors.tpdb import TPDBConnector
from app.ingest import ingest_from_connector
from app.scheduler.performer_driven import run_performer_driven
log = logging.getLogger(__name__)
CONNECTORS: dict[str, type[BaseConnector]] = {
"tpdb": TPDBConnector,
"stashdb": StashDBConnector,
}
STRATEGIES = ("performer-driven", "bulk-dedup", "movies")
# Movie connectors zarejestrowane w app/connectors/__init__.py — wspólne miejsce
# dla scheduler-job i ręcznego --strategy=movies.
def _movie_connectors() -> dict[str, type]:
from app.connectors import get_movie_connectors
return dict(get_movie_connectors())
def _build_connector(name: str) -> BaseConnector:
cls = CONNECTORS.get(name)
if cls is None:
raise SystemExit(f"unknown source: {name}. available={list(CONNECTORS)}")
return cls()
def run_once_source(*, source: str, limit: int | None, use_delta: bool) -> int:
connector = _build_connector(source)
counters = ingest_from_connector(connector, limit=limit, use_delta=use_delta)
log.info("run_once finished counters=%s", counters)
return 0 if counters["errors"] == 0 else 1
def run_once_strategy(
*,
strategy: str,
top_n: int,
performers: list[str] | None,
performer_ids: list[uuid.UUID] | None,
sitetags: list[str] | None,
per_performer_limit: int | None,
skip_tubes: bool = False,
dedup_strategy: str = "all",
cross_source_only: bool = False,
dry_run: bool = False,
) -> int:
if strategy == "performer-driven":
counters = run_performer_driven(
performer_names=performers,
performer_ids=performer_ids,
top_n=top_n,
sitetags=sitetags,
per_performer_limit=per_performer_limit,
skip_tubes=skip_tubes,
)
log.info(
"performer-driven done: processed=%d skipped_no_refs=%d per_source=%s",
counters.performers_processed,
counters.performers_skipped_no_refs,
counters.per_source,
)
total_errors = sum(s.get("errors", 0) for s in counters.per_source.values())
return 0 if total_errors == 0 else 1
if strategy == "movies":
from app.ingest import ingest_movies_from_connector
registry = _movie_connectors()
# `performers` field reusowane: lista `paradisehill,streamporn,...` lub "all".
# Default: wszystkie. CLI: --performers=paradisehill albo --performers=streamporn,mangoporn
which = performers or ["all"]
targets: list[str] = list(registry.keys()) if "all" in which else which
any_error = False
for tgt in targets:
cls = registry.get(tgt)
if cls is None:
log.error("unknown movie source: %s (available: %s)", tgt, list(registry))
any_error = True
continue
log.info("movie ingest %s starting (delta=%s, limit=%s)", tgt, True, per_performer_limit)
try:
counters = ingest_movies_from_connector(
cls(),
use_delta=True,
limit=per_performer_limit,
)
log.info("movie ingest %s done: %s", tgt, counters)
if counters.get("errors", 0) > 0:
any_error = True
except Exception:
log.exception("movie ingest %s failed", tgt)
any_error = True
return 1 if any_error else 0
if strategy == "bulk-dedup":
from app.scheduler.bulk_dedup import run_bulk_dedup
bc = run_bulk_dedup(
strategy=dedup_strategy,
dry_run=dry_run,
cross_source_only=cross_source_only,
)
log.info("bulk-dedup done: %s", bc)
return 0
raise SystemExit(f"unknown strategy: {strategy}. available={STRATEGIES}")
def reap_stuck_ingest_runs(max_age_hours: int = 2) -> int:
"""Oznacza ingest_runs wiszące w status=running jako failed (killed_by_restart).
Worker jest single-process: gdy zostanie ubity w trakcie runu (deploy, OOM,
restart obrazu), wiersz IngestRun zostaje na zawsze w `running` — `finished_at`
nigdy się nie ustawia. Bez reapowania zombie akumulują się (13 znalezionych
2026-06-01) i fałszują metryki: `_last_successful_finished_at` ignoruje je
(OK), ale watchdog liczący stuck-runy i fail-rate widzi je jako szum.
Reapujemy TYLKO runy starsze niż `max_age_hours` — healthy run kończy się w
sekundy/minuty (po SQL-fix phash), a movie-ingest ma własny 6-min cap, więc
cokolwiek `running` > 2h to na pewno trup po ubitym workerze. Margines chroni
równolegle odpalony ręczny `--once` (inny proces) przed omyłkowym reapem.
"""
from sqlalchemy import update
from app.db import session_scope
from app.models.ingest_run import IngestRun, IngestStatus
now = datetime.now(UTC)
cutoff = now - timedelta(hours=max_age_hours)
with session_scope() as session:
result = session.execute(
update(IngestRun)
.where(
IngestRun.status == IngestStatus.running,
IngestRun.started_at < cutoff,
)
.values(status=IngestStatus.failed, finished_at=now, errors={"message": "killed_by_restart"})
)
reaped = result.rowcount or 0
if reaped:
log.warning("reaped %d stuck ingest_runs (running > %dh) on startup", reaped, max_age_hours)
return reaped
def run_forever() -> int:
"""APScheduler scheduled mode — odpala joby cron-like wg config (env-driven).
Joby:
tpdb / stashdb — co N godzin, delta od ostatniego successful run
performer-driven — co N godzin, top-N performerów z bazy
performer-continuous — tick co N sekund, 1 performer per tick (tube backfill)
"""
from app.scheduler.jobs import build_scheduler # opóźniony import (apscheduler)
# Sprzątanie po poprzednim (ubitym) workerze zanim wystartujemy joby.
reap_stuck_ingest_runs()
settings = get_settings()
cfg = {
"tpdb_hours": settings.sched_tpdb_hours or None,
"stashdb_hours": settings.sched_stashdb_hours or None,
"performer_driven_hours": settings.sched_performer_driven_hours or None,
"performer_driven_top_n": settings.sched_performer_driven_top_n,
"performer_continuous_seconds": getattr(
settings, "sched_performer_continuous_seconds", 60
) or None,
"performer_continuous_refresh_days": getattr(
settings, "sched_performer_continuous_refresh_days", 30
),
"movie_ingest_hours": getattr(settings, "sched_movie_ingest_hours", 24) or None,
# Browse-latest scheduler — browse newest scenes co 6h / 4×dobę (~100 scen/tube/run).
# Bug: brak tego klucza w worker config przez ~tydzień powodował że browse-mode nigdy
# nie odpalał (15k freshporno z 2026-05-13 to bulk import jednorazowy). Bug-report
# 93d3c485 (2026-05-19) "brak freshporno".
"browse_latest_hours": getattr(settings, "sched_browse_latest_hours", 6) or None,
"browse_latest_max_pages": getattr(settings, "sched_browse_latest_max_pages", 5),
# Deep-crawl pełnych katalogów browse-tube'ów (Faza 2a) — round-robin, wznawialny.
"deep_crawl_hours": getattr(settings, "sched_deep_crawl_hours", 1) or None,
"deep_crawl_pages_per_run": getattr(settings, "deep_crawl_pages_per_run", 60),
# Bulk-dedup performers — safety net dla duplikatów które resolver
# pominął (np. freshporno scen przed fixem release_date). Run 12h.
"bulk_dedup_hours": getattr(settings, "sched_bulk_dedup_hours", 12) or None,
# Taxonomy scene_count refresh — denormalizacja liczników (perf fix 0019).
"taxonomy_counts_hours": getattr(settings, "sched_taxonomy_counts_hours", 3) or None,
}
sched = build_scheduler(cfg)
log.info("worker scheduled mode starting (jobs=%d)", len(sched.get_jobs()))
try:
sched.start()
except (KeyboardInterrupt, SystemExit):
log.info("worker received shutdown")
return 0
def _split_csv(s: str | None) -> list[str]:
if not s:
return []
return [x.strip() for x in s.split(",") if x.strip()]
def main(argv: list[str] | None = None) -> int:
settings = get_settings()
logging.basicConfig(
level=settings.log_level,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
# Sentry: pusty DSN → no-op. Worker errory (failed ingest, dead connector,
# connection loss do TPDB/StashDB) trafiają do Sentry z release tagiem
# `goon-worker@...` — odróżnia ich od API errors w UI.
if settings.sentry_dsn:
import sentry_sdk
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
sentry_sdk.init(
dsn=settings.sentry_dsn,
environment=settings.sentry_environment,
traces_sample_rate=settings.sentry_traces_sample_rate,
integrations=[SqlalchemyIntegration()],
release="goon-worker@0.1.0",
)
parser = argparse.ArgumentParser(prog="goon-worker")
parser.add_argument("--once", action="store_true", help="Uruchom jeden cykl ingest i zakończ")
parser.add_argument("--source", choices=sorted(CONNECTORS), help="Nazwa źródła (np. tpdb)")
parser.add_argument(
"--strategy",
choices=STRATEGIES,
help="Strategia ingest (alternatywa do --source)",
)
parser.add_argument("--limit", type=int, default=None, help="Limit scen do pobrania")
parser.add_argument(
"--no-delta",
action="store_true",
help="Pomija filtr od ostatniego successful run (full re-pull)",
)
# Strategy-specific options
parser.add_argument("--top-n", type=int, default=20, help="Liczba top performerów (performer-driven)")
parser.add_argument(
"--performers",
type=str,
default=None,
help="Lista nazwisk performerów (CSV) — overrides top-N",
)
parser.add_argument(
"--performer-ids",
type=str,
default=None,
help="Lista naszych kanonicznych UUID-ów (CSV) — overrides top-N i --performers",
)
parser.add_argument(
"--sitetags",
type=str,
default=None,
help="CSV sitetagów do tube search (default: wszystkie z ALL_DIRECT_SCRAPERS)",
)
parser.add_argument(
"--skip-tubes",
action="store_true",
help="Pomija tube discovery (canonical-only catch-up: tylko TPDB+StashDB)",
)
parser.add_argument(
"--per-performer-limit",
type=int,
default=None,
help="Limit scen z jednego connectora per performer (debug)",
)
parser.add_argument(
"--dedup-strategy",
choices=("phash", "performers", "all"),
default="all",
help="bulk-dedup zakres: phash | performers | all",
)
parser.add_argument(
"--cross-source-only",
action="store_true",
help="bulk-dedup: tylko pary (tpdb-only ↔ stashdb-only) — najwyższa wartość,"
" pomija pary już zmergowane lub same-source",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Tylko loguj, nic nie zapisuj (dla bulk-dedup preview)",
)
args = parser.parse_args(argv)
if args.once:
if args.strategy:
performer_ids = None
if args.performer_ids:
try:
performer_ids = [uuid.UUID(s) for s in _split_csv(args.performer_ids)]
except ValueError as e:
parser.error(f"invalid --performer-ids: {e}")
return run_once_strategy(
strategy=args.strategy,
top_n=args.top_n,
performers=_split_csv(args.performers) or None,
performer_ids=performer_ids,
sitetags=_split_csv(args.sitetags) or None,
per_performer_limit=args.per_performer_limit,
skip_tubes=args.skip_tubes,
dedup_strategy=args.dedup_strategy,
cross_source_only=args.cross_source_only,
dry_run=args.dry_run,
)
if not args.source:
parser.error("--once wymaga --source albo --strategy")
return run_once_source(source=args.source, limit=args.limit, use_delta=not args.no_delta)
return run_forever()
if __name__ == "__main__":
sys.exit(main())