From e4cb94bc59dc0e997bfa68c5bfa3688c9de008ed Mon Sep 17 00:00:00 2001 From: jtrzupek Date: Thu, 18 Jun 2026 09:18:59 +0200 Subject: [PATCH] feat(scheduler): hetzner bandwidth monitor + search-tube watchdog coverage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two observability additions to the worker scheduler (intertwined in the same files): (1) ingest-watchdog now also covers performer-driven search scrapers (ALL_DIRECT_SCRAPERS) with a separate 7d threshold, not just browse tubes at 48h — several search tubes (perverzija, fpoxxx, porndish, ...) had frozen silently for weeks. (2) New Hetzner Cloud bandwidth monitor (app/scheduler/hetzner_monitor.py): polls outgoing_traffic vs included_traffic and fires a Sentry message at info/warning/error % thresholds with a per-level fingerprint. The config fields existed for ages but the monitor was never implemented. No-op until HETZNER_API_TOKEN + HETZNER_SERVER_ID are set in .env (verified: returns {enabled: False}, job registers as 'hetzner-monitor every 6h', jobs=13). Co-Authored-By: Claude Opus 4.8 (1M context) --- app/config.py | 20 +++++- app/scheduler/hetzner_monitor.py | 103 +++++++++++++++++++++++++++++++ app/scheduler/ingest_watchdog.py | 78 ++++++++++++++++------- app/scheduler/jobs.py | 49 ++++++++++++--- app/scheduler/worker.py | 7 +++ 5 files changed, 222 insertions(+), 35 deletions(-) create mode 100644 app/scheduler/hetzner_monitor.py diff --git a/app/config.py b/app/config.py index 6b6975c..7e4d053 100644 --- a/app/config.py +++ b/app/config.py @@ -109,17 +109,26 @@ class Settings(BaseSettings): sched_thumb_dedup_hours: int = Field( default=12, validation_alias="GOON_SCHED_THUMB_DEDUP_HOURS" ) - # Ingest freshness watchdog — alert do Sentry gdy aktywny browse-tube (origin - # tube:) przestał dawać nowe sceny > max_age_hours. Łapie zamrożenie + # Ingest freshness watchdog — alert do Sentry gdy aktywny tube (origin + # tube:) przestał dawać nowe sceny > próg. Łapie zamrożenie # pojedynczego origin, którego globalny monitor (jeden Source "tube-scraper") nie # widzi (np. freshporno browse z rotującego roota, report 14f3a655). 6h cadence - # (po browse-latest), próg 48h. Każdy 0/None = wyłączony. + # (po browse-latest). Każdy 0/None = wyłączony. sched_ingest_watchdog_hours: int = Field( default=6, validation_alias="GOON_SCHED_INGEST_WATCHDOG_HOURS" ) + # Próg dla browse-scraperów (ALL_BROWSE_SCRAPERS) — crawlowane raz dziennie z + # listingu, więc 48h ciszy = anomalia. ingest_watchdog_max_age_hours: int = Field( default=48, validation_alias="GOON_INGEST_WATCHDOG_MAX_AGE_HOURS" ) + # Próg dla performer-driven search-scraperów (ALL_DIRECT_SCRAPERS) — kadencja jest + # nierówna (continuous queue ~30d refresh per performer, ingest orphan-heavy), więc + # 48h dawałoby false-positivy. 7d (168h): healthy search-tuby obserwowane <6h świeżości + # (continuous tick hituje wszystkie tuby per performer), zamrożone ≥73h → ~28× margines. + ingest_watchdog_search_max_age_hours: int = Field( + default=168, validation_alias="GOON_INGEST_WATCHDOG_SEARCH_MAX_AGE_HOURS" + ) # Taxonomy scene_count refresh — przelicza denormalizowane liczniki scen na # tags/performers/studios (hot-path /tags|/performers|/studios|/favorites czyta # gotową kolumnę zamiast agregować 6.3M scene_tags per-request). 3h cadence — @@ -137,6 +146,11 @@ class Settings(BaseSettings): hetzner_alert_info_pct: int = Field(default=50, validation_alias="HETZNER_ALERT_INFO_PCT") hetzner_alert_warning_pct: int = Field(default=80, validation_alias="HETZNER_ALERT_WARNING_PCT") hetzner_alert_error_pct: int = Field(default=95, validation_alias="HETZNER_ALERT_ERROR_PCT") + # Cadence sprawdzania transferu (godziny). 0/None = monitor wyłączony. Domyślnie 6h + # (transfer rośnie wolno; częściej bez sensu). Działa tylko gdy ustawiony token+id. + sched_hetzner_monitor_hours: int = Field( + default=6, validation_alias="GOON_SCHED_HETZNER_MONITOR_HOURS" + ) # Bright Data ISP proxy (stałe IP od ISP, rozliczane ryczałtem NIE per-GB) — # używany do ingestu HTML (scrape) tubów które blokują VPS IP twardym Cloudflare diff --git a/app/scheduler/hetzner_monitor.py b/app/scheduler/hetzner_monitor.py new file mode 100644 index 0000000..3cfce57 --- /dev/null +++ b/app/scheduler/hetzner_monitor.py @@ -0,0 +1,103 @@ +"""Hetzner Cloud bandwidth monitor — alert do Sentry zanim transfer przekroczy included. + +Hetzner Cloud: ruch WYCHODZĄCY liczy się do `included_traffic` (przychodzący darmowy), +overage = €1/TB. Przy dystrybucji apki (fala instalacji + część playbacku przez +/proxy/) transfer może rosnąć. Spec był w config.py od dawna, ale monitor nigdy +nie powstał — to jego implementacja. + +Pyta Hetzner Cloud API o bieżący `outgoing_traffic` vs `included_traffic` i alarmuje do +Sentry przy progach (info/warning/error %, z config). Stabilny fingerprint per poziom → +jedno eskalujące issue, nie nowe co run. Bez `HETZNER_API_TOKEN`/`HETZNER_SERVER_ID` = +no-op (warning w logu raz). Wołane periodycznie przez scheduler (`_job_hetzner_monitor`). +""" +from __future__ import annotations + +import logging +from typing import Any + +import httpx + +from app.config import get_settings + +log = logging.getLogger(__name__) + +_API = "https://api.hetzner.cloud/v1/servers/{id}" + + +def run_hetzner_bandwidth_check() -> dict[str, Any]: + """Sprawdź % zużycia included_traffic i zaalarmuj Sentry przy przekroczeniu progu. + + Zwraca {enabled, used_pct, outgoing_gb, included_gb, level} (level=None gdy poniżej + najniższego progu lub monitor wyłączony). + """ + s = get_settings() + token = s.hetzner_api_token + server_id = s.hetzner_server_id + if not token or not server_id: + log.info("hetzner-monitor: wyłączony (brak HETZNER_API_TOKEN/HETZNER_SERVER_ID)") + return {"enabled": False} + + try: + r = httpx.get( + _API.format(id=server_id), + headers={"Authorization": f"Bearer {token}"}, + timeout=20.0, + ) + r.raise_for_status() + srv = r.json()["server"] + except Exception as e: + log.warning("hetzner-monitor: API fetch failed: %s", str(e)[:160]) + return {"enabled": True, "error": str(e)[:160]} + + included = srv.get("included_traffic") or 0 + outgoing = srv.get("outgoing_traffic") or 0 + if included <= 0: + log.warning("hetzner-monitor: included_traffic=0 — pomijam") + return {"enabled": True, "error": "included_traffic=0"} + + used_pct = round(100.0 * outgoing / included, 1) + out_gb = round(outgoing / 1e9, 1) + inc_gb = round(included / 1e9, 1) + + # Najwyższy przekroczony próg → poziom Sentry. + level: str | None = None + if used_pct >= s.hetzner_alert_error_pct: + level = "error" + elif used_pct >= s.hetzner_alert_warning_pct: + level = "warning" + elif used_pct >= s.hetzner_alert_info_pct: + level = "info" + + result = { + "enabled": True, + "used_pct": used_pct, + "outgoing_gb": out_gb, + "included_gb": inc_gb, + "level": level, + } + log.info( + "hetzner-monitor: %s%% included (%sGB / %sGB out), level=%s", + used_pct, out_gb, inc_gb, level or "ok", + ) + + if level: + try: + import sentry_sdk + + with sentry_sdk.push_scope() as scope: + scope.level = level + scope.set_tag("hetzner_server_id", str(server_id)) + scope.set_extra("used_pct", used_pct) + scope.set_extra("outgoing_gb", out_gb) + scope.set_extra("included_gb", inc_gb) + # Fingerprint per POZIOM → eskalacja info→warning→error to osobne, trwałe + # issue (nie fragmentowane po zmiennym %). + scope.fingerprint = ["hetzner-traffic", level] + sentry_sdk.capture_message( + f"hetzner-monitor: transfer {used_pct}% included " + f"({out_gb}GB / {inc_gb}GB) — poziom {level}" + ) + except Exception: # pragma: no cover - Sentry off / brak DSN + log.exception("hetzner-monitor: Sentry capture failed") + + return result diff --git a/app/scheduler/ingest_watchdog.py b/app/scheduler/ingest_watchdog.py index 23eacdb..600ea42 100644 --- a/app/scheduler/ingest_watchdog.py +++ b/app/scheduler/ingest_watchdog.py @@ -1,4 +1,4 @@ -"""Per-sitetag freshness watchdog — alert gdy aktywny browse-tube przestał dawać nowe sceny. +"""Per-sitetag freshness watchdog — alert gdy aktywny tube przestał dawać nowe sceny. Globalny monitor źródeł (ingest_runs per `Source`) tego NIE łapie, bo wszystkie tube scrapery dzielą jeden `Source` = "tube-scraper" — pojedynczy origin może zamarznąć @@ -7,6 +7,15 @@ dostawała stary zestaw → new=0/skipped=N przez 2 dni), a zagregowany run nada raportuje success. Sygnał per-origin: `max(created_at)` na playback_sources danego `tube:`. Jak zamrożony > próg → alert (report 14f3a655 2026-06-15). +Pokrywamy DWIE klasy scraperów, każda z własnym progiem: + - **browse** (`ALL_BROWSE_SCRAPERS`) — crawlowane codziennie z listingu, próg 48h. + - **search** (`ALL_DIRECT_SCRAPERS`) — performer-driven, nierówna kadencja (~30d + refresh per performer), próg wyższy (domyślnie 7d). Bez tego pokrycia kilka + search-tubów (sxyland, latestpornvideo, perverzija, fpoxxx, mypornerleak, porndish) + zamarzło cicho 2026-05-07/06-07/06-13 i nic nie krzyknęło do Sentry. +Tag obecny w obu listach (xvideoscom, epornercom — i browse i search) liczymy jako +browse (ostrzejszy próg). + Patrz [[reference_kvs_root_rotates_use_latest_updates]] dla klasy błędu, którą to łapie. """ from __future__ import annotations @@ -23,27 +32,43 @@ log = logging.getLogger(__name__) def run_ingest_freshness_watchdog( - *, max_age_hours: int = 48, min_history: int = 100 + *, + max_age_hours: int = 48, + search_max_age_hours: int = 168, + min_history: int = 100, ) -> dict[str, Any]: - """Sprawdź każdy aktywny browse-scraper: czy origin dostał nową scenę < max_age_hours. + """Sprawdź każdy aktywny scraper: czy origin dostał nową scenę < próg dla swojej klasy. - Skanujemy TYLKO sitetagi z `ALL_BROWSE_SCRAPERS` (te, które aktywnie crawlujemy i - od których spodziewamy się świeżości) — nie wszystkie origin-y, żeby nie alarmować - o legacy/jednorazowych źródłach. `min_history` odsiewa świeżo dodane tuby bez - ustalonej kadencji (za mało scen, by wiedzieć czy 48h ciszy to anomalia). + Skanujemy sitetagi z `ALL_BROWSE_SCRAPERS` (próg `max_age_hours`, domyślnie 48h) oraz + z `ALL_DIRECT_SCRAPERS` (performer-driven search, próg `search_max_age_hours`, domyślnie + 7d — nierówna kadencja, 48h dawałoby false-positivy). Tag w obu listach liczymy jako + browse (ostrzejszy próg). Nie skanujemy wszystkich origin-ów, żeby nie alarmować o + legacy/jednorazowych źródłach. `min_history` odsiewa świeżo dodane tuby bez ustalonej + kadencji (za mało scen, by wiedzieć czy cisza to anomalia). Stale origin → Sentry `capture_message` ze stabilnym fingerprintem per origin - (wiek w extra, nie w tytule — inaczej każdy run = nowe issue). Zwraca - {checked, stale:[{origin, age_hours, total}]}. + (wiek + próg w extra, nie w tytule — inaczej każdy run = nowe issue). Zwraca + {checked, stale:[{origin, age_hours, total, kind, max_age_hours}]}. """ - from app.connectors.direct_scrapers import ALL_BROWSE_SCRAPERS + from app.connectors.direct_scrapers import ( + ALL_BROWSE_SCRAPERS, + ALL_DIRECT_SCRAPERS, + ) + + browse_tags = {cls.sitetag for cls in ALL_BROWSE_SCRAPERS} + # Search-tuby dzielące tag z browse (xvideoscom, epornercom) idą pod browse-próg. + search_tags = {cls.sitetag for cls in ALL_DIRECT_SCRAPERS} - browse_tags + + # (sitetag, próg_h, klasa) — klasa tylko do logów/extra w Sentry. + checks: list[tuple[str, int, str]] = [ + (tag, max_age_hours, "browse") for tag in sorted(browse_tags) + ] + [(tag, search_max_age_hours, "search") for tag in sorted(search_tags)] - sitetags = sorted({cls.sitetag for cls in ALL_BROWSE_SCRAPERS}) now = datetime.now(UTC) stale: list[dict[str, Any]] = [] with session_scope() as s: - for tag in sitetags: + for tag, threshold, kind in checks: origin = f"tube:{tag}" row = s.execute( text( @@ -56,16 +81,22 @@ def run_ingest_freshness_watchdog( if total < min_history or newest is None: continue age_h = (now - newest).total_seconds() / 3600.0 - if age_h >= max_age_hours: + if age_h >= threshold: stale.append( - {"origin": origin, "age_hours": round(age_h, 1), "total": total} + { + "origin": origin, + "age_hours": round(age_h, 1), + "total": total, + "kind": kind, + "max_age_hours": threshold, + } ) if stale: log.warning( - "ingest-watchdog: %d/%d browse origin(s) bez nowych scen >%dh: %s", - len(stale), len(sitetags), max_age_hours, - ", ".join(f"{x['origin']}({x['age_hours']}h)" for x in stale), + "ingest-watchdog: %d/%d origin(s) bez nowych scen (browse>%dh / search>%dh): %s", + len(stale), len(checks), max_age_hours, search_max_age_hours, + ", ".join(f"{x['origin']}({x['age_hours']}h,{x['kind']})" for x in stale), ) try: import sentry_sdk @@ -74,22 +105,23 @@ def run_ingest_freshness_watchdog( with sentry_sdk.push_scope() as scope: scope.level = "warning" scope.set_tag("ingest_origin", x["origin"]) + scope.set_tag("ingest_scraper_kind", x["kind"]) scope.set_extra("age_hours", x["age_hours"]) scope.set_extra("total_scenes", x["total"]) - scope.set_extra("max_age_hours", max_age_hours) + scope.set_extra("max_age_hours", x["max_age_hours"]) # Fingerprint per origin → jedno trwałe issue na zamrożony tube, # nie fragmentowane przez zmienny wiek. scope.fingerprint = ["ingest-stale-origin", x["origin"]] sentry_sdk.capture_message( - f"ingest-watchdog: {x['origin']} bez nowych scen " - f"({x['age_hours']:.0f}h, próg {max_age_hours}h)" + f"ingest-watchdog: {x['origin']} ({x['kind']}) bez nowych scen " + f"({x['age_hours']:.0f}h, próg {x['max_age_hours']}h)" ) except Exception: # pragma: no cover - Sentry off / brak DSN log.exception("ingest-watchdog: Sentry capture failed") else: log.info( - "ingest-watchdog: wszystkie %d browse origin świeże (<%dh)", - len(sitetags), max_age_hours, + "ingest-watchdog: wszystkie %d origin świeże (browse<%dh / search<%dh)", + len(checks), max_age_hours, search_max_age_hours, ) - return {"checked": len(sitetags), "stale": stale} + return {"checked": len(checks), "stale": stale} diff --git a/app/scheduler/jobs.py b/app/scheduler/jobs.py index 1187da4..9a973ef 100644 --- a/app/scheduler/jobs.py +++ b/app/scheduler/jobs.py @@ -276,20 +276,35 @@ def _job_thumb_asset_dedup() -> None: log.exception("[scheduler] thumb-asset dedup failed") -def _job_ingest_watchdog(max_age_hours: int) -> None: - """Per-origin freshness watchdog — alert do Sentry gdy aktywny browse-tube przestał - dawać nowe sceny > max_age_hours. Globalny monitor (jeden Source 'tube-scraper') tego - nie łapie; pojedynczy origin może zamarznąć przy success-runie (report 14f3a655).""" +def _job_ingest_watchdog(max_age_hours: int, search_max_age_hours: int) -> None: + """Per-origin freshness watchdog — alert do Sentry gdy aktywny tube przestał dawać + nowe sceny > próg (browse: max_age_hours, search: search_max_age_hours). Globalny + monitor (jeden Source 'tube-scraper') tego nie łapie; pojedynczy origin może zamarznąć + przy success-runie (report 14f3a655).""" try: from app.scheduler.ingest_watchdog import run_ingest_freshness_watchdog - res = run_ingest_freshness_watchdog(max_age_hours=max_age_hours) + res = run_ingest_freshness_watchdog( + max_age_hours=max_age_hours, + search_max_age_hours=search_max_age_hours, + ) if res["stale"]: log.warning("[scheduler] ingest-watchdog: %d stale origin(s)", len(res["stale"])) except Exception: log.exception("[scheduler] ingest-watchdog failed") +def _job_hetzner_monitor() -> None: + """Hetzner Cloud bandwidth monitor — alert do Sentry przy progach % included_traffic. + No-op gdy brak HETZNER_API_TOKEN/SERVER_ID w env (loguje że wyłączony).""" + try: + from app.scheduler.hetzner_monitor import run_hetzner_bandwidth_check + + run_hetzner_bandwidth_check() + except Exception: + log.exception("[scheduler] hetzner-monitor failed") + + def _job_performer_continuous(refresh_after_days: int) -> None: """Continuous worker — 1 performer per tick, ORDER BY last_searched_at NULLS FIRST. @@ -396,8 +411,9 @@ def build_scheduler(cfg: dict[str, Any]) -> BlockingScheduler: if cfg.get("ingest_watchdog_hours"): wd_max_age = cfg.get("ingest_watchdog_max_age_hours") or 48 + wd_search_max_age = cfg.get("ingest_watchdog_search_max_age_hours") or 168 sched.add_job( - lambda: _job_ingest_watchdog(wd_max_age), + lambda: _job_ingest_watchdog(wd_max_age, wd_search_max_age), IntervalTrigger(hours=cfg["ingest_watchdog_hours"], start_date=INTERVAL_ANCHOR), id="ingest_watchdog", replace_existing=True, @@ -405,10 +421,21 @@ def build_scheduler(cfg: dict[str, Any]) -> BlockingScheduler: coalesce=True, ) log.info( - "scheduler: ingest-watchdog every %dh (max_age=%dh)", - cfg["ingest_watchdog_hours"], wd_max_age, + "scheduler: ingest-watchdog every %dh (browse max_age=%dh, search max_age=%dh)", + cfg["ingest_watchdog_hours"], wd_max_age, wd_search_max_age, ) + if cfg.get("hetzner_monitor_hours"): + sched.add_job( + _job_hetzner_monitor, + IntervalTrigger(hours=cfg["hetzner_monitor_hours"], start_date=INTERVAL_ANCHOR), + id="hetzner_monitor", + replace_existing=True, + max_instances=1, + coalesce=True, + ) + log.info("scheduler: hetzner-monitor every %dh", cfg["hetzner_monitor_hours"]) + if cfg.get("movie_ingest_hours"): sched.add_job( _job_movie_ingest, @@ -527,7 +554,11 @@ DEFAULT_CONFIG: dict[str, Any] = { # round-robin po tube'ach. Backfill katalogów (porndoe ~62k itd.) przez dni. "deep_crawl_hours": 1, "deep_crawl_pages_per_run": 60, - # Ingest freshness watchdog — per-origin alert do Sentry, co 6h, próg 48h. + # Ingest freshness watchdog — per-origin alert do Sentry, co 6h. Browse próg 48h, + # search (performer-driven) próg 7d (nierówna kadencja). "ingest_watchdog_hours": 6, "ingest_watchdog_max_age_hours": 48, + "ingest_watchdog_search_max_age_hours": 168, + # Hetzner Cloud bandwidth monitor — co 6h, alert Sentry przy progach % included. + "hetzner_monitor_hours": 6, } diff --git a/app/scheduler/worker.py b/app/scheduler/worker.py index cc1df53..bd7bf0a 100644 --- a/app/scheduler/worker.py +++ b/app/scheduler/worker.py @@ -213,10 +213,17 @@ def run_forever() -> int: # Taxonomy scene_count refresh — denormalizacja liczników (perf fix 0019). "taxonomy_counts_hours": getattr(settings, "sched_taxonomy_counts_hours", 3) or None, # Ingest freshness watchdog — per-origin alert do Sentry (report 14f3a655). + # Browse: 48h próg; search (performer-driven, nierówna kadencja): 7d próg. "ingest_watchdog_hours": getattr(settings, "sched_ingest_watchdog_hours", 6) or None, "ingest_watchdog_max_age_hours": getattr( settings, "ingest_watchdog_max_age_hours", 48 ), + "ingest_watchdog_search_max_age_hours": getattr( + settings, "ingest_watchdog_search_max_age_hours", 168 + ), + # Hetzner Cloud bandwidth monitor — alert do Sentry przy progach % included. + # No-op gdy brak HETZNER_API_TOKEN/SERVER_ID (sam job może być on). + "hetzner_monitor_hours": getattr(settings, "sched_hetzner_monitor_hours", 6) or None, } sched = build_scheduler(cfg) log.info("worker scheduled mode starting (jobs=%d)", len(sched.get_jobs()))