"""Daily smoke test — uruchamiany przez systemd timer (04:00 UTC). Sprawdza w jednym przebiegu: 1. **DB metrics** (cheap, ~1s) - Nowe sceny w ostatnich 24h per source (TPDB/StashDB/direct scrapery) - Drop vs 7-day rolling average (alarm gdy któryś source spadł >50%) - Failed/partial ingest_runs w ostatnich 24h - Liczba playback_sources oznaczonych dead w ostatnich 24h - Liczba nowych bug_reports w ostatnich 24h - Coverage: % scen z thumbnail / z duration / z stream 2. **Canary extractor tests** (heavier, ~30-60s) - Per tube origin: 1 alive PlaybackSource → try_extract() → mierz timing - HEAD-check pierwszy stream_url z headers (Referer + UA) - Alarm gdy tube failuje (TubePageError 5xx, exception, brak StreamSource) 3. **Thumbnail health** (~10s) - 3 random `PlaybackSource.thumbnail_url` per origin — HEAD check - Alarm gdy >50% miniaturek z origin'a zwraca non-2xx Reporting: - stdout: czytelna tabela (-> systemd journal -> `journalctl -u goon-smoke`) - Sentry: `capture_message(level=warning)` na agregowany raport gdy są failure (tag `smoke_test=fail`); `level=info` gdy wszystko zielone (tag=ok). DSN ten sam co api/worker (env `SENTRY_DSN`). - Exit code: 0 zawsze (smoke test nie powinien killować timera) — ale gdy `--strict` to non-zero przy any failure. """ from __future__ import annotations import argparse import logging import os import random import sys import time import traceback from dataclasses import dataclass, field from datetime import UTC, datetime, timedelta import httpx from sqlalchemy import func, select from app.config import get_settings from app.db import session_scope from app.extractors import HosterDead, TubePageError, try_extract from app.models.bug_report import BugReport from app.models.ingest_run import IngestRun, IngestStatus from app.models.playback_source import PlaybackSource from app.models.scene import Scene, SceneExternalRef from app.models.source import Source logging.basicConfig(level=logging.WARNING, format="%(levelname)s %(name)s: %(message)s") log = logging.getLogger("smoke") DEFAULT_UA = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36" ) # ---------- result aggregation ---------- @dataclass class CheckResult: name: str status: str # OK / WARN / FAIL / SKIP detail: str = "" elapsed: float = 0.0 @property def emoji(self) -> str: return {"OK": "+", "WARN": "!", "FAIL": "X", "SKIP": "-"}.get(self.status, "?") def line(self) -> str: return f" [{self.emoji}] {self.name:38s} {self.status:5s} {self.elapsed:5.1f}s {self.detail}" @dataclass class Report: db_metrics: list[CheckResult] = field(default_factory=list) extractors: list[CheckResult] = field(default_factory=list) thumbs: list[CheckResult] = field(default_factory=list) started_at: datetime = field(default_factory=lambda: datetime.now(UTC)) @property def all_checks(self) -> list[CheckResult]: return self.db_metrics + self.extractors + self.thumbs @property def failed(self) -> list[CheckResult]: return [c for c in self.all_checks if c.status == "FAIL"] @property def warned(self) -> list[CheckResult]: return [c for c in self.all_checks if c.status == "WARN"] def summary_line(self) -> str: total = len(self.all_checks) ok = sum(1 for c in self.all_checks if c.status == "OK") warn = len(self.warned) fail = len(self.failed) skip = sum(1 for c in self.all_checks if c.status == "SKIP") return f"checks={total} ok={ok} warn={warn} fail={fail} skip={skip}" # ---------- 1. DB METRICS ---------- def _check_new_scenes_per_source(report: Report) -> None: """Liczy nowe scene_external_refs per source w ostatnich 24h. Porównuje do 7-day average i alarmuje gdy któryś tube ma drop >50% (lub 0 nowych przy avg >5/dzień).""" t0 = time.time() try: with session_scope() as s: now = datetime.now(UTC) past_24h = now - timedelta(hours=24) past_7d = now - timedelta(days=7) # 24h count per source rows_24h = s.execute( select(Source.name, func.count(SceneExternalRef.scene_id)) .join(SceneExternalRef, SceneExternalRef.source_id == Source.id) .where(SceneExternalRef.first_seen >= past_24h) .group_by(Source.name) ).all() map_24h = {name: cnt for name, cnt in rows_24h} # 7d avg per source (count / 7) rows_7d = s.execute( select(Source.name, func.count(SceneExternalRef.scene_id)) .join(SceneExternalRef, SceneExternalRef.source_id == Source.id) .where(SceneExternalRef.first_seen >= past_7d) .group_by(Source.name) ).all() avg_per_day = {name: cnt / 7.0 for name, cnt in rows_7d} elapsed = time.time() - t0 total_24h = sum(map_24h.values()) per_src = [] warns = [] for name in sorted(set(map_24h) | set(avg_per_day)): n24 = map_24h.get(name, 0) avg = avg_per_day.get(name, 0.0) per_src.append(f"{name}={n24}") # Alarm: avg >= 5/d ale 24h <= 50% avg if avg >= 5.0 and n24 < avg * 0.5: warns.append(f"{name} 24h={n24} 7d-avg={avg:.1f}") if warns: report.db_metrics.append(CheckResult( "new scenes / source (24h)", "WARN", f"total={total_24h}, drops: {'; '.join(warns)}", elapsed, )) else: report.db_metrics.append(CheckResult( "new scenes / source (24h)", "OK", f"total={total_24h} ({', '.join(per_src) or 'no sources'})", elapsed, )) except Exception as e: report.db_metrics.append(CheckResult( "new scenes / source (24h)", "FAIL", f"{type(e).__name__}: {e}", time.time() - t0, )) def _check_ingest_runs(report: Report) -> None: """Failed/partial ingest_runs w ostatnich 24h. >0 = WARN.""" t0 = time.time() try: with session_scope() as s: past_24h = datetime.now(UTC) - timedelta(hours=24) rows = s.execute( select(IngestRun.status, func.count(IngestRun.id)) .where(IngestRun.started_at >= past_24h) .group_by(IngestRun.status) ).all() counts = {str(status.value if hasattr(status, "value") else status): cnt for status, cnt in rows} failed = counts.get("failed", 0) partial = counts.get("partial", 0) ok = counts.get("success", 0) running = counts.get("running", 0) detail = f"success={ok} partial={partial} failed={failed} running={running}" if failed > 0 or partial > 3: report.db_metrics.append(CheckResult( "ingest runs (24h)", "WARN", detail, time.time() - t0 )) else: report.db_metrics.append(CheckResult( "ingest runs (24h)", "OK", detail, time.time() - t0 )) except Exception as e: report.db_metrics.append(CheckResult( "ingest runs (24h)", "FAIL", f"{type(e).__name__}: {e}", time.time() - t0 )) def _check_dead_playbacks(report: Report) -> None: """Liczba PlaybackSource oznaczonych dead w ostatnich 24h. >50 dead = WARN (hoster się popsuł na masową skalę).""" t0 = time.time() try: with session_scope() as s: past_24h = datetime.now(UTC) - timedelta(hours=24) n_dead_24h = s.execute( select(func.count(PlaybackSource.id)) .where(PlaybackSource.dead_at >= past_24h) ).scalar() or 0 n_alive = s.execute( select(func.count(PlaybackSource.id)) .where(PlaybackSource.dead_at.is_(None)) ).scalar() or 0 detail = f"dead_24h={n_dead_24h} alive_total={n_alive}" if n_dead_24h > 50: report.db_metrics.append(CheckResult( "playback dead-marks (24h)", "WARN", detail, time.time() - t0 )) else: report.db_metrics.append(CheckResult( "playback dead-marks (24h)", "OK", detail, time.time() - t0 )) except Exception as e: report.db_metrics.append(CheckResult( "playback dead-marks (24h)", "FAIL", f"{type(e).__name__}: {e}", time.time() - t0 )) def _check_bug_reports(report: Report) -> None: """Nowe bug_reports w ostatnich 24h. Tylko informacyjnie (nie warn).""" t0 = time.time() try: with session_scope() as s: past_24h = datetime.now(UTC) - timedelta(hours=24) n = s.execute( select(func.count(BugReport.id)) .where(BugReport.created_at >= past_24h) ).scalar() or 0 report.db_metrics.append(CheckResult( "new bug reports (24h)", "OK", f"count={n}", time.time() - t0 )) except Exception as e: report.db_metrics.append(CheckResult( "new bug reports (24h)", "FAIL", f"{type(e).__name__}: {e}", time.time() - t0 )) def _check_coverage(report: Report) -> None: """% scen mających thumbnail / duration / stream URL przynajmniej w jednym PlaybackSource. <70% = WARN.""" t0 = time.time() try: with session_scope() as s: total_scenes = s.execute(select(func.count(Scene.id))).scalar() or 1 with_thumb = s.execute( select(func.count(func.distinct(PlaybackSource.scene_id))) .where(PlaybackSource.thumbnail_url.is_not(None)) ).scalar() or 0 with_duration = s.execute( select(func.count(func.distinct(PlaybackSource.scene_id))) .where(PlaybackSource.duration_sec.is_not(None)) ).scalar() or 0 with_stream = s.execute( select(func.count(func.distinct(PlaybackSource.scene_id))) .where(PlaybackSource.stream_url.is_not(None)) .where(PlaybackSource.dead_at.is_(None)) ).scalar() or 0 pct_thumb = with_thumb * 100 / total_scenes pct_dur = with_duration * 100 / total_scenes pct_stream = with_stream * 100 / total_scenes detail = f"scenes={total_scenes} thumb={pct_thumb:.0f}% dur={pct_dur:.0f}% stream={pct_stream:.0f}%" # Coverage progi konserwatywne — większość direct-scrap scen ma URL ale # nie ma stream cache'a (resolve on-demand). if pct_thumb < 50: report.db_metrics.append(CheckResult( "scene coverage", "WARN", detail, time.time() - t0 )) else: report.db_metrics.append(CheckResult( "scene coverage", "OK", detail, time.time() - t0 )) except Exception as e: report.db_metrics.append(CheckResult( "scene coverage", "FAIL", f"{type(e).__name__}: {e}", time.time() - t0 )) # ---------- 2. CANARY EXTRACTORS ---------- def _pick_canary_samples(per_origin: int = 3) -> dict[tuple[str, str], list[str]]: """Zwraca dict (origin, sitetag) → [page_url, ...] — N losowych alive PlaybackSource per origin. Multi-sample żeby pojedynczy dead URL nie falszował alarmu (canary considered failed gdy WSZYSTKIE N samples fail). Filtr last_seen >= 30d: worker scrape loop aktualizuje last_seen_at gdy re-scrapuje page'a. Stare URL-e (np. hqporner sceny sprzed kilku miesięcy) mogą być 404 nawet bez dead_at (bo nikt nie kliknął Watch). Sampling tylko z "świeżych" eliminuje false positives — chcemy testować że *aktualnie aktywne* sceny działają.""" out: dict[tuple[str, str], list[str]] = {} cutoff = datetime.now(UTC) - timedelta(days=30) with session_scope() as s: origins = s.execute( select(PlaybackSource.origin) .where(PlaybackSource.dead_at.is_(None)) .where( (PlaybackSource.origin.like("pornapp:%")) | (PlaybackSource.origin.like("tube:%")) ) .group_by(PlaybackSource.origin) ).scalars().all() for origin in origins: rows = s.execute( select(PlaybackSource.page_url) .where(PlaybackSource.origin == origin) .where(PlaybackSource.dead_at.is_(None)) .where(PlaybackSource.last_seen_at >= cutoff) .order_by(func.random()) .limit(per_origin) ).scalars().all() sitetag = origin.split(":", 1)[1] out[(origin, sitetag)] = [u for u in rows if u] return out def _try_one_canary(sitetag: str, page_url: str) -> tuple[str, str]: """Pojedyncza próba extract + HEAD-check. Zwraca (status, detail). Status: OK / WARN / FAIL. Caller agreguje N prób per origin.""" try: sources = try_extract(sitetag, page_url) except HosterDead as e: return "WARN", f"HosterDead: {str(e)[:60]}" except TubePageError as e: status = "WARN" if e.status_code in (404, 410) else "FAIL" return status, f"TubePageError HTTP {e.status_code}" except Exception as e: return "FAIL", f"{type(e).__name__}: {str(e)[:60]}" if not sources: return "WARN", "no sources (extractor None)" directs = [s for s in sources if s.link and s.type != "hoster"] hosters = [s for s in sources if s.type == "hoster"] if not directs: return "OK", f"{len(hosters)} hoster-only (WebView fallback)" first = directs[0] try: with httpx.Client(timeout=15, follow_redirects=True) as client: r = client.head( first.link, headers={ "User-Agent": DEFAULT_UA, "Referer": first.referer or page_url, }, ) head_status = r.status_code if head_status >= 400: r = client.get( first.link, headers={ "User-Agent": DEFAULT_UA, "Referer": first.referer or page_url, "Range": "bytes=0-1024", }, ) head_status = r.status_code except Exception as e: return "WARN", f"{len(directs)}d, HEAD err {type(e).__name__}" if head_status in (200, 206): return "OK", f"{len(directs)}d/{len(hosters)}h, HEAD {head_status} {first.quality or '?'}" return "WARN", f"{len(directs)}d, HEAD {head_status}" def _check_canary_extractor_multi( origin: str, sitetag: str, page_urls: list[str], report: Report ) -> None: """Multi-sample: próbuje N URL-i, raportuje BEST result. Dampens single-URL flakiness (np. hqporner: 1 strona dead 404, reszta alive). Failure tylko gdy WSZYSTKIE N samples nie dają OK.""" if not page_urls: report.extractors.append(CheckResult( f"extract: {origin}", "SKIP", "no alive playback_sources", 0.0, )) return t0 = time.time() results = [] for url in page_urls: results.append(_try_one_canary(sitetag, url)) elapsed = time.time() - t0 # Best wins — OK > WARN > FAIL. Detail z pierwszej OK (lub agreg gdy wszystkie fail). statuses = [r[0] for r in results] if "OK" in statuses: idx = statuses.index("OK") detail = f"[{statuses.count('OK')}/{len(results)} OK] {results[idx][1]}" report.extractors.append(CheckResult(f"extract: {origin}", "OK", detail, elapsed)) elif "WARN" in statuses: # Skomprymuj detale wszystkich WARN-ów warn_details = [r[1] for r in results if r[0] == "WARN"] detail = f"[0/{len(results)} OK] {warn_details[0]}" report.extractors.append(CheckResult(f"extract: {origin}", "WARN", detail, elapsed)) else: fail_details = [r[1] for r in results if r[0] == "FAIL"] detail = f"[0/{len(results)} OK] {fail_details[0]}" report.extractors.append(CheckResult(f"extract: {origin}", "FAIL", detail, elapsed)) def _run_canary_extractors(report: Report, *, samples_per_origin: int) -> None: samples = _pick_canary_samples(per_origin=samples_per_origin) log.info("canary: %d origins, %d samples/origin", len(samples), samples_per_origin) for (origin, sitetag), urls in samples.items(): _check_canary_extractor_multi(origin, sitetag, urls, report) # ---------- 3. THUMBNAIL HEALTH ---------- def _check_thumbnails(report: Report, *, per_origin: int = 3) -> None: """Random sample N thumbnail URLs per origin + HEAD-check. Wysyła Referer=`/` z page_url'a — hotlink-protected CDN-y (58img.top dla mypornerleak, trafficdeposit dla sxyprn, fastporndelivery dla hqporner) zwracają 403 bez właściwego Refererera, ale 200 z nim. Symulujemy realny request mobile WebView.""" t0 = time.time() try: with session_scope() as s: origins = s.execute( select(PlaybackSource.origin) .where(PlaybackSource.thumbnail_url.is_not(None)) .where(PlaybackSource.dead_at.is_(None)) .group_by(PlaybackSource.origin) ).scalars().all() cutoff = datetime.now(UTC) - timedelta(days=30) per_origin_rows: dict[str, list[tuple[str, str]]] = {} for origin in origins: rows = s.execute( select(PlaybackSource.thumbnail_url, PlaybackSource.page_url) .where(PlaybackSource.origin == origin) .where(PlaybackSource.thumbnail_url.is_not(None)) .where(PlaybackSource.dead_at.is_(None)) .where(PlaybackSource.last_seen_at >= cutoff) .order_by(func.random()) .limit(per_origin) ).all() per_origin_rows[origin] = [(t, p) for t, p in rows if t] except Exception as e: report.thumbs.append(CheckResult( "thumbnails query", "FAIL", f"{type(e).__name__}: {e}", time.time() - t0 )) return from urllib.parse import urlparse with httpx.Client(timeout=10, follow_redirects=True) as client: for origin, rows in per_origin_rows.items(): t1 = time.time() fail_n = 0 for thumb_url, page_url in rows: # Referer = origin tube page'a (host root) — hotlink protection # zwykle whitelistuje cały host, nie konkretny path. page_host = urlparse(page_url).hostname if page_url else None referer = f"https://{page_host}/" if page_host else "" headers = {"User-Agent": DEFAULT_UA} if referer: headers["Referer"] = referer try: r = client.head(thumb_url, headers=headers) if r.status_code >= 400: # CDN nie wspiera HEAD — GET z Range r = client.get( thumb_url, headers={**headers, "Range": "bytes=0-128"}, ) if r.status_code >= 400: fail_n += 1 except Exception: fail_n += 1 elapsed = time.time() - t1 total = len(rows) if total == 0: continue fail_pct = fail_n * 100 / total if fail_pct > 50: report.thumbs.append(CheckResult( f"thumb: {origin}", "WARN", f"{fail_n}/{total} broken", elapsed, )) else: report.thumbs.append(CheckResult( f"thumb: {origin}", "OK", f"{total-fail_n}/{total} OK", elapsed, )) # ---------- main + sentry ---------- def _init_sentry() -> bool: """Init Sentry SDK, return True jeśli DSN jest ustawiony.""" settings = get_settings() if not settings.sentry_dsn: return False try: import sentry_sdk from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration sentry_sdk.init( dsn=settings.sentry_dsn, environment=settings.sentry_environment, traces_sample_rate=0.0, # smoke ma własne timingi, nie potrzebujemy spanów integrations=[SqlalchemyIntegration()], release="goon-smoke@0.1.0", ) return True except Exception as e: log.warning("sentry init failed: %s", e) return False def _send_sentry(report: Report, sentry_enabled: bool) -> None: if not sentry_enabled: return try: import sentry_sdk failed = report.failed warned = report.warned if failed: level = "error" tag = "fail" elif warned: level = "warning" tag = "warn" else: level = "info" tag = "ok" lines = [ f"Smoke test {tag.upper()} — {report.summary_line()}", f"started_at={report.started_at.isoformat()}", ] if failed: lines.append("\nFAIL:") for c in failed: lines.append(f" - {c.name}: {c.detail}") if warned: lines.append("\nWARN:") for c in warned: lines.append(f" - {c.name}: {c.detail}") with sentry_sdk.new_scope() as scope: scope.set_tag("smoke_test", tag) scope.set_tag("smoke_fail_count", len(failed)) scope.set_tag("smoke_warn_count", len(warned)) sentry_sdk.capture_message("\n".join(lines), level=level) log.info("sentry message sent (level=%s)", level) except Exception as e: log.warning("sentry send failed: %s\n%s", e, traceback.format_exc()) def _print_report(report: Report) -> None: elapsed_total = (datetime.now(UTC) - report.started_at).total_seconds() print() print("=" * 80) print(f"Goon smoke test — {report.started_at.isoformat()}") print(f" {report.summary_line()} elapsed={elapsed_total:.1f}s") print("=" * 80) print("\n[DB metrics]") for c in report.db_metrics: print(c.line()) print("\n[Canary extractors]") for c in report.extractors: print(c.line()) print("\n[Thumbnails]") for c in report.thumbs: print(c.line()) print() def main() -> int: ap = argparse.ArgumentParser() ap.add_argument( "--samples-per-origin", type=int, default=3, help="Ile losowych PlaybackSource testować per origin (default 3). " "Multi-sample żeby pojedynczy dead URL nie falszował alarmu — canary " "considered failed gdy WSZYSTKIE N samples fail.", ) ap.add_argument( "--thumbs-per-origin", type=int, default=3, help="Ile thumbnail URLs sprawdzać per origin (default 3)", ) ap.add_argument( "--skip-extractors", action="store_true", help="Pomiń canary extractor testy (np. dla quick DB-only health)", ) ap.add_argument( "--skip-thumbs", action="store_true", help="Pomiń HEAD-check thumbnails", ) ap.add_argument( "--strict", action="store_true", help="Exit 1 gdy any FAIL/WARN (default: zawsze 0 żeby timer się nie psuł)", ) args = ap.parse_args() sentry_enabled = _init_sentry() log.info("sentry_enabled=%s", sentry_enabled) report = Report() # 1. DB metrics _check_new_scenes_per_source(report) _check_ingest_runs(report) _check_dead_playbacks(report) _check_bug_reports(report) _check_coverage(report) # 2. Canary extractors if not args.skip_extractors: _run_canary_extractors(report, samples_per_origin=args.samples_per_origin) # 3. Thumbnails if not args.skip_thumbs: _check_thumbnails(report, per_origin=args.thumbs_per_origin) _print_report(report) _send_sentry(report, sentry_enabled) if args.strict and (report.failed or report.warned): return 1 return 0 if __name__ == "__main__": sys.exit(main())