"""APScheduler job definitions dla worker'a (M5). Domyślny harmonogram: - tpdb — co 6h, delta od ostatniego successful run - stashdb — co 6h, delta - performer-driven — co 12h, top-N performerów z bazy (auto-discovers nowe sceny przez ALL_DIRECT_SCRAPERS — 25 tube'ów per-tube HTTP scraping) - performer-continuous — tick co N sekund, 1 performer per tick (ORDER BY last_searched_at) Konfigurację (interwały, włącz/wyłącz) można nadpisać przez env (`GOON_SCHED_*`), patrz `app/scheduler/config.py`. Uwaga: APScheduler in-process (BlockingScheduler) — wystarczy dla self-hosted single worker. Dla multi-worker trzebaby Redis/SQLAlchemy job store + distributed lock. """ from __future__ import annotations import logging from datetime import datetime, timezone from typing import Any from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.interval import IntervalTrigger from app.connectors import get_movie_connectors from app.connectors.stashdb import StashDBConnector from app.connectors.tpdb import TPDBConnector from app.ingest import ingest_from_connector, ingest_movies_from_connector from app.scheduler.browse_latest import run_browse_latest from app.scheduler.performer_driven import run_continuous_one_at_a_time, run_performer_driven log = logging.getLogger(__name__) # Stała "epoka" dla IntervalTrigger.start_date — kotwica siatki fire-times. # Bez start_date APScheduler liczy next_run_time = add_job_time + interval, więc każdy # restart workera (a tych jest dużo — manual deploys, OOM, obraz przebudowany) odsuwa # kolejny fire o pełen interval. Bug-reporty 2026-05-19 (`93d3c485` "brak freshporno") # i 2026-05-23 (`2fbf1c73` "Czemu nie ma nowych filmów?") to dokładnie ten case: # worker restartowany 15× w ciągu 3 dni → movie_ingest (24h) nigdy nie odpalił po # 2026-05-20 05:29. # # Ze stałym start_date w przeszłości next_run_time leży na siatce co N godzin od tej # kotwicy → restart workera nie zmienia kiedy następny fire. 05:00 UTC = 07:00 PL, # niski ruch, bez kolizji z ręcznymi deployami w godzinach pracy. INTERVAL_ANCHOR = datetime(2026, 1, 1, 5, 0, tzinfo=timezone.utc) # Hard-timeout dla jobów robiących zewnętrzne HTTP (tpdb/stashdb/performer-driven). # Bez tego zawis connectora bez własnego timeoutu blokował job na wiele godzin, a # `max_instances=1` blokował KOLEJNE fire'y do restartu workera (incident 2026-06-02: # 6 runów wisiało 8.7h od wspólnego anchora 05:00). Po timeoucie job WRACA (slot się # zwalnia → następny fire leci); osierocony wątek dożywa do restartu (jak movie-ingest), # a jego 'running' DB-row sprząta periodic reaper (_job_reap_stuck). _JOB_TIMEOUT_SEC = 1800 # 30 min — healthy tpdb/stashdb delta to minuty (po SQL-phash), performer-driven top-N ~10-20 min. def _run_with_timeout(fn, *, label: str, timeout_sec: int = _JOB_TIMEOUT_SEC) -> None: from concurrent.futures import ThreadPoolExecutor from concurrent.futures import TimeoutError as FutureTimeout ex = ThreadPoolExecutor(max_workers=1) try: fut = ex.submit(fn) try: fut.result(timeout=timeout_sec) except FutureTimeout: log.error( "[scheduler] %s HUNG > %ds — zwalniam slot, orphan thread dożyje do restartu", label, timeout_sec, ) except Exception: log.exception("[scheduler] %s job failed", label) finally: ex.shutdown(wait=False) def _job_tpdb() -> None: log.info("[scheduler] tpdb delta starting") _run_with_timeout(lambda: ingest_from_connector(TPDBConnector(), use_delta=True), label="tpdb") def _job_stashdb() -> None: log.info("[scheduler] stashdb delta starting") _run_with_timeout(lambda: ingest_from_connector(StashDBConnector(), use_delta=True), label="stashdb") def _job_performer_driven(top_n: int) -> None: log.info("[scheduler] performer-driven top-%d starting", top_n) _run_with_timeout( lambda: run_performer_driven(top_n=top_n, per_performer_limit=50), label="performer-driven", ) def _job_reap_stuck() -> None: """Periodic reaper — czyści ingest_runs wiszące w 'running' >2h (zombie po zawisach connectorów / kill mid-run). Startup-only reaper nie łapał ich gdy worker długo żył (incident 2026-06-02: zombie wisiały 8.7h). Delayed import — unika cyklu z worker.py.""" try: from app.scheduler.worker import reap_stuck_ingest_runs reaped = reap_stuck_ingest_runs() if reaped: log.warning("[scheduler] periodic reaper: %d stuck ingest_runs", reaped) except Exception: log.exception("[scheduler] periodic reaper failed") def _job_deep_crawl(pages_per_run: int) -> None: """Deep-crawl pełnych katalogów browse-tube'ów (Faza 2a — ingest-all). Round-robin po sitetagu, wznawialny kursor (app/scheduler/deep_crawl.py). Hard-timeout 1h.""" log.info("[scheduler] deep-crawl starting (pages_per_run=%d)", pages_per_run) from app.scheduler.deep_crawl import run_deep_crawl _run_with_timeout( lambda: run_deep_crawl(pages_per_run=pages_per_run), label="deep-crawl", timeout_sec=3600, ) def _job_browse_latest(max_pages: int) -> None: """Browse-latest — scrap newest scenes z rich-metadata tubes (shyfap + ...). Komplementarny do performer-driven: forward-fill (new scenes) vs backward (known performers). """ log.info("[scheduler] browse-latest starting (max_pages=%d)", max_pages) try: run_browse_latest(max_pages=max_pages) except Exception: log.exception("[scheduler] browse-latest job failed") def _job_movie_ingest() -> None: """Movies ingest — paradisehill (primary) + dooplay mirrory. Paradisehill jako primary daje canonical movie record (title + year + studio). Mirrory dooplay (mangoporn/streamporn/pandamovies) doklejają playback sources z native-friendly origins (mangoporn:luluvid, :voe, etc.) — `extract_stream_from_hoster` rozwiązuje je do bezpośredniego stream URL → mobile gra natywnie zamiast WebView. Matching mirror→primary movie idzie przez `resolve_movie` (title+year+studio similarity). Każdy connector osobny IngestRun + delta od ostatniego success. Kolejność: paradisehill FIRST (żeby mirrory miały do czego się przykleić), potem mirrory. Pojedynczy failed connector NIE zatrzymuje pozostałych — każdy w osobnym try/except. HARD TIMEOUT per-connector (bug-report 2026-05-30 "ingest znów się zawiesił"): sam try/except chroni przed *wyjątkiem*, ale NIE przed *hangiem* (CPU-bound ReDoS na patologicznej stronie / thread-stall) — wtedy jeden mirror blokuje resztę i mangoporn (jedyny z realnym new-content) nigdy nie startuje. Każdy connector leci w osobnym wątku z `future.result(timeout)`; po przekroczeniu logujemy i idziemy dalej (osierocony wątek dożywa do restartu workera — OK, bo loop się odblokowuje). Healthy run ~50s, cap 6 min = zapas. """ from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeout PER_CONNECTOR_TIMEOUT = 360 # sekundy for name, cls in get_movie_connectors(): log.info("[scheduler] movie ingest %s starting", name) try: ex = ThreadPoolExecutor(max_workers=1) fut = ex.submit(ingest_movies_from_connector, cls(), use_delta=True) try: fut.result(timeout=PER_CONNECTOR_TIMEOUT) except FutureTimeout: log.error( "[scheduler] movie ingest %s HUNG > %ds — skip, kolejka leci dalej", name, PER_CONNECTOR_TIMEOUT, ) finally: # shutdown(wait=False): nie blokuj na join osieroconego wątku. ex.shutdown(wait=False) except Exception: log.exception("[scheduler] movie ingest %s failed", name) def _job_refresh_taxonomy_counts() -> None: """Przelicza denormalizowane scene_count na tags/performers/studios. Hot-path /tags|/performers|/studios|/favorites czyta gotową kolumnę zamiast agregować 6.3M scene_tags per-request (~4.3s → <20ms). Patrz migracja 0019 + app/scheduler/taxonomy_counts.py. """ log.info("[scheduler] taxonomy counts refresh starting") try: from app.scheduler.taxonomy_counts import refresh_taxonomy_counts changed = refresh_taxonomy_counts() log.info("[scheduler] taxonomy counts refresh done: %s", changed) except Exception: log.exception("[scheduler] taxonomy counts refresh failed") def _job_bulk_dedup_performers() -> None: """Pair-wise dedup po performer overlap — safety net dla duplikatów które resolver-time scoring nie złapał. Use case (bug-report 2026-05-20, "brak Brazzers Exxtra po 15-05"): freshporno scrape przed fixem release_date tworzył duplicate scenes zamiast PS-merge do canonical TPDB scen. Resolver scoring miał score >0.92 (auto) z release_date, ale BEZ release_date wagi się przesuwały i wpadało w review/new. Bulk_dedup performers strategy iteruje per performer, robi pair-wise scoring dla wszystkich scen tego performera — łapie duplicate-y które ingest-time resolver pominął (np. gdy 2 sceny tej samej title+performer ale różny release_date). Auto-merge gdy score≥0.92, pending merge_candidate gdy 0.75-0.92. """ log.info("[scheduler] bulk_dedup performers starting") try: from app.scheduler.bulk_dedup import run_bulk_dedup # cross_source_only=True: bez tego flag pairwise generuje N²/2 par na płodnego # performera, materializowane w listę → worker OOM-killed co 12h (6GB RSS na # 7.6GB boxie, 2026-06-06), ubijając przy okazji równoległe tpdb/stashdb/ingesty. # Flag zawęża do cross-source kandydatów (TPDB↔StashDB) z pre-filtrem candidate. # Timeout-wrap jak tpdb/stashdb — job nie ma własnego hard-timeoutu. _run_with_timeout( lambda: run_bulk_dedup(strategy="performers", dry_run=False, cross_source_only=True), label="bulk-dedup-performers", ) log.info("[scheduler] bulk_dedup performers done") except Exception: log.exception("[scheduler] bulk_dedup performers failed") def _job_performer_continuous(refresh_after_days: int) -> None: """Continuous worker — 1 performer per tick, ORDER BY last_searched_at NULLS FIRST. Per tick: full search across ~25 tubeów (per_performer_limit=None). Tick zajmuje ~50-80s. Interval ustawiony na 15s + max_instances=1 + coalesce=True znaczy że real rate to max(15s, tick_duration) — efektywnie ~1 perf/50-80s. """ try: run_continuous_one_at_a_time( refresh_after_days=refresh_after_days, per_performer_limit=None, # full coverage all tubes ) except Exception: log.exception("[scheduler] performer-continuous failed") def build_scheduler(cfg: dict[str, Any]) -> BlockingScheduler: """Buduje scheduler na podstawie cfg dictu. cfg keys: tpdb_hours: int | None (None = wyłączony) stashdb_hours: int | None performer_driven_hours: int | None performer_driven_top_n: int performer_continuous_seconds: int | None performer_continuous_refresh_days: int """ sched = BlockingScheduler(timezone="UTC") if cfg.get("tpdb_hours"): sched.add_job( _job_tpdb, IntervalTrigger(hours=cfg["tpdb_hours"], start_date=INTERVAL_ANCHOR), id="tpdb", replace_existing=True, max_instances=1, coalesce=True, ) log.info("scheduler: tpdb every %dh", cfg["tpdb_hours"]) if cfg.get("stashdb_hours"): sched.add_job( _job_stashdb, IntervalTrigger(hours=cfg["stashdb_hours"], start_date=INTERVAL_ANCHOR), id="stashdb", replace_existing=True, max_instances=1, coalesce=True, ) log.info("scheduler: stashdb every %dh", cfg["stashdb_hours"]) if cfg.get("performer_driven_hours"): top_n = cfg.get("performer_driven_top_n") or 20 sched.add_job( lambda: _job_performer_driven(top_n), IntervalTrigger(hours=cfg["performer_driven_hours"], start_date=INTERVAL_ANCHOR), id="performer_driven", replace_existing=True, max_instances=1, coalesce=True, ) log.info( "scheduler: performer-driven every %dh (top_n=%d)", cfg["performer_driven_hours"], top_n, ) if cfg.get("browse_latest_hours"): max_pages = cfg.get("browse_latest_max_pages") or 5 sched.add_job( lambda: _job_browse_latest(max_pages), IntervalTrigger(hours=cfg["browse_latest_hours"], start_date=INTERVAL_ANCHOR), id="browse_latest", replace_existing=True, max_instances=1, coalesce=True, ) log.info( "scheduler: browse-latest every %dh (max_pages=%d)", cfg["browse_latest_hours"], max_pages, ) if cfg.get("bulk_dedup_hours"): sched.add_job( _job_bulk_dedup_performers, IntervalTrigger(hours=cfg["bulk_dedup_hours"], start_date=INTERVAL_ANCHOR), id="bulk_dedup_performers", replace_existing=True, max_instances=1, coalesce=True, ) log.info("scheduler: bulk-dedup performers every %dh", cfg["bulk_dedup_hours"]) if cfg.get("movie_ingest_hours"): sched.add_job( _job_movie_ingest, IntervalTrigger(hours=cfg["movie_ingest_hours"], start_date=INTERVAL_ANCHOR), id="movie_ingest", replace_existing=True, max_instances=1, coalesce=True, ) log.info("scheduler: movie-ingest every %dh", cfg["movie_ingest_hours"]) if cfg.get("taxonomy_counts_hours"): sched.add_job( _job_refresh_taxonomy_counts, IntervalTrigger(hours=cfg["taxonomy_counts_hours"], start_date=INTERVAL_ANCHOR), id="taxonomy_counts", replace_existing=True, max_instances=1, coalesce=True, ) log.info("scheduler: taxonomy-counts refresh every %dh", cfg["taxonomy_counts_hours"]) if cfg.get("performer_continuous_seconds"): refresh_days = cfg.get("performer_continuous_refresh_days") or 30 seconds = cfg["performer_continuous_seconds"] sched.add_job( lambda: _job_performer_continuous(refresh_days), IntervalTrigger(seconds=seconds), id="performer_continuous", replace_existing=True, max_instances=1, coalesce=True, ) log.info( "scheduler: performer-continuous every %ds (refresh_after=%dd)", seconds, refresh_days, ) if cfg.get("deep_crawl_hours"): pages = cfg.get("deep_crawl_pages_per_run") or 60 sched.add_job( lambda: _job_deep_crawl(pages), IntervalTrigger(hours=cfg["deep_crawl_hours"], start_date=INTERVAL_ANCHOR), id="deep_crawl", replace_existing=True, max_instances=1, coalesce=True, ) log.info("scheduler: deep-crawl every %dh (%d pages/run)", cfg["deep_crawl_hours"], pages) # Periodic reaper — czyści zombie 'running' runy co godzinę. Domyślnie ZAWSZE on # (cfg.get(...,1)), bo startup-only reaper nie łapie zawisów gdy worker długo żyje. reap_hours = cfg.get("reap_stuck_hours", 1) if reap_hours: sched.add_job( _job_reap_stuck, IntervalTrigger(hours=reap_hours, start_date=INTERVAL_ANCHOR), id="reap_stuck", replace_existing=True, max_instances=1, coalesce=True, ) log.info("scheduler: reap-stuck every %dh", reap_hours) return sched DEFAULT_CONFIG: dict[str, Any] = { "tpdb_hours": 6, "stashdb_hours": 6, "performer_driven_hours": 12, "performer_driven_top_n": 20, # Browse-latest — newest scenes z rich-metadata tubes. Co 6h (4×/dobę) × ~100 # scen/tube/run łapie świeże sceny których performera jeszcze nie znamy (newcomerki # → canonical ingest dorobi potem). NB: ten DEFAULT_CONFIG jest poglądowy — realnie # run_forever() bierze interwały z Settings (config.py: sched_browse_latest_hours=6). "browse_latest_hours": 6, "browse_latest_max_pages": 5, # Movies — paradisehill + dooplay mirrory. Raz dziennie wystarczy (sites rosną # wolniej niż tube'y). Najwazniejsze: mirrory dorzucają native-friendly playback # sources do paradisehill movies → mobile gra natywnie zamiast WebView. "movie_ingest_hours": 24, # Continuous worker: tick co 15s, ale max_instances=1 + coalesce sprawia że # efektywny rate = max(15s, tick_duration). Tick z full coverage (25 tubes) ~50-80s, # więc realnie ~1 perf/60s. Przy 14.7k performerów = ~10 dni full sweep + refresh # każdego co 30 dni. "performer_continuous_seconds": 15, "performer_continuous_refresh_days": 30, # Taxonomy scene_count refresh — denormalizacja liczników dla /tags|/performers| # /studios|/favorites. Co 3h: counts do tego stale, dla sortu "popular" bez znaczenia. "taxonomy_counts_hours": 3, # Periodic reaper zombie 'running' runów — co 1h (próg 'running'>2h w funkcji). "reap_stuck_hours": 1, # Deep-crawl pełnych katalogów browse-tube'ów (Faza 2a). Co 1h, 60 stron/run, # round-robin po tube'ach. Backfill katalogów (porndoe ~62k itd.) przez dni. "deep_crawl_hours": 1, "deep_crawl_pages_per_run": 60, }