feat(scheduler): hetzner bandwidth monitor + search-tube watchdog coverage
Two observability additions to the worker scheduler (intertwined in the same files): (1) ingest-watchdog now also covers performer-driven search scrapers (ALL_DIRECT_SCRAPERS) with a separate 7d threshold, not just browse tubes at 48h — several search tubes (perverzija, fpoxxx, porndish, ...) had frozen silently for weeks. (2) New Hetzner Cloud bandwidth monitor (app/scheduler/hetzner_monitor.py): polls outgoing_traffic vs included_traffic and fires a Sentry message at info/warning/error % thresholds with a per-level fingerprint. The config fields existed for ages but the monitor was never implemented. No-op until HETZNER_API_TOKEN + HETZNER_SERVER_ID are set in .env (verified: returns {enabled: False}, job registers as 'hetzner-monitor every 6h', jobs=13).
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
b1a530611f
commit
e4cb94bc59
5 changed files with 222 additions and 35 deletions
|
|
@ -109,17 +109,26 @@ class Settings(BaseSettings):
|
||||||
sched_thumb_dedup_hours: int = Field(
|
sched_thumb_dedup_hours: int = Field(
|
||||||
default=12, validation_alias="GOON_SCHED_THUMB_DEDUP_HOURS"
|
default=12, validation_alias="GOON_SCHED_THUMB_DEDUP_HOURS"
|
||||||
)
|
)
|
||||||
# Ingest freshness watchdog — alert do Sentry gdy aktywny browse-tube (origin
|
# Ingest freshness watchdog — alert do Sentry gdy aktywny tube (origin
|
||||||
# tube:<sitetag>) przestał dawać nowe sceny > max_age_hours. Łapie zamrożenie
|
# tube:<sitetag>) przestał dawać nowe sceny > próg. Łapie zamrożenie
|
||||||
# pojedynczego origin, którego globalny monitor (jeden Source "tube-scraper") nie
|
# pojedynczego origin, którego globalny monitor (jeden Source "tube-scraper") nie
|
||||||
# widzi (np. freshporno browse z rotującego roota, report 14f3a655). 6h cadence
|
# widzi (np. freshporno browse z rotującego roota, report 14f3a655). 6h cadence
|
||||||
# (po browse-latest), próg 48h. Każdy 0/None = wyłączony.
|
# (po browse-latest). Każdy 0/None = wyłączony.
|
||||||
sched_ingest_watchdog_hours: int = Field(
|
sched_ingest_watchdog_hours: int = Field(
|
||||||
default=6, validation_alias="GOON_SCHED_INGEST_WATCHDOG_HOURS"
|
default=6, validation_alias="GOON_SCHED_INGEST_WATCHDOG_HOURS"
|
||||||
)
|
)
|
||||||
|
# Próg dla browse-scraperów (ALL_BROWSE_SCRAPERS) — crawlowane raz dziennie z
|
||||||
|
# listingu, więc 48h ciszy = anomalia.
|
||||||
ingest_watchdog_max_age_hours: int = Field(
|
ingest_watchdog_max_age_hours: int = Field(
|
||||||
default=48, validation_alias="GOON_INGEST_WATCHDOG_MAX_AGE_HOURS"
|
default=48, validation_alias="GOON_INGEST_WATCHDOG_MAX_AGE_HOURS"
|
||||||
)
|
)
|
||||||
|
# Próg dla performer-driven search-scraperów (ALL_DIRECT_SCRAPERS) — kadencja jest
|
||||||
|
# nierówna (continuous queue ~30d refresh per performer, ingest orphan-heavy), więc
|
||||||
|
# 48h dawałoby false-positivy. 7d (168h): healthy search-tuby obserwowane <6h świeżości
|
||||||
|
# (continuous tick hituje wszystkie tuby per performer), zamrożone ≥73h → ~28× margines.
|
||||||
|
ingest_watchdog_search_max_age_hours: int = Field(
|
||||||
|
default=168, validation_alias="GOON_INGEST_WATCHDOG_SEARCH_MAX_AGE_HOURS"
|
||||||
|
)
|
||||||
# Taxonomy scene_count refresh — przelicza denormalizowane liczniki scen na
|
# Taxonomy scene_count refresh — przelicza denormalizowane liczniki scen na
|
||||||
# tags/performers/studios (hot-path /tags|/performers|/studios|/favorites czyta
|
# tags/performers/studios (hot-path /tags|/performers|/studios|/favorites czyta
|
||||||
# gotową kolumnę zamiast agregować 6.3M scene_tags per-request). 3h cadence —
|
# gotową kolumnę zamiast agregować 6.3M scene_tags per-request). 3h cadence —
|
||||||
|
|
@ -137,6 +146,11 @@ class Settings(BaseSettings):
|
||||||
hetzner_alert_info_pct: int = Field(default=50, validation_alias="HETZNER_ALERT_INFO_PCT")
|
hetzner_alert_info_pct: int = Field(default=50, validation_alias="HETZNER_ALERT_INFO_PCT")
|
||||||
hetzner_alert_warning_pct: int = Field(default=80, validation_alias="HETZNER_ALERT_WARNING_PCT")
|
hetzner_alert_warning_pct: int = Field(default=80, validation_alias="HETZNER_ALERT_WARNING_PCT")
|
||||||
hetzner_alert_error_pct: int = Field(default=95, validation_alias="HETZNER_ALERT_ERROR_PCT")
|
hetzner_alert_error_pct: int = Field(default=95, validation_alias="HETZNER_ALERT_ERROR_PCT")
|
||||||
|
# Cadence sprawdzania transferu (godziny). 0/None = monitor wyłączony. Domyślnie 6h
|
||||||
|
# (transfer rośnie wolno; częściej bez sensu). Działa tylko gdy ustawiony token+id.
|
||||||
|
sched_hetzner_monitor_hours: int = Field(
|
||||||
|
default=6, validation_alias="GOON_SCHED_HETZNER_MONITOR_HOURS"
|
||||||
|
)
|
||||||
|
|
||||||
# Bright Data ISP proxy (stałe IP od ISP, rozliczane ryczałtem NIE per-GB) —
|
# Bright Data ISP proxy (stałe IP od ISP, rozliczane ryczałtem NIE per-GB) —
|
||||||
# używany do ingestu HTML (scrape) tubów które blokują VPS IP twardym Cloudflare
|
# używany do ingestu HTML (scrape) tubów które blokują VPS IP twardym Cloudflare
|
||||||
|
|
|
||||||
103
app/scheduler/hetzner_monitor.py
Normal file
103
app/scheduler/hetzner_monitor.py
Normal file
|
|
@ -0,0 +1,103 @@
|
||||||
|
"""Hetzner Cloud bandwidth monitor — alert do Sentry zanim transfer przekroczy included.
|
||||||
|
|
||||||
|
Hetzner Cloud: ruch WYCHODZĄCY liczy się do `included_traffic` (przychodzący darmowy),
|
||||||
|
overage = €1/TB. Przy dystrybucji apki (fala instalacji + część playbacku przez
|
||||||
|
/proxy/<token>) transfer może rosnąć. Spec był w config.py od dawna, ale monitor nigdy
|
||||||
|
nie powstał — to jego implementacja.
|
||||||
|
|
||||||
|
Pyta Hetzner Cloud API o bieżący `outgoing_traffic` vs `included_traffic` i alarmuje do
|
||||||
|
Sentry przy progach (info/warning/error %, z config). Stabilny fingerprint per poziom →
|
||||||
|
jedno eskalujące issue, nie nowe co run. Bez `HETZNER_API_TOKEN`/`HETZNER_SERVER_ID` =
|
||||||
|
no-op (warning w logu raz). Wołane periodycznie przez scheduler (`_job_hetzner_monitor`).
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
from app.config import get_settings
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_API = "https://api.hetzner.cloud/v1/servers/{id}"
|
||||||
|
|
||||||
|
|
||||||
|
def run_hetzner_bandwidth_check() -> dict[str, Any]:
|
||||||
|
"""Sprawdź % zużycia included_traffic i zaalarmuj Sentry przy przekroczeniu progu.
|
||||||
|
|
||||||
|
Zwraca {enabled, used_pct, outgoing_gb, included_gb, level} (level=None gdy poniżej
|
||||||
|
najniższego progu lub monitor wyłączony).
|
||||||
|
"""
|
||||||
|
s = get_settings()
|
||||||
|
token = s.hetzner_api_token
|
||||||
|
server_id = s.hetzner_server_id
|
||||||
|
if not token or not server_id:
|
||||||
|
log.info("hetzner-monitor: wyłączony (brak HETZNER_API_TOKEN/HETZNER_SERVER_ID)")
|
||||||
|
return {"enabled": False}
|
||||||
|
|
||||||
|
try:
|
||||||
|
r = httpx.get(
|
||||||
|
_API.format(id=server_id),
|
||||||
|
headers={"Authorization": f"Bearer {token}"},
|
||||||
|
timeout=20.0,
|
||||||
|
)
|
||||||
|
r.raise_for_status()
|
||||||
|
srv = r.json()["server"]
|
||||||
|
except Exception as e:
|
||||||
|
log.warning("hetzner-monitor: API fetch failed: %s", str(e)[:160])
|
||||||
|
return {"enabled": True, "error": str(e)[:160]}
|
||||||
|
|
||||||
|
included = srv.get("included_traffic") or 0
|
||||||
|
outgoing = srv.get("outgoing_traffic") or 0
|
||||||
|
if included <= 0:
|
||||||
|
log.warning("hetzner-monitor: included_traffic=0 — pomijam")
|
||||||
|
return {"enabled": True, "error": "included_traffic=0"}
|
||||||
|
|
||||||
|
used_pct = round(100.0 * outgoing / included, 1)
|
||||||
|
out_gb = round(outgoing / 1e9, 1)
|
||||||
|
inc_gb = round(included / 1e9, 1)
|
||||||
|
|
||||||
|
# Najwyższy przekroczony próg → poziom Sentry.
|
||||||
|
level: str | None = None
|
||||||
|
if used_pct >= s.hetzner_alert_error_pct:
|
||||||
|
level = "error"
|
||||||
|
elif used_pct >= s.hetzner_alert_warning_pct:
|
||||||
|
level = "warning"
|
||||||
|
elif used_pct >= s.hetzner_alert_info_pct:
|
||||||
|
level = "info"
|
||||||
|
|
||||||
|
result = {
|
||||||
|
"enabled": True,
|
||||||
|
"used_pct": used_pct,
|
||||||
|
"outgoing_gb": out_gb,
|
||||||
|
"included_gb": inc_gb,
|
||||||
|
"level": level,
|
||||||
|
}
|
||||||
|
log.info(
|
||||||
|
"hetzner-monitor: %s%% included (%sGB / %sGB out), level=%s",
|
||||||
|
used_pct, out_gb, inc_gb, level or "ok",
|
||||||
|
)
|
||||||
|
|
||||||
|
if level:
|
||||||
|
try:
|
||||||
|
import sentry_sdk
|
||||||
|
|
||||||
|
with sentry_sdk.push_scope() as scope:
|
||||||
|
scope.level = level
|
||||||
|
scope.set_tag("hetzner_server_id", str(server_id))
|
||||||
|
scope.set_extra("used_pct", used_pct)
|
||||||
|
scope.set_extra("outgoing_gb", out_gb)
|
||||||
|
scope.set_extra("included_gb", inc_gb)
|
||||||
|
# Fingerprint per POZIOM → eskalacja info→warning→error to osobne, trwałe
|
||||||
|
# issue (nie fragmentowane po zmiennym %).
|
||||||
|
scope.fingerprint = ["hetzner-traffic", level]
|
||||||
|
sentry_sdk.capture_message(
|
||||||
|
f"hetzner-monitor: transfer {used_pct}% included "
|
||||||
|
f"({out_gb}GB / {inc_gb}GB) — poziom {level}"
|
||||||
|
)
|
||||||
|
except Exception: # pragma: no cover - Sentry off / brak DSN
|
||||||
|
log.exception("hetzner-monitor: Sentry capture failed")
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
"""Per-sitetag freshness watchdog — alert gdy aktywny browse-tube przestał dawać nowe sceny.
|
"""Per-sitetag freshness watchdog — alert gdy aktywny tube przestał dawać nowe sceny.
|
||||||
|
|
||||||
Globalny monitor źródeł (ingest_runs per `Source`) tego NIE łapie, bo wszystkie tube
|
Globalny monitor źródeł (ingest_runs per `Source`) tego NIE łapie, bo wszystkie tube
|
||||||
scrapery dzielą jeden `Source` = "tube-scraper" — pojedynczy origin może zamarznąć
|
scrapery dzielą jeden `Source` = "tube-scraper" — pojedynczy origin może zamarznąć
|
||||||
|
|
@ -7,6 +7,15 @@ dostawała stary zestaw → new=0/skipped=N przez 2 dni), a zagregowany run nada
|
||||||
raportuje success. Sygnał per-origin: `max(created_at)` na playback_sources danego
|
raportuje success. Sygnał per-origin: `max(created_at)` na playback_sources danego
|
||||||
`tube:<sitetag>`. Jak zamrożony > próg → alert (report 14f3a655 2026-06-15).
|
`tube:<sitetag>`. Jak zamrożony > próg → alert (report 14f3a655 2026-06-15).
|
||||||
|
|
||||||
|
Pokrywamy DWIE klasy scraperów, każda z własnym progiem:
|
||||||
|
- **browse** (`ALL_BROWSE_SCRAPERS`) — crawlowane codziennie z listingu, próg 48h.
|
||||||
|
- **search** (`ALL_DIRECT_SCRAPERS`) — performer-driven, nierówna kadencja (~30d
|
||||||
|
refresh per performer), próg wyższy (domyślnie 7d). Bez tego pokrycia kilka
|
||||||
|
search-tubów (sxyland, latestpornvideo, perverzija, fpoxxx, mypornerleak, porndish)
|
||||||
|
zamarzło cicho 2026-05-07/06-07/06-13 i nic nie krzyknęło do Sentry.
|
||||||
|
Tag obecny w obu listach (xvideoscom, epornercom — i browse i search) liczymy jako
|
||||||
|
browse (ostrzejszy próg).
|
||||||
|
|
||||||
Patrz [[reference_kvs_root_rotates_use_latest_updates]] dla klasy błędu, którą to łapie.
|
Patrz [[reference_kvs_root_rotates_use_latest_updates]] dla klasy błędu, którą to łapie.
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
@ -23,27 +32,43 @@ log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def run_ingest_freshness_watchdog(
|
def run_ingest_freshness_watchdog(
|
||||||
*, max_age_hours: int = 48, min_history: int = 100
|
*,
|
||||||
|
max_age_hours: int = 48,
|
||||||
|
search_max_age_hours: int = 168,
|
||||||
|
min_history: int = 100,
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
"""Sprawdź każdy aktywny browse-scraper: czy origin dostał nową scenę < max_age_hours.
|
"""Sprawdź każdy aktywny scraper: czy origin dostał nową scenę < próg dla swojej klasy.
|
||||||
|
|
||||||
Skanujemy TYLKO sitetagi z `ALL_BROWSE_SCRAPERS` (te, które aktywnie crawlujemy i
|
Skanujemy sitetagi z `ALL_BROWSE_SCRAPERS` (próg `max_age_hours`, domyślnie 48h) oraz
|
||||||
od których spodziewamy się świeżości) — nie wszystkie origin-y, żeby nie alarmować
|
z `ALL_DIRECT_SCRAPERS` (performer-driven search, próg `search_max_age_hours`, domyślnie
|
||||||
o legacy/jednorazowych źródłach. `min_history` odsiewa świeżo dodane tuby bez
|
7d — nierówna kadencja, 48h dawałoby false-positivy). Tag w obu listach liczymy jako
|
||||||
ustalonej kadencji (za mało scen, by wiedzieć czy 48h ciszy to anomalia).
|
browse (ostrzejszy próg). Nie skanujemy wszystkich origin-ów, żeby nie alarmować o
|
||||||
|
legacy/jednorazowych źródłach. `min_history` odsiewa świeżo dodane tuby bez ustalonej
|
||||||
|
kadencji (za mało scen, by wiedzieć czy cisza to anomalia).
|
||||||
|
|
||||||
Stale origin → Sentry `capture_message` ze stabilnym fingerprintem per origin
|
Stale origin → Sentry `capture_message` ze stabilnym fingerprintem per origin
|
||||||
(wiek w extra, nie w tytule — inaczej każdy run = nowe issue). Zwraca
|
(wiek + próg w extra, nie w tytule — inaczej każdy run = nowe issue). Zwraca
|
||||||
{checked, stale:[{origin, age_hours, total}]}.
|
{checked, stale:[{origin, age_hours, total, kind, max_age_hours}]}.
|
||||||
"""
|
"""
|
||||||
from app.connectors.direct_scrapers import ALL_BROWSE_SCRAPERS
|
from app.connectors.direct_scrapers import (
|
||||||
|
ALL_BROWSE_SCRAPERS,
|
||||||
|
ALL_DIRECT_SCRAPERS,
|
||||||
|
)
|
||||||
|
|
||||||
|
browse_tags = {cls.sitetag for cls in ALL_BROWSE_SCRAPERS}
|
||||||
|
# Search-tuby dzielące tag z browse (xvideoscom, epornercom) idą pod browse-próg.
|
||||||
|
search_tags = {cls.sitetag for cls in ALL_DIRECT_SCRAPERS} - browse_tags
|
||||||
|
|
||||||
|
# (sitetag, próg_h, klasa) — klasa tylko do logów/extra w Sentry.
|
||||||
|
checks: list[tuple[str, int, str]] = [
|
||||||
|
(tag, max_age_hours, "browse") for tag in sorted(browse_tags)
|
||||||
|
] + [(tag, search_max_age_hours, "search") for tag in sorted(search_tags)]
|
||||||
|
|
||||||
sitetags = sorted({cls.sitetag for cls in ALL_BROWSE_SCRAPERS})
|
|
||||||
now = datetime.now(UTC)
|
now = datetime.now(UTC)
|
||||||
stale: list[dict[str, Any]] = []
|
stale: list[dict[str, Any]] = []
|
||||||
|
|
||||||
with session_scope() as s:
|
with session_scope() as s:
|
||||||
for tag in sitetags:
|
for tag, threshold, kind in checks:
|
||||||
origin = f"tube:{tag}"
|
origin = f"tube:{tag}"
|
||||||
row = s.execute(
|
row = s.execute(
|
||||||
text(
|
text(
|
||||||
|
|
@ -56,16 +81,22 @@ def run_ingest_freshness_watchdog(
|
||||||
if total < min_history or newest is None:
|
if total < min_history or newest is None:
|
||||||
continue
|
continue
|
||||||
age_h = (now - newest).total_seconds() / 3600.0
|
age_h = (now - newest).total_seconds() / 3600.0
|
||||||
if age_h >= max_age_hours:
|
if age_h >= threshold:
|
||||||
stale.append(
|
stale.append(
|
||||||
{"origin": origin, "age_hours": round(age_h, 1), "total": total}
|
{
|
||||||
|
"origin": origin,
|
||||||
|
"age_hours": round(age_h, 1),
|
||||||
|
"total": total,
|
||||||
|
"kind": kind,
|
||||||
|
"max_age_hours": threshold,
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
if stale:
|
if stale:
|
||||||
log.warning(
|
log.warning(
|
||||||
"ingest-watchdog: %d/%d browse origin(s) bez nowych scen >%dh: %s",
|
"ingest-watchdog: %d/%d origin(s) bez nowych scen (browse>%dh / search>%dh): %s",
|
||||||
len(stale), len(sitetags), max_age_hours,
|
len(stale), len(checks), max_age_hours, search_max_age_hours,
|
||||||
", ".join(f"{x['origin']}({x['age_hours']}h)" for x in stale),
|
", ".join(f"{x['origin']}({x['age_hours']}h,{x['kind']})" for x in stale),
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
import sentry_sdk
|
import sentry_sdk
|
||||||
|
|
@ -74,22 +105,23 @@ def run_ingest_freshness_watchdog(
|
||||||
with sentry_sdk.push_scope() as scope:
|
with sentry_sdk.push_scope() as scope:
|
||||||
scope.level = "warning"
|
scope.level = "warning"
|
||||||
scope.set_tag("ingest_origin", x["origin"])
|
scope.set_tag("ingest_origin", x["origin"])
|
||||||
|
scope.set_tag("ingest_scraper_kind", x["kind"])
|
||||||
scope.set_extra("age_hours", x["age_hours"])
|
scope.set_extra("age_hours", x["age_hours"])
|
||||||
scope.set_extra("total_scenes", x["total"])
|
scope.set_extra("total_scenes", x["total"])
|
||||||
scope.set_extra("max_age_hours", max_age_hours)
|
scope.set_extra("max_age_hours", x["max_age_hours"])
|
||||||
# Fingerprint per origin → jedno trwałe issue na zamrożony tube,
|
# Fingerprint per origin → jedno trwałe issue na zamrożony tube,
|
||||||
# nie fragmentowane przez zmienny wiek.
|
# nie fragmentowane przez zmienny wiek.
|
||||||
scope.fingerprint = ["ingest-stale-origin", x["origin"]]
|
scope.fingerprint = ["ingest-stale-origin", x["origin"]]
|
||||||
sentry_sdk.capture_message(
|
sentry_sdk.capture_message(
|
||||||
f"ingest-watchdog: {x['origin']} bez nowych scen "
|
f"ingest-watchdog: {x['origin']} ({x['kind']}) bez nowych scen "
|
||||||
f"({x['age_hours']:.0f}h, próg {max_age_hours}h)"
|
f"({x['age_hours']:.0f}h, próg {x['max_age_hours']}h)"
|
||||||
)
|
)
|
||||||
except Exception: # pragma: no cover - Sentry off / brak DSN
|
except Exception: # pragma: no cover - Sentry off / brak DSN
|
||||||
log.exception("ingest-watchdog: Sentry capture failed")
|
log.exception("ingest-watchdog: Sentry capture failed")
|
||||||
else:
|
else:
|
||||||
log.info(
|
log.info(
|
||||||
"ingest-watchdog: wszystkie %d browse origin świeże (<%dh)",
|
"ingest-watchdog: wszystkie %d origin świeże (browse<%dh / search<%dh)",
|
||||||
len(sitetags), max_age_hours,
|
len(checks), max_age_hours, search_max_age_hours,
|
||||||
)
|
)
|
||||||
|
|
||||||
return {"checked": len(sitetags), "stale": stale}
|
return {"checked": len(checks), "stale": stale}
|
||||||
|
|
|
||||||
|
|
@ -276,20 +276,35 @@ def _job_thumb_asset_dedup() -> None:
|
||||||
log.exception("[scheduler] thumb-asset dedup failed")
|
log.exception("[scheduler] thumb-asset dedup failed")
|
||||||
|
|
||||||
|
|
||||||
def _job_ingest_watchdog(max_age_hours: int) -> None:
|
def _job_ingest_watchdog(max_age_hours: int, search_max_age_hours: int) -> None:
|
||||||
"""Per-origin freshness watchdog — alert do Sentry gdy aktywny browse-tube przestał
|
"""Per-origin freshness watchdog — alert do Sentry gdy aktywny tube przestał dawać
|
||||||
dawać nowe sceny > max_age_hours. Globalny monitor (jeden Source 'tube-scraper') tego
|
nowe sceny > próg (browse: max_age_hours, search: search_max_age_hours). Globalny
|
||||||
nie łapie; pojedynczy origin może zamarznąć przy success-runie (report 14f3a655)."""
|
monitor (jeden Source 'tube-scraper') tego nie łapie; pojedynczy origin może zamarznąć
|
||||||
|
przy success-runie (report 14f3a655)."""
|
||||||
try:
|
try:
|
||||||
from app.scheduler.ingest_watchdog import run_ingest_freshness_watchdog
|
from app.scheduler.ingest_watchdog import run_ingest_freshness_watchdog
|
||||||
|
|
||||||
res = run_ingest_freshness_watchdog(max_age_hours=max_age_hours)
|
res = run_ingest_freshness_watchdog(
|
||||||
|
max_age_hours=max_age_hours,
|
||||||
|
search_max_age_hours=search_max_age_hours,
|
||||||
|
)
|
||||||
if res["stale"]:
|
if res["stale"]:
|
||||||
log.warning("[scheduler] ingest-watchdog: %d stale origin(s)", len(res["stale"]))
|
log.warning("[scheduler] ingest-watchdog: %d stale origin(s)", len(res["stale"]))
|
||||||
except Exception:
|
except Exception:
|
||||||
log.exception("[scheduler] ingest-watchdog failed")
|
log.exception("[scheduler] ingest-watchdog failed")
|
||||||
|
|
||||||
|
|
||||||
|
def _job_hetzner_monitor() -> None:
|
||||||
|
"""Hetzner Cloud bandwidth monitor — alert do Sentry przy progach % included_traffic.
|
||||||
|
No-op gdy brak HETZNER_API_TOKEN/SERVER_ID w env (loguje że wyłączony)."""
|
||||||
|
try:
|
||||||
|
from app.scheduler.hetzner_monitor import run_hetzner_bandwidth_check
|
||||||
|
|
||||||
|
run_hetzner_bandwidth_check()
|
||||||
|
except Exception:
|
||||||
|
log.exception("[scheduler] hetzner-monitor failed")
|
||||||
|
|
||||||
|
|
||||||
def _job_performer_continuous(refresh_after_days: int) -> None:
|
def _job_performer_continuous(refresh_after_days: int) -> None:
|
||||||
"""Continuous worker — 1 performer per tick, ORDER BY last_searched_at NULLS FIRST.
|
"""Continuous worker — 1 performer per tick, ORDER BY last_searched_at NULLS FIRST.
|
||||||
|
|
||||||
|
|
@ -396,8 +411,9 @@ def build_scheduler(cfg: dict[str, Any]) -> BlockingScheduler:
|
||||||
|
|
||||||
if cfg.get("ingest_watchdog_hours"):
|
if cfg.get("ingest_watchdog_hours"):
|
||||||
wd_max_age = cfg.get("ingest_watchdog_max_age_hours") or 48
|
wd_max_age = cfg.get("ingest_watchdog_max_age_hours") or 48
|
||||||
|
wd_search_max_age = cfg.get("ingest_watchdog_search_max_age_hours") or 168
|
||||||
sched.add_job(
|
sched.add_job(
|
||||||
lambda: _job_ingest_watchdog(wd_max_age),
|
lambda: _job_ingest_watchdog(wd_max_age, wd_search_max_age),
|
||||||
IntervalTrigger(hours=cfg["ingest_watchdog_hours"], start_date=INTERVAL_ANCHOR),
|
IntervalTrigger(hours=cfg["ingest_watchdog_hours"], start_date=INTERVAL_ANCHOR),
|
||||||
id="ingest_watchdog",
|
id="ingest_watchdog",
|
||||||
replace_existing=True,
|
replace_existing=True,
|
||||||
|
|
@ -405,10 +421,21 @@ def build_scheduler(cfg: dict[str, Any]) -> BlockingScheduler:
|
||||||
coalesce=True,
|
coalesce=True,
|
||||||
)
|
)
|
||||||
log.info(
|
log.info(
|
||||||
"scheduler: ingest-watchdog every %dh (max_age=%dh)",
|
"scheduler: ingest-watchdog every %dh (browse max_age=%dh, search max_age=%dh)",
|
||||||
cfg["ingest_watchdog_hours"], wd_max_age,
|
cfg["ingest_watchdog_hours"], wd_max_age, wd_search_max_age,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if cfg.get("hetzner_monitor_hours"):
|
||||||
|
sched.add_job(
|
||||||
|
_job_hetzner_monitor,
|
||||||
|
IntervalTrigger(hours=cfg["hetzner_monitor_hours"], start_date=INTERVAL_ANCHOR),
|
||||||
|
id="hetzner_monitor",
|
||||||
|
replace_existing=True,
|
||||||
|
max_instances=1,
|
||||||
|
coalesce=True,
|
||||||
|
)
|
||||||
|
log.info("scheduler: hetzner-monitor every %dh", cfg["hetzner_monitor_hours"])
|
||||||
|
|
||||||
if cfg.get("movie_ingest_hours"):
|
if cfg.get("movie_ingest_hours"):
|
||||||
sched.add_job(
|
sched.add_job(
|
||||||
_job_movie_ingest,
|
_job_movie_ingest,
|
||||||
|
|
@ -527,7 +554,11 @@ DEFAULT_CONFIG: dict[str, Any] = {
|
||||||
# round-robin po tube'ach. Backfill katalogów (porndoe ~62k itd.) przez dni.
|
# round-robin po tube'ach. Backfill katalogów (porndoe ~62k itd.) przez dni.
|
||||||
"deep_crawl_hours": 1,
|
"deep_crawl_hours": 1,
|
||||||
"deep_crawl_pages_per_run": 60,
|
"deep_crawl_pages_per_run": 60,
|
||||||
# Ingest freshness watchdog — per-origin alert do Sentry, co 6h, próg 48h.
|
# Ingest freshness watchdog — per-origin alert do Sentry, co 6h. Browse próg 48h,
|
||||||
|
# search (performer-driven) próg 7d (nierówna kadencja).
|
||||||
"ingest_watchdog_hours": 6,
|
"ingest_watchdog_hours": 6,
|
||||||
"ingest_watchdog_max_age_hours": 48,
|
"ingest_watchdog_max_age_hours": 48,
|
||||||
|
"ingest_watchdog_search_max_age_hours": 168,
|
||||||
|
# Hetzner Cloud bandwidth monitor — co 6h, alert Sentry przy progach % included.
|
||||||
|
"hetzner_monitor_hours": 6,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -213,10 +213,17 @@ def run_forever() -> int:
|
||||||
# Taxonomy scene_count refresh — denormalizacja liczników (perf fix 0019).
|
# Taxonomy scene_count refresh — denormalizacja liczników (perf fix 0019).
|
||||||
"taxonomy_counts_hours": getattr(settings, "sched_taxonomy_counts_hours", 3) or None,
|
"taxonomy_counts_hours": getattr(settings, "sched_taxonomy_counts_hours", 3) or None,
|
||||||
# Ingest freshness watchdog — per-origin alert do Sentry (report 14f3a655).
|
# Ingest freshness watchdog — per-origin alert do Sentry (report 14f3a655).
|
||||||
|
# Browse: 48h próg; search (performer-driven, nierówna kadencja): 7d próg.
|
||||||
"ingest_watchdog_hours": getattr(settings, "sched_ingest_watchdog_hours", 6) or None,
|
"ingest_watchdog_hours": getattr(settings, "sched_ingest_watchdog_hours", 6) or None,
|
||||||
"ingest_watchdog_max_age_hours": getattr(
|
"ingest_watchdog_max_age_hours": getattr(
|
||||||
settings, "ingest_watchdog_max_age_hours", 48
|
settings, "ingest_watchdog_max_age_hours", 48
|
||||||
),
|
),
|
||||||
|
"ingest_watchdog_search_max_age_hours": getattr(
|
||||||
|
settings, "ingest_watchdog_search_max_age_hours", 168
|
||||||
|
),
|
||||||
|
# Hetzner Cloud bandwidth monitor — alert do Sentry przy progach % included.
|
||||||
|
# No-op gdy brak HETZNER_API_TOKEN/SERVER_ID (sam job może być on).
|
||||||
|
"hetzner_monitor_hours": getattr(settings, "sched_hetzner_monitor_hours", 6) or None,
|
||||||
}
|
}
|
||||||
sched = build_scheduler(cfg)
|
sched = build_scheduler(cfg)
|
||||||
log.info("worker scheduled mode starting (jobs=%d)", len(sched.get_jobs()))
|
log.info("worker scheduled mode starting (jobs=%d)", len(sched.get_jobs()))
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue