goon/app/scheduler/ingest_watchdog.py
jtrzupek 0424cb9138 feat(scheduler): per-origin ingest freshness watchdog -> Sentry
The global source monitor can't catch a single stalled tube because every tube scraper shares one Source row (tube-scraper), so an aggregate run still reports success while one origin freezes (freshporno browsing the rotating KVS homepage root, report 14f3a655). New watchdog checks max(created_at) per active browse-scraper origin (tube:<sitetag>); if a tube with history hasn't produced a new scene in > max_age_hours it fires a Sentry message with a stable per-origin fingerprint (age in extras, not the title, so it stays one grouped issue). Runs every 6h, 48h threshold, both env-tunable (GOON_SCHED_INGEST_WATCHDOG_HOURS / GOON_INGEST_WATCHDOG_MAX_AGE_HOURS). Verified: 0 stale at 48h post-fix, detects neporn at a strict 12h threshold.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-15 10:26:25 +02:00

95 lines
4 KiB
Python

"""Per-sitetag freshness watchdog — alert gdy aktywny browse-tube przestał dawać nowe sceny.
Globalny monitor źródeł (ingest_runs per `Source`) tego NIE łapie, bo wszystkie tube
scrapery dzielą jeden `Source` = "tube-scraper" — pojedynczy origin może zamarznąć
(np. freshporno: scraper browsował z roota `/`, który KVS rotuje → cold-session
dostawała stary zestaw → new=0/skipped=N przez 2 dni), a zagregowany run nadal
raportuje success. Sygnał per-origin: `max(created_at)` na playback_sources danego
`tube:<sitetag>`. Jak zamrożony > próg → alert (report 14f3a655 2026-06-15).
Patrz [[reference_kvs_root_rotates_use_latest_updates]] dla klasy błędu, którą to łapie.
"""
from __future__ import annotations
import logging
from datetime import UTC, datetime
from typing import Any
from sqlalchemy import text
from app.db import session_scope
log = logging.getLogger(__name__)
def run_ingest_freshness_watchdog(
*, max_age_hours: int = 48, min_history: int = 100
) -> dict[str, Any]:
"""Sprawdź każdy aktywny browse-scraper: czy origin dostał nową scenę < max_age_hours.
Skanujemy TYLKO sitetagi z `ALL_BROWSE_SCRAPERS` (te, które aktywnie crawlujemy i
od których spodziewamy się świeżości) — nie wszystkie origin-y, żeby nie alarmować
o legacy/jednorazowych źródłach. `min_history` odsiewa świeżo dodane tuby bez
ustalonej kadencji (za mało scen, by wiedzieć czy 48h ciszy to anomalia).
Stale origin → Sentry `capture_message` ze stabilnym fingerprintem per origin
(wiek w extra, nie w tytule — inaczej każdy run = nowe issue). Zwraca
{checked, stale:[{origin, age_hours, total}]}.
"""
from app.connectors.direct_scrapers import ALL_BROWSE_SCRAPERS
sitetags = sorted({cls.sitetag for cls in ALL_BROWSE_SCRAPERS})
now = datetime.now(UTC)
stale: list[dict[str, Any]] = []
with session_scope() as s:
for tag in sitetags:
origin = f"tube:{tag}"
row = s.execute(
text(
"SELECT max(created_at) AS newest, count(*) AS total "
"FROM playback_sources WHERE origin = :o"
),
{"o": origin},
).one()
newest, total = row.newest, row.total
if total < min_history or newest is None:
continue
age_h = (now - newest).total_seconds() / 3600.0
if age_h >= max_age_hours:
stale.append(
{"origin": origin, "age_hours": round(age_h, 1), "total": total}
)
if stale:
log.warning(
"ingest-watchdog: %d/%d browse origin(s) bez nowych scen >%dh: %s",
len(stale), len(sitetags), max_age_hours,
", ".join(f"{x['origin']}({x['age_hours']}h)" for x in stale),
)
try:
import sentry_sdk
for x in stale:
with sentry_sdk.push_scope() as scope:
scope.level = "warning"
scope.set_tag("ingest_origin", x["origin"])
scope.set_extra("age_hours", x["age_hours"])
scope.set_extra("total_scenes", x["total"])
scope.set_extra("max_age_hours", max_age_hours)
# Fingerprint per origin → jedno trwałe issue na zamrożony tube,
# nie fragmentowane przez zmienny wiek.
scope.fingerprint = ["ingest-stale-origin", x["origin"]]
sentry_sdk.capture_message(
f"ingest-watchdog: {x['origin']} bez nowych scen "
f"({x['age_hours']:.0f}h, próg {max_age_hours}h)"
)
except Exception: # pragma: no cover - Sentry off / brak DSN
log.exception("ingest-watchdog: Sentry capture failed")
else:
log.info(
"ingest-watchdog: wszystkie %d browse origin świeże (<%dh)",
len(sitetags), max_age_hours,
)
return {"checked": len(sitetags), "stale": stale}