"""Daily smoke test — uruchamiany przez systemd timer (04:00 UTC). Sprawdza w jednym przebiegu: 1. **DB metrics** (cheap, ~1s) - Nowe sceny w ostatnich 24h per source (TPDB/StashDB/direct scrapery) - Drop vs 7-day rolling average (alarm gdy któryś source spadł >50%) - Failed/partial ingest_runs w ostatnich 24h - Liczba playback_sources oznaczonych dead w ostatnich 24h - Liczba nowych bug_reports w ostatnich 24h - Coverage: % scen z thumbnail / z duration / z stream 2. **Canary extractor tests** (heavier, ~30-60s) - Per tube origin: 1 alive PlaybackSource → try_extract() → mierz timing - HEAD-check pierwszy stream_url z headers (Referer + UA) - Alarm gdy tube failuje (TubePageError 5xx, exception, brak StreamSource) 3. **Thumbnail health** (~10s) - 3 random `PlaybackSource.thumbnail_url` per origin — HEAD check - Alarm gdy >50% miniaturek z origin'a zwraca non-2xx Reporting: - stdout: czytelna tabela (-> systemd journal -> `journalctl -u goon-smoke`) - Sentry: `capture_message(level=warning)` na agregowany raport gdy są failure (tag `smoke_test=fail`); `level=info` gdy wszystko zielone (tag=ok). DSN ten sam co api/worker (env `SENTRY_DSN`). - Exit code: 0 zawsze (smoke test nie powinien killować timera) — ale gdy `--strict` to non-zero przy any failure. """ from __future__ import annotations import argparse import logging import os import random import sys import time import traceback from dataclasses import dataclass, field from datetime import UTC, datetime, timedelta import httpx from sqlalchemy import func, select, text from app.config import get_settings from app.db import session_scope from app.extractors import HosterDead, TubePageError, try_extract from app.models.bug_report import BugReport from app.models.ingest_run import IngestRun, IngestStatus from app.models.playback_source import PlaybackSource from app.models.scene import Scene, SceneExternalRef from app.models.source import Source logging.basicConfig(level=logging.WARNING, format="%(levelname)s %(name)s: %(message)s") log = logging.getLogger("smoke") DEFAULT_UA = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36" ) # ---------- result aggregation ---------- @dataclass class CheckResult: name: str status: str # OK / WARN / FAIL / SKIP detail: str = "" elapsed: float = 0.0 @property def emoji(self) -> str: return {"OK": "+", "WARN": "!", "FAIL": "X", "SKIP": "-"}.get(self.status, "?") def line(self) -> str: return f" [{self.emoji}] {self.name:38s} {self.status:5s} {self.elapsed:5.1f}s {self.detail}" @dataclass class Report: db_metrics: list[CheckResult] = field(default_factory=list) extractors: list[CheckResult] = field(default_factory=list) thumbs: list[CheckResult] = field(default_factory=list) started_at: datetime = field(default_factory=lambda: datetime.now(UTC)) @property def all_checks(self) -> list[CheckResult]: return self.db_metrics + self.extractors + self.thumbs @property def failed(self) -> list[CheckResult]: return [c for c in self.all_checks if c.status == "FAIL"] @property def warned(self) -> list[CheckResult]: return [c for c in self.all_checks if c.status == "WARN"] def summary_line(self) -> str: total = len(self.all_checks) ok = sum(1 for c in self.all_checks if c.status == "OK") warn = len(self.warned) fail = len(self.failed) skip = sum(1 for c in self.all_checks if c.status == "SKIP") return f"checks={total} ok={ok} warn={warn} fail={fail} skip={skip}" # ---------- 1. DB METRICS ---------- def _check_new_scenes_per_source(report: Report) -> None: """Liczy nowe scene_external_refs per source w ostatnich 24h. Porównuje do 7-day average i alarmuje gdy któryś tube ma drop >50% (lub 0 nowych przy avg >5/dzień).""" t0 = time.time() try: with session_scope() as s: now = datetime.now(UTC) past_24h = now - timedelta(hours=24) past_7d = now - timedelta(days=7) # 24h count per source rows_24h = s.execute( select(Source.name, func.count(SceneExternalRef.scene_id)) .join(SceneExternalRef, SceneExternalRef.source_id == Source.id) .where(SceneExternalRef.first_seen >= past_24h) .group_by(Source.name) ).all() map_24h = {name: cnt for name, cnt in rows_24h} # 7d avg per source (count / 7) rows_7d = s.execute( select(Source.name, func.count(SceneExternalRef.scene_id)) .join(SceneExternalRef, SceneExternalRef.source_id == Source.id) .where(SceneExternalRef.first_seen >= past_7d) .group_by(Source.name) ).all() avg_per_day = {name: cnt / 7.0 for name, cnt in rows_7d} elapsed = time.time() - t0 total_24h = sum(map_24h.values()) per_src = [] warns = [] for name in sorted(set(map_24h) | set(avg_per_day)): n24 = map_24h.get(name, 0) avg = avg_per_day.get(name, 0.0) per_src.append(f"{name}={n24}") # Alarm: avg >= 5/d ale 24h <= 50% avg if avg >= 5.0 and n24 < avg * 0.5: warns.append(f"{name} 24h={n24} 7d-avg={avg:.1f}") if warns: report.db_metrics.append(CheckResult( "new scenes / source (24h)", "WARN", f"total={total_24h}, drops: {'; '.join(warns)}", elapsed, )) else: report.db_metrics.append(CheckResult( "new scenes / source (24h)", "OK", f"total={total_24h} ({', '.join(per_src) or 'no sources'})", elapsed, )) except Exception as e: report.db_metrics.append(CheckResult( "new scenes / source (24h)", "FAIL", f"{type(e).__name__}: {e}", time.time() - t0, )) def _check_ingest_runs(report: Report) -> None: """Failed/partial ingest_runs w ostatnich 24h. >0 = WARN.""" t0 = time.time() try: with session_scope() as s: past_24h = datetime.now(UTC) - timedelta(hours=24) rows = s.execute( select(IngestRun.status, func.count(IngestRun.id)) .where(IngestRun.started_at >= past_24h) .group_by(IngestRun.status) ).all() counts = {str(status.value if hasattr(status, "value") else status): cnt for status, cnt in rows} # killed_by_restart to NIE awaria connectora — to worker ubity mid-run # (deploy/OOM). Liczony osobno (stuck-run check go pokrywa), żeby nie # odpalać WARN przy każdym deployu. killed = s.execute( select(func.count(IngestRun.id)) .where(IngestRun.started_at >= past_24h) .where(IngestRun.status == IngestStatus.failed) .where(IngestRun.errors["message"].astext == "killed_by_restart") ).scalar() or 0 failed = counts.get("failed", 0) real_failed = failed - killed partial = counts.get("partial", 0) ok = counts.get("success", 0) running = counts.get("running", 0) detail = f"success={ok} partial={partial} failed={real_failed} killed={killed} running={running}" if real_failed > 0 or partial > 3: report.db_metrics.append(CheckResult( "ingest runs (24h)", "WARN", detail, time.time() - t0 )) else: report.db_metrics.append(CheckResult( "ingest runs (24h)", "OK", detail, time.time() - t0 )) except Exception as e: report.db_metrics.append(CheckResult( "ingest runs (24h)", "FAIL", f"{type(e).__name__}: {e}", time.time() - t0 )) def _check_dead_playbacks(report: Report) -> None: """Liczba PlaybackSource oznaczonych dead w ostatnich 24h. >50 dead = WARN (hoster się popsuł na masową skalę).""" t0 = time.time() try: with session_scope() as s: past_24h = datetime.now(UTC) - timedelta(hours=24) n_dead_24h = s.execute( select(func.count(PlaybackSource.id)) .where(PlaybackSource.dead_at >= past_24h) ).scalar() or 0 n_alive = s.execute( select(func.count(PlaybackSource.id)) .where(PlaybackSource.dead_at.is_(None)) ).scalar() or 0 detail = f"dead_24h={n_dead_24h} alive_total={n_alive}" if n_dead_24h > 50: report.db_metrics.append(CheckResult( "playback dead-marks (24h)", "WARN", detail, time.time() - t0 )) else: report.db_metrics.append(CheckResult( "playback dead-marks (24h)", "OK", detail, time.time() - t0 )) except Exception as e: report.db_metrics.append(CheckResult( "playback dead-marks (24h)", "FAIL", f"{type(e).__name__}: {e}", time.time() - t0 )) def _check_bug_reports(report: Report) -> None: """Nowe bug_reports w ostatnich 24h. Tylko informacyjnie (nie warn).""" t0 = time.time() try: with session_scope() as s: past_24h = datetime.now(UTC) - timedelta(hours=24) n = s.execute( select(func.count(BugReport.id)) .where(BugReport.created_at >= past_24h) ).scalar() or 0 report.db_metrics.append(CheckResult( "new bug reports (24h)", "OK", f"count={n}", time.time() - t0 )) except Exception as e: report.db_metrics.append(CheckResult( "new bug reports (24h)", "FAIL", f"{type(e).__name__}: {e}", time.time() - t0 )) def _check_coverage(report: Report) -> None: """% scen mających thumbnail / duration / stream URL przynajmniej w jednym PlaybackSource. <70% = WARN.""" t0 = time.time() try: with session_scope() as s: total_scenes = s.execute(select(func.count(Scene.id))).scalar() or 1 with_thumb = s.execute( select(func.count(func.distinct(PlaybackSource.scene_id))) .where(PlaybackSource.thumbnail_url.is_not(None)) ).scalar() or 0 with_duration = s.execute( select(func.count(func.distinct(PlaybackSource.scene_id))) .where(PlaybackSource.duration_sec.is_not(None)) ).scalar() or 0 with_stream = s.execute( select(func.count(func.distinct(PlaybackSource.scene_id))) .where(PlaybackSource.stream_url.is_not(None)) .where(PlaybackSource.dead_at.is_(None)) ).scalar() or 0 pct_thumb = with_thumb * 100 / total_scenes pct_dur = with_duration * 100 / total_scenes pct_stream = with_stream * 100 / total_scenes detail = f"scenes={total_scenes} thumb={pct_thumb:.0f}% dur={pct_dur:.0f}% stream={pct_stream:.0f}%" # Coverage progi konserwatywne — większość direct-scrap scen ma URL ale # nie ma stream cache'a (resolve on-demand). if pct_thumb < 50: report.db_metrics.append(CheckResult( "scene coverage", "WARN", detail, time.time() - t0 )) else: report.db_metrics.append(CheckResult( "scene coverage", "OK", detail, time.time() - t0 )) except Exception as e: report.db_metrics.append(CheckResult( "scene coverage", "FAIL", f"{type(e).__name__}: {e}", time.time() - t0 )) def _check_ingest_per_source(report: Report) -> None: """Per-source health (7d). Agregat `_check_ingest_runs` topił sygnał: stashdb 2026-06-01 miał 24 failed / 57 runów i 0 sukcesów przez ~7 dni (delta nigdy nie szła naprzód, bo run nie kończył się przed restartem workera), a całość i tak pokazywała trochę zielonego. Tu alarmujemy PER źródło: - runs≥4 i 0 sukcesów → źródło działa, ale nigdy nie kończy (twardy WARN) - ostatni sukces > 24h przy aktywnych runach → delta się zatrzymała """ t0 = time.time() try: with session_scope() as s: now = datetime.now(UTC) past_7d = now - timedelta(days=7) rows = s.execute( select( Source.name, func.count(IngestRun.id), func.count(IngestRun.id).filter(IngestRun.status == IngestStatus.success), func.max(IngestRun.finished_at).filter(IngestRun.status == IngestStatus.success), ) .join(IngestRun, IngestRun.source_id == Source.id) .where(IngestRun.started_at >= past_7d) .group_by(Source.name) ).all() now = datetime.now(UTC) warns: list[str] = [] details: list[str] = [] for name, runs, ok, last_ok in rows: details.append(f"{name}:{ok}/{runs}") if runs >= 4 and ok == 0: warns.append(f"{name} 0 success/{runs} runs") elif last_ok is not None and (now - last_ok).total_seconds() > 24 * 3600 and runs >= 2: warns.append(f"{name} last-ok {(now - last_ok).total_seconds() / 3600:.0f}h ago") status = "WARN" if warns else "OK" detail = "; ".join(warns) if warns else (" ".join(details) or "no runs") report.db_metrics.append(CheckResult("ingest per-source (7d)", status, detail, time.time() - t0)) except Exception as e: report.db_metrics.append(CheckResult( "ingest per-source (7d)", "FAIL", f"{type(e).__name__}: {e}", time.time() - t0 )) def _check_stuck_runs(report: Report) -> None: """Runy wiszące w 'running' > 2h. Reaper czyści je na restarcie workera, ale hang MIĘDZY restartami (connector bez timeoutu) zostanie tu złapany.""" t0 = time.time() try: with session_scope() as s: cutoff = datetime.now(UTC) - timedelta(hours=2) n = s.execute( select(func.count(IngestRun.id)).where( IngestRun.status == IngestStatus.running, IngestRun.started_at < cutoff, ) ).scalar() or 0 report.db_metrics.append(CheckResult( "stuck ingest runs (>2h)", "WARN" if n else "OK", f"count={n}", time.time() - t0 )) except Exception as e: report.db_metrics.append(CheckResult( "stuck ingest runs (>2h)", "FAIL", f"{type(e).__name__}: {e}", time.time() - t0 )) def _check_browse_freshness(report: Report) -> None: """Cicha śmierć browse-scrapera: gdy HTML tube'a się zmieni, `_extract_scene_urls` zwraca [] → run kończy się SUCCESS z 0 scen, bez wyjątku, bez alarmu (bug-reporty 93d3c485 / 2fbf1c73 wykryte przez userów, nie monitoring). Browse odpala codziennie i RE-touchuje latest sceny, więc max(last_seen_at) per sitetag musi być świeży. Staleness > 36h dla aktywnego browse sitetagu = scraper przestał działać.""" t0 = time.time() try: from app.connectors.direct_scrapers import ALL_BROWSE_SCRAPERS browse_sitetags: set[str] = set() for cls in ALL_BROWSE_SCRAPERS: try: browse_sitetags.add(cls().sitetag) except Exception: pass if not browse_sitetags: report.db_metrics.append(CheckResult("browse scraper freshness", "SKIP", "no browse scrapers", time.time() - t0)) return with session_scope() as s: rows = s.execute(text( "SELECT split_part(er.external_id, ':', 1) AS sitetag, max(er.last_seen_at) AS last_seen " "FROM external_records er JOIN sources s ON s.id = er.source_id " "WHERE s.kind = 'scraper' AND er.entity_kind = 'scene' " "GROUP BY 1" )).all() seen = {st: ls for st, ls in rows} now = datetime.now(UTC) warns: list[str] = [] details: list[str] = [] for st in sorted(browse_sitetags): ls = seen.get(st) if ls is None: warns.append(f"{st} never") continue age_h = (now - ls).total_seconds() / 3600 details.append(f"{st}:{age_h:.0f}h") if age_h > 36: warns.append(f"{st} stale {age_h:.0f}h") status = "WARN" if warns else "OK" detail = "; ".join(warns) if warns else (" ".join(details) or "no data") report.db_metrics.append(CheckResult("browse scraper freshness", status, detail, time.time() - t0)) except Exception as e: report.db_metrics.append(CheckResult( "browse scraper freshness", "FAIL", f"{type(e).__name__}: {e}", time.time() - t0 )) # ---------- 2. CANARY EXTRACTORS ---------- def _pick_canary_samples(per_origin: int = 3) -> dict[tuple[str, str], list[str]]: """Zwraca dict (origin, sitetag) → [page_url, ...] — N losowych alive PlaybackSource per origin. Multi-sample żeby pojedynczy dead URL nie falszował alarmu (canary considered failed gdy WSZYSTKIE N samples fail). Filtr last_seen >= 30d: worker scrape loop aktualizuje last_seen_at gdy re-scrapuje page'a. Stare URL-e (np. hqporner sceny sprzed kilku miesięcy) mogą być 404 nawet bez dead_at (bo nikt nie kliknął Watch). Sampling tylko z "świeżych" eliminuje false positives — chcemy testować że *aktualnie aktywne* sceny działają.""" out: dict[tuple[str, str], list[str]] = {} cutoff = datetime.now(UTC) - timedelta(days=30) with session_scope() as s: origins = s.execute( select(PlaybackSource.origin) .where(PlaybackSource.dead_at.is_(None)) .where( (PlaybackSource.origin.like("pornapp:%")) | (PlaybackSource.origin.like("tube:%")) ) .group_by(PlaybackSource.origin) ).scalars().all() for origin in origins: rows = s.execute( select(PlaybackSource.page_url) .where(PlaybackSource.origin == origin) .where(PlaybackSource.dead_at.is_(None)) .where(PlaybackSource.last_seen_at >= cutoff) .order_by(func.random()) .limit(per_origin) ).scalars().all() sitetag = origin.split(":", 1)[1] out[(origin, sitetag)] = [u for u in rows if u] return out def _try_one_canary(sitetag: str, page_url: str) -> tuple[str, str]: """Pojedyncza próba extract + HEAD-check. Zwraca (status, detail). Status: OK / WARN / FAIL. Caller agreguje N prób per origin.""" try: sources = try_extract(sitetag, page_url) except HosterDead as e: return "WARN", f"HosterDead: {str(e)[:60]}" except TubePageError as e: status = "WARN" if e.status_code in (404, 410) else "FAIL" return status, f"TubePageError HTTP {e.status_code}" except Exception as e: return "FAIL", f"{type(e).__name__}: {str(e)[:60]}" if not sources: return "WARN", "no sources (extractor None)" directs = [s for s in sources if s.link and s.type != "hoster"] hosters = [s for s in sources if s.type == "hoster"] if not directs: return "OK", f"{len(hosters)} hoster-only (WebView fallback)" first = directs[0] try: with httpx.Client(timeout=15, follow_redirects=True) as client: r = client.head( first.link, headers={ "User-Agent": DEFAULT_UA, "Referer": first.referer or page_url, }, ) head_status = r.status_code if head_status >= 400: r = client.get( first.link, headers={ "User-Agent": DEFAULT_UA, "Referer": first.referer or page_url, "Range": "bytes=0-1024", }, ) head_status = r.status_code except Exception as e: return "WARN", f"{len(directs)}d, HEAD err {type(e).__name__}" if head_status in (200, 206): return "OK", f"{len(directs)}d/{len(hosters)}h, HEAD {head_status} {first.quality or '?'}" return "WARN", f"{len(directs)}d, HEAD {head_status}" def _check_canary_extractor_multi( origin: str, sitetag: str, page_urls: list[str], report: Report ) -> None: """Multi-sample: próbuje N URL-i, raportuje BEST result. Dampens single-URL flakiness (np. hqporner: 1 strona dead 404, reszta alive). Failure tylko gdy WSZYSTKIE N samples nie dają OK.""" if not page_urls: report.extractors.append(CheckResult( f"extract: {origin}", "SKIP", "no alive playback_sources", 0.0, )) return t0 = time.time() results = [] for url in page_urls: results.append(_try_one_canary(sitetag, url)) elapsed = time.time() - t0 # Best wins — OK > WARN > FAIL. Detail z pierwszej OK (lub agreg gdy wszystkie fail). statuses = [r[0] for r in results] if "OK" in statuses: idx = statuses.index("OK") detail = f"[{statuses.count('OK')}/{len(results)} OK] {results[idx][1]}" report.extractors.append(CheckResult(f"extract: {origin}", "OK", detail, elapsed)) elif "WARN" in statuses: # Skomprymuj detale wszystkich WARN-ów warn_details = [r[1] for r in results if r[0] == "WARN"] detail = f"[0/{len(results)} OK] {warn_details[0]}" report.extractors.append(CheckResult(f"extract: {origin}", "WARN", detail, elapsed)) else: fail_details = [r[1] for r in results if r[0] == "FAIL"] detail = f"[0/{len(results)} OK] {fail_details[0]}" report.extractors.append(CheckResult(f"extract: {origin}", "FAIL", detail, elapsed)) def _run_canary_extractors(report: Report, *, samples_per_origin: int) -> None: samples = _pick_canary_samples(per_origin=samples_per_origin) log.info("canary: %d origins, %d samples/origin", len(samples), samples_per_origin) for (origin, sitetag), urls in samples.items(): _check_canary_extractor_multi(origin, sitetag, urls, report) # ---------- 3. THUMBNAIL HEALTH ---------- def _check_thumbnails(report: Report, *, per_origin: int = 3) -> None: """Random sample N thumbnail URLs per origin + HEAD-check. Wysyła Referer=`/` z page_url'a — hotlink-protected CDN-y (58img.top dla mypornerleak, trafficdeposit dla sxyprn, fastporndelivery dla hqporner) zwracają 403 bez właściwego Refererera, ale 200 z nim. Symulujemy realny request mobile WebView.""" t0 = time.time() try: with session_scope() as s: origins = s.execute( select(PlaybackSource.origin) .where(PlaybackSource.thumbnail_url.is_not(None)) .where(PlaybackSource.dead_at.is_(None)) .group_by(PlaybackSource.origin) ).scalars().all() cutoff = datetime.now(UTC) - timedelta(days=30) per_origin_rows: dict[str, list[tuple[str, str]]] = {} for origin in origins: rows = s.execute( select(PlaybackSource.thumbnail_url, PlaybackSource.page_url) .where(PlaybackSource.origin == origin) .where(PlaybackSource.thumbnail_url.is_not(None)) .where(PlaybackSource.dead_at.is_(None)) .where(PlaybackSource.last_seen_at >= cutoff) .order_by(func.random()) .limit(per_origin) ).all() per_origin_rows[origin] = [(t, p) for t, p in rows if t] except Exception as e: report.thumbs.append(CheckResult( "thumbnails query", "FAIL", f"{type(e).__name__}: {e}", time.time() - t0 )) return from urllib.parse import urlparse with httpx.Client(timeout=10, follow_redirects=True) as client: for origin, rows in per_origin_rows.items(): t1 = time.time() fail_n = 0 for thumb_url, page_url in rows: # Referer = origin tube page'a (host root) — hotlink protection # zwykle whitelistuje cały host, nie konkretny path. page_host = urlparse(page_url).hostname if page_url else None referer = f"https://{page_host}/" if page_host else "" headers = {"User-Agent": DEFAULT_UA} if referer: headers["Referer"] = referer try: r = client.head(thumb_url, headers=headers) if r.status_code >= 400: # CDN nie wspiera HEAD — GET z Range r = client.get( thumb_url, headers={**headers, "Range": "bytes=0-128"}, ) if r.status_code >= 400: fail_n += 1 except Exception: fail_n += 1 elapsed = time.time() - t1 total = len(rows) if total == 0: continue fail_pct = fail_n * 100 / total if fail_pct > 50: report.thumbs.append(CheckResult( f"thumb: {origin}", "WARN", f"{fail_n}/{total} broken", elapsed, )) else: report.thumbs.append(CheckResult( f"thumb: {origin}", "OK", f"{total-fail_n}/{total} OK", elapsed, )) # ---------- main + sentry ---------- def _init_sentry() -> bool: """Init Sentry SDK, return True jeśli DSN jest ustawiony.""" settings = get_settings() if not settings.sentry_dsn: return False try: import sentry_sdk from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration sentry_sdk.init( dsn=settings.sentry_dsn, environment=settings.sentry_environment, traces_sample_rate=0.0, # smoke ma własne timingi, nie potrzebujemy spanów integrations=[SqlalchemyIntegration()], release="goon-smoke@0.1.0", ) return True except Exception as e: log.warning("sentry init failed: %s", e) return False def _send_sentry(report: Report, sentry_enabled: bool) -> None: if not sentry_enabled: return try: import sentry_sdk failed = report.failed warned = report.warned if failed: level = "error" tag = "fail" elif warned: level = "warning" tag = "warn" else: level = "info" tag = "ok" lines = [ f"Smoke test {tag.upper()} — {report.summary_line()}", f"started_at={report.started_at.isoformat()}", ] if failed: lines.append("\nFAIL:") for c in failed: lines.append(f" - {c.name}: {c.detail}") if warned: lines.append("\nWARN:") for c in warned: lines.append(f" - {c.name}: {c.detail}") with sentry_sdk.new_scope() as scope: scope.set_tag("smoke_test", tag) scope.set_tag("smoke_fail_count", len(failed)) scope.set_tag("smoke_warn_count", len(warned)) sentry_sdk.capture_message("\n".join(lines), level=level) log.info("sentry message sent (level=%s)", level) except Exception as e: log.warning("sentry send failed: %s\n%s", e, traceback.format_exc()) def _print_report(report: Report) -> None: elapsed_total = (datetime.now(UTC) - report.started_at).total_seconds() print() print("=" * 80) print(f"Goon smoke test — {report.started_at.isoformat()}") print(f" {report.summary_line()} elapsed={elapsed_total:.1f}s") print("=" * 80) print("\n[DB metrics]") for c in report.db_metrics: print(c.line()) print("\n[Canary extractors]") for c in report.extractors: print(c.line()) print("\n[Thumbnails]") for c in report.thumbs: print(c.line()) print() def main() -> int: ap = argparse.ArgumentParser() ap.add_argument( "--samples-per-origin", type=int, default=3, help="Ile losowych PlaybackSource testować per origin (default 3). " "Multi-sample żeby pojedynczy dead URL nie falszował alarmu — canary " "considered failed gdy WSZYSTKIE N samples fail.", ) ap.add_argument( "--thumbs-per-origin", type=int, default=3, help="Ile thumbnail URLs sprawdzać per origin (default 3)", ) ap.add_argument( "--skip-extractors", action="store_true", help="Pomiń canary extractor testy (np. dla quick DB-only health)", ) ap.add_argument( "--skip-thumbs", action="store_true", help="Pomiń HEAD-check thumbnails", ) ap.add_argument( "--strict", action="store_true", help="Exit 1 gdy any FAIL/WARN (default: zawsze 0 żeby timer się nie psuł)", ) args = ap.parse_args() sentry_enabled = _init_sentry() log.info("sentry_enabled=%s", sentry_enabled) report = Report() # 1. DB metrics _check_new_scenes_per_source(report) _check_ingest_runs(report) _check_ingest_per_source(report) _check_stuck_runs(report) _check_browse_freshness(report) _check_dead_playbacks(report) _check_bug_reports(report) _check_coverage(report) # 2. Canary extractors if not args.skip_extractors: _run_canary_extractors(report, samples_per_origin=args.samples_per_origin) # 3. Thumbnails if not args.skip_thumbs: _check_thumbnails(report, per_origin=args.thumbs_per_origin) _print_report(report) _send_sentry(report, sentry_enabled) if args.strict and (report.failed or report.warned): return 1 return 0 if __name__ == "__main__": sys.exit(main())