"""Hetzner Cloud traffic monitor — sprawdza % zużytego bandwidth + Sentry alert. Hetzner Cloud servers mają included_traffic (np. 20TB dla CX22). Po przekroczeniu overage €1/TB. Bez monitoringu można obudzić się z 100TB rachunkiem. Endpoint: `GET https://api.hetzner.cloud/v1/servers/{id}` zwraca: { "server": { "outgoing_traffic": 1234567890, # bytes (current billing period) "incoming_traffic": 234567890, # bytes "included_traffic": 21990232555520, # bytes (20TB for CX22) ... } } Threshold alerty (Sentry capture_message + log): - >50% (info): early warning, jeszcze daleko - >80% (warning): uważaj, możesz overshoot - >95% (error): natychmiast disable bandwidth-heavy tubes lub deploy throttle Uruchomienie: docker exec goon-worker-1 python -m scripts.check_hetzner_traffic Cron na VPS (każda godzina): 0 * * * * docker exec goon-worker-1 python -m scripts.check_hetzner_traffic """ from __future__ import annotations import logging import sys import httpx from app.config import get_settings log = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") def _fmt_bytes(b: int) -> str: """1234567890 → '1.15 GB' (binary units).""" if b < 1024: return f"{b} B" units = ["KB", "MB", "GB", "TB"] val = float(b) for u in units: val /= 1024 if val < 1024: return f"{val:.2f} {u}" return f"{val:.2f} PB" def _sentry_alert(message: str, level: str, **extra: object) -> None: """Wysyła Sentry capture_message z level + tags. No-op gdy SENTRY_DSN puste.""" try: import sentry_sdk with sentry_sdk.push_scope() as scope: scope.set_tag("monitor", "hetzner_traffic") for k, v in extra.items(): scope.set_extra(k, v) sentry_sdk.capture_message(message, level=level) log.info("sentry alert sent: level=%s msg=%s", level, message) except Exception as e: log.warning("sentry alert failed: %s", e) def check() -> int: """Returns exit code: 0=OK, 1=warning, 2=critical, 3=config missing.""" settings = get_settings() if not settings.hetzner_api_token or not settings.hetzner_server_id: log.warning( "HETZNER_API_TOKEN or HETZNER_SERVER_ID not set — monitor disabled. " "Generate token: Hetzner Cloud panel → Security → API Tokens (read-only)." ) return 3 url = f"https://api.hetzner.cloud/v1/servers/{settings.hetzner_server_id}" headers = {"Authorization": f"Bearer {settings.hetzner_api_token}"} try: with httpx.Client(timeout=15.0) as c: r = c.get(url, headers=headers) r.raise_for_status() except httpx.HTTPStatusError as e: log.error("Hetzner API HTTP %s: %s", e.response.status_code, e.response.text[:200]) _sentry_alert( f"Hetzner API HTTP {e.response.status_code}", level="error", response_body=e.response.text[:500], ) return 4 except Exception as e: log.error("Hetzner API fetch failed: %s", e) _sentry_alert(f"Hetzner API fetch failed: {e}", level="error") return 4 server = r.json().get("server") or {} outgoing = int(server.get("outgoing_traffic") or 0) incoming = int(server.get("incoming_traffic") or 0) included = int(server.get("included_traffic") or 0) name = server.get("name") or f"id={settings.hetzner_server_id}" server_type = (server.get("server_type") or {}).get("name") or "unknown" if included == 0: log.warning("Hetzner server %s has included_traffic=0 (dedicated/pricing edge case)", name) return 5 pct = (outgoing / included) * 100 log.info( "Hetzner %s (%s): outgoing=%s incoming=%s included=%s usage=%.2f%%", name, server_type, _fmt_bytes(outgoing), _fmt_bytes(incoming), _fmt_bytes(included), pct, ) # Cache result do `/tmp/hetzner_traffic.json` żeby admin endpoint mógł pokazać # bez ponownego API hit (rate-limit Hetzner Cloud API to 3600 req/h dla token). try: import json from datetime import UTC, datetime from pathlib import Path Path("/tmp/hetzner_traffic.json").write_text(json.dumps({ "server_name": name, "server_type": server_type, "outgoing_bytes": outgoing, "outgoing_pretty": _fmt_bytes(outgoing), "incoming_bytes": incoming, "incoming_pretty": _fmt_bytes(incoming), "included_bytes": included, "included_pretty": _fmt_bytes(included), "usage_pct": round(pct, 2), "checked_at": datetime.now(UTC).isoformat(), }, indent=2)) except Exception as e: log.warning("hetzner cache write failed: %s", e) # Threshold dispatch (highest matching wins) if pct >= settings.hetzner_alert_error_pct: _sentry_alert( f"Hetzner traffic CRITICAL: {pct:.1f}% ({_fmt_bytes(outgoing)} / {_fmt_bytes(included)})", level="error", outgoing=_fmt_bytes(outgoing), included=_fmt_bytes(included), pct=round(pct, 2), ) return 2 elif pct >= settings.hetzner_alert_warning_pct: _sentry_alert( f"Hetzner traffic WARNING: {pct:.1f}% ({_fmt_bytes(outgoing)} / {_fmt_bytes(included)})", level="warning", outgoing=_fmt_bytes(outgoing), included=_fmt_bytes(included), pct=round(pct, 2), ) return 1 elif pct >= settings.hetzner_alert_info_pct: _sentry_alert( f"Hetzner traffic INFO: {pct:.1f}% ({_fmt_bytes(outgoing)} / {_fmt_bytes(included)})", level="info", outgoing=_fmt_bytes(outgoing), included=_fmt_bytes(included), pct=round(pct, 2), ) return 0 if __name__ == "__main__": sys.exit(check())