"""Worker process. Tryby: --once --source=tpdb|stashdb [--limit=N] [--no-delta] Klasyczny ingest z jednego connectora. --once --strategy=performer-driven [--top-n=N] [--performers="Lola,Mia"] Strategia performer-driven: dla top-N performerów (lub explicit listy) pobierz wszystkie sceny z TPDB/StashDB + szukaj w direct tube scrapach. Patrz `app/scheduler/performer_driven.py`. Pełen scheduler (APScheduler + cron) — patrz `jobs.py`. """ from __future__ import annotations import argparse import logging import signal import sys import time import uuid from app.config import get_settings from app.connectors.base import BaseConnector from app.connectors.stashdb import StashDBConnector from app.connectors.tpdb import TPDBConnector from app.ingest import ingest_from_connector from app.scheduler.performer_driven import run_performer_driven log = logging.getLogger(__name__) CONNECTORS: dict[str, type[BaseConnector]] = { "tpdb": TPDBConnector, "stashdb": StashDBConnector, } STRATEGIES = ("performer-driven", "bulk-dedup", "movies") # Movie connectors zarejestrowane w app/connectors/__init__.py — wspólne miejsce # dla scheduler-job i ręcznego --strategy=movies. def _movie_connectors() -> dict[str, type]: from app.connectors import get_movie_connectors return dict(get_movie_connectors()) def _build_connector(name: str) -> BaseConnector: cls = CONNECTORS.get(name) if cls is None: raise SystemExit(f"unknown source: {name}. available={list(CONNECTORS)}") return cls() def run_once_source(*, source: str, limit: int | None, use_delta: bool) -> int: connector = _build_connector(source) counters = ingest_from_connector(connector, limit=limit, use_delta=use_delta) log.info("run_once finished counters=%s", counters) return 0 if counters["errors"] == 0 else 1 def run_once_strategy( *, strategy: str, top_n: int, performers: list[str] | None, performer_ids: list[uuid.UUID] | None, sitetags: list[str] | None, per_performer_limit: int | None, skip_tubes: bool = False, dedup_strategy: str = "all", cross_source_only: bool = False, dry_run: bool = False, ) -> int: if strategy == "performer-driven": counters = run_performer_driven( performer_names=performers, performer_ids=performer_ids, top_n=top_n, sitetags=sitetags, per_performer_limit=per_performer_limit, skip_tubes=skip_tubes, ) log.info( "performer-driven done: processed=%d skipped_no_refs=%d per_source=%s", counters.performers_processed, counters.performers_skipped_no_refs, counters.per_source, ) total_errors = sum(s.get("errors", 0) for s in counters.per_source.values()) return 0 if total_errors == 0 else 1 if strategy == "movies": from app.ingest import ingest_movies_from_connector registry = _movie_connectors() # `performers` field reusowane: lista `paradisehill,streamporn,...` lub "all". # Default: wszystkie. CLI: --performers=paradisehill albo --performers=streamporn,mangoporn which = performers or ["all"] targets: list[str] = list(registry.keys()) if "all" in which else which any_error = False for tgt in targets: cls = registry.get(tgt) if cls is None: log.error("unknown movie source: %s (available: %s)", tgt, list(registry)) any_error = True continue log.info("movie ingest %s starting (delta=%s, limit=%s)", tgt, True, per_performer_limit) try: counters = ingest_movies_from_connector( cls(), use_delta=True, limit=per_performer_limit, ) log.info("movie ingest %s done: %s", tgt, counters) if counters.get("errors", 0) > 0: any_error = True except Exception: log.exception("movie ingest %s failed", tgt) any_error = True return 1 if any_error else 0 if strategy == "bulk-dedup": from app.scheduler.bulk_dedup import run_bulk_dedup bc = run_bulk_dedup( strategy=dedup_strategy, dry_run=dry_run, cross_source_only=cross_source_only, ) log.info("bulk-dedup done: %s", bc) return 0 raise SystemExit(f"unknown strategy: {strategy}. available={STRATEGIES}") def run_forever() -> int: """APScheduler scheduled mode — odpala joby cron-like wg config (env-driven). Joby: tpdb / stashdb — co N godzin, delta od ostatniego successful run performer-driven — co N godzin, top-N performerów z bazy performer-continuous — tick co N sekund, 1 performer per tick (tube backfill) """ from app.scheduler.jobs import build_scheduler # opóźniony import (apscheduler) settings = get_settings() cfg = { "tpdb_hours": settings.sched_tpdb_hours or None, "stashdb_hours": settings.sched_stashdb_hours or None, "performer_driven_hours": settings.sched_performer_driven_hours or None, "performer_driven_top_n": settings.sched_performer_driven_top_n, "performer_continuous_seconds": getattr( settings, "sched_performer_continuous_seconds", 60 ) or None, "performer_continuous_refresh_days": getattr( settings, "sched_performer_continuous_refresh_days", 30 ), "movie_ingest_hours": getattr(settings, "sched_movie_ingest_hours", 24) or None, # Browse-latest scheduler — freshporno/porn00/pornxp browse newest scenes raz # dziennie (~100 scen/tube/run). Bug: brak tego klucza w worker config przez # ~tydzień powodował że browse-mode nigdy nie odpalał (15k freshporno z 2026-05-13 # to bulk import jednorazowy). Bug-report 93d3c485 (2026-05-19) "brak freshporno". "browse_latest_hours": getattr(settings, "sched_browse_latest_hours", 24) or None, "browse_latest_max_pages": getattr(settings, "sched_browse_latest_max_pages", 5), } sched = build_scheduler(cfg) log.info("worker scheduled mode starting (jobs=%d)", len(sched.get_jobs())) try: sched.start() except (KeyboardInterrupt, SystemExit): log.info("worker received shutdown") return 0 def _split_csv(s: str | None) -> list[str]: if not s: return [] return [x.strip() for x in s.split(",") if x.strip()] def main(argv: list[str] | None = None) -> int: settings = get_settings() logging.basicConfig( level=settings.log_level, format="%(asctime)s %(levelname)s %(name)s %(message)s", ) # Sentry: pusty DSN → no-op. Worker errory (failed ingest, dead connector, # connection loss do TPDB/StashDB) trafiają do Sentry z release tagiem # `goon-worker@...` — odróżnia ich od API errors w UI. if settings.sentry_dsn: import sentry_sdk from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration sentry_sdk.init( dsn=settings.sentry_dsn, environment=settings.sentry_environment, traces_sample_rate=settings.sentry_traces_sample_rate, integrations=[SqlalchemyIntegration()], release="goon-worker@0.1.0", ) parser = argparse.ArgumentParser(prog="goon-worker") parser.add_argument("--once", action="store_true", help="Uruchom jeden cykl ingest i zakończ") parser.add_argument("--source", choices=sorted(CONNECTORS), help="Nazwa źródła (np. tpdb)") parser.add_argument( "--strategy", choices=STRATEGIES, help="Strategia ingest (alternatywa do --source)", ) parser.add_argument("--limit", type=int, default=None, help="Limit scen do pobrania") parser.add_argument( "--no-delta", action="store_true", help="Pomija filtr od ostatniego successful run (full re-pull)", ) # Strategy-specific options parser.add_argument("--top-n", type=int, default=20, help="Liczba top performerów (performer-driven)") parser.add_argument( "--performers", type=str, default=None, help="Lista nazwisk performerów (CSV) — overrides top-N", ) parser.add_argument( "--performer-ids", type=str, default=None, help="Lista naszych kanonicznych UUID-ów (CSV) — overrides top-N i --performers", ) parser.add_argument( "--sitetags", type=str, default=None, help="CSV sitetagów do tube search (default: wszystkie z ALL_DIRECT_SCRAPERS)", ) parser.add_argument( "--skip-tubes", action="store_true", help="Pomija tube discovery (canonical-only catch-up: tylko TPDB+StashDB)", ) parser.add_argument( "--per-performer-limit", type=int, default=None, help="Limit scen z jednego connectora per performer (debug)", ) parser.add_argument( "--dedup-strategy", choices=("phash", "performers", "all"), default="all", help="bulk-dedup zakres: phash | performers | all", ) parser.add_argument( "--cross-source-only", action="store_true", help="bulk-dedup: tylko pary (tpdb-only ↔ stashdb-only) — najwyższa wartość," " pomija pary już zmergowane lub same-source", ) parser.add_argument( "--dry-run", action="store_true", help="Tylko loguj, nic nie zapisuj (dla bulk-dedup preview)", ) args = parser.parse_args(argv) if args.once: if args.strategy: performer_ids = None if args.performer_ids: try: performer_ids = [uuid.UUID(s) for s in _split_csv(args.performer_ids)] except ValueError as e: parser.error(f"invalid --performer-ids: {e}") return run_once_strategy( strategy=args.strategy, top_n=args.top_n, performers=_split_csv(args.performers) or None, performer_ids=performer_ids, sitetags=_split_csv(args.sitetags) or None, per_performer_limit=args.per_performer_limit, skip_tubes=args.skip_tubes, dedup_strategy=args.dedup_strategy, cross_source_only=args.cross_source_only, dry_run=args.dry_run, ) if not args.source: parser.error("--once wymaga --source albo --strategy") return run_once_source(source=args.source, limit=args.limit, use_delta=not args.no_delta) return run_forever() if __name__ == "__main__": sys.exit(main())