"""Per-sitetag freshness watchdog — alert gdy aktywny browse-tube przestał dawać nowe sceny. Globalny monitor źródeł (ingest_runs per `Source`) tego NIE łapie, bo wszystkie tube scrapery dzielą jeden `Source` = "tube-scraper" — pojedynczy origin może zamarznąć (np. freshporno: scraper browsował z roota `/`, który KVS rotuje → cold-session dostawała stary zestaw → new=0/skipped=N przez 2 dni), a zagregowany run nadal raportuje success. Sygnał per-origin: `max(created_at)` na playback_sources danego `tube:`. Jak zamrożony > próg → alert (report 14f3a655 2026-06-15). Patrz [[reference_kvs_root_rotates_use_latest_updates]] dla klasy błędu, którą to łapie. """ from __future__ import annotations import logging from datetime import UTC, datetime from typing import Any from sqlalchemy import text from app.db import session_scope log = logging.getLogger(__name__) def run_ingest_freshness_watchdog( *, max_age_hours: int = 48, min_history: int = 100 ) -> dict[str, Any]: """Sprawdź każdy aktywny browse-scraper: czy origin dostał nową scenę < max_age_hours. Skanujemy TYLKO sitetagi z `ALL_BROWSE_SCRAPERS` (te, które aktywnie crawlujemy i od których spodziewamy się świeżości) — nie wszystkie origin-y, żeby nie alarmować o legacy/jednorazowych źródłach. `min_history` odsiewa świeżo dodane tuby bez ustalonej kadencji (za mało scen, by wiedzieć czy 48h ciszy to anomalia). Stale origin → Sentry `capture_message` ze stabilnym fingerprintem per origin (wiek w extra, nie w tytule — inaczej każdy run = nowe issue). Zwraca {checked, stale:[{origin, age_hours, total}]}. """ from app.connectors.direct_scrapers import ALL_BROWSE_SCRAPERS sitetags = sorted({cls.sitetag for cls in ALL_BROWSE_SCRAPERS}) now = datetime.now(UTC) stale: list[dict[str, Any]] = [] with session_scope() as s: for tag in sitetags: origin = f"tube:{tag}" row = s.execute( text( "SELECT max(created_at) AS newest, count(*) AS total " "FROM playback_sources WHERE origin = :o" ), {"o": origin}, ).one() newest, total = row.newest, row.total if total < min_history or newest is None: continue age_h = (now - newest).total_seconds() / 3600.0 if age_h >= max_age_hours: stale.append( {"origin": origin, "age_hours": round(age_h, 1), "total": total} ) if stale: log.warning( "ingest-watchdog: %d/%d browse origin(s) bez nowych scen >%dh: %s", len(stale), len(sitetags), max_age_hours, ", ".join(f"{x['origin']}({x['age_hours']}h)" for x in stale), ) try: import sentry_sdk for x in stale: with sentry_sdk.push_scope() as scope: scope.level = "warning" scope.set_tag("ingest_origin", x["origin"]) scope.set_extra("age_hours", x["age_hours"]) scope.set_extra("total_scenes", x["total"]) scope.set_extra("max_age_hours", max_age_hours) # Fingerprint per origin → jedno trwałe issue na zamrożony tube, # nie fragmentowane przez zmienny wiek. scope.fingerprint = ["ingest-stale-origin", x["origin"]] sentry_sdk.capture_message( f"ingest-watchdog: {x['origin']} bez nowych scen " f"({x['age_hours']:.0f}h, próg {max_age_hours}h)" ) except Exception: # pragma: no cover - Sentry off / brak DSN log.exception("ingest-watchdog: Sentry capture failed") else: log.info( "ingest-watchdog: wszystkie %d browse origin świeże (<%dh)", len(sitetags), max_age_hours, ) return {"checked": len(sitetags), "stale": stale}