goon/app/scheduler/worker.py
jtrzupek e4cb94bc59 feat(scheduler): hetzner bandwidth monitor + search-tube watchdog coverage
Two observability additions to the worker scheduler (intertwined in the same files): (1) ingest-watchdog now also covers performer-driven search scrapers (ALL_DIRECT_SCRAPERS) with a separate 7d threshold, not just browse tubes at 48h — several search tubes (perverzija, fpoxxx, porndish, ...) had frozen silently for weeks. (2) New Hetzner Cloud bandwidth monitor (app/scheduler/hetzner_monitor.py): polls outgoing_traffic vs included_traffic and fires a Sentry message at info/warning/error % thresholds with a per-level fingerprint. The config fields existed for ages but the monitor was never implemented. No-op until HETZNER_API_TOKEN + HETZNER_SERVER_ID are set in .env (verified: returns {enabled: False}, job registers as 'hetzner-monitor every 6h', jobs=13).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-18 09:18:59 +02:00

356 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Worker process.
Tryby:
--once --source=tpdb|stashdb [--limit=N] [--no-delta]
Klasyczny ingest z jednego connectora.
--once --strategy=performer-driven [--top-n=N] [--performers="Lola,Mia"]
Strategia performer-driven: dla top-N performerów (lub explicit listy)
pobierz wszystkie sceny z TPDB/StashDB + szukaj w direct tube scrapach.
Patrz `app/scheduler/performer_driven.py`.
Pełen scheduler (APScheduler + cron) — patrz `jobs.py`.
"""
from __future__ import annotations
import argparse
import logging
import signal
import sys
import time
import uuid
from datetime import UTC, datetime, timedelta
from app.config import get_settings
from app.connectors.base import BaseConnector
from app.connectors.stashdb import StashDBConnector
from app.connectors.tpdb import TPDBConnector
from app.ingest import ingest_from_connector
from app.scheduler.performer_driven import run_performer_driven
log = logging.getLogger(__name__)
CONNECTORS: dict[str, type[BaseConnector]] = {
"tpdb": TPDBConnector,
"stashdb": StashDBConnector,
}
STRATEGIES = ("performer-driven", "bulk-dedup", "movies")
# Movie connectors zarejestrowane w app/connectors/__init__.py — wspólne miejsce
# dla scheduler-job i ręcznego --strategy=movies.
def _movie_connectors() -> dict[str, type]:
from app.connectors import get_movie_connectors
return dict(get_movie_connectors())
def _build_connector(name: str) -> BaseConnector:
cls = CONNECTORS.get(name)
if cls is None:
raise SystemExit(f"unknown source: {name}. available={list(CONNECTORS)}")
return cls()
def run_once_source(*, source: str, limit: int | None, use_delta: bool) -> int:
connector = _build_connector(source)
counters = ingest_from_connector(connector, limit=limit, use_delta=use_delta)
log.info("run_once finished counters=%s", counters)
return 0 if counters["errors"] == 0 else 1
def run_once_strategy(
*,
strategy: str,
top_n: int,
performers: list[str] | None,
performer_ids: list[uuid.UUID] | None,
sitetags: list[str] | None,
per_performer_limit: int | None,
skip_tubes: bool = False,
dedup_strategy: str = "all",
cross_source_only: bool = False,
dry_run: bool = False,
) -> int:
if strategy == "performer-driven":
counters = run_performer_driven(
performer_names=performers,
performer_ids=performer_ids,
top_n=top_n,
sitetags=sitetags,
per_performer_limit=per_performer_limit,
skip_tubes=skip_tubes,
)
log.info(
"performer-driven done: processed=%d skipped_no_refs=%d per_source=%s",
counters.performers_processed,
counters.performers_skipped_no_refs,
counters.per_source,
)
total_errors = sum(s.get("errors", 0) for s in counters.per_source.values())
return 0 if total_errors == 0 else 1
if strategy == "movies":
from app.ingest import ingest_movies_from_connector
registry = _movie_connectors()
# `performers` field reusowane: lista `paradisehill,streamporn,...` lub "all".
# Default: wszystkie. CLI: --performers=paradisehill albo --performers=streamporn,mangoporn
which = performers or ["all"]
targets: list[str] = list(registry.keys()) if "all" in which else which
any_error = False
for tgt in targets:
cls = registry.get(tgt)
if cls is None:
log.error("unknown movie source: %s (available: %s)", tgt, list(registry))
any_error = True
continue
log.info("movie ingest %s starting (delta=%s, limit=%s)", tgt, True, per_performer_limit)
try:
counters = ingest_movies_from_connector(
cls(),
use_delta=True,
limit=per_performer_limit,
)
log.info("movie ingest %s done: %s", tgt, counters)
if counters.get("errors", 0) > 0:
any_error = True
except Exception:
log.exception("movie ingest %s failed", tgt)
any_error = True
return 1 if any_error else 0
if strategy == "bulk-dedup":
from app.scheduler.bulk_dedup import run_bulk_dedup
bc = run_bulk_dedup(
strategy=dedup_strategy,
dry_run=dry_run,
cross_source_only=cross_source_only,
)
log.info("bulk-dedup done: %s", bc)
return 0
raise SystemExit(f"unknown strategy: {strategy}. available={STRATEGIES}")
def reap_stuck_ingest_runs(max_age_hours: int = 2) -> int:
"""Oznacza ingest_runs wiszące w status=running jako failed (killed_by_restart).
Worker jest single-process: gdy zostanie ubity w trakcie runu (deploy, OOM,
restart obrazu), wiersz IngestRun zostaje na zawsze w `running` — `finished_at`
nigdy się nie ustawia. Bez reapowania zombie akumulują się (13 znalezionych
2026-06-01) i fałszują metryki: `_last_successful_finished_at` ignoruje je
(OK), ale watchdog liczący stuck-runy i fail-rate widzi je jako szum.
Reapujemy TYLKO runy starsze niż `max_age_hours` — healthy run kończy się w
sekundy/minuty (po SQL-fix phash), a movie-ingest ma własny 6-min cap, więc
cokolwiek `running` > 2h to na pewno trup po ubitym workerze. Margines chroni
równolegle odpalony ręczny `--once` (inny proces) przed omyłkowym reapem.
"""
from sqlalchemy import update
from app.db import session_scope
from app.models.ingest_run import IngestRun, IngestStatus
now = datetime.now(UTC)
cutoff = now - timedelta(hours=max_age_hours)
with session_scope() as session:
result = session.execute(
update(IngestRun)
.where(
IngestRun.status == IngestStatus.running,
IngestRun.started_at < cutoff,
)
.values(status=IngestStatus.failed, finished_at=now, errors={"message": "killed_by_restart"})
)
reaped = result.rowcount or 0
if reaped:
log.warning("reaped %d stuck ingest_runs (running > %dh) on startup", reaped, max_age_hours)
return reaped
def run_forever() -> int:
"""APScheduler scheduled mode — odpala joby cron-like wg config (env-driven).
Joby:
tpdb / stashdb — co N godzin, delta od ostatniego successful run
performer-driven — co N godzin, top-N performerów z bazy
performer-continuous — tick co N sekund, 1 performer per tick (tube backfill)
"""
from app.scheduler.jobs import build_scheduler # opóźniony import (apscheduler)
# Sprzątanie po poprzednim (ubitym) workerze zanim wystartujemy joby.
reap_stuck_ingest_runs()
settings = get_settings()
cfg = {
"tpdb_hours": settings.sched_tpdb_hours or None,
"stashdb_hours": settings.sched_stashdb_hours or None,
"performer_driven_hours": settings.sched_performer_driven_hours or None,
"performer_driven_top_n": settings.sched_performer_driven_top_n,
"performer_continuous_seconds": getattr(
settings, "sched_performer_continuous_seconds", 60
) or None,
"performer_continuous_refresh_days": getattr(
settings, "sched_performer_continuous_refresh_days", 30
),
"movie_ingest_hours": getattr(settings, "sched_movie_ingest_hours", 24) or None,
# Browse-latest scheduler — browse newest scenes co 6h / 4×dobę (~100 scen/tube/run).
# Bug: brak tego klucza w worker config przez ~tydzień powodował że browse-mode nigdy
# nie odpalał (15k freshporno z 2026-05-13 to bulk import jednorazowy). Bug-report
# 93d3c485 (2026-05-19) "brak freshporno".
"browse_latest_hours": getattr(settings, "sched_browse_latest_hours", 6) or None,
"browse_latest_max_pages": getattr(settings, "sched_browse_latest_max_pages", 5),
# Deep-crawl pełnych katalogów browse-tube'ów (Faza 2a) — round-robin, wznawialny.
"deep_crawl_hours": getattr(settings, "sched_deep_crawl_hours", 1) or None,
"deep_crawl_pages_per_run": getattr(settings, "deep_crawl_pages_per_run", 60),
# Bulk-dedup performers — safety net dla duplikatów które resolver
# pominął (np. freshporno scen przed fixem release_date). Run 12h.
"bulk_dedup_hours": getattr(settings, "sched_bulk_dedup_hours", 12) or None,
# Thumb-asset dedup — hdporn.gg/fullmovies.xxx same-video-różne-tytuły (reports
# 205b17d9/5a2944cb). bulk_dedup tego nie łapie; dupy odrastają przy re-ingeście.
"thumb_dedup_hours": getattr(settings, "sched_thumb_dedup_hours", 12) or None,
# Taxonomy scene_count refresh — denormalizacja liczników (perf fix 0019).
"taxonomy_counts_hours": getattr(settings, "sched_taxonomy_counts_hours", 3) or None,
# Ingest freshness watchdog — per-origin alert do Sentry (report 14f3a655).
# Browse: 48h próg; search (performer-driven, nierówna kadencja): 7d próg.
"ingest_watchdog_hours": getattr(settings, "sched_ingest_watchdog_hours", 6) or None,
"ingest_watchdog_max_age_hours": getattr(
settings, "ingest_watchdog_max_age_hours", 48
),
"ingest_watchdog_search_max_age_hours": getattr(
settings, "ingest_watchdog_search_max_age_hours", 168
),
# Hetzner Cloud bandwidth monitor — alert do Sentry przy progach % included.
# No-op gdy brak HETZNER_API_TOKEN/SERVER_ID (sam job może być on).
"hetzner_monitor_hours": getattr(settings, "sched_hetzner_monitor_hours", 6) or None,
}
sched = build_scheduler(cfg)
log.info("worker scheduled mode starting (jobs=%d)", len(sched.get_jobs()))
try:
sched.start()
except (KeyboardInterrupt, SystemExit):
log.info("worker received shutdown")
return 0
def _split_csv(s: str | None) -> list[str]:
if not s:
return []
return [x.strip() for x in s.split(",") if x.strip()]
def main(argv: list[str] | None = None) -> int:
settings = get_settings()
logging.basicConfig(
level=settings.log_level,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
# Sentry: pusty DSN → no-op. Worker errory (failed ingest, dead connector,
# connection loss do TPDB/StashDB) trafiają do Sentry z release tagiem
# `goon-worker@...` — odróżnia ich od API errors w UI.
if settings.sentry_dsn:
import sentry_sdk
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
sentry_sdk.init(
dsn=settings.sentry_dsn,
environment=settings.sentry_environment,
traces_sample_rate=settings.sentry_traces_sample_rate,
integrations=[SqlalchemyIntegration()],
release="goon-worker@0.1.0",
)
parser = argparse.ArgumentParser(prog="goon-worker")
parser.add_argument("--once", action="store_true", help="Uruchom jeden cykl ingest i zakończ")
parser.add_argument("--source", choices=sorted(CONNECTORS), help="Nazwa źródła (np. tpdb)")
parser.add_argument(
"--strategy",
choices=STRATEGIES,
help="Strategia ingest (alternatywa do --source)",
)
parser.add_argument("--limit", type=int, default=None, help="Limit scen do pobrania")
parser.add_argument(
"--no-delta",
action="store_true",
help="Pomija filtr od ostatniego successful run (full re-pull)",
)
# Strategy-specific options
parser.add_argument("--top-n", type=int, default=20, help="Liczba top performerów (performer-driven)")
parser.add_argument(
"--performers",
type=str,
default=None,
help="Lista nazwisk performerów (CSV) — overrides top-N",
)
parser.add_argument(
"--performer-ids",
type=str,
default=None,
help="Lista naszych kanonicznych UUID-ów (CSV) — overrides top-N i --performers",
)
parser.add_argument(
"--sitetags",
type=str,
default=None,
help="CSV sitetagów do tube search (default: wszystkie z ALL_DIRECT_SCRAPERS)",
)
parser.add_argument(
"--skip-tubes",
action="store_true",
help="Pomija tube discovery (canonical-only catch-up: tylko TPDB+StashDB)",
)
parser.add_argument(
"--per-performer-limit",
type=int,
default=None,
help="Limit scen z jednego connectora per performer (debug)",
)
parser.add_argument(
"--dedup-strategy",
choices=("phash", "performers", "all"),
default="all",
help="bulk-dedup zakres: phash | performers | all",
)
parser.add_argument(
"--cross-source-only",
action="store_true",
help="bulk-dedup: tylko pary (tpdb-only ↔ stashdb-only) — najwyższa wartość,"
" pomija pary już zmergowane lub same-source",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Tylko loguj, nic nie zapisuj (dla bulk-dedup preview)",
)
args = parser.parse_args(argv)
if args.once:
if args.strategy:
performer_ids = None
if args.performer_ids:
try:
performer_ids = [uuid.UUID(s) for s in _split_csv(args.performer_ids)]
except ValueError as e:
parser.error(f"invalid --performer-ids: {e}")
return run_once_strategy(
strategy=args.strategy,
top_n=args.top_n,
performers=_split_csv(args.performers) or None,
performer_ids=performer_ids,
sitetags=_split_csv(args.sitetags) or None,
per_performer_limit=args.per_performer_limit,
skip_tubes=args.skip_tubes,
dedup_strategy=args.dedup_strategy,
cross_source_only=args.cross_source_only,
dry_run=args.dry_run,
)
if not args.source:
parser.error("--once wymaga --source albo --strategy")
return run_once_source(source=args.source, limit=args.limit, use_delta=not args.no_delta)
return run_forever()
if __name__ == "__main__":
sys.exit(main())