goon/app/scheduler/ingest_watchdog.py
jtrzupek e4cb94bc59 feat(scheduler): hetzner bandwidth monitor + search-tube watchdog coverage
Two observability additions to the worker scheduler (intertwined in the same files): (1) ingest-watchdog now also covers performer-driven search scrapers (ALL_DIRECT_SCRAPERS) with a separate 7d threshold, not just browse tubes at 48h — several search tubes (perverzija, fpoxxx, porndish, ...) had frozen silently for weeks. (2) New Hetzner Cloud bandwidth monitor (app/scheduler/hetzner_monitor.py): polls outgoing_traffic vs included_traffic and fires a Sentry message at info/warning/error % thresholds with a per-level fingerprint. The config fields existed for ages but the monitor was never implemented. No-op until HETZNER_API_TOKEN + HETZNER_SERVER_ID are set in .env (verified: returns {enabled: False}, job registers as 'hetzner-monitor every 6h', jobs=13).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-18 09:18:59 +02:00

127 lines
5.6 KiB
Python

"""Per-sitetag freshness watchdog — alert gdy aktywny tube przestał dawać nowe sceny.
Globalny monitor źródeł (ingest_runs per `Source`) tego NIE łapie, bo wszystkie tube
scrapery dzielą jeden `Source` = "tube-scraper" — pojedynczy origin może zamarznąć
(np. freshporno: scraper browsował z roota `/`, który KVS rotuje → cold-session
dostawała stary zestaw → new=0/skipped=N przez 2 dni), a zagregowany run nadal
raportuje success. Sygnał per-origin: `max(created_at)` na playback_sources danego
`tube:<sitetag>`. Jak zamrożony > próg → alert (report 14f3a655 2026-06-15).
Pokrywamy DWIE klasy scraperów, każda z własnym progiem:
- **browse** (`ALL_BROWSE_SCRAPERS`) — crawlowane codziennie z listingu, próg 48h.
- **search** (`ALL_DIRECT_SCRAPERS`) — performer-driven, nierówna kadencja (~30d
refresh per performer), próg wyższy (domyślnie 7d). Bez tego pokrycia kilka
search-tubów (sxyland, latestpornvideo, perverzija, fpoxxx, mypornerleak, porndish)
zamarzło cicho 2026-05-07/06-07/06-13 i nic nie krzyknęło do Sentry.
Tag obecny w obu listach (xvideoscom, epornercom — i browse i search) liczymy jako
browse (ostrzejszy próg).
Patrz [[reference_kvs_root_rotates_use_latest_updates]] dla klasy błędu, którą to łapie.
"""
from __future__ import annotations
import logging
from datetime import UTC, datetime
from typing import Any
from sqlalchemy import text
from app.db import session_scope
log = logging.getLogger(__name__)
def run_ingest_freshness_watchdog(
*,
max_age_hours: int = 48,
search_max_age_hours: int = 168,
min_history: int = 100,
) -> dict[str, Any]:
"""Sprawdź każdy aktywny scraper: czy origin dostał nową scenę < próg dla swojej klasy.
Skanujemy sitetagi z `ALL_BROWSE_SCRAPERS` (próg `max_age_hours`, domyślnie 48h) oraz
z `ALL_DIRECT_SCRAPERS` (performer-driven search, próg `search_max_age_hours`, domyślnie
7d — nierówna kadencja, 48h dawałoby false-positivy). Tag w obu listach liczymy jako
browse (ostrzejszy próg). Nie skanujemy wszystkich origin-ów, żeby nie alarmować o
legacy/jednorazowych źródłach. `min_history` odsiewa świeżo dodane tuby bez ustalonej
kadencji (za mało scen, by wiedzieć czy cisza to anomalia).
Stale origin → Sentry `capture_message` ze stabilnym fingerprintem per origin
(wiek + próg w extra, nie w tytule — inaczej każdy run = nowe issue). Zwraca
{checked, stale:[{origin, age_hours, total, kind, max_age_hours}]}.
"""
from app.connectors.direct_scrapers import (
ALL_BROWSE_SCRAPERS,
ALL_DIRECT_SCRAPERS,
)
browse_tags = {cls.sitetag for cls in ALL_BROWSE_SCRAPERS}
# Search-tuby dzielące tag z browse (xvideoscom, epornercom) idą pod browse-próg.
search_tags = {cls.sitetag for cls in ALL_DIRECT_SCRAPERS} - browse_tags
# (sitetag, próg_h, klasa) — klasa tylko do logów/extra w Sentry.
checks: list[tuple[str, int, str]] = [
(tag, max_age_hours, "browse") for tag in sorted(browse_tags)
] + [(tag, search_max_age_hours, "search") for tag in sorted(search_tags)]
now = datetime.now(UTC)
stale: list[dict[str, Any]] = []
with session_scope() as s:
for tag, threshold, kind in checks:
origin = f"tube:{tag}"
row = s.execute(
text(
"SELECT max(created_at) AS newest, count(*) AS total "
"FROM playback_sources WHERE origin = :o"
),
{"o": origin},
).one()
newest, total = row.newest, row.total
if total < min_history or newest is None:
continue
age_h = (now - newest).total_seconds() / 3600.0
if age_h >= threshold:
stale.append(
{
"origin": origin,
"age_hours": round(age_h, 1),
"total": total,
"kind": kind,
"max_age_hours": threshold,
}
)
if stale:
log.warning(
"ingest-watchdog: %d/%d origin(s) bez nowych scen (browse>%dh / search>%dh): %s",
len(stale), len(checks), max_age_hours, search_max_age_hours,
", ".join(f"{x['origin']}({x['age_hours']}h,{x['kind']})" for x in stale),
)
try:
import sentry_sdk
for x in stale:
with sentry_sdk.push_scope() as scope:
scope.level = "warning"
scope.set_tag("ingest_origin", x["origin"])
scope.set_tag("ingest_scraper_kind", x["kind"])
scope.set_extra("age_hours", x["age_hours"])
scope.set_extra("total_scenes", x["total"])
scope.set_extra("max_age_hours", x["max_age_hours"])
# Fingerprint per origin → jedno trwałe issue na zamrożony tube,
# nie fragmentowane przez zmienny wiek.
scope.fingerprint = ["ingest-stale-origin", x["origin"]]
sentry_sdk.capture_message(
f"ingest-watchdog: {x['origin']} ({x['kind']}) bez nowych scen "
f"({x['age_hours']:.0f}h, próg {x['max_age_hours']}h)"
)
except Exception: # pragma: no cover - Sentry off / brak DSN
log.exception("ingest-watchdog: Sentry capture failed")
else:
log.info(
"ingest-watchdog: wszystkie %d origin świeże (browse<%dh / search<%dh)",
len(checks), max_age_hours, search_max_age_hours,
)
return {"checked": len(checks), "stale": stale}