goon/scripts/check_hetzner_traffic.py
goon-foss ad0284585b Initial commit
Goon — self-hosted aggregator for adult-content scene metadata.

Indexes scenes from TPDB, StashDB, and 30+ public adult tube sites.
Cross-source deduplication via perceptual hash + Levenshtein distance.
FastAPI backend + APScheduler worker + React Native (Expo) mobile client.

FOSS, ad-free, donation-funded. See README for details.
2026-05-20 10:10:22 +02:00

167 lines
5.9 KiB
Python

"""Hetzner Cloud traffic monitor — sprawdza % zużytego bandwidth + Sentry alert.
Hetzner Cloud servers mają included_traffic (np. 20TB dla CX22). Po przekroczeniu
overage €1/TB. Bez monitoringu można obudzić się z 100TB rachunkiem.
Endpoint: `GET https://api.hetzner.cloud/v1/servers/{id}` zwraca:
{
"server": {
"outgoing_traffic": 1234567890, # bytes (current billing period)
"incoming_traffic": 234567890, # bytes
"included_traffic": 21990232555520, # bytes (20TB for CX22)
...
}
}
Threshold alerty (Sentry capture_message + log):
- >50% (info): early warning, jeszcze daleko
- >80% (warning): uważaj, możesz overshoot
- >95% (error): natychmiast disable bandwidth-heavy tubes lub deploy throttle
Uruchomienie:
docker exec goon-worker-1 python -m scripts.check_hetzner_traffic
Cron na VPS (każda godzina):
0 * * * * docker exec goon-worker-1 python -m scripts.check_hetzner_traffic
"""
from __future__ import annotations
import logging
import sys
import httpx
from app.config import get_settings
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
def _fmt_bytes(b: int) -> str:
"""1234567890 → '1.15 GB' (binary units)."""
if b < 1024:
return f"{b} B"
units = ["KB", "MB", "GB", "TB"]
val = float(b)
for u in units:
val /= 1024
if val < 1024:
return f"{val:.2f} {u}"
return f"{val:.2f} PB"
def _sentry_alert(message: str, level: str, **extra: object) -> None:
"""Wysyła Sentry capture_message z level + tags. No-op gdy SENTRY_DSN puste."""
try:
import sentry_sdk
with sentry_sdk.push_scope() as scope:
scope.set_tag("monitor", "hetzner_traffic")
for k, v in extra.items():
scope.set_extra(k, v)
sentry_sdk.capture_message(message, level=level)
log.info("sentry alert sent: level=%s msg=%s", level, message)
except Exception as e:
log.warning("sentry alert failed: %s", e)
def check() -> int:
"""Returns exit code: 0=OK, 1=warning, 2=critical, 3=config missing."""
settings = get_settings()
if not settings.hetzner_api_token or not settings.hetzner_server_id:
log.warning(
"HETZNER_API_TOKEN or HETZNER_SERVER_ID not set — monitor disabled. "
"Generate token: Hetzner Cloud panel → Security → API Tokens (read-only)."
)
return 3
url = f"https://api.hetzner.cloud/v1/servers/{settings.hetzner_server_id}"
headers = {"Authorization": f"Bearer {settings.hetzner_api_token}"}
try:
with httpx.Client(timeout=15.0) as c:
r = c.get(url, headers=headers)
r.raise_for_status()
except httpx.HTTPStatusError as e:
log.error("Hetzner API HTTP %s: %s", e.response.status_code, e.response.text[:200])
_sentry_alert(
f"Hetzner API HTTP {e.response.status_code}",
level="error",
response_body=e.response.text[:500],
)
return 4
except Exception as e:
log.error("Hetzner API fetch failed: %s", e)
_sentry_alert(f"Hetzner API fetch failed: {e}", level="error")
return 4
server = r.json().get("server") or {}
outgoing = int(server.get("outgoing_traffic") or 0)
incoming = int(server.get("incoming_traffic") or 0)
included = int(server.get("included_traffic") or 0)
name = server.get("name") or f"id={settings.hetzner_server_id}"
server_type = (server.get("server_type") or {}).get("name") or "unknown"
if included == 0:
log.warning("Hetzner server %s has included_traffic=0 (dedicated/pricing edge case)", name)
return 5
pct = (outgoing / included) * 100
log.info(
"Hetzner %s (%s): outgoing=%s incoming=%s included=%s usage=%.2f%%",
name, server_type, _fmt_bytes(outgoing), _fmt_bytes(incoming),
_fmt_bytes(included), pct,
)
# Cache result do `/tmp/hetzner_traffic.json` żeby admin endpoint mógł pokazać
# bez ponownego API hit (rate-limit Hetzner Cloud API to 3600 req/h dla token).
try:
import json
from datetime import UTC, datetime
from pathlib import Path
Path("/tmp/hetzner_traffic.json").write_text(json.dumps({
"server_name": name,
"server_type": server_type,
"outgoing_bytes": outgoing,
"outgoing_pretty": _fmt_bytes(outgoing),
"incoming_bytes": incoming,
"incoming_pretty": _fmt_bytes(incoming),
"included_bytes": included,
"included_pretty": _fmt_bytes(included),
"usage_pct": round(pct, 2),
"checked_at": datetime.now(UTC).isoformat(),
}, indent=2))
except Exception as e:
log.warning("hetzner cache write failed: %s", e)
# Threshold dispatch (highest matching wins)
if pct >= settings.hetzner_alert_error_pct:
_sentry_alert(
f"Hetzner traffic CRITICAL: {pct:.1f}% ({_fmt_bytes(outgoing)} / {_fmt_bytes(included)})",
level="error",
outgoing=_fmt_bytes(outgoing),
included=_fmt_bytes(included),
pct=round(pct, 2),
)
return 2
elif pct >= settings.hetzner_alert_warning_pct:
_sentry_alert(
f"Hetzner traffic WARNING: {pct:.1f}% ({_fmt_bytes(outgoing)} / {_fmt_bytes(included)})",
level="warning",
outgoing=_fmt_bytes(outgoing),
included=_fmt_bytes(included),
pct=round(pct, 2),
)
return 1
elif pct >= settings.hetzner_alert_info_pct:
_sentry_alert(
f"Hetzner traffic INFO: {pct:.1f}% ({_fmt_bytes(outgoing)} / {_fmt_bytes(included)})",
level="info",
outgoing=_fmt_bytes(outgoing),
included=_fmt_bytes(included),
pct=round(pct, 2),
)
return 0
if __name__ == "__main__":
sys.exit(check())