diff --git a/app/config.py b/app/config.py index b329114..3bfa3b3 100644 --- a/app/config.py +++ b/app/config.py @@ -48,6 +48,11 @@ class Settings(BaseSettings): title_token_set_min: int = 88 date_window_days: int = 7 + # Skip ingestu clip-store (ManyVids/IWantClips/Clips4Sale/...) z canonical source — + # to permanentne orphany (free tubes nie hostują), ~56% ingestu TPDB/StashDB. + # False = wciągaj jak dawniej. Tube'y z clip-store studiem NIE są skipowane (mają playback). + skip_clip_store: bool = Field(default=True, validation_alias="GOON_SKIP_CLIP_STORE") + # APScheduler (M5). Każdy 0/None = job wyłączony. sched_tpdb_hours: int = Field(default=6, validation_alias="GOON_SCHED_TPDB_HOURS") sched_stashdb_hours: int = Field(default=6, validation_alias="GOON_SCHED_STASHDB_HOURS") diff --git a/app/connectors/direct_scrapers/__init__.py b/app/connectors/direct_scrapers/__init__.py index f88b461..33fbd3b 100644 --- a/app/connectors/direct_scrapers/__init__.py +++ b/app/connectors/direct_scrapers/__init__.py @@ -141,6 +141,8 @@ from app.connectors.direct_scrapers.porn00 import Porn00Scraper # noqa: E402 from app.connectors.direct_scrapers.porndoe import PornDoeScraper # noqa: E402 from app.connectors.direct_scrapers.pornxp import PornXPScraper # noqa: E402 from app.connectors.direct_scrapers.shyfap import ShyfapScraper # noqa: E402, F401 +from app.connectors.direct_scrapers.fullmovies import FullmoviesScraper # noqa: E402 +from app.connectors.direct_scrapers.hdporngg import HDPornGGScraper # noqa: E402 ALL_BROWSE_SCRAPERS: list[type[BaseBrowseScraper]] = [ FreshpornoScraper, @@ -161,10 +163,17 @@ ALL_BROWSE_SCRAPERS: list[type[BaseBrowseScraper]] = [ # komplet sygnałów. Phash hit-rate niski (własne crop-thumbnaile), studio + # performer + date + duration nadrabiają. PornDoeScraper, + # FullmoviesScraper + HDPornGGScraper — dołączone 2026-06-01. KVS engine (sponsor_groups + # stack, `/videos//` + `/latest-updates/`). Studio teraz z PREFIKSU tytułu + # ("Studio - Scene") — sidebar `/networks/` listował WSZYSTKIE sieci, więc pierwszy match + # zawsze Brazzers (mis-attribution, dlatego nigdy nie były włączone). Niosą paysite studio + # content (TeamSkeet/Dad Crush/Brazzers/...) z title+performer+duration → composite fuzzy. + # Nawet bez canonical match: grywalny content z inferred tagami (mission: daily tagged ingest). + FullmoviesScraper, + HDPornGGScraper, + # 4k69.com — NIE dołączony: homepage JS-rendered, brak og:/KVS markerów w surowym HTML + # (probe 2026-06-01). Wymagałby headless render — odłożony. # ShyfapScraper — wyłączony 2026-05-12 (pilot fail, 0% match — orphan factory). - # Follow-up: dorobić te tubey i sprawdzić phash distance: - # - fullmovies.xxx (channel/network/pornstars/categories, brak duration) - # - 4k69.com + hdporn.gg (klony freshporno — prawdopodobnie ten sam phash hit rate) ] __all__ = [ diff --git a/app/connectors/direct_scrapers/fullmovies.py b/app/connectors/direct_scrapers/fullmovies.py index 754f5db..e04fffe 100644 --- a/app/connectors/direct_scrapers/fullmovies.py +++ b/app/connectors/direct_scrapers/fullmovies.py @@ -21,6 +21,7 @@ from app.connectors.direct_scrapers._browse_base import ( compute_thumbnail_phash, meta_content, ) +from app.normalize.text import slugify _BASE = "https://www.fullmovies.xxx" _SCENE_URL_RE = re.compile(r'href="(https://www\.fullmovies\.xxx/videos/[a-z0-9\-]+/)"', re.IGNORECASE) @@ -68,23 +69,25 @@ class FullmoviesScraper(BaseBrowseScraper): if dur_meta and dur_meta.isdigit(): duration_sec = int(dur_meta) + # Studio z PREFIKSU tytułu ("Studio - Scene Title"), nie z sidebara /networks/. + # Sidebar listuje WSZYSTKIE sieci → `_NETWORK_LINK_RE.finditer().first()` zawsze + # zwracał pierwszą z listy (Brazzers) dla każdej sceny — mis-attribution. Tytuł + # po oczyszczeniu ma format "Studio - Opis" (np. "Fake Hostel - ..."). studio: RawStudio | None = None - for m in _NETWORK_LINK_RE.finditer(detail_html): - slug, name = m.group(1), m.group(2).strip() - if name.lower() in ("networks", ""): - continue - studio = RawStudio( - external_id=f"fullmoviesxxx:network:{slug}", - name=name, - slug=slug, - ) - break + if " - " in title: + studio_name = title.split(" - ", 1)[0].strip() + if studio_name and len(studio_name) <= 50: + studio = RawStudio( + external_id=f"fullmoviesxxx:studio:{slugify(studio_name)}", + name=studio_name, + slug=slugify(studio_name), + ) performers: list[RawPerformer] = [] seen_perf: set[str] = set() for m in _MODEL_LINK_RE.finditer(detail_html): slug, name = m.group(1), m.group(2).strip() - if slug in seen_perf or name.lower() in ("pornstars", "models"): + if not name or slug in seen_perf or name.lower() in ("pornstars", "models"): continue seen_perf.add(slug) performers.append( @@ -100,11 +103,11 @@ class FullmoviesScraper(BaseBrowseScraper): seen_tag.add(slug) tags.append(RawTag(external_id=f"fullmoviesxxx:tag:{slug}", name=name, slug=slug)) + # Phash WYŁĄCZONY (pilot 2026-06-01: 0% trafień ≤5, mediana Hamming 14 do + # canonical — auto-screenshoty img.fullmovies.xxx, nie hot-linkowane studio + # thumbnaile). Matching trzyma się na title+performer+duration (seed: 92% tagged), + # więc download thumbnaila pod phash to czysty narzut. thumbnail_url zostaje (display). fingerprints: list[RawFingerprint] = [] - if thumbnail_url: - ph = compute_thumbnail_phash(thumbnail_url, referer=_BASE + "/") - if ph: - fingerprints.append(RawFingerprint(kind="phash", value=ph)) playback_sources = [ RawPlaybackSource( diff --git a/app/connectors/direct_scrapers/hdporngg.py b/app/connectors/direct_scrapers/hdporngg.py index 9eaca8a..2edcb84 100644 --- a/app/connectors/direct_scrapers/hdporngg.py +++ b/app/connectors/direct_scrapers/hdporngg.py @@ -25,6 +25,7 @@ from app.connectors.direct_scrapers._browse_base import ( compute_thumbnail_phash, meta_content, ) +from app.normalize.text import slugify _BASE = "https://www.hdporn.gg" _SCENE_URL_RE = re.compile(r'href="(https://www\.hdporn\.gg/videos/[a-z0-9\-]+/)"', re.IGNORECASE) @@ -75,27 +76,24 @@ class HDPornGGScraper(BaseBrowseScraper): if dur_meta and dur_meta.isdigit(): duration_sec = int(dur_meta) - # Studio z /networks/. Skip nav anchors typu "Networks" / "Pornstars". + # Studio z PREFIKSU tytułu ("Studio - Scene Title"), nie z sidebara /networks/. + # Sidebar listuje WSZYSTKIE sieci → pierwszy match zawsze ten sam (Brazzers) dla + # każdej sceny. Tytuł po oczyszczeniu ma format "Studio - Opis" (np. "Dad Crush - ..."). studio: RawStudio | None = None - for m in _NETWORK_LINK_RE.finditer(detail_html): - slug, name = m.group(1), m.group(2).strip() - if name.lower() in ("networks", ""): - continue - # Pierwszy NETWORK link w body to studio sceny (nav sidebar też ma networks - # listę — bierzemy gdy `class="btn_sponsor_group"` lub po prostu pierwszy - # NIE z sidebara). hdporn.gg pokazuje btn_sponsor_group w main scene area. - studio = RawStudio( - external_id=f"hdporngg:network:{slug}", - name=name, - slug=slug, - ) - break + if " - " in title: + studio_name = title.split(" - ", 1)[0].strip() + if studio_name and len(studio_name) <= 50: + studio = RawStudio( + external_id=f"hdporngg:studio:{slugify(studio_name)}", + name=studio_name, + slug=slugify(studio_name), + ) performers: list[RawPerformer] = [] seen_perf: set[str] = set() for m in _MODEL_LINK_RE.finditer(detail_html): slug, name = m.group(1), m.group(2).strip() - if slug in seen_perf or name.lower() in ("pornstars", "models"): + if not name or slug in seen_perf or name.lower() in ("pornstars", "models"): continue seen_perf.add(slug) performers.append( @@ -113,11 +111,11 @@ class HDPornGGScraper(BaseBrowseScraper): RawTag(external_id=f"hdporngg:tag:{slug}", name=name, slug=slug) ) + # Phash WYŁĄCZONY (pilot 2026-06-01: 0% trafień ≤5, mediana Hamming 14 do + # canonical — auto-screenshoty img.hdporn.gg, nie hot-linkowane studio thumbnaile). + # Matching trzyma się na title+performer+duration (seed: 92% tagged), więc download + # thumbnaila pod phash to czysty narzut. thumbnail_url zostaje (display). fingerprints: list[RawFingerprint] = [] - if thumbnail_url: - ph = compute_thumbnail_phash(thumbnail_url, referer=_BASE + "/") - if ph: - fingerprints.append(RawFingerprint(kind="phash", value=ph)) playback_sources = [ RawPlaybackSource( diff --git a/app/ingest.py b/app/ingest.py index fcb8280..8eab289 100644 --- a/app/ingest.py +++ b/app/ingest.py @@ -18,6 +18,7 @@ from __future__ import annotations import hashlib import json import logging +import re import uuid from collections.abc import Iterable from datetime import UTC, datetime, timedelta @@ -26,6 +27,7 @@ from sqlalchemy import select from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.orm import Session +from app.config import get_settings from app.connectors.base import BaseConnector, BaseMovieConnector, RawMovie, RawScene from app.db import session_scope from app.models.external_record import EntityKind, ExternalRecord @@ -39,6 +41,31 @@ from app.resolve.scene_resolver import resolve_scene log = logging.getLogger(__name__) +# Clip-store studia (ManyVids/IWantClips/Clips4Sale/...) — content twórców z paywalla. +# Darmowe tube'y go nie hostują, więc z canonical (TPDB/StashDB) wjeżdża jako permanentny +# orphan (56% ingestu canonical, ~860/dzień, ~550k w DB). Skipujemy resolve dla tych scen +# gdy źródło jest canonical — oszczędza ~połowę resolve-time i nie zaśmieca katalogu. +# NIE skipujemy dla tube'ów: tube scena z clip-store studiem MA playback, więc jest grywalna. +_CLIP_STORE_RE = re.compile( + r"^\s*(manyvids|i ?want ?clips|clips4sale|fancentro|loyalfans|onlyfans|fansly|modelhub)\b", + re.IGNORECASE, +) + + +def is_clip_store_studio(name: str | None) -> bool: + return bool(name) and _CLIP_STORE_RE.match(name) is not None + + +def _skip_clip_store_canonical(session: Session, *, source_id: uuid.UUID, studio_name: str | None) -> bool: + """True gdy scena to clip-store content z canonical source → pomijamy resolve.""" + if not getattr(get_settings(), "skip_clip_store", True): + return False + if not is_clip_store_studio(studio_name): + return False + src = session.get(Source, source_id) + return src is not None and src.kind in (SourceKind.tpdb, SourceKind.stashdb) + + def _canonical_json(payload: dict) -> bytes: return json.dumps(payload, sort_keys=True, separators=(",", ":"), default=str).encode() @@ -214,6 +241,13 @@ def _process_scene(*, source_id: uuid.UUID, raw_scene: RawScene, counters: dict[ return norm = normalize_scene(raw_scene) + + if _skip_clip_store_canonical( + session, source_id=source_id, studio_name=norm.studio.name if norm.studio else None + ): + counters["skipped"] += 1 + return + result = resolve_scene(session, norm=norm, source_id=source_id) if result.was_created: diff --git a/app/normalize/scenes.py b/app/normalize/scenes.py index 563a193..ab99fb9 100644 --- a/app/normalize/scenes.py +++ b/app/normalize/scenes.py @@ -15,6 +15,7 @@ from app.connectors.base import ( RawStudio, RawTag, ) +from app.normalize.tag_inference import infer_tag_slugs from app.normalize.text import normalize, normalize_person, slugify @@ -22,7 +23,8 @@ from app.normalize.text import normalize, normalize_person, slugify class NormalizedTag: name: str slug: str - external_id: str | None + # Optional — `resolve_tag` keys on slug and `Tag` has no external_id column. + external_id: str | None = None @dataclass @@ -87,6 +89,26 @@ def normalize_tag(raw: RawTag) -> NormalizedTag: ) +def _merge_inferred_tags(raw_tags: list[RawTag], *, title: str) -> list[NormalizedTag]: + """Tagi z connectora + tagi wywnioskowane z tytułu (`infer_tag_slugs`). + + Tube'y (96% grywalnego katalogu) przychodzą prawie bez tagów — tylko 16% + tube-only scen ma jakikolwiek tag, vs 99% scen zmatchowanych z TPDB/StashDB. + `infer_tag_slugs` mapuje phrasy z tytułu na canonical slugi (zgrane z DB), więc + resolve_tag trafia w istniejące Tag rows. Union, nie nadpisanie: gdy scena potem + zmergeuje się z TPDB, tagi się sumują. Inference odpalamy dla KAŻDEJ sceny — + dla canonical to no-op (już mają komplet), dla tube to wypełnienie braku. + """ + tags = [normalize_tag(t) for t in raw_tags] + seen = {t.slug for t in tags} + for slug in infer_tag_slugs(title): + if slug in seen: + continue + seen.add(slug) + tags.append(NormalizedTag(name=slug.replace("-", " ").title(), slug=slug, external_id=None)) + return tags + + def normalize_studio(raw: RawStudio) -> NormalizedStudio: return NormalizedStudio( name=raw.name, @@ -160,7 +182,7 @@ def normalize_scene(raw: RawScene) -> NormalizedScene: url=raw.url, studio=normalize_studio(raw.studio) if raw.studio else None, performers=[normalize_performer(p) for p in raw.performers], - tags=[normalize_tag(t) for t in raw.tags], + tags=_merge_inferred_tags(raw.tags, title=raw.title), fingerprints=[(fp.kind, fp.value) for fp in raw.fingerprints], playback_sources=list(raw.playback_sources), cross_source_refs=dict(raw.cross_source_refs), diff --git a/app/resolve/scene_match.py b/app/resolve/scene_match.py index d8f7b1e..34261b6 100644 --- a/app/resolve/scene_match.py +++ b/app/resolve/scene_match.py @@ -4,7 +4,7 @@ from __future__ import annotations import uuid from datetime import date, timedelta -from sqlalchemy import and_, or_, select +from sqlalchemy import and_, or_, select, text from sqlalchemy.orm import Session from app.config import get_settings @@ -82,19 +82,43 @@ def find_by_phash_within( ) -> tuple[Scene, int] | None: """Path 3b: pHash w obrębie max_hamming (Hamming distance bitów hex). - Implementacja: seq scan po wszystkich phashach. Akceptowalne dla self-hosted - rzędu 10⁵ scen; przy 10⁶+ można dodać locality-sensitive index (BK-tree, MinHash). + Hamming liczony server-side: `bit_count(a # b)` na 64-bitowych bit-stringach + (`('x'||hex)::bit(64)`), ORDER BY dist LIMIT 1 → najbliższy match. Postgres robi + popcount w C nad całym zbiorem phashy (~10⁵-10⁶) w kilkadziesiąt ms zamiast + Python-loop ~6s/scenę (był bottleneck zabijający długie ingest-runy: każda scena + z phashem skanowała wszystkie 277k fingerprintów po stronie aplikacji). + + Wymaga 64-bit (16 hex) phasha — `imagehash.phash(hash_size=8)` zawsze taki jest. + Dla nietypowej długości fallback do Python-loop (rzadkie, np. legacy/uszkodzone). Zwraca (Scene, distance) dla najbliższego matcha ≤ max_hamming, albo None. """ if max_hamming is None: max_hamming = get_settings().fingerprint_hamming_max + if len(phash) == 16: + row = session.execute( + text( + "SELECT scene_id, " + "bit_count(('x'||value)::bit(64) # ('x'||:phash)::bit(64)) AS dist " + "FROM scene_fingerprints " + "WHERE kind = 'phash' AND length(value) = 16 " + "ORDER BY dist ASC LIMIT 1" + ), + {"phash": phash}, + ).first() + if row is None or row.dist > max_hamming: + return None + scene = session.get(Scene, row.scene_id) + if scene is None: + return None + return scene, int(row.dist) + + # Fallback dla phashy o nietypowej długości — Python-loop nad zgodnymi długościami. rows = session.execute( select(SceneFingerprint.scene_id, SceneFingerprint.value).where( SceneFingerprint.kind == "phash" ) ).all() - best: tuple[uuid.UUID, int] | None = None target_len = len(phash) for scene_id, value in rows: diff --git a/app/scheduler/worker.py b/app/scheduler/worker.py index acf08ea..763ce40 100644 --- a/app/scheduler/worker.py +++ b/app/scheduler/worker.py @@ -19,6 +19,7 @@ import signal import sys import time import uuid +from datetime import UTC, datetime, timedelta from app.config import get_settings from app.connectors.base import BaseConnector @@ -132,6 +133,42 @@ def run_once_strategy( raise SystemExit(f"unknown strategy: {strategy}. available={STRATEGIES}") +def reap_stuck_ingest_runs(max_age_hours: int = 2) -> int: + """Oznacza ingest_runs wiszące w status=running jako failed (killed_by_restart). + + Worker jest single-process: gdy zostanie ubity w trakcie runu (deploy, OOM, + restart obrazu), wiersz IngestRun zostaje na zawsze w `running` — `finished_at` + nigdy się nie ustawia. Bez reapowania zombie akumulują się (13 znalezionych + 2026-06-01) i fałszują metryki: `_last_successful_finished_at` ignoruje je + (OK), ale watchdog liczący stuck-runy i fail-rate widzi je jako szum. + + Reapujemy TYLKO runy starsze niż `max_age_hours` — healthy run kończy się w + sekundy/minuty (po SQL-fix phash), a movie-ingest ma własny 6-min cap, więc + cokolwiek `running` > 2h to na pewno trup po ubitym workerze. Margines chroni + równolegle odpalony ręczny `--once` (inny proces) przed omyłkowym reapem. + """ + from sqlalchemy import update + + from app.db import session_scope + from app.models.ingest_run import IngestRun, IngestStatus + + now = datetime.now(UTC) + cutoff = now - timedelta(hours=max_age_hours) + with session_scope() as session: + result = session.execute( + update(IngestRun) + .where( + IngestRun.status == IngestStatus.running, + IngestRun.started_at < cutoff, + ) + .values(status=IngestStatus.failed, finished_at=now, errors={"message": "killed_by_restart"}) + ) + reaped = result.rowcount or 0 + if reaped: + log.warning("reaped %d stuck ingest_runs (running > %dh) on startup", reaped, max_age_hours) + return reaped + + def run_forever() -> int: """APScheduler scheduled mode — odpala joby cron-like wg config (env-driven). @@ -142,6 +179,9 @@ def run_forever() -> int: """ from app.scheduler.jobs import build_scheduler # opóźniony import (apscheduler) + # Sprzątanie po poprzednim (ubitym) workerze zanim wystartujemy joby. + reap_stuck_ingest_runs() + settings = get_settings() cfg = { "tpdb_hours": settings.sched_tpdb_hours or None, diff --git a/scripts/backfill_inferred_tags.py b/scripts/backfill_inferred_tags.py new file mode 100644 index 0000000..e796a81 --- /dev/null +++ b/scripts/backfill_inferred_tags.py @@ -0,0 +1,142 @@ +"""Backfill inferred tags dla scen bez tagów. + +Forward-fix (`normalize_scene._merge_inferred_tags`) taguje tylko nowe/zmienione +sceny — istniejące tube-only sceny bez tagów (16% tag coverage, ~868k grywalnych) +nie są re-resolvowane (skip przy niezmienionym raw_hash). Ten skrypt jednorazowo +przelatuje sceny bez `scene_tags`, infer_tag_slugs(title) → INSERT SceneTag. + +Bezpieczeństwo: + - Keyset pagination po `scenes.id` (stały RAM niezależnie od rozmiaru tabeli). + - Batch commit co `--batch` scen. + - `pg_insert(...).on_conflict_do_nothing` na (scene_id, tag_id) — idempotentne, + można puścić wielokrotnie. + - `--dry-run` liczy wpływ bez zapisu. + - `source_id=NULL` (SceneTag.source_id nullable) — tag z inferencji nie ma źródła. + +Użycie: + python scripts/backfill_inferred_tags.py --dry-run # cały katalog, bez zapisu + python scripts/backfill_inferred_tags.py --only-playable --dry-run + python scripts/backfill_inferred_tags.py --batch 1000 # realny zapis + python scripts/backfill_inferred_tags.py --limit 500 # test na 500 scenach +""" +from __future__ import annotations + +import argparse +import logging +import sys +import time + +from sqlalchemy import select, text +from sqlalchemy.dialects.postgresql import insert as pg_insert + +from app.db import session_scope +from app.models.scene import SceneTag +from app.models.tag import Tag +from app.normalize.tag_inference import infer_tag_slugs + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") +log = logging.getLogger("backfill_tags") + + +def _load_tag_cache(session) -> dict[str, "uuid.UUID"]: # type: ignore[name-defined] + return {slug: tid for slug, tid in session.execute(select(Tag.slug, Tag.id)).all()} + + +def _fetch_batch(session, *, last_id, batch: int, only_playable: bool): + """Keyset page: sceny bez scene_tags, z tytułem, id > last_id.""" + where_playable = ( + "AND EXISTS (SELECT 1 FROM playback_sources p WHERE p.scene_id = sc.id AND p.dead_at IS NULL)" + if only_playable + else "" + ) + sql = text( + f""" + SELECT sc.id, sc.title + FROM scenes sc + WHERE sc.id > :last_id + AND sc.title IS NOT NULL AND sc.title <> '' + AND NOT EXISTS (SELECT 1 FROM scene_tags st WHERE st.scene_id = sc.id) + {where_playable} + ORDER BY sc.id + LIMIT :batch + """ + ) + return session.execute(sql, {"last_id": last_id, "batch": batch}).all() + + +def main() -> int: + ap = argparse.ArgumentParser() + ap.add_argument("--dry-run", action="store_true", help="Licz wpływ, nie zapisuj") + ap.add_argument("--only-playable", action="store_true", help="Tylko sceny z żywym playbackiem") + ap.add_argument("--batch", type=int, default=1000, help="Scen na transakcję (default 1000)") + ap.add_argument("--limit", type=int, default=None, help="Max scen do przetworzenia (test)") + args = ap.parse_args() + + t0 = time.time() + scanned = 0 + scenes_tagged = 0 + links_added = 0 + last_id = "00000000-0000-0000-0000-000000000000" + + with session_scope() as session: + tag_cache = _load_tag_cache(session) + log.info("tag cache: %d slugs; dry_run=%s only_playable=%s", len(tag_cache), args.dry_run, args.only_playable) + + while True: + with session_scope() as session: + rows = _fetch_batch(session, last_id=last_id, batch=args.batch, only_playable=args.only_playable) + if not rows: + break + + pending: list[dict] = [] + for scene_id, title in rows: + last_id = str(scene_id) + scanned += 1 + slugs = infer_tag_slugs(title) + if not slugs: + continue + got = 0 + for slug in slugs: + tag_id = tag_cache.get(slug) + if tag_id is None: + # Slug nie istnieje jako Tag (rzadkie — inference slugi są DB-aligned). + # Tworzymy, by nie zgubić sygnału; cache by uniknąć powtórek. + if args.dry_run: + continue + tag = Tag(name=slug.replace("-", " ").title(), slug=slug) + session.add(tag) + session.flush() + tag_id = tag.id + tag_cache[slug] = tag_id + pending.append({"scene_id": scene_id, "tag_id": tag_id, "source_id": None}) + got += 1 + if got: + scenes_tagged += 1 + links_added += got + + if pending and not args.dry_run: + session.execute( + pg_insert(SceneTag.__table__).values(pending).on_conflict_do_nothing( + index_elements=["scene_id", "tag_id"] + ) + ) + + if scanned % 20000 < args.batch: + rate = scanned / max(time.time() - t0, 0.1) + log.info("scanned=%d tagged=%d links=%d (%.0f scenes/s) last_id=%s", + scanned, scenes_tagged, links_added, rate, last_id) + + if args.limit and scanned >= args.limit: + log.info("limit %d reached", args.limit) + break + + log.info( + "DONE %s: scanned=%d scenes_tagged=%d links_added=%d elapsed=%.1fs", + "(dry-run)" if args.dry_run else "(written)", + scanned, scenes_tagged, links_added, time.time() - t0, + ) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/smoke_test.py b/scripts/smoke_test.py index e758dfc..d14c55b 100644 --- a/scripts/smoke_test.py +++ b/scripts/smoke_test.py @@ -40,7 +40,7 @@ from dataclasses import dataclass, field from datetime import UTC, datetime, timedelta import httpx -from sqlalchemy import func, select +from sqlalchemy import func, select, text from app.config import get_settings from app.db import session_scope @@ -185,14 +185,24 @@ def _check_ingest_runs(report: Report) -> None: .group_by(IngestRun.status) ).all() counts = {str(status.value if hasattr(status, "value") else status): cnt for status, cnt in rows} + # killed_by_restart to NIE awaria connectora — to worker ubity mid-run + # (deploy/OOM). Liczony osobno (stuck-run check go pokrywa), żeby nie + # odpalać WARN przy każdym deployu. + killed = s.execute( + select(func.count(IngestRun.id)) + .where(IngestRun.started_at >= past_24h) + .where(IngestRun.status == IngestStatus.failed) + .where(IngestRun.errors["message"].astext == "killed_by_restart") + ).scalar() or 0 failed = counts.get("failed", 0) + real_failed = failed - killed partial = counts.get("partial", 0) ok = counts.get("success", 0) running = counts.get("running", 0) - detail = f"success={ok} partial={partial} failed={failed} running={running}" + detail = f"success={ok} partial={partial} failed={real_failed} killed={killed} running={running}" - if failed > 0 or partial > 3: + if real_failed > 0 or partial > 3: report.db_metrics.append(CheckResult( "ingest runs (24h)", "WARN", detail, time.time() - t0 )) @@ -298,6 +308,121 @@ def _check_coverage(report: Report) -> None: )) +def _check_ingest_per_source(report: Report) -> None: + """Per-source health (7d). Agregat `_check_ingest_runs` topił sygnał: stashdb + 2026-06-01 miał 24 failed / 57 runów i 0 sukcesów przez ~7 dni (delta nigdy + nie szła naprzód, bo run nie kończył się przed restartem workera), a całość + i tak pokazywała trochę zielonego. Tu alarmujemy PER źródło: + - runs≥4 i 0 sukcesów → źródło działa, ale nigdy nie kończy (twardy WARN) + - ostatni sukces > 24h przy aktywnych runach → delta się zatrzymała + """ + t0 = time.time() + try: + with session_scope() as s: + now = datetime.now(UTC) + past_7d = now - timedelta(days=7) + rows = s.execute( + select( + Source.name, + func.count(IngestRun.id), + func.count(IngestRun.id).filter(IngestRun.status == IngestStatus.success), + func.max(IngestRun.finished_at).filter(IngestRun.status == IngestStatus.success), + ) + .join(IngestRun, IngestRun.source_id == Source.id) + .where(IngestRun.started_at >= past_7d) + .group_by(Source.name) + ).all() + + now = datetime.now(UTC) + warns: list[str] = [] + details: list[str] = [] + for name, runs, ok, last_ok in rows: + details.append(f"{name}:{ok}/{runs}") + if runs >= 4 and ok == 0: + warns.append(f"{name} 0 success/{runs} runs") + elif last_ok is not None and (now - last_ok).total_seconds() > 24 * 3600 and runs >= 2: + warns.append(f"{name} last-ok {(now - last_ok).total_seconds() / 3600:.0f}h ago") + status = "WARN" if warns else "OK" + detail = "; ".join(warns) if warns else (" ".join(details) or "no runs") + report.db_metrics.append(CheckResult("ingest per-source (7d)", status, detail, time.time() - t0)) + except Exception as e: + report.db_metrics.append(CheckResult( + "ingest per-source (7d)", "FAIL", f"{type(e).__name__}: {e}", time.time() - t0 + )) + + +def _check_stuck_runs(report: Report) -> None: + """Runy wiszące w 'running' > 2h. Reaper czyści je na restarcie workera, ale + hang MIĘDZY restartami (connector bez timeoutu) zostanie tu złapany.""" + t0 = time.time() + try: + with session_scope() as s: + cutoff = datetime.now(UTC) - timedelta(hours=2) + n = s.execute( + select(func.count(IngestRun.id)).where( + IngestRun.status == IngestStatus.running, + IngestRun.started_at < cutoff, + ) + ).scalar() or 0 + report.db_metrics.append(CheckResult( + "stuck ingest runs (>2h)", "WARN" if n else "OK", f"count={n}", time.time() - t0 + )) + except Exception as e: + report.db_metrics.append(CheckResult( + "stuck ingest runs (>2h)", "FAIL", f"{type(e).__name__}: {e}", time.time() - t0 + )) + + +def _check_browse_freshness(report: Report) -> None: + """Cicha śmierć browse-scrapera: gdy HTML tube'a się zmieni, `_extract_scene_urls` + zwraca [] → run kończy się SUCCESS z 0 scen, bez wyjątku, bez alarmu (bug-reporty + 93d3c485 / 2fbf1c73 wykryte przez userów, nie monitoring). Browse odpala codziennie + i RE-touchuje latest sceny, więc max(last_seen_at) per sitetag musi być świeży. + Staleness > 36h dla aktywnego browse sitetagu = scraper przestał działać.""" + t0 = time.time() + try: + from app.connectors.direct_scrapers import ALL_BROWSE_SCRAPERS + + browse_sitetags: set[str] = set() + for cls in ALL_BROWSE_SCRAPERS: + try: + browse_sitetags.add(cls().sitetag) + except Exception: + pass + if not browse_sitetags: + report.db_metrics.append(CheckResult("browse scraper freshness", "SKIP", "no browse scrapers", time.time() - t0)) + return + + with session_scope() as s: + rows = s.execute(text( + "SELECT split_part(er.external_id, ':', 1) AS sitetag, max(er.last_seen_at) AS last_seen " + "FROM external_records er JOIN sources s ON s.id = er.source_id " + "WHERE s.kind = 'scraper' AND er.entity_kind = 'scene' " + "GROUP BY 1" + )).all() + seen = {st: ls for st, ls in rows} + + now = datetime.now(UTC) + warns: list[str] = [] + details: list[str] = [] + for st in sorted(browse_sitetags): + ls = seen.get(st) + if ls is None: + warns.append(f"{st} never") + continue + age_h = (now - ls).total_seconds() / 3600 + details.append(f"{st}:{age_h:.0f}h") + if age_h > 36: + warns.append(f"{st} stale {age_h:.0f}h") + status = "WARN" if warns else "OK" + detail = "; ".join(warns) if warns else (" ".join(details) or "no data") + report.db_metrics.append(CheckResult("browse scraper freshness", status, detail, time.time() - t0)) + except Exception as e: + report.db_metrics.append(CheckResult( + "browse scraper freshness", "FAIL", f"{type(e).__name__}: {e}", time.time() - t0 + )) + + # ---------- 2. CANARY EXTRACTORS ---------- @@ -636,6 +761,9 @@ def main() -> int: # 1. DB metrics _check_new_scenes_per_source(report) _check_ingest_runs(report) + _check_ingest_per_source(report) + _check_stuck_runs(report) + _check_browse_freshness(report) _check_dead_playbacks(report) _check_bug_reports(report) _check_coverage(report)