diff --git a/app/scheduler/jobs.py b/app/scheduler/jobs.py index 8c3fff9..811e9db 100644 --- a/app/scheduler/jobs.py +++ b/app/scheduler/jobs.py @@ -46,31 +46,65 @@ log = logging.getLogger(__name__) INTERVAL_ANCHOR = datetime(2026, 1, 1, 5, 0, tzinfo=timezone.utc) +# Hard-timeout dla jobów robiących zewnętrzne HTTP (tpdb/stashdb/performer-driven). +# Bez tego zawis connectora bez własnego timeoutu blokował job na wiele godzin, a +# `max_instances=1` blokował KOLEJNE fire'y do restartu workera (incident 2026-06-02: +# 6 runów wisiało 8.7h od wspólnego anchora 05:00). Po timeoucie job WRACA (slot się +# zwalnia → następny fire leci); osierocony wątek dożywa do restartu (jak movie-ingest), +# a jego 'running' DB-row sprząta periodic reaper (_job_reap_stuck). +_JOB_TIMEOUT_SEC = 1800 # 30 min — healthy tpdb/stashdb delta to minuty (po SQL-phash), performer-driven top-N ~10-20 min. + + +def _run_with_timeout(fn, *, label: str, timeout_sec: int = _JOB_TIMEOUT_SEC) -> None: + from concurrent.futures import ThreadPoolExecutor + from concurrent.futures import TimeoutError as FutureTimeout + + ex = ThreadPoolExecutor(max_workers=1) + try: + fut = ex.submit(fn) + try: + fut.result(timeout=timeout_sec) + except FutureTimeout: + log.error( + "[scheduler] %s HUNG > %ds — zwalniam slot, orphan thread dożyje do restartu", + label, timeout_sec, + ) + except Exception: + log.exception("[scheduler] %s job failed", label) + finally: + ex.shutdown(wait=False) + + def _job_tpdb() -> None: log.info("[scheduler] tpdb delta starting") - try: - ingest_from_connector(TPDBConnector(), use_delta=True) - except Exception: - log.exception("[scheduler] tpdb job failed") + _run_with_timeout(lambda: ingest_from_connector(TPDBConnector(), use_delta=True), label="tpdb") def _job_stashdb() -> None: log.info("[scheduler] stashdb delta starting") - try: - ingest_from_connector(StashDBConnector(), use_delta=True) - except Exception: - log.exception("[scheduler] stashdb job failed") + _run_with_timeout(lambda: ingest_from_connector(StashDBConnector(), use_delta=True), label="stashdb") def _job_performer_driven(top_n: int) -> None: log.info("[scheduler] performer-driven top-%d starting", top_n) + _run_with_timeout( + lambda: run_performer_driven(top_n=top_n, per_performer_limit=50), + label="performer-driven", + ) + + +def _job_reap_stuck() -> None: + """Periodic reaper — czyści ingest_runs wiszące w 'running' >2h (zombie po zawisach + connectorów / kill mid-run). Startup-only reaper nie łapał ich gdy worker długo żył + (incident 2026-06-02: zombie wisiały 8.7h). Delayed import — unika cyklu z worker.py.""" try: - run_performer_driven( - top_n=top_n, - per_performer_limit=50, - ) + from app.scheduler.worker import reap_stuck_ingest_runs + + reaped = reap_stuck_ingest_runs() + if reaped: + log.warning("[scheduler] periodic reaper: %d stuck ingest_runs", reaped) except Exception: - log.exception("[scheduler] performer-driven job failed") + log.exception("[scheduler] periodic reaper failed") def _job_browse_latest(max_pages: int) -> None: @@ -300,6 +334,20 @@ def build_scheduler(cfg: dict[str, Any]) -> BlockingScheduler: seconds, refresh_days, ) + # Periodic reaper — czyści zombie 'running' runy co godzinę. Domyślnie ZAWSZE on + # (cfg.get(...,1)), bo startup-only reaper nie łapie zawisów gdy worker długo żyje. + reap_hours = cfg.get("reap_stuck_hours", 1) + if reap_hours: + sched.add_job( + _job_reap_stuck, + IntervalTrigger(hours=reap_hours, start_date=INTERVAL_ANCHOR), + id="reap_stuck", + replace_existing=True, + max_instances=1, + coalesce=True, + ) + log.info("scheduler: reap-stuck every %dh", reap_hours) + return sched @@ -327,4 +375,6 @@ DEFAULT_CONFIG: dict[str, Any] = { # Taxonomy scene_count refresh — denormalizacja liczników dla /tags|/performers| # /studios|/favorites. Co 3h: counts do tego stale, dla sortu "popular" bez znaczenia. "taxonomy_counts_hours": 3, + # Periodic reaper zombie 'running' runów — co 1h (próg 'running'>2h w funkcji). + "reap_stuck_hours": 1, }