fix(scheduler): hard-timeout heavy jobs + periodic stuck-run reaper

At the shared 05:00 anchor all heavy jobs fire together; tpdb/stashdb/performer-driven
had no timeout, so a hung connector blocked the whole job and — with max_instances=1 —
blocked every future fire of that job until a worker restart (incident 2026-06-02: 6 runs
hung 8.7h, movie mirrors 47h stale, tube ingest stalled).

- _run_with_timeout wraps tpdb/stashdb/performer-driven in a 30-min hard cap (same
  ThreadPoolExecutor pattern movie-ingest already uses): on timeout the job returns and
  frees the scheduler slot; the orphaned thread lives until restart.
- _job_reap_stuck: hourly reaper of 'running' >2h rows, registered in the scheduler —
  the startup-only reaper missed hangs while the worker stayed up for hours.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
jtrzupek 2026-06-02 16:17:50 +02:00
parent 24fc790691
commit 08f901712c

View file

@ -46,31 +46,65 @@ log = logging.getLogger(__name__)
INTERVAL_ANCHOR = datetime(2026, 1, 1, 5, 0, tzinfo=timezone.utc)
# Hard-timeout dla jobów robiących zewnętrzne HTTP (tpdb/stashdb/performer-driven).
# Bez tego zawis connectora bez własnego timeoutu blokował job na wiele godzin, a
# `max_instances=1` blokował KOLEJNE fire'y do restartu workera (incident 2026-06-02:
# 6 runów wisiało 8.7h od wspólnego anchora 05:00). Po timeoucie job WRACA (slot się
# zwalnia → następny fire leci); osierocony wątek dożywa do restartu (jak movie-ingest),
# a jego 'running' DB-row sprząta periodic reaper (_job_reap_stuck).
_JOB_TIMEOUT_SEC = 1800 # 30 min — healthy tpdb/stashdb delta to minuty (po SQL-phash), performer-driven top-N ~10-20 min.
def _run_with_timeout(fn, *, label: str, timeout_sec: int = _JOB_TIMEOUT_SEC) -> None:
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import TimeoutError as FutureTimeout
ex = ThreadPoolExecutor(max_workers=1)
try:
fut = ex.submit(fn)
try:
fut.result(timeout=timeout_sec)
except FutureTimeout:
log.error(
"[scheduler] %s HUNG > %ds — zwalniam slot, orphan thread dożyje do restartu",
label, timeout_sec,
)
except Exception:
log.exception("[scheduler] %s job failed", label)
finally:
ex.shutdown(wait=False)
def _job_tpdb() -> None:
log.info("[scheduler] tpdb delta starting")
try:
ingest_from_connector(TPDBConnector(), use_delta=True)
except Exception:
log.exception("[scheduler] tpdb job failed")
_run_with_timeout(lambda: ingest_from_connector(TPDBConnector(), use_delta=True), label="tpdb")
def _job_stashdb() -> None:
log.info("[scheduler] stashdb delta starting")
try:
ingest_from_connector(StashDBConnector(), use_delta=True)
except Exception:
log.exception("[scheduler] stashdb job failed")
_run_with_timeout(lambda: ingest_from_connector(StashDBConnector(), use_delta=True), label="stashdb")
def _job_performer_driven(top_n: int) -> None:
log.info("[scheduler] performer-driven top-%d starting", top_n)
_run_with_timeout(
lambda: run_performer_driven(top_n=top_n, per_performer_limit=50),
label="performer-driven",
)
def _job_reap_stuck() -> None:
"""Periodic reaper — czyści ingest_runs wiszące w 'running' >2h (zombie po zawisach
connectorów / kill mid-run). Startup-only reaper nie łapał ich gdy worker długo żył
(incident 2026-06-02: zombie wisiały 8.7h). Delayed import unika cyklu z worker.py."""
try:
run_performer_driven(
top_n=top_n,
per_performer_limit=50,
)
from app.scheduler.worker import reap_stuck_ingest_runs
reaped = reap_stuck_ingest_runs()
if reaped:
log.warning("[scheduler] periodic reaper: %d stuck ingest_runs", reaped)
except Exception:
log.exception("[scheduler] performer-driven job failed")
log.exception("[scheduler] periodic reaper failed")
def _job_browse_latest(max_pages: int) -> None:
@ -300,6 +334,20 @@ def build_scheduler(cfg: dict[str, Any]) -> BlockingScheduler:
seconds, refresh_days,
)
# Periodic reaper — czyści zombie 'running' runy co godzinę. Domyślnie ZAWSZE on
# (cfg.get(...,1)), bo startup-only reaper nie łapie zawisów gdy worker długo żyje.
reap_hours = cfg.get("reap_stuck_hours", 1)
if reap_hours:
sched.add_job(
_job_reap_stuck,
IntervalTrigger(hours=reap_hours, start_date=INTERVAL_ANCHOR),
id="reap_stuck",
replace_existing=True,
max_instances=1,
coalesce=True,
)
log.info("scheduler: reap-stuck every %dh", reap_hours)
return sched
@ -327,4 +375,6 @@ DEFAULT_CONFIG: dict[str, Any] = {
# Taxonomy scene_count refresh — denormalizacja liczników dla /tags|/performers|
# /studios|/favorites. Co 3h: counts do tego stale, dla sortu "popular" bez znaczenia.
"taxonomy_counts_hours": 3,
# Periodic reaper zombie 'running' runów — co 1h (próg 'running'>2h w funkcji).
"reap_stuck_hours": 1,
}