feat(scheduler): per-origin ingest freshness watchdog -> Sentry
The global source monitor can't catch a single stalled tube because every tube scraper shares one Source row (tube-scraper), so an aggregate run still reports success while one origin freezes (freshporno browsing the rotating KVS homepage root, report 14f3a655). New watchdog checks max(created_at) per active browse-scraper origin (tube:<sitetag>); if a tube with history hasn't produced a new scene in > max_age_hours it fires a Sentry message with a stable per-origin fingerprint (age in extras, not the title, so it stays one grouped issue). Runs every 6h, 48h threshold, both env-tunable (GOON_SCHED_INGEST_WATCHDOG_HOURS / GOON_INGEST_WATCHDOG_MAX_AGE_HOURS). Verified: 0 stale at 48h post-fix, detects neporn at a strict 12h threshold. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
parent
4b71689a95
commit
0424cb9138
4 changed files with 143 additions and 0 deletions
|
|
@ -109,6 +109,17 @@ class Settings(BaseSettings):
|
|||
sched_thumb_dedup_hours: int = Field(
|
||||
default=12, validation_alias="GOON_SCHED_THUMB_DEDUP_HOURS"
|
||||
)
|
||||
# Ingest freshness watchdog — alert do Sentry gdy aktywny browse-tube (origin
|
||||
# tube:<sitetag>) przestał dawać nowe sceny > max_age_hours. Łapie zamrożenie
|
||||
# pojedynczego origin, którego globalny monitor (jeden Source "tube-scraper") nie
|
||||
# widzi (np. freshporno browse z rotującego roota, report 14f3a655). 6h cadence
|
||||
# (po browse-latest), próg 48h. Każdy 0/None = wyłączony.
|
||||
sched_ingest_watchdog_hours: int = Field(
|
||||
default=6, validation_alias="GOON_SCHED_INGEST_WATCHDOG_HOURS"
|
||||
)
|
||||
ingest_watchdog_max_age_hours: int = Field(
|
||||
default=48, validation_alias="GOON_INGEST_WATCHDOG_MAX_AGE_HOURS"
|
||||
)
|
||||
# Taxonomy scene_count refresh — przelicza denormalizowane liczniki scen na
|
||||
# tags/performers/studios (hot-path /tags|/performers|/studios|/favorites czyta
|
||||
# gotową kolumnę zamiast agregować 6.3M scene_tags per-request). 3h cadence —
|
||||
|
|
|
|||
95
app/scheduler/ingest_watchdog.py
Normal file
95
app/scheduler/ingest_watchdog.py
Normal file
|
|
@ -0,0 +1,95 @@
|
|||
"""Per-sitetag freshness watchdog — alert gdy aktywny browse-tube przestał dawać nowe sceny.
|
||||
|
||||
Globalny monitor źródeł (ingest_runs per `Source`) tego NIE łapie, bo wszystkie tube
|
||||
scrapery dzielą jeden `Source` = "tube-scraper" — pojedynczy origin może zamarznąć
|
||||
(np. freshporno: scraper browsował z roota `/`, który KVS rotuje → cold-session
|
||||
dostawała stary zestaw → new=0/skipped=N przez 2 dni), a zagregowany run nadal
|
||||
raportuje success. Sygnał per-origin: `max(created_at)` na playback_sources danego
|
||||
`tube:<sitetag>`. Jak zamrożony > próg → alert (report 14f3a655 2026-06-15).
|
||||
|
||||
Patrz [[reference_kvs_root_rotates_use_latest_updates]] dla klasy błędu, którą to łapie.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy import text
|
||||
|
||||
from app.db import session_scope
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def run_ingest_freshness_watchdog(
|
||||
*, max_age_hours: int = 48, min_history: int = 100
|
||||
) -> dict[str, Any]:
|
||||
"""Sprawdź każdy aktywny browse-scraper: czy origin dostał nową scenę < max_age_hours.
|
||||
|
||||
Skanujemy TYLKO sitetagi z `ALL_BROWSE_SCRAPERS` (te, które aktywnie crawlujemy i
|
||||
od których spodziewamy się świeżości) — nie wszystkie origin-y, żeby nie alarmować
|
||||
o legacy/jednorazowych źródłach. `min_history` odsiewa świeżo dodane tuby bez
|
||||
ustalonej kadencji (za mało scen, by wiedzieć czy 48h ciszy to anomalia).
|
||||
|
||||
Stale origin → Sentry `capture_message` ze stabilnym fingerprintem per origin
|
||||
(wiek w extra, nie w tytule — inaczej każdy run = nowe issue). Zwraca
|
||||
{checked, stale:[{origin, age_hours, total}]}.
|
||||
"""
|
||||
from app.connectors.direct_scrapers import ALL_BROWSE_SCRAPERS
|
||||
|
||||
sitetags = sorted({cls.sitetag for cls in ALL_BROWSE_SCRAPERS})
|
||||
now = datetime.now(UTC)
|
||||
stale: list[dict[str, Any]] = []
|
||||
|
||||
with session_scope() as s:
|
||||
for tag in sitetags:
|
||||
origin = f"tube:{tag}"
|
||||
row = s.execute(
|
||||
text(
|
||||
"SELECT max(created_at) AS newest, count(*) AS total "
|
||||
"FROM playback_sources WHERE origin = :o"
|
||||
),
|
||||
{"o": origin},
|
||||
).one()
|
||||
newest, total = row.newest, row.total
|
||||
if total < min_history or newest is None:
|
||||
continue
|
||||
age_h = (now - newest).total_seconds() / 3600.0
|
||||
if age_h >= max_age_hours:
|
||||
stale.append(
|
||||
{"origin": origin, "age_hours": round(age_h, 1), "total": total}
|
||||
)
|
||||
|
||||
if stale:
|
||||
log.warning(
|
||||
"ingest-watchdog: %d/%d browse origin(s) bez nowych scen >%dh: %s",
|
||||
len(stale), len(sitetags), max_age_hours,
|
||||
", ".join(f"{x['origin']}({x['age_hours']}h)" for x in stale),
|
||||
)
|
||||
try:
|
||||
import sentry_sdk
|
||||
|
||||
for x in stale:
|
||||
with sentry_sdk.push_scope() as scope:
|
||||
scope.level = "warning"
|
||||
scope.set_tag("ingest_origin", x["origin"])
|
||||
scope.set_extra("age_hours", x["age_hours"])
|
||||
scope.set_extra("total_scenes", x["total"])
|
||||
scope.set_extra("max_age_hours", max_age_hours)
|
||||
# Fingerprint per origin → jedno trwałe issue na zamrożony tube,
|
||||
# nie fragmentowane przez zmienny wiek.
|
||||
scope.fingerprint = ["ingest-stale-origin", x["origin"]]
|
||||
sentry_sdk.capture_message(
|
||||
f"ingest-watchdog: {x['origin']} bez nowych scen "
|
||||
f"({x['age_hours']:.0f}h, próg {max_age_hours}h)"
|
||||
)
|
||||
except Exception: # pragma: no cover - Sentry off / brak DSN
|
||||
log.exception("ingest-watchdog: Sentry capture failed")
|
||||
else:
|
||||
log.info(
|
||||
"ingest-watchdog: wszystkie %d browse origin świeże (<%dh)",
|
||||
len(sitetags), max_age_hours,
|
||||
)
|
||||
|
||||
return {"checked": len(sitetags), "stale": stale}
|
||||
|
|
@ -276,6 +276,20 @@ def _job_thumb_asset_dedup() -> None:
|
|||
log.exception("[scheduler] thumb-asset dedup failed")
|
||||
|
||||
|
||||
def _job_ingest_watchdog(max_age_hours: int) -> None:
|
||||
"""Per-origin freshness watchdog — alert do Sentry gdy aktywny browse-tube przestał
|
||||
dawać nowe sceny > max_age_hours. Globalny monitor (jeden Source 'tube-scraper') tego
|
||||
nie łapie; pojedynczy origin może zamarznąć przy success-runie (report 14f3a655)."""
|
||||
try:
|
||||
from app.scheduler.ingest_watchdog import run_ingest_freshness_watchdog
|
||||
|
||||
res = run_ingest_freshness_watchdog(max_age_hours=max_age_hours)
|
||||
if res["stale"]:
|
||||
log.warning("[scheduler] ingest-watchdog: %d stale origin(s)", len(res["stale"]))
|
||||
except Exception:
|
||||
log.exception("[scheduler] ingest-watchdog failed")
|
||||
|
||||
|
||||
def _job_performer_continuous(refresh_after_days: int) -> None:
|
||||
"""Continuous worker — 1 performer per tick, ORDER BY last_searched_at NULLS FIRST.
|
||||
|
||||
|
|
@ -380,6 +394,21 @@ def build_scheduler(cfg: dict[str, Any]) -> BlockingScheduler:
|
|||
)
|
||||
log.info("scheduler: thumb-asset dedup every %dh", cfg["thumb_dedup_hours"])
|
||||
|
||||
if cfg.get("ingest_watchdog_hours"):
|
||||
wd_max_age = cfg.get("ingest_watchdog_max_age_hours") or 48
|
||||
sched.add_job(
|
||||
lambda: _job_ingest_watchdog(wd_max_age),
|
||||
IntervalTrigger(hours=cfg["ingest_watchdog_hours"], start_date=INTERVAL_ANCHOR),
|
||||
id="ingest_watchdog",
|
||||
replace_existing=True,
|
||||
max_instances=1,
|
||||
coalesce=True,
|
||||
)
|
||||
log.info(
|
||||
"scheduler: ingest-watchdog every %dh (max_age=%dh)",
|
||||
cfg["ingest_watchdog_hours"], wd_max_age,
|
||||
)
|
||||
|
||||
if cfg.get("movie_ingest_hours"):
|
||||
sched.add_job(
|
||||
_job_movie_ingest,
|
||||
|
|
@ -498,4 +527,7 @@ DEFAULT_CONFIG: dict[str, Any] = {
|
|||
# round-robin po tube'ach. Backfill katalogów (porndoe ~62k itd.) przez dni.
|
||||
"deep_crawl_hours": 1,
|
||||
"deep_crawl_pages_per_run": 60,
|
||||
# Ingest freshness watchdog — per-origin alert do Sentry, co 6h, próg 48h.
|
||||
"ingest_watchdog_hours": 6,
|
||||
"ingest_watchdog_max_age_hours": 48,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -212,6 +212,11 @@ def run_forever() -> int:
|
|||
"thumb_dedup_hours": getattr(settings, "sched_thumb_dedup_hours", 12) or None,
|
||||
# Taxonomy scene_count refresh — denormalizacja liczników (perf fix 0019).
|
||||
"taxonomy_counts_hours": getattr(settings, "sched_taxonomy_counts_hours", 3) or None,
|
||||
# Ingest freshness watchdog — per-origin alert do Sentry (report 14f3a655).
|
||||
"ingest_watchdog_hours": getattr(settings, "sched_ingest_watchdog_hours", 6) or None,
|
||||
"ingest_watchdog_max_age_hours": getattr(
|
||||
settings, "ingest_watchdog_max_age_hours", 48
|
||||
),
|
||||
}
|
||||
sched = build_scheduler(cfg)
|
||||
log.info("worker scheduled mode starting (jobs=%d)", len(sched.get_jobs()))
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue