diff --git a/app/connectors/__init__.py b/app/connectors/__init__.py index db3d9df..9818b87 100644 --- a/app/connectors/__init__.py +++ b/app/connectors/__init__.py @@ -40,9 +40,15 @@ def get_movie_connectors() -> list[tuple[str, type]]: ) from app.connectors.paradisehill import ParadisehillConnector + # Kolejność ingestu: paradisehill FIRST (canonical primary, mirrory się do + # niego przyklejają), potem mangoporn (jedyny mirror z realnym new-content — + # 72 nowych 2026-05-28; streamporn/pandamovies zwracają stale 0 new), na końcu + # streamporn + pandamovies. Powód reorderu (2026-05-30): gdy streamporn wiesza + # się intermittentnie, mangoporn musi zdążyć przed nim — patrz per-connector + # timeout w _job_movie_ingest. return [ ("paradisehill", ParadisehillConnector), + ("mangoporn", MangopornConnector), ("streamporn", StreampornConnector), ("pandamovies", PandamoviesConnector), - ("mangoporn", MangopornConnector), ] diff --git a/app/scheduler/jobs.py b/app/scheduler/jobs.py index 873ee13..f5b241e 100644 --- a/app/scheduler/jobs.py +++ b/app/scheduler/jobs.py @@ -98,11 +98,34 @@ def _job_movie_ingest() -> None: Kolejność: paradisehill FIRST (żeby mirrory miały do czego się przykleić), potem mirrory. Pojedynczy failed connector NIE zatrzymuje pozostałych — każdy w osobnym try/except. + + HARD TIMEOUT per-connector (bug-report 2026-05-30 "ingest znów się zawiesił"): + sam try/except chroni przed *wyjątkiem*, ale NIE przed *hangiem* (CPU-bound + ReDoS na patologicznej stronie / thread-stall) — wtedy jeden mirror blokuje + resztę i mangoporn (jedyny z realnym new-content) nigdy nie startuje. + Każdy connector leci w osobnym wątku z `future.result(timeout)`; po + przekroczeniu logujemy i idziemy dalej (osierocony wątek dożywa do restartu + workera — OK, bo loop się odblokowuje). Healthy run ~50s, cap 6 min = zapas. """ + from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeout + + PER_CONNECTOR_TIMEOUT = 360 # sekundy + for name, cls in get_movie_connectors(): log.info("[scheduler] movie ingest %s starting", name) try: - ingest_movies_from_connector(cls(), use_delta=True) + ex = ThreadPoolExecutor(max_workers=1) + fut = ex.submit(ingest_movies_from_connector, cls(), use_delta=True) + try: + fut.result(timeout=PER_CONNECTOR_TIMEOUT) + except FutureTimeout: + log.error( + "[scheduler] movie ingest %s HUNG > %ds — skip, kolejka leci dalej", + name, PER_CONNECTOR_TIMEOUT, + ) + finally: + # shutdown(wait=False): nie blokuj na join osieroconego wątku. + ex.shutdown(wait=False) except Exception: log.exception("[scheduler] movie ingest %s failed", name)