Resolver/perf: - find_by_phash_within: nearest match via Postgres bit_count over bit(64) XOR instead of Python scan of all phash fingerprints (~20x faster per scene; unblocks long delta runs that were killed mid-run before since advanced). Scheduler/reliability: - reap ingest_runs stuck in 'running' on worker startup (killed_by_restart). - smoke_test: per-source ingest health, stuck-run and browse-freshness checks -> Sentry; exclude killed_by_restart from the failed-run alarm. Tags (ingest with tags + fill blanks): - wire infer_tag_slugs into normalize_scene so tube scenes get title-inferred tags (was dead code); union with connector tags. - scripts/backfill_inferred_tags.py: keyset/batched/idempotent backfill for existing tagless scenes (playable tag coverage 16% -> ~52%). Clip-store: - skip ManyVids/IWantClips/Clips4Sale/... from canonical sources at ingest (GOON_SKIP_CLIP_STORE, default on) — permanent orphans, ~56% of canonical ingest, never have a free-tube playback source. Browse tubes: - enable fullmovies + hdporn.gg: studio parsed from title prefix instead of the /networks/ sidebar (which always yielded the first listed network); drop phash compute (pilot: 0% canonical hit within Hamming 5 — auto-screenshots), matching relies on title/performer/duration. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
788 lines
30 KiB
Python
788 lines
30 KiB
Python
"""Daily smoke test — uruchamiany przez systemd timer (04:00 UTC).
|
|
|
|
Sprawdza w jednym przebiegu:
|
|
|
|
1. **DB metrics** (cheap, ~1s)
|
|
- Nowe sceny w ostatnich 24h per source (TPDB/StashDB/direct scrapery)
|
|
- Drop vs 7-day rolling average (alarm gdy któryś source spadł >50%)
|
|
- Failed/partial ingest_runs w ostatnich 24h
|
|
- Liczba playback_sources oznaczonych dead w ostatnich 24h
|
|
- Liczba nowych bug_reports w ostatnich 24h
|
|
- Coverage: % scen z thumbnail / z duration / z stream
|
|
|
|
2. **Canary extractor tests** (heavier, ~30-60s)
|
|
- Per tube origin: 1 alive PlaybackSource → try_extract() → mierz timing
|
|
- HEAD-check pierwszy stream_url z headers (Referer + UA)
|
|
- Alarm gdy tube failuje (TubePageError 5xx, exception, brak StreamSource)
|
|
|
|
3. **Thumbnail health** (~10s)
|
|
- 3 random `PlaybackSource.thumbnail_url` per origin — HEAD check
|
|
- Alarm gdy >50% miniaturek z origin'a zwraca non-2xx
|
|
|
|
Reporting:
|
|
- stdout: czytelna tabela (-> systemd journal -> `journalctl -u goon-smoke`)
|
|
- Sentry: `capture_message(level=warning)` na agregowany raport gdy są failure
|
|
(tag `smoke_test=fail`); `level=info` gdy wszystko zielone (tag=ok). DSN ten
|
|
sam co api/worker (env `SENTRY_DSN`).
|
|
- Exit code: 0 zawsze (smoke test nie powinien killować timera) — ale gdy
|
|
`--strict` to non-zero przy any failure.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import random
|
|
import sys
|
|
import time
|
|
import traceback
|
|
from dataclasses import dataclass, field
|
|
from datetime import UTC, datetime, timedelta
|
|
|
|
import httpx
|
|
from sqlalchemy import func, select, text
|
|
|
|
from app.config import get_settings
|
|
from app.db import session_scope
|
|
from app.extractors import HosterDead, TubePageError, try_extract
|
|
from app.models.bug_report import BugReport
|
|
from app.models.ingest_run import IngestRun, IngestStatus
|
|
from app.models.playback_source import PlaybackSource
|
|
from app.models.scene import Scene, SceneExternalRef
|
|
from app.models.source import Source
|
|
|
|
logging.basicConfig(level=logging.WARNING, format="%(levelname)s %(name)s: %(message)s")
|
|
log = logging.getLogger("smoke")
|
|
|
|
DEFAULT_UA = (
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
|
|
"(KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36"
|
|
)
|
|
|
|
|
|
# ---------- result aggregation ----------
|
|
|
|
|
|
@dataclass
|
|
class CheckResult:
|
|
name: str
|
|
status: str # OK / WARN / FAIL / SKIP
|
|
detail: str = ""
|
|
elapsed: float = 0.0
|
|
|
|
@property
|
|
def emoji(self) -> str:
|
|
return {"OK": "+", "WARN": "!", "FAIL": "X", "SKIP": "-"}.get(self.status, "?")
|
|
|
|
def line(self) -> str:
|
|
return f" [{self.emoji}] {self.name:38s} {self.status:5s} {self.elapsed:5.1f}s {self.detail}"
|
|
|
|
|
|
@dataclass
|
|
class Report:
|
|
db_metrics: list[CheckResult] = field(default_factory=list)
|
|
extractors: list[CheckResult] = field(default_factory=list)
|
|
thumbs: list[CheckResult] = field(default_factory=list)
|
|
started_at: datetime = field(default_factory=lambda: datetime.now(UTC))
|
|
|
|
@property
|
|
def all_checks(self) -> list[CheckResult]:
|
|
return self.db_metrics + self.extractors + self.thumbs
|
|
|
|
@property
|
|
def failed(self) -> list[CheckResult]:
|
|
return [c for c in self.all_checks if c.status == "FAIL"]
|
|
|
|
@property
|
|
def warned(self) -> list[CheckResult]:
|
|
return [c for c in self.all_checks if c.status == "WARN"]
|
|
|
|
def summary_line(self) -> str:
|
|
total = len(self.all_checks)
|
|
ok = sum(1 for c in self.all_checks if c.status == "OK")
|
|
warn = len(self.warned)
|
|
fail = len(self.failed)
|
|
skip = sum(1 for c in self.all_checks if c.status == "SKIP")
|
|
return f"checks={total} ok={ok} warn={warn} fail={fail} skip={skip}"
|
|
|
|
|
|
# ---------- 1. DB METRICS ----------
|
|
|
|
|
|
def _check_new_scenes_per_source(report: Report) -> None:
|
|
"""Liczy nowe scene_external_refs per source w ostatnich 24h.
|
|
Porównuje do 7-day average i alarmuje gdy któryś tube ma drop >50%
|
|
(lub 0 nowych przy avg >5/dzień)."""
|
|
t0 = time.time()
|
|
try:
|
|
with session_scope() as s:
|
|
now = datetime.now(UTC)
|
|
past_24h = now - timedelta(hours=24)
|
|
past_7d = now - timedelta(days=7)
|
|
|
|
# 24h count per source
|
|
rows_24h = s.execute(
|
|
select(Source.name, func.count(SceneExternalRef.scene_id))
|
|
.join(SceneExternalRef, SceneExternalRef.source_id == Source.id)
|
|
.where(SceneExternalRef.first_seen >= past_24h)
|
|
.group_by(Source.name)
|
|
).all()
|
|
map_24h = {name: cnt for name, cnt in rows_24h}
|
|
|
|
# 7d avg per source (count / 7)
|
|
rows_7d = s.execute(
|
|
select(Source.name, func.count(SceneExternalRef.scene_id))
|
|
.join(SceneExternalRef, SceneExternalRef.source_id == Source.id)
|
|
.where(SceneExternalRef.first_seen >= past_7d)
|
|
.group_by(Source.name)
|
|
).all()
|
|
avg_per_day = {name: cnt / 7.0 for name, cnt in rows_7d}
|
|
|
|
elapsed = time.time() - t0
|
|
total_24h = sum(map_24h.values())
|
|
per_src = []
|
|
warns = []
|
|
for name in sorted(set(map_24h) | set(avg_per_day)):
|
|
n24 = map_24h.get(name, 0)
|
|
avg = avg_per_day.get(name, 0.0)
|
|
per_src.append(f"{name}={n24}")
|
|
# Alarm: avg >= 5/d ale 24h <= 50% avg
|
|
if avg >= 5.0 and n24 < avg * 0.5:
|
|
warns.append(f"{name} 24h={n24} 7d-avg={avg:.1f}")
|
|
|
|
if warns:
|
|
report.db_metrics.append(CheckResult(
|
|
"new scenes / source (24h)",
|
|
"WARN",
|
|
f"total={total_24h}, drops: {'; '.join(warns)}",
|
|
elapsed,
|
|
))
|
|
else:
|
|
report.db_metrics.append(CheckResult(
|
|
"new scenes / source (24h)",
|
|
"OK",
|
|
f"total={total_24h} ({', '.join(per_src) or 'no sources'})",
|
|
elapsed,
|
|
))
|
|
except Exception as e:
|
|
report.db_metrics.append(CheckResult(
|
|
"new scenes / source (24h)",
|
|
"FAIL",
|
|
f"{type(e).__name__}: {e}",
|
|
time.time() - t0,
|
|
))
|
|
|
|
|
|
def _check_ingest_runs(report: Report) -> None:
|
|
"""Failed/partial ingest_runs w ostatnich 24h. >0 = WARN."""
|
|
t0 = time.time()
|
|
try:
|
|
with session_scope() as s:
|
|
past_24h = datetime.now(UTC) - timedelta(hours=24)
|
|
rows = s.execute(
|
|
select(IngestRun.status, func.count(IngestRun.id))
|
|
.where(IngestRun.started_at >= past_24h)
|
|
.group_by(IngestRun.status)
|
|
).all()
|
|
counts = {str(status.value if hasattr(status, "value") else status): cnt for status, cnt in rows}
|
|
# killed_by_restart to NIE awaria connectora — to worker ubity mid-run
|
|
# (deploy/OOM). Liczony osobno (stuck-run check go pokrywa), żeby nie
|
|
# odpalać WARN przy każdym deployu.
|
|
killed = s.execute(
|
|
select(func.count(IngestRun.id))
|
|
.where(IngestRun.started_at >= past_24h)
|
|
.where(IngestRun.status == IngestStatus.failed)
|
|
.where(IngestRun.errors["message"].astext == "killed_by_restart")
|
|
).scalar() or 0
|
|
|
|
failed = counts.get("failed", 0)
|
|
real_failed = failed - killed
|
|
partial = counts.get("partial", 0)
|
|
ok = counts.get("success", 0)
|
|
running = counts.get("running", 0)
|
|
detail = f"success={ok} partial={partial} failed={real_failed} killed={killed} running={running}"
|
|
|
|
if real_failed > 0 or partial > 3:
|
|
report.db_metrics.append(CheckResult(
|
|
"ingest runs (24h)", "WARN", detail, time.time() - t0
|
|
))
|
|
else:
|
|
report.db_metrics.append(CheckResult(
|
|
"ingest runs (24h)", "OK", detail, time.time() - t0
|
|
))
|
|
except Exception as e:
|
|
report.db_metrics.append(CheckResult(
|
|
"ingest runs (24h)", "FAIL", f"{type(e).__name__}: {e}", time.time() - t0
|
|
))
|
|
|
|
|
|
def _check_dead_playbacks(report: Report) -> None:
|
|
"""Liczba PlaybackSource oznaczonych dead w ostatnich 24h.
|
|
>50 dead = WARN (hoster się popsuł na masową skalę)."""
|
|
t0 = time.time()
|
|
try:
|
|
with session_scope() as s:
|
|
past_24h = datetime.now(UTC) - timedelta(hours=24)
|
|
n_dead_24h = s.execute(
|
|
select(func.count(PlaybackSource.id))
|
|
.where(PlaybackSource.dead_at >= past_24h)
|
|
).scalar() or 0
|
|
n_alive = s.execute(
|
|
select(func.count(PlaybackSource.id))
|
|
.where(PlaybackSource.dead_at.is_(None))
|
|
).scalar() or 0
|
|
|
|
detail = f"dead_24h={n_dead_24h} alive_total={n_alive}"
|
|
if n_dead_24h > 50:
|
|
report.db_metrics.append(CheckResult(
|
|
"playback dead-marks (24h)", "WARN", detail, time.time() - t0
|
|
))
|
|
else:
|
|
report.db_metrics.append(CheckResult(
|
|
"playback dead-marks (24h)", "OK", detail, time.time() - t0
|
|
))
|
|
except Exception as e:
|
|
report.db_metrics.append(CheckResult(
|
|
"playback dead-marks (24h)", "FAIL", f"{type(e).__name__}: {e}", time.time() - t0
|
|
))
|
|
|
|
|
|
def _check_bug_reports(report: Report) -> None:
|
|
"""Nowe bug_reports w ostatnich 24h. Tylko informacyjnie (nie warn)."""
|
|
t0 = time.time()
|
|
try:
|
|
with session_scope() as s:
|
|
past_24h = datetime.now(UTC) - timedelta(hours=24)
|
|
n = s.execute(
|
|
select(func.count(BugReport.id))
|
|
.where(BugReport.created_at >= past_24h)
|
|
).scalar() or 0
|
|
report.db_metrics.append(CheckResult(
|
|
"new bug reports (24h)", "OK", f"count={n}", time.time() - t0
|
|
))
|
|
except Exception as e:
|
|
report.db_metrics.append(CheckResult(
|
|
"new bug reports (24h)", "FAIL", f"{type(e).__name__}: {e}", time.time() - t0
|
|
))
|
|
|
|
|
|
def _check_coverage(report: Report) -> None:
|
|
"""% scen mających thumbnail / duration / stream URL przynajmniej w jednym
|
|
PlaybackSource. <70% = WARN."""
|
|
t0 = time.time()
|
|
try:
|
|
with session_scope() as s:
|
|
total_scenes = s.execute(select(func.count(Scene.id))).scalar() or 1
|
|
with_thumb = s.execute(
|
|
select(func.count(func.distinct(PlaybackSource.scene_id)))
|
|
.where(PlaybackSource.thumbnail_url.is_not(None))
|
|
).scalar() or 0
|
|
with_duration = s.execute(
|
|
select(func.count(func.distinct(PlaybackSource.scene_id)))
|
|
.where(PlaybackSource.duration_sec.is_not(None))
|
|
).scalar() or 0
|
|
with_stream = s.execute(
|
|
select(func.count(func.distinct(PlaybackSource.scene_id)))
|
|
.where(PlaybackSource.stream_url.is_not(None))
|
|
.where(PlaybackSource.dead_at.is_(None))
|
|
).scalar() or 0
|
|
|
|
pct_thumb = with_thumb * 100 / total_scenes
|
|
pct_dur = with_duration * 100 / total_scenes
|
|
pct_stream = with_stream * 100 / total_scenes
|
|
detail = f"scenes={total_scenes} thumb={pct_thumb:.0f}% dur={pct_dur:.0f}% stream={pct_stream:.0f}%"
|
|
|
|
# Coverage progi konserwatywne — większość direct-scrap scen ma URL ale
|
|
# nie ma stream cache'a (resolve on-demand).
|
|
if pct_thumb < 50:
|
|
report.db_metrics.append(CheckResult(
|
|
"scene coverage", "WARN", detail, time.time() - t0
|
|
))
|
|
else:
|
|
report.db_metrics.append(CheckResult(
|
|
"scene coverage", "OK", detail, time.time() - t0
|
|
))
|
|
except Exception as e:
|
|
report.db_metrics.append(CheckResult(
|
|
"scene coverage", "FAIL", f"{type(e).__name__}: {e}", time.time() - t0
|
|
))
|
|
|
|
|
|
def _check_ingest_per_source(report: Report) -> None:
|
|
"""Per-source health (7d). Agregat `_check_ingest_runs` topił sygnał: stashdb
|
|
2026-06-01 miał 24 failed / 57 runów i 0 sukcesów przez ~7 dni (delta nigdy
|
|
nie szła naprzód, bo run nie kończył się przed restartem workera), a całość
|
|
i tak pokazywała trochę zielonego. Tu alarmujemy PER źródło:
|
|
- runs≥4 i 0 sukcesów → źródło działa, ale nigdy nie kończy (twardy WARN)
|
|
- ostatni sukces > 24h przy aktywnych runach → delta się zatrzymała
|
|
"""
|
|
t0 = time.time()
|
|
try:
|
|
with session_scope() as s:
|
|
now = datetime.now(UTC)
|
|
past_7d = now - timedelta(days=7)
|
|
rows = s.execute(
|
|
select(
|
|
Source.name,
|
|
func.count(IngestRun.id),
|
|
func.count(IngestRun.id).filter(IngestRun.status == IngestStatus.success),
|
|
func.max(IngestRun.finished_at).filter(IngestRun.status == IngestStatus.success),
|
|
)
|
|
.join(IngestRun, IngestRun.source_id == Source.id)
|
|
.where(IngestRun.started_at >= past_7d)
|
|
.group_by(Source.name)
|
|
).all()
|
|
|
|
now = datetime.now(UTC)
|
|
warns: list[str] = []
|
|
details: list[str] = []
|
|
for name, runs, ok, last_ok in rows:
|
|
details.append(f"{name}:{ok}/{runs}")
|
|
if runs >= 4 and ok == 0:
|
|
warns.append(f"{name} 0 success/{runs} runs")
|
|
elif last_ok is not None and (now - last_ok).total_seconds() > 24 * 3600 and runs >= 2:
|
|
warns.append(f"{name} last-ok {(now - last_ok).total_seconds() / 3600:.0f}h ago")
|
|
status = "WARN" if warns else "OK"
|
|
detail = "; ".join(warns) if warns else (" ".join(details) or "no runs")
|
|
report.db_metrics.append(CheckResult("ingest per-source (7d)", status, detail, time.time() - t0))
|
|
except Exception as e:
|
|
report.db_metrics.append(CheckResult(
|
|
"ingest per-source (7d)", "FAIL", f"{type(e).__name__}: {e}", time.time() - t0
|
|
))
|
|
|
|
|
|
def _check_stuck_runs(report: Report) -> None:
|
|
"""Runy wiszące w 'running' > 2h. Reaper czyści je na restarcie workera, ale
|
|
hang MIĘDZY restartami (connector bez timeoutu) zostanie tu złapany."""
|
|
t0 = time.time()
|
|
try:
|
|
with session_scope() as s:
|
|
cutoff = datetime.now(UTC) - timedelta(hours=2)
|
|
n = s.execute(
|
|
select(func.count(IngestRun.id)).where(
|
|
IngestRun.status == IngestStatus.running,
|
|
IngestRun.started_at < cutoff,
|
|
)
|
|
).scalar() or 0
|
|
report.db_metrics.append(CheckResult(
|
|
"stuck ingest runs (>2h)", "WARN" if n else "OK", f"count={n}", time.time() - t0
|
|
))
|
|
except Exception as e:
|
|
report.db_metrics.append(CheckResult(
|
|
"stuck ingest runs (>2h)", "FAIL", f"{type(e).__name__}: {e}", time.time() - t0
|
|
))
|
|
|
|
|
|
def _check_browse_freshness(report: Report) -> None:
|
|
"""Cicha śmierć browse-scrapera: gdy HTML tube'a się zmieni, `_extract_scene_urls`
|
|
zwraca [] → run kończy się SUCCESS z 0 scen, bez wyjątku, bez alarmu (bug-reporty
|
|
93d3c485 / 2fbf1c73 wykryte przez userów, nie monitoring). Browse odpala codziennie
|
|
i RE-touchuje latest sceny, więc max(last_seen_at) per sitetag musi być świeży.
|
|
Staleness > 36h dla aktywnego browse sitetagu = scraper przestał działać."""
|
|
t0 = time.time()
|
|
try:
|
|
from app.connectors.direct_scrapers import ALL_BROWSE_SCRAPERS
|
|
|
|
browse_sitetags: set[str] = set()
|
|
for cls in ALL_BROWSE_SCRAPERS:
|
|
try:
|
|
browse_sitetags.add(cls().sitetag)
|
|
except Exception:
|
|
pass
|
|
if not browse_sitetags:
|
|
report.db_metrics.append(CheckResult("browse scraper freshness", "SKIP", "no browse scrapers", time.time() - t0))
|
|
return
|
|
|
|
with session_scope() as s:
|
|
rows = s.execute(text(
|
|
"SELECT split_part(er.external_id, ':', 1) AS sitetag, max(er.last_seen_at) AS last_seen "
|
|
"FROM external_records er JOIN sources s ON s.id = er.source_id "
|
|
"WHERE s.kind = 'scraper' AND er.entity_kind = 'scene' "
|
|
"GROUP BY 1"
|
|
)).all()
|
|
seen = {st: ls for st, ls in rows}
|
|
|
|
now = datetime.now(UTC)
|
|
warns: list[str] = []
|
|
details: list[str] = []
|
|
for st in sorted(browse_sitetags):
|
|
ls = seen.get(st)
|
|
if ls is None:
|
|
warns.append(f"{st} never")
|
|
continue
|
|
age_h = (now - ls).total_seconds() / 3600
|
|
details.append(f"{st}:{age_h:.0f}h")
|
|
if age_h > 36:
|
|
warns.append(f"{st} stale {age_h:.0f}h")
|
|
status = "WARN" if warns else "OK"
|
|
detail = "; ".join(warns) if warns else (" ".join(details) or "no data")
|
|
report.db_metrics.append(CheckResult("browse scraper freshness", status, detail, time.time() - t0))
|
|
except Exception as e:
|
|
report.db_metrics.append(CheckResult(
|
|
"browse scraper freshness", "FAIL", f"{type(e).__name__}: {e}", time.time() - t0
|
|
))
|
|
|
|
|
|
# ---------- 2. CANARY EXTRACTORS ----------
|
|
|
|
|
|
def _pick_canary_samples(per_origin: int = 3) -> dict[tuple[str, str], list[str]]:
|
|
"""Zwraca dict (origin, sitetag) → [page_url, ...] — N losowych alive
|
|
PlaybackSource per origin. Multi-sample żeby pojedynczy dead URL nie
|
|
falszował alarmu (canary considered failed gdy WSZYSTKIE N samples fail).
|
|
|
|
Filtr last_seen >= 30d: worker scrape loop aktualizuje last_seen_at gdy
|
|
re-scrapuje page'a. Stare URL-e (np. hqporner sceny sprzed kilku miesięcy)
|
|
mogą być 404 nawet bez dead_at (bo nikt nie kliknął Watch). Sampling tylko
|
|
z "świeżych" eliminuje false positives — chcemy testować że *aktualnie
|
|
aktywne* sceny działają."""
|
|
out: dict[tuple[str, str], list[str]] = {}
|
|
cutoff = datetime.now(UTC) - timedelta(days=30)
|
|
with session_scope() as s:
|
|
origins = s.execute(
|
|
select(PlaybackSource.origin)
|
|
.where(PlaybackSource.dead_at.is_(None))
|
|
.where(
|
|
(PlaybackSource.origin.like("pornapp:%"))
|
|
| (PlaybackSource.origin.like("tube:%"))
|
|
)
|
|
.group_by(PlaybackSource.origin)
|
|
).scalars().all()
|
|
|
|
for origin in origins:
|
|
rows = s.execute(
|
|
select(PlaybackSource.page_url)
|
|
.where(PlaybackSource.origin == origin)
|
|
.where(PlaybackSource.dead_at.is_(None))
|
|
.where(PlaybackSource.last_seen_at >= cutoff)
|
|
.order_by(func.random())
|
|
.limit(per_origin)
|
|
).scalars().all()
|
|
sitetag = origin.split(":", 1)[1]
|
|
out[(origin, sitetag)] = [u for u in rows if u]
|
|
return out
|
|
|
|
|
|
def _try_one_canary(sitetag: str, page_url: str) -> tuple[str, str]:
|
|
"""Pojedyncza próba extract + HEAD-check. Zwraca (status, detail).
|
|
Status: OK / WARN / FAIL. Caller agreguje N prób per origin."""
|
|
try:
|
|
sources = try_extract(sitetag, page_url)
|
|
except HosterDead as e:
|
|
return "WARN", f"HosterDead: {str(e)[:60]}"
|
|
except TubePageError as e:
|
|
status = "WARN" if e.status_code in (404, 410) else "FAIL"
|
|
return status, f"TubePageError HTTP {e.status_code}"
|
|
except Exception as e:
|
|
return "FAIL", f"{type(e).__name__}: {str(e)[:60]}"
|
|
|
|
if not sources:
|
|
return "WARN", "no sources (extractor None)"
|
|
|
|
directs = [s for s in sources if s.link and s.type != "hoster"]
|
|
hosters = [s for s in sources if s.type == "hoster"]
|
|
|
|
if not directs:
|
|
return "OK", f"{len(hosters)} hoster-only (WebView fallback)"
|
|
|
|
first = directs[0]
|
|
try:
|
|
with httpx.Client(timeout=15, follow_redirects=True) as client:
|
|
r = client.head(
|
|
first.link,
|
|
headers={
|
|
"User-Agent": DEFAULT_UA,
|
|
"Referer": first.referer or page_url,
|
|
},
|
|
)
|
|
head_status = r.status_code
|
|
if head_status >= 400:
|
|
r = client.get(
|
|
first.link,
|
|
headers={
|
|
"User-Agent": DEFAULT_UA,
|
|
"Referer": first.referer or page_url,
|
|
"Range": "bytes=0-1024",
|
|
},
|
|
)
|
|
head_status = r.status_code
|
|
except Exception as e:
|
|
return "WARN", f"{len(directs)}d, HEAD err {type(e).__name__}"
|
|
|
|
if head_status in (200, 206):
|
|
return "OK", f"{len(directs)}d/{len(hosters)}h, HEAD {head_status} {first.quality or '?'}"
|
|
return "WARN", f"{len(directs)}d, HEAD {head_status}"
|
|
|
|
|
|
def _check_canary_extractor_multi(
|
|
origin: str, sitetag: str, page_urls: list[str], report: Report
|
|
) -> None:
|
|
"""Multi-sample: próbuje N URL-i, raportuje BEST result. Dampens single-URL
|
|
flakiness (np. hqporner: 1 strona dead 404, reszta alive). Failure tylko gdy
|
|
WSZYSTKIE N samples nie dają OK."""
|
|
if not page_urls:
|
|
report.extractors.append(CheckResult(
|
|
f"extract: {origin}", "SKIP", "no alive playback_sources", 0.0,
|
|
))
|
|
return
|
|
|
|
t0 = time.time()
|
|
results = []
|
|
for url in page_urls:
|
|
results.append(_try_one_canary(sitetag, url))
|
|
elapsed = time.time() - t0
|
|
|
|
# Best wins — OK > WARN > FAIL. Detail z pierwszej OK (lub agreg gdy wszystkie fail).
|
|
statuses = [r[0] for r in results]
|
|
if "OK" in statuses:
|
|
idx = statuses.index("OK")
|
|
detail = f"[{statuses.count('OK')}/{len(results)} OK] {results[idx][1]}"
|
|
report.extractors.append(CheckResult(f"extract: {origin}", "OK", detail, elapsed))
|
|
elif "WARN" in statuses:
|
|
# Skomprymuj detale wszystkich WARN-ów
|
|
warn_details = [r[1] for r in results if r[0] == "WARN"]
|
|
detail = f"[0/{len(results)} OK] {warn_details[0]}"
|
|
report.extractors.append(CheckResult(f"extract: {origin}", "WARN", detail, elapsed))
|
|
else:
|
|
fail_details = [r[1] for r in results if r[0] == "FAIL"]
|
|
detail = f"[0/{len(results)} OK] {fail_details[0]}"
|
|
report.extractors.append(CheckResult(f"extract: {origin}", "FAIL", detail, elapsed))
|
|
|
|
|
|
def _run_canary_extractors(report: Report, *, samples_per_origin: int) -> None:
|
|
samples = _pick_canary_samples(per_origin=samples_per_origin)
|
|
log.info("canary: %d origins, %d samples/origin", len(samples), samples_per_origin)
|
|
for (origin, sitetag), urls in samples.items():
|
|
_check_canary_extractor_multi(origin, sitetag, urls, report)
|
|
|
|
|
|
# ---------- 3. THUMBNAIL HEALTH ----------
|
|
|
|
|
|
def _check_thumbnails(report: Report, *, per_origin: int = 3) -> None:
|
|
"""Random sample N thumbnail URLs per origin + HEAD-check.
|
|
|
|
Wysyła Referer=`<host>/` z page_url'a — hotlink-protected CDN-y (58img.top
|
|
dla mypornerleak, trafficdeposit dla sxyprn, fastporndelivery dla hqporner)
|
|
zwracają 403 bez właściwego Refererera, ale 200 z nim. Symulujemy realny
|
|
request mobile WebView."""
|
|
t0 = time.time()
|
|
try:
|
|
with session_scope() as s:
|
|
origins = s.execute(
|
|
select(PlaybackSource.origin)
|
|
.where(PlaybackSource.thumbnail_url.is_not(None))
|
|
.where(PlaybackSource.dead_at.is_(None))
|
|
.group_by(PlaybackSource.origin)
|
|
).scalars().all()
|
|
|
|
cutoff = datetime.now(UTC) - timedelta(days=30)
|
|
per_origin_rows: dict[str, list[tuple[str, str]]] = {}
|
|
for origin in origins:
|
|
rows = s.execute(
|
|
select(PlaybackSource.thumbnail_url, PlaybackSource.page_url)
|
|
.where(PlaybackSource.origin == origin)
|
|
.where(PlaybackSource.thumbnail_url.is_not(None))
|
|
.where(PlaybackSource.dead_at.is_(None))
|
|
.where(PlaybackSource.last_seen_at >= cutoff)
|
|
.order_by(func.random())
|
|
.limit(per_origin)
|
|
).all()
|
|
per_origin_rows[origin] = [(t, p) for t, p in rows if t]
|
|
except Exception as e:
|
|
report.thumbs.append(CheckResult(
|
|
"thumbnails query", "FAIL", f"{type(e).__name__}: {e}", time.time() - t0
|
|
))
|
|
return
|
|
|
|
from urllib.parse import urlparse
|
|
|
|
with httpx.Client(timeout=10, follow_redirects=True) as client:
|
|
for origin, rows in per_origin_rows.items():
|
|
t1 = time.time()
|
|
fail_n = 0
|
|
for thumb_url, page_url in rows:
|
|
# Referer = origin tube page'a (host root) — hotlink protection
|
|
# zwykle whitelistuje cały host, nie konkretny path.
|
|
page_host = urlparse(page_url).hostname if page_url else None
|
|
referer = f"https://{page_host}/" if page_host else ""
|
|
headers = {"User-Agent": DEFAULT_UA}
|
|
if referer:
|
|
headers["Referer"] = referer
|
|
try:
|
|
r = client.head(thumb_url, headers=headers)
|
|
if r.status_code >= 400:
|
|
# CDN nie wspiera HEAD — GET z Range
|
|
r = client.get(
|
|
thumb_url,
|
|
headers={**headers, "Range": "bytes=0-128"},
|
|
)
|
|
if r.status_code >= 400:
|
|
fail_n += 1
|
|
except Exception:
|
|
fail_n += 1
|
|
elapsed = time.time() - t1
|
|
total = len(rows)
|
|
if total == 0:
|
|
continue
|
|
fail_pct = fail_n * 100 / total
|
|
if fail_pct > 50:
|
|
report.thumbs.append(CheckResult(
|
|
f"thumb: {origin}", "WARN",
|
|
f"{fail_n}/{total} broken",
|
|
elapsed,
|
|
))
|
|
else:
|
|
report.thumbs.append(CheckResult(
|
|
f"thumb: {origin}", "OK",
|
|
f"{total-fail_n}/{total} OK",
|
|
elapsed,
|
|
))
|
|
|
|
|
|
# ---------- main + sentry ----------
|
|
|
|
|
|
def _init_sentry() -> bool:
|
|
"""Init Sentry SDK, return True jeśli DSN jest ustawiony."""
|
|
settings = get_settings()
|
|
if not settings.sentry_dsn:
|
|
return False
|
|
try:
|
|
import sentry_sdk
|
|
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
|
|
|
|
sentry_sdk.init(
|
|
dsn=settings.sentry_dsn,
|
|
environment=settings.sentry_environment,
|
|
traces_sample_rate=0.0, # smoke ma własne timingi, nie potrzebujemy spanów
|
|
integrations=[SqlalchemyIntegration()],
|
|
release="goon-smoke@0.1.0",
|
|
)
|
|
return True
|
|
except Exception as e:
|
|
log.warning("sentry init failed: %s", e)
|
|
return False
|
|
|
|
|
|
def _send_sentry(report: Report, sentry_enabled: bool) -> None:
|
|
if not sentry_enabled:
|
|
return
|
|
try:
|
|
import sentry_sdk
|
|
failed = report.failed
|
|
warned = report.warned
|
|
if failed:
|
|
level = "error"
|
|
tag = "fail"
|
|
elif warned:
|
|
level = "warning"
|
|
tag = "warn"
|
|
else:
|
|
level = "info"
|
|
tag = "ok"
|
|
|
|
lines = [
|
|
f"Smoke test {tag.upper()} — {report.summary_line()}",
|
|
f"started_at={report.started_at.isoformat()}",
|
|
]
|
|
if failed:
|
|
lines.append("\nFAIL:")
|
|
for c in failed:
|
|
lines.append(f" - {c.name}: {c.detail}")
|
|
if warned:
|
|
lines.append("\nWARN:")
|
|
for c in warned:
|
|
lines.append(f" - {c.name}: {c.detail}")
|
|
|
|
with sentry_sdk.new_scope() as scope:
|
|
scope.set_tag("smoke_test", tag)
|
|
scope.set_tag("smoke_fail_count", len(failed))
|
|
scope.set_tag("smoke_warn_count", len(warned))
|
|
sentry_sdk.capture_message("\n".join(lines), level=level)
|
|
log.info("sentry message sent (level=%s)", level)
|
|
except Exception as e:
|
|
log.warning("sentry send failed: %s\n%s", e, traceback.format_exc())
|
|
|
|
|
|
def _print_report(report: Report) -> None:
|
|
elapsed_total = (datetime.now(UTC) - report.started_at).total_seconds()
|
|
print()
|
|
print("=" * 80)
|
|
print(f"Goon smoke test — {report.started_at.isoformat()}")
|
|
print(f" {report.summary_line()} elapsed={elapsed_total:.1f}s")
|
|
print("=" * 80)
|
|
|
|
print("\n[DB metrics]")
|
|
for c in report.db_metrics:
|
|
print(c.line())
|
|
|
|
print("\n[Canary extractors]")
|
|
for c in report.extractors:
|
|
print(c.line())
|
|
|
|
print("\n[Thumbnails]")
|
|
for c in report.thumbs:
|
|
print(c.line())
|
|
print()
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument(
|
|
"--samples-per-origin", type=int, default=3,
|
|
help="Ile losowych PlaybackSource testować per origin (default 3). "
|
|
"Multi-sample żeby pojedynczy dead URL nie falszował alarmu — canary "
|
|
"considered failed gdy WSZYSTKIE N samples fail.",
|
|
)
|
|
ap.add_argument(
|
|
"--thumbs-per-origin", type=int, default=3,
|
|
help="Ile thumbnail URLs sprawdzać per origin (default 3)",
|
|
)
|
|
ap.add_argument(
|
|
"--skip-extractors", action="store_true",
|
|
help="Pomiń canary extractor testy (np. dla quick DB-only health)",
|
|
)
|
|
ap.add_argument(
|
|
"--skip-thumbs", action="store_true",
|
|
help="Pomiń HEAD-check thumbnails",
|
|
)
|
|
ap.add_argument(
|
|
"--strict", action="store_true",
|
|
help="Exit 1 gdy any FAIL/WARN (default: zawsze 0 żeby timer się nie psuł)",
|
|
)
|
|
args = ap.parse_args()
|
|
|
|
sentry_enabled = _init_sentry()
|
|
log.info("sentry_enabled=%s", sentry_enabled)
|
|
|
|
report = Report()
|
|
|
|
# 1. DB metrics
|
|
_check_new_scenes_per_source(report)
|
|
_check_ingest_runs(report)
|
|
_check_ingest_per_source(report)
|
|
_check_stuck_runs(report)
|
|
_check_browse_freshness(report)
|
|
_check_dead_playbacks(report)
|
|
_check_bug_reports(report)
|
|
_check_coverage(report)
|
|
|
|
# 2. Canary extractors
|
|
if not args.skip_extractors:
|
|
_run_canary_extractors(report, samples_per_origin=args.samples_per_origin)
|
|
|
|
# 3. Thumbnails
|
|
if not args.skip_thumbs:
|
|
_check_thumbnails(report, per_origin=args.thumbs_per_origin)
|
|
|
|
_print_report(report)
|
|
_send_sentry(report, sentry_enabled)
|
|
|
|
if args.strict and (report.failed or report.warned):
|
|
return 1
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|