The global source monitor can't catch a single stalled tube because every tube scraper shares one Source row (tube-scraper), so an aggregate run still reports success while one origin freezes (freshporno browsing the rotating KVS homepage root, report 14f3a655). New watchdog checks max(created_at) per active browse-scraper origin (tube:<sitetag>); if a tube with history hasn't produced a new scene in > max_age_hours it fires a Sentry message with a stable per-origin fingerprint (age in extras, not the title, so it stays one grouped issue). Runs every 6h, 48h threshold, both env-tunable (GOON_SCHED_INGEST_WATCHDOG_HOURS / GOON_INGEST_WATCHDOG_MAX_AGE_HOURS). Verified: 0 stale at 48h post-fix, detects neporn at a strict 12h threshold. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
349 lines
14 KiB
Python
349 lines
14 KiB
Python
"""Worker process.
|
||
|
||
Tryby:
|
||
--once --source=tpdb|stashdb [--limit=N] [--no-delta]
|
||
Klasyczny ingest z jednego connectora.
|
||
|
||
--once --strategy=performer-driven [--top-n=N] [--performers="Lola,Mia"]
|
||
Strategia performer-driven: dla top-N performerów (lub explicit listy)
|
||
pobierz wszystkie sceny z TPDB/StashDB + szukaj w direct tube scrapach.
|
||
Patrz `app/scheduler/performer_driven.py`.
|
||
|
||
Pełen scheduler (APScheduler + cron) — patrz `jobs.py`.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import logging
|
||
import signal
|
||
import sys
|
||
import time
|
||
import uuid
|
||
from datetime import UTC, datetime, timedelta
|
||
|
||
from app.config import get_settings
|
||
from app.connectors.base import BaseConnector
|
||
from app.connectors.stashdb import StashDBConnector
|
||
from app.connectors.tpdb import TPDBConnector
|
||
from app.ingest import ingest_from_connector
|
||
from app.scheduler.performer_driven import run_performer_driven
|
||
|
||
log = logging.getLogger(__name__)
|
||
|
||
|
||
CONNECTORS: dict[str, type[BaseConnector]] = {
|
||
"tpdb": TPDBConnector,
|
||
"stashdb": StashDBConnector,
|
||
}
|
||
|
||
STRATEGIES = ("performer-driven", "bulk-dedup", "movies")
|
||
|
||
|
||
# Movie connectors zarejestrowane w app/connectors/__init__.py — wspólne miejsce
|
||
# dla scheduler-job i ręcznego --strategy=movies.
|
||
def _movie_connectors() -> dict[str, type]:
|
||
from app.connectors import get_movie_connectors
|
||
return dict(get_movie_connectors())
|
||
|
||
|
||
def _build_connector(name: str) -> BaseConnector:
|
||
cls = CONNECTORS.get(name)
|
||
if cls is None:
|
||
raise SystemExit(f"unknown source: {name}. available={list(CONNECTORS)}")
|
||
return cls()
|
||
|
||
|
||
def run_once_source(*, source: str, limit: int | None, use_delta: bool) -> int:
|
||
connector = _build_connector(source)
|
||
counters = ingest_from_connector(connector, limit=limit, use_delta=use_delta)
|
||
log.info("run_once finished counters=%s", counters)
|
||
return 0 if counters["errors"] == 0 else 1
|
||
|
||
|
||
def run_once_strategy(
|
||
*,
|
||
strategy: str,
|
||
top_n: int,
|
||
performers: list[str] | None,
|
||
performer_ids: list[uuid.UUID] | None,
|
||
sitetags: list[str] | None,
|
||
per_performer_limit: int | None,
|
||
skip_tubes: bool = False,
|
||
dedup_strategy: str = "all",
|
||
cross_source_only: bool = False,
|
||
dry_run: bool = False,
|
||
) -> int:
|
||
if strategy == "performer-driven":
|
||
counters = run_performer_driven(
|
||
performer_names=performers,
|
||
performer_ids=performer_ids,
|
||
top_n=top_n,
|
||
sitetags=sitetags,
|
||
per_performer_limit=per_performer_limit,
|
||
skip_tubes=skip_tubes,
|
||
)
|
||
log.info(
|
||
"performer-driven done: processed=%d skipped_no_refs=%d per_source=%s",
|
||
counters.performers_processed,
|
||
counters.performers_skipped_no_refs,
|
||
counters.per_source,
|
||
)
|
||
total_errors = sum(s.get("errors", 0) for s in counters.per_source.values())
|
||
return 0 if total_errors == 0 else 1
|
||
|
||
if strategy == "movies":
|
||
from app.ingest import ingest_movies_from_connector
|
||
registry = _movie_connectors()
|
||
# `performers` field reusowane: lista `paradisehill,streamporn,...` lub "all".
|
||
# Default: wszystkie. CLI: --performers=paradisehill albo --performers=streamporn,mangoporn
|
||
which = performers or ["all"]
|
||
targets: list[str] = list(registry.keys()) if "all" in which else which
|
||
any_error = False
|
||
for tgt in targets:
|
||
cls = registry.get(tgt)
|
||
if cls is None:
|
||
log.error("unknown movie source: %s (available: %s)", tgt, list(registry))
|
||
any_error = True
|
||
continue
|
||
log.info("movie ingest %s starting (delta=%s, limit=%s)", tgt, True, per_performer_limit)
|
||
try:
|
||
counters = ingest_movies_from_connector(
|
||
cls(),
|
||
use_delta=True,
|
||
limit=per_performer_limit,
|
||
)
|
||
log.info("movie ingest %s done: %s", tgt, counters)
|
||
if counters.get("errors", 0) > 0:
|
||
any_error = True
|
||
except Exception:
|
||
log.exception("movie ingest %s failed", tgt)
|
||
any_error = True
|
||
return 1 if any_error else 0
|
||
|
||
if strategy == "bulk-dedup":
|
||
from app.scheduler.bulk_dedup import run_bulk_dedup
|
||
bc = run_bulk_dedup(
|
||
strategy=dedup_strategy,
|
||
dry_run=dry_run,
|
||
cross_source_only=cross_source_only,
|
||
)
|
||
log.info("bulk-dedup done: %s", bc)
|
||
return 0
|
||
|
||
raise SystemExit(f"unknown strategy: {strategy}. available={STRATEGIES}")
|
||
|
||
|
||
def reap_stuck_ingest_runs(max_age_hours: int = 2) -> int:
|
||
"""Oznacza ingest_runs wiszące w status=running jako failed (killed_by_restart).
|
||
|
||
Worker jest single-process: gdy zostanie ubity w trakcie runu (deploy, OOM,
|
||
restart obrazu), wiersz IngestRun zostaje na zawsze w `running` — `finished_at`
|
||
nigdy się nie ustawia. Bez reapowania zombie akumulują się (13 znalezionych
|
||
2026-06-01) i fałszują metryki: `_last_successful_finished_at` ignoruje je
|
||
(OK), ale watchdog liczący stuck-runy i fail-rate widzi je jako szum.
|
||
|
||
Reapujemy TYLKO runy starsze niż `max_age_hours` — healthy run kończy się w
|
||
sekundy/minuty (po SQL-fix phash), a movie-ingest ma własny 6-min cap, więc
|
||
cokolwiek `running` > 2h to na pewno trup po ubitym workerze. Margines chroni
|
||
równolegle odpalony ręczny `--once` (inny proces) przed omyłkowym reapem.
|
||
"""
|
||
from sqlalchemy import update
|
||
|
||
from app.db import session_scope
|
||
from app.models.ingest_run import IngestRun, IngestStatus
|
||
|
||
now = datetime.now(UTC)
|
||
cutoff = now - timedelta(hours=max_age_hours)
|
||
with session_scope() as session:
|
||
result = session.execute(
|
||
update(IngestRun)
|
||
.where(
|
||
IngestRun.status == IngestStatus.running,
|
||
IngestRun.started_at < cutoff,
|
||
)
|
||
.values(status=IngestStatus.failed, finished_at=now, errors={"message": "killed_by_restart"})
|
||
)
|
||
reaped = result.rowcount or 0
|
||
if reaped:
|
||
log.warning("reaped %d stuck ingest_runs (running > %dh) on startup", reaped, max_age_hours)
|
||
return reaped
|
||
|
||
|
||
def run_forever() -> int:
|
||
"""APScheduler scheduled mode — odpala joby cron-like wg config (env-driven).
|
||
|
||
Joby:
|
||
tpdb / stashdb — co N godzin, delta od ostatniego successful run
|
||
performer-driven — co N godzin, top-N performerów z bazy
|
||
performer-continuous — tick co N sekund, 1 performer per tick (tube backfill)
|
||
"""
|
||
from app.scheduler.jobs import build_scheduler # opóźniony import (apscheduler)
|
||
|
||
# Sprzątanie po poprzednim (ubitym) workerze zanim wystartujemy joby.
|
||
reap_stuck_ingest_runs()
|
||
|
||
settings = get_settings()
|
||
cfg = {
|
||
"tpdb_hours": settings.sched_tpdb_hours or None,
|
||
"stashdb_hours": settings.sched_stashdb_hours or None,
|
||
"performer_driven_hours": settings.sched_performer_driven_hours or None,
|
||
"performer_driven_top_n": settings.sched_performer_driven_top_n,
|
||
"performer_continuous_seconds": getattr(
|
||
settings, "sched_performer_continuous_seconds", 60
|
||
) or None,
|
||
"performer_continuous_refresh_days": getattr(
|
||
settings, "sched_performer_continuous_refresh_days", 30
|
||
),
|
||
"movie_ingest_hours": getattr(settings, "sched_movie_ingest_hours", 24) or None,
|
||
# Browse-latest scheduler — browse newest scenes co 6h / 4×dobę (~100 scen/tube/run).
|
||
# Bug: brak tego klucza w worker config przez ~tydzień powodował że browse-mode nigdy
|
||
# nie odpalał (15k freshporno z 2026-05-13 to bulk import jednorazowy). Bug-report
|
||
# 93d3c485 (2026-05-19) "brak freshporno".
|
||
"browse_latest_hours": getattr(settings, "sched_browse_latest_hours", 6) or None,
|
||
"browse_latest_max_pages": getattr(settings, "sched_browse_latest_max_pages", 5),
|
||
# Deep-crawl pełnych katalogów browse-tube'ów (Faza 2a) — round-robin, wznawialny.
|
||
"deep_crawl_hours": getattr(settings, "sched_deep_crawl_hours", 1) or None,
|
||
"deep_crawl_pages_per_run": getattr(settings, "deep_crawl_pages_per_run", 60),
|
||
# Bulk-dedup performers — safety net dla duplikatów które resolver
|
||
# pominął (np. freshporno scen przed fixem release_date). Run 12h.
|
||
"bulk_dedup_hours": getattr(settings, "sched_bulk_dedup_hours", 12) or None,
|
||
# Thumb-asset dedup — hdporn.gg/fullmovies.xxx same-video-różne-tytuły (reports
|
||
# 205b17d9/5a2944cb). bulk_dedup tego nie łapie; dupy odrastają przy re-ingeście.
|
||
"thumb_dedup_hours": getattr(settings, "sched_thumb_dedup_hours", 12) or None,
|
||
# Taxonomy scene_count refresh — denormalizacja liczników (perf fix 0019).
|
||
"taxonomy_counts_hours": getattr(settings, "sched_taxonomy_counts_hours", 3) or None,
|
||
# Ingest freshness watchdog — per-origin alert do Sentry (report 14f3a655).
|
||
"ingest_watchdog_hours": getattr(settings, "sched_ingest_watchdog_hours", 6) or None,
|
||
"ingest_watchdog_max_age_hours": getattr(
|
||
settings, "ingest_watchdog_max_age_hours", 48
|
||
),
|
||
}
|
||
sched = build_scheduler(cfg)
|
||
log.info("worker scheduled mode starting (jobs=%d)", len(sched.get_jobs()))
|
||
try:
|
||
sched.start()
|
||
except (KeyboardInterrupt, SystemExit):
|
||
log.info("worker received shutdown")
|
||
return 0
|
||
|
||
|
||
def _split_csv(s: str | None) -> list[str]:
|
||
if not s:
|
||
return []
|
||
return [x.strip() for x in s.split(",") if x.strip()]
|
||
|
||
|
||
def main(argv: list[str] | None = None) -> int:
|
||
settings = get_settings()
|
||
logging.basicConfig(
|
||
level=settings.log_level,
|
||
format="%(asctime)s %(levelname)s %(name)s %(message)s",
|
||
)
|
||
|
||
# Sentry: pusty DSN → no-op. Worker errory (failed ingest, dead connector,
|
||
# connection loss do TPDB/StashDB) trafiają do Sentry z release tagiem
|
||
# `goon-worker@...` — odróżnia ich od API errors w UI.
|
||
if settings.sentry_dsn:
|
||
import sentry_sdk
|
||
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
|
||
|
||
sentry_sdk.init(
|
||
dsn=settings.sentry_dsn,
|
||
environment=settings.sentry_environment,
|
||
traces_sample_rate=settings.sentry_traces_sample_rate,
|
||
integrations=[SqlalchemyIntegration()],
|
||
release="goon-worker@0.1.0",
|
||
)
|
||
|
||
parser = argparse.ArgumentParser(prog="goon-worker")
|
||
parser.add_argument("--once", action="store_true", help="Uruchom jeden cykl ingest i zakończ")
|
||
parser.add_argument("--source", choices=sorted(CONNECTORS), help="Nazwa źródła (np. tpdb)")
|
||
parser.add_argument(
|
||
"--strategy",
|
||
choices=STRATEGIES,
|
||
help="Strategia ingest (alternatywa do --source)",
|
||
)
|
||
parser.add_argument("--limit", type=int, default=None, help="Limit scen do pobrania")
|
||
parser.add_argument(
|
||
"--no-delta",
|
||
action="store_true",
|
||
help="Pomija filtr od ostatniego successful run (full re-pull)",
|
||
)
|
||
# Strategy-specific options
|
||
parser.add_argument("--top-n", type=int, default=20, help="Liczba top performerów (performer-driven)")
|
||
parser.add_argument(
|
||
"--performers",
|
||
type=str,
|
||
default=None,
|
||
help="Lista nazwisk performerów (CSV) — overrides top-N",
|
||
)
|
||
parser.add_argument(
|
||
"--performer-ids",
|
||
type=str,
|
||
default=None,
|
||
help="Lista naszych kanonicznych UUID-ów (CSV) — overrides top-N i --performers",
|
||
)
|
||
parser.add_argument(
|
||
"--sitetags",
|
||
type=str,
|
||
default=None,
|
||
help="CSV sitetagów do tube search (default: wszystkie z ALL_DIRECT_SCRAPERS)",
|
||
)
|
||
parser.add_argument(
|
||
"--skip-tubes",
|
||
action="store_true",
|
||
help="Pomija tube discovery (canonical-only catch-up: tylko TPDB+StashDB)",
|
||
)
|
||
parser.add_argument(
|
||
"--per-performer-limit",
|
||
type=int,
|
||
default=None,
|
||
help="Limit scen z jednego connectora per performer (debug)",
|
||
)
|
||
parser.add_argument(
|
||
"--dedup-strategy",
|
||
choices=("phash", "performers", "all"),
|
||
default="all",
|
||
help="bulk-dedup zakres: phash | performers | all",
|
||
)
|
||
parser.add_argument(
|
||
"--cross-source-only",
|
||
action="store_true",
|
||
help="bulk-dedup: tylko pary (tpdb-only ↔ stashdb-only) — najwyższa wartość,"
|
||
" pomija pary już zmergowane lub same-source",
|
||
)
|
||
parser.add_argument(
|
||
"--dry-run",
|
||
action="store_true",
|
||
help="Tylko loguj, nic nie zapisuj (dla bulk-dedup preview)",
|
||
)
|
||
args = parser.parse_args(argv)
|
||
|
||
if args.once:
|
||
if args.strategy:
|
||
performer_ids = None
|
||
if args.performer_ids:
|
||
try:
|
||
performer_ids = [uuid.UUID(s) for s in _split_csv(args.performer_ids)]
|
||
except ValueError as e:
|
||
parser.error(f"invalid --performer-ids: {e}")
|
||
return run_once_strategy(
|
||
strategy=args.strategy,
|
||
top_n=args.top_n,
|
||
performers=_split_csv(args.performers) or None,
|
||
performer_ids=performer_ids,
|
||
sitetags=_split_csv(args.sitetags) or None,
|
||
per_performer_limit=args.per_performer_limit,
|
||
skip_tubes=args.skip_tubes,
|
||
dedup_strategy=args.dedup_strategy,
|
||
cross_source_only=args.cross_source_only,
|
||
dry_run=args.dry_run,
|
||
)
|
||
if not args.source:
|
||
parser.error("--once wymaga --source albo --strategy")
|
||
return run_once_source(source=args.source, limit=args.limit, use_delta=not args.no_delta)
|
||
return run_forever()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|