From 0424cb913811045f4e5289447fd7e2e51a46dc77 Mon Sep 17 00:00:00 2001 From: jtrzupek Date: Mon, 15 Jun 2026 10:26:25 +0200 Subject: [PATCH] feat(scheduler): per-origin ingest freshness watchdog -> Sentry The global source monitor can't catch a single stalled tube because every tube scraper shares one Source row (tube-scraper), so an aggregate run still reports success while one origin freezes (freshporno browsing the rotating KVS homepage root, report 14f3a655). New watchdog checks max(created_at) per active browse-scraper origin (tube:); if a tube with history hasn't produced a new scene in > max_age_hours it fires a Sentry message with a stable per-origin fingerprint (age in extras, not the title, so it stays one grouped issue). Runs every 6h, 48h threshold, both env-tunable (GOON_SCHED_INGEST_WATCHDOG_HOURS / GOON_INGEST_WATCHDOG_MAX_AGE_HOURS). Verified: 0 stale at 48h post-fix, detects neporn at a strict 12h threshold. Co-Authored-By: Claude Fable 5 --- app/config.py | 11 ++++ app/scheduler/ingest_watchdog.py | 95 ++++++++++++++++++++++++++++++++ app/scheduler/jobs.py | 32 +++++++++++ app/scheduler/worker.py | 5 ++ 4 files changed, 143 insertions(+) create mode 100644 app/scheduler/ingest_watchdog.py diff --git a/app/config.py b/app/config.py index 603c985..6b6975c 100644 --- a/app/config.py +++ b/app/config.py @@ -109,6 +109,17 @@ class Settings(BaseSettings): sched_thumb_dedup_hours: int = Field( default=12, validation_alias="GOON_SCHED_THUMB_DEDUP_HOURS" ) + # Ingest freshness watchdog — alert do Sentry gdy aktywny browse-tube (origin + # tube:) przestał dawać nowe sceny > max_age_hours. Łapie zamrożenie + # pojedynczego origin, którego globalny monitor (jeden Source "tube-scraper") nie + # widzi (np. freshporno browse z rotującego roota, report 14f3a655). 6h cadence + # (po browse-latest), próg 48h. Każdy 0/None = wyłączony. + sched_ingest_watchdog_hours: int = Field( + default=6, validation_alias="GOON_SCHED_INGEST_WATCHDOG_HOURS" + ) + ingest_watchdog_max_age_hours: int = Field( + default=48, validation_alias="GOON_INGEST_WATCHDOG_MAX_AGE_HOURS" + ) # Taxonomy scene_count refresh — przelicza denormalizowane liczniki scen na # tags/performers/studios (hot-path /tags|/performers|/studios|/favorites czyta # gotową kolumnę zamiast agregować 6.3M scene_tags per-request). 3h cadence — diff --git a/app/scheduler/ingest_watchdog.py b/app/scheduler/ingest_watchdog.py new file mode 100644 index 0000000..23eacdb --- /dev/null +++ b/app/scheduler/ingest_watchdog.py @@ -0,0 +1,95 @@ +"""Per-sitetag freshness watchdog — alert gdy aktywny browse-tube przestał dawać nowe sceny. + +Globalny monitor źródeł (ingest_runs per `Source`) tego NIE łapie, bo wszystkie tube +scrapery dzielą jeden `Source` = "tube-scraper" — pojedynczy origin może zamarznąć +(np. freshporno: scraper browsował z roota `/`, który KVS rotuje → cold-session +dostawała stary zestaw → new=0/skipped=N przez 2 dni), a zagregowany run nadal +raportuje success. Sygnał per-origin: `max(created_at)` na playback_sources danego +`tube:`. Jak zamrożony > próg → alert (report 14f3a655 2026-06-15). + +Patrz [[reference_kvs_root_rotates_use_latest_updates]] dla klasy błędu, którą to łapie. +""" +from __future__ import annotations + +import logging +from datetime import UTC, datetime +from typing import Any + +from sqlalchemy import text + +from app.db import session_scope + +log = logging.getLogger(__name__) + + +def run_ingest_freshness_watchdog( + *, max_age_hours: int = 48, min_history: int = 100 +) -> dict[str, Any]: + """Sprawdź każdy aktywny browse-scraper: czy origin dostał nową scenę < max_age_hours. + + Skanujemy TYLKO sitetagi z `ALL_BROWSE_SCRAPERS` (te, które aktywnie crawlujemy i + od których spodziewamy się świeżości) — nie wszystkie origin-y, żeby nie alarmować + o legacy/jednorazowych źródłach. `min_history` odsiewa świeżo dodane tuby bez + ustalonej kadencji (za mało scen, by wiedzieć czy 48h ciszy to anomalia). + + Stale origin → Sentry `capture_message` ze stabilnym fingerprintem per origin + (wiek w extra, nie w tytule — inaczej każdy run = nowe issue). Zwraca + {checked, stale:[{origin, age_hours, total}]}. + """ + from app.connectors.direct_scrapers import ALL_BROWSE_SCRAPERS + + sitetags = sorted({cls.sitetag for cls in ALL_BROWSE_SCRAPERS}) + now = datetime.now(UTC) + stale: list[dict[str, Any]] = [] + + with session_scope() as s: + for tag in sitetags: + origin = f"tube:{tag}" + row = s.execute( + text( + "SELECT max(created_at) AS newest, count(*) AS total " + "FROM playback_sources WHERE origin = :o" + ), + {"o": origin}, + ).one() + newest, total = row.newest, row.total + if total < min_history or newest is None: + continue + age_h = (now - newest).total_seconds() / 3600.0 + if age_h >= max_age_hours: + stale.append( + {"origin": origin, "age_hours": round(age_h, 1), "total": total} + ) + + if stale: + log.warning( + "ingest-watchdog: %d/%d browse origin(s) bez nowych scen >%dh: %s", + len(stale), len(sitetags), max_age_hours, + ", ".join(f"{x['origin']}({x['age_hours']}h)" for x in stale), + ) + try: + import sentry_sdk + + for x in stale: + with sentry_sdk.push_scope() as scope: + scope.level = "warning" + scope.set_tag("ingest_origin", x["origin"]) + scope.set_extra("age_hours", x["age_hours"]) + scope.set_extra("total_scenes", x["total"]) + scope.set_extra("max_age_hours", max_age_hours) + # Fingerprint per origin → jedno trwałe issue na zamrożony tube, + # nie fragmentowane przez zmienny wiek. + scope.fingerprint = ["ingest-stale-origin", x["origin"]] + sentry_sdk.capture_message( + f"ingest-watchdog: {x['origin']} bez nowych scen " + f"({x['age_hours']:.0f}h, próg {max_age_hours}h)" + ) + except Exception: # pragma: no cover - Sentry off / brak DSN + log.exception("ingest-watchdog: Sentry capture failed") + else: + log.info( + "ingest-watchdog: wszystkie %d browse origin świeże (<%dh)", + len(sitetags), max_age_hours, + ) + + return {"checked": len(sitetags), "stale": stale} diff --git a/app/scheduler/jobs.py b/app/scheduler/jobs.py index baf4e79..1187da4 100644 --- a/app/scheduler/jobs.py +++ b/app/scheduler/jobs.py @@ -276,6 +276,20 @@ def _job_thumb_asset_dedup() -> None: log.exception("[scheduler] thumb-asset dedup failed") +def _job_ingest_watchdog(max_age_hours: int) -> None: + """Per-origin freshness watchdog — alert do Sentry gdy aktywny browse-tube przestał + dawać nowe sceny > max_age_hours. Globalny monitor (jeden Source 'tube-scraper') tego + nie łapie; pojedynczy origin może zamarznąć przy success-runie (report 14f3a655).""" + try: + from app.scheduler.ingest_watchdog import run_ingest_freshness_watchdog + + res = run_ingest_freshness_watchdog(max_age_hours=max_age_hours) + if res["stale"]: + log.warning("[scheduler] ingest-watchdog: %d stale origin(s)", len(res["stale"])) + except Exception: + log.exception("[scheduler] ingest-watchdog failed") + + def _job_performer_continuous(refresh_after_days: int) -> None: """Continuous worker — 1 performer per tick, ORDER BY last_searched_at NULLS FIRST. @@ -380,6 +394,21 @@ def build_scheduler(cfg: dict[str, Any]) -> BlockingScheduler: ) log.info("scheduler: thumb-asset dedup every %dh", cfg["thumb_dedup_hours"]) + if cfg.get("ingest_watchdog_hours"): + wd_max_age = cfg.get("ingest_watchdog_max_age_hours") or 48 + sched.add_job( + lambda: _job_ingest_watchdog(wd_max_age), + IntervalTrigger(hours=cfg["ingest_watchdog_hours"], start_date=INTERVAL_ANCHOR), + id="ingest_watchdog", + replace_existing=True, + max_instances=1, + coalesce=True, + ) + log.info( + "scheduler: ingest-watchdog every %dh (max_age=%dh)", + cfg["ingest_watchdog_hours"], wd_max_age, + ) + if cfg.get("movie_ingest_hours"): sched.add_job( _job_movie_ingest, @@ -498,4 +527,7 @@ DEFAULT_CONFIG: dict[str, Any] = { # round-robin po tube'ach. Backfill katalogów (porndoe ~62k itd.) przez dni. "deep_crawl_hours": 1, "deep_crawl_pages_per_run": 60, + # Ingest freshness watchdog — per-origin alert do Sentry, co 6h, próg 48h. + "ingest_watchdog_hours": 6, + "ingest_watchdog_max_age_hours": 48, } diff --git a/app/scheduler/worker.py b/app/scheduler/worker.py index ca66087..cc1df53 100644 --- a/app/scheduler/worker.py +++ b/app/scheduler/worker.py @@ -212,6 +212,11 @@ def run_forever() -> int: "thumb_dedup_hours": getattr(settings, "sched_thumb_dedup_hours", 12) or None, # Taxonomy scene_count refresh — denormalizacja liczników (perf fix 0019). "taxonomy_counts_hours": getattr(settings, "sched_taxonomy_counts_hours", 3) or None, + # Ingest freshness watchdog — per-origin alert do Sentry (report 14f3a655). + "ingest_watchdog_hours": getattr(settings, "sched_ingest_watchdog_hours", 6) or None, + "ingest_watchdog_max_age_hours": getattr( + settings, "ingest_watchdog_max_age_hours", 48 + ), } sched = build_scheduler(cfg) log.info("worker scheduled mode starting (jobs=%d)", len(sched.get_jobs()))