feat(ingest): SQL phash match, tag inference + backfill, clip-store skip, browse tubes, watchdog
Resolver/perf: - find_by_phash_within: nearest match via Postgres bit_count over bit(64) XOR instead of Python scan of all phash fingerprints (~20x faster per scene; unblocks long delta runs that were killed mid-run before since advanced). Scheduler/reliability: - reap ingest_runs stuck in 'running' on worker startup (killed_by_restart). - smoke_test: per-source ingest health, stuck-run and browse-freshness checks -> Sentry; exclude killed_by_restart from the failed-run alarm. Tags (ingest with tags + fill blanks): - wire infer_tag_slugs into normalize_scene so tube scenes get title-inferred tags (was dead code); union with connector tags. - scripts/backfill_inferred_tags.py: keyset/batched/idempotent backfill for existing tagless scenes (playable tag coverage 16% -> ~52%). Clip-store: - skip ManyVids/IWantClips/Clips4Sale/... from canonical sources at ingest (GOON_SKIP_CLIP_STORE, default on) — permanent orphans, ~56% of canonical ingest, never have a free-tube playback source. Browse tubes: - enable fullmovies + hdporn.gg: studio parsed from title prefix instead of the /networks/ sidebar (which always yielded the first listed network); drop phash compute (pilot: 0% canonical hit within Hamming 5 — auto-screenshots), matching relies on title/performer/duration. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
ee83ae5e97
commit
da7fcda132
10 changed files with 451 additions and 46 deletions
|
|
@ -48,6 +48,11 @@ class Settings(BaseSettings):
|
|||
title_token_set_min: int = 88
|
||||
date_window_days: int = 7
|
||||
|
||||
# Skip ingestu clip-store (ManyVids/IWantClips/Clips4Sale/...) z canonical source —
|
||||
# to permanentne orphany (free tubes nie hostują), ~56% ingestu TPDB/StashDB.
|
||||
# False = wciągaj jak dawniej. Tube'y z clip-store studiem NIE są skipowane (mają playback).
|
||||
skip_clip_store: bool = Field(default=True, validation_alias="GOON_SKIP_CLIP_STORE")
|
||||
|
||||
# APScheduler (M5). Każdy 0/None = job wyłączony.
|
||||
sched_tpdb_hours: int = Field(default=6, validation_alias="GOON_SCHED_TPDB_HOURS")
|
||||
sched_stashdb_hours: int = Field(default=6, validation_alias="GOON_SCHED_STASHDB_HOURS")
|
||||
|
|
|
|||
|
|
@ -141,6 +141,8 @@ from app.connectors.direct_scrapers.porn00 import Porn00Scraper # noqa: E402
|
|||
from app.connectors.direct_scrapers.porndoe import PornDoeScraper # noqa: E402
|
||||
from app.connectors.direct_scrapers.pornxp import PornXPScraper # noqa: E402
|
||||
from app.connectors.direct_scrapers.shyfap import ShyfapScraper # noqa: E402, F401
|
||||
from app.connectors.direct_scrapers.fullmovies import FullmoviesScraper # noqa: E402
|
||||
from app.connectors.direct_scrapers.hdporngg import HDPornGGScraper # noqa: E402
|
||||
|
||||
ALL_BROWSE_SCRAPERS: list[type[BaseBrowseScraper]] = [
|
||||
FreshpornoScraper,
|
||||
|
|
@ -161,10 +163,17 @@ ALL_BROWSE_SCRAPERS: list[type[BaseBrowseScraper]] = [
|
|||
# komplet sygnałów. Phash hit-rate niski (własne crop-thumbnaile), studio +
|
||||
# performer + date + duration nadrabiają.
|
||||
PornDoeScraper,
|
||||
# FullmoviesScraper + HDPornGGScraper — dołączone 2026-06-01. KVS engine (sponsor_groups
|
||||
# stack, `/videos/<slug>/` + `/latest-updates/`). Studio teraz z PREFIKSU tytułu
|
||||
# ("Studio - Scene") — sidebar `/networks/` listował WSZYSTKIE sieci, więc pierwszy match
|
||||
# zawsze Brazzers (mis-attribution, dlatego nigdy nie były włączone). Niosą paysite studio
|
||||
# content (TeamSkeet/Dad Crush/Brazzers/...) z title+performer+duration → composite fuzzy.
|
||||
# Nawet bez canonical match: grywalny content z inferred tagami (mission: daily tagged ingest).
|
||||
FullmoviesScraper,
|
||||
HDPornGGScraper,
|
||||
# 4k69.com — NIE dołączony: homepage JS-rendered, brak og:/KVS markerów w surowym HTML
|
||||
# (probe 2026-06-01). Wymagałby headless render — odłożony.
|
||||
# ShyfapScraper — wyłączony 2026-05-12 (pilot fail, 0% match — orphan factory).
|
||||
# Follow-up: dorobić te tubey i sprawdzić phash distance:
|
||||
# - fullmovies.xxx (channel/network/pornstars/categories, brak duration)
|
||||
# - 4k69.com + hdporn.gg (klony freshporno — prawdopodobnie ten sam phash hit rate)
|
||||
]
|
||||
|
||||
__all__ = [
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ from app.connectors.direct_scrapers._browse_base import (
|
|||
compute_thumbnail_phash,
|
||||
meta_content,
|
||||
)
|
||||
from app.normalize.text import slugify
|
||||
|
||||
_BASE = "https://www.fullmovies.xxx"
|
||||
_SCENE_URL_RE = re.compile(r'href="(https://www\.fullmovies\.xxx/videos/[a-z0-9\-]+/)"', re.IGNORECASE)
|
||||
|
|
@ -68,23 +69,25 @@ class FullmoviesScraper(BaseBrowseScraper):
|
|||
if dur_meta and dur_meta.isdigit():
|
||||
duration_sec = int(dur_meta)
|
||||
|
||||
# Studio z PREFIKSU tytułu ("Studio - Scene Title"), nie z sidebara /networks/.
|
||||
# Sidebar listuje WSZYSTKIE sieci → `_NETWORK_LINK_RE.finditer().first()` zawsze
|
||||
# zwracał pierwszą z listy (Brazzers) dla każdej sceny — mis-attribution. Tytuł
|
||||
# po oczyszczeniu ma format "Studio - Opis" (np. "Fake Hostel - ...").
|
||||
studio: RawStudio | None = None
|
||||
for m in _NETWORK_LINK_RE.finditer(detail_html):
|
||||
slug, name = m.group(1), m.group(2).strip()
|
||||
if name.lower() in ("networks", ""):
|
||||
continue
|
||||
if " - " in title:
|
||||
studio_name = title.split(" - ", 1)[0].strip()
|
||||
if studio_name and len(studio_name) <= 50:
|
||||
studio = RawStudio(
|
||||
external_id=f"fullmoviesxxx:network:{slug}",
|
||||
name=name,
|
||||
slug=slug,
|
||||
external_id=f"fullmoviesxxx:studio:{slugify(studio_name)}",
|
||||
name=studio_name,
|
||||
slug=slugify(studio_name),
|
||||
)
|
||||
break
|
||||
|
||||
performers: list[RawPerformer] = []
|
||||
seen_perf: set[str] = set()
|
||||
for m in _MODEL_LINK_RE.finditer(detail_html):
|
||||
slug, name = m.group(1), m.group(2).strip()
|
||||
if slug in seen_perf or name.lower() in ("pornstars", "models"):
|
||||
if not name or slug in seen_perf or name.lower() in ("pornstars", "models"):
|
||||
continue
|
||||
seen_perf.add(slug)
|
||||
performers.append(
|
||||
|
|
@ -100,11 +103,11 @@ class FullmoviesScraper(BaseBrowseScraper):
|
|||
seen_tag.add(slug)
|
||||
tags.append(RawTag(external_id=f"fullmoviesxxx:tag:{slug}", name=name, slug=slug))
|
||||
|
||||
# Phash WYŁĄCZONY (pilot 2026-06-01: 0% trafień ≤5, mediana Hamming 14 do
|
||||
# canonical — auto-screenshoty img.fullmovies.xxx, nie hot-linkowane studio
|
||||
# thumbnaile). Matching trzyma się na title+performer+duration (seed: 92% tagged),
|
||||
# więc download thumbnaila pod phash to czysty narzut. thumbnail_url zostaje (display).
|
||||
fingerprints: list[RawFingerprint] = []
|
||||
if thumbnail_url:
|
||||
ph = compute_thumbnail_phash(thumbnail_url, referer=_BASE + "/")
|
||||
if ph:
|
||||
fingerprints.append(RawFingerprint(kind="phash", value=ph))
|
||||
|
||||
playback_sources = [
|
||||
RawPlaybackSource(
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ from app.connectors.direct_scrapers._browse_base import (
|
|||
compute_thumbnail_phash,
|
||||
meta_content,
|
||||
)
|
||||
from app.normalize.text import slugify
|
||||
|
||||
_BASE = "https://www.hdporn.gg"
|
||||
_SCENE_URL_RE = re.compile(r'href="(https://www\.hdporn\.gg/videos/[a-z0-9\-]+/)"', re.IGNORECASE)
|
||||
|
|
@ -75,27 +76,24 @@ class HDPornGGScraper(BaseBrowseScraper):
|
|||
if dur_meta and dur_meta.isdigit():
|
||||
duration_sec = int(dur_meta)
|
||||
|
||||
# Studio z /networks/. Skip nav anchors typu "Networks" / "Pornstars".
|
||||
# Studio z PREFIKSU tytułu ("Studio - Scene Title"), nie z sidebara /networks/.
|
||||
# Sidebar listuje WSZYSTKIE sieci → pierwszy match zawsze ten sam (Brazzers) dla
|
||||
# każdej sceny. Tytuł po oczyszczeniu ma format "Studio - Opis" (np. "Dad Crush - ...").
|
||||
studio: RawStudio | None = None
|
||||
for m in _NETWORK_LINK_RE.finditer(detail_html):
|
||||
slug, name = m.group(1), m.group(2).strip()
|
||||
if name.lower() in ("networks", ""):
|
||||
continue
|
||||
# Pierwszy NETWORK link w body to studio sceny (nav sidebar też ma networks
|
||||
# listę — bierzemy gdy `class="btn_sponsor_group"` lub po prostu pierwszy
|
||||
# NIE z sidebara). hdporn.gg pokazuje btn_sponsor_group w main scene area.
|
||||
if " - " in title:
|
||||
studio_name = title.split(" - ", 1)[0].strip()
|
||||
if studio_name and len(studio_name) <= 50:
|
||||
studio = RawStudio(
|
||||
external_id=f"hdporngg:network:{slug}",
|
||||
name=name,
|
||||
slug=slug,
|
||||
external_id=f"hdporngg:studio:{slugify(studio_name)}",
|
||||
name=studio_name,
|
||||
slug=slugify(studio_name),
|
||||
)
|
||||
break
|
||||
|
||||
performers: list[RawPerformer] = []
|
||||
seen_perf: set[str] = set()
|
||||
for m in _MODEL_LINK_RE.finditer(detail_html):
|
||||
slug, name = m.group(1), m.group(2).strip()
|
||||
if slug in seen_perf or name.lower() in ("pornstars", "models"):
|
||||
if not name or slug in seen_perf or name.lower() in ("pornstars", "models"):
|
||||
continue
|
||||
seen_perf.add(slug)
|
||||
performers.append(
|
||||
|
|
@ -113,11 +111,11 @@ class HDPornGGScraper(BaseBrowseScraper):
|
|||
RawTag(external_id=f"hdporngg:tag:{slug}", name=name, slug=slug)
|
||||
)
|
||||
|
||||
# Phash WYŁĄCZONY (pilot 2026-06-01: 0% trafień ≤5, mediana Hamming 14 do
|
||||
# canonical — auto-screenshoty img.hdporn.gg, nie hot-linkowane studio thumbnaile).
|
||||
# Matching trzyma się na title+performer+duration (seed: 92% tagged), więc download
|
||||
# thumbnaila pod phash to czysty narzut. thumbnail_url zostaje (display).
|
||||
fingerprints: list[RawFingerprint] = []
|
||||
if thumbnail_url:
|
||||
ph = compute_thumbnail_phash(thumbnail_url, referer=_BASE + "/")
|
||||
if ph:
|
||||
fingerprints.append(RawFingerprint(kind="phash", value=ph))
|
||||
|
||||
playback_sources = [
|
||||
RawPlaybackSource(
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ from __future__ import annotations
|
|||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import uuid
|
||||
from collections.abc import Iterable
|
||||
from datetime import UTC, datetime, timedelta
|
||||
|
|
@ -26,6 +27,7 @@ from sqlalchemy import select
|
|||
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.config import get_settings
|
||||
from app.connectors.base import BaseConnector, BaseMovieConnector, RawMovie, RawScene
|
||||
from app.db import session_scope
|
||||
from app.models.external_record import EntityKind, ExternalRecord
|
||||
|
|
@ -39,6 +41,31 @@ from app.resolve.scene_resolver import resolve_scene
|
|||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Clip-store studia (ManyVids/IWantClips/Clips4Sale/...) — content twórców z paywalla.
|
||||
# Darmowe tube'y go nie hostują, więc z canonical (TPDB/StashDB) wjeżdża jako permanentny
|
||||
# orphan (56% ingestu canonical, ~860/dzień, ~550k w DB). Skipujemy resolve dla tych scen
|
||||
# gdy źródło jest canonical — oszczędza ~połowę resolve-time i nie zaśmieca katalogu.
|
||||
# NIE skipujemy dla tube'ów: tube scena z clip-store studiem MA playback, więc jest grywalna.
|
||||
_CLIP_STORE_RE = re.compile(
|
||||
r"^\s*(manyvids|i ?want ?clips|clips4sale|fancentro|loyalfans|onlyfans|fansly|modelhub)\b",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
|
||||
def is_clip_store_studio(name: str | None) -> bool:
|
||||
return bool(name) and _CLIP_STORE_RE.match(name) is not None
|
||||
|
||||
|
||||
def _skip_clip_store_canonical(session: Session, *, source_id: uuid.UUID, studio_name: str | None) -> bool:
|
||||
"""True gdy scena to clip-store content z canonical source → pomijamy resolve."""
|
||||
if not getattr(get_settings(), "skip_clip_store", True):
|
||||
return False
|
||||
if not is_clip_store_studio(studio_name):
|
||||
return False
|
||||
src = session.get(Source, source_id)
|
||||
return src is not None and src.kind in (SourceKind.tpdb, SourceKind.stashdb)
|
||||
|
||||
|
||||
def _canonical_json(payload: dict) -> bytes:
|
||||
return json.dumps(payload, sort_keys=True, separators=(",", ":"), default=str).encode()
|
||||
|
||||
|
|
@ -214,6 +241,13 @@ def _process_scene(*, source_id: uuid.UUID, raw_scene: RawScene, counters: dict[
|
|||
return
|
||||
|
||||
norm = normalize_scene(raw_scene)
|
||||
|
||||
if _skip_clip_store_canonical(
|
||||
session, source_id=source_id, studio_name=norm.studio.name if norm.studio else None
|
||||
):
|
||||
counters["skipped"] += 1
|
||||
return
|
||||
|
||||
result = resolve_scene(session, norm=norm, source_id=source_id)
|
||||
|
||||
if result.was_created:
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ from app.connectors.base import (
|
|||
RawStudio,
|
||||
RawTag,
|
||||
)
|
||||
from app.normalize.tag_inference import infer_tag_slugs
|
||||
from app.normalize.text import normalize, normalize_person, slugify
|
||||
|
||||
|
||||
|
|
@ -22,7 +23,8 @@ from app.normalize.text import normalize, normalize_person, slugify
|
|||
class NormalizedTag:
|
||||
name: str
|
||||
slug: str
|
||||
external_id: str | None
|
||||
# Optional — `resolve_tag` keys on slug and `Tag` has no external_id column.
|
||||
external_id: str | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
|
|
@ -87,6 +89,26 @@ def normalize_tag(raw: RawTag) -> NormalizedTag:
|
|||
)
|
||||
|
||||
|
||||
def _merge_inferred_tags(raw_tags: list[RawTag], *, title: str) -> list[NormalizedTag]:
|
||||
"""Tagi z connectora + tagi wywnioskowane z tytułu (`infer_tag_slugs`).
|
||||
|
||||
Tube'y (96% grywalnego katalogu) przychodzą prawie bez tagów — tylko 16%
|
||||
tube-only scen ma jakikolwiek tag, vs 99% scen zmatchowanych z TPDB/StashDB.
|
||||
`infer_tag_slugs` mapuje phrasy z tytułu na canonical slugi (zgrane z DB), więc
|
||||
resolve_tag trafia w istniejące Tag rows. Union, nie nadpisanie: gdy scena potem
|
||||
zmergeuje się z TPDB, tagi się sumują. Inference odpalamy dla KAŻDEJ sceny —
|
||||
dla canonical to no-op (już mają komplet), dla tube to wypełnienie braku.
|
||||
"""
|
||||
tags = [normalize_tag(t) for t in raw_tags]
|
||||
seen = {t.slug for t in tags}
|
||||
for slug in infer_tag_slugs(title):
|
||||
if slug in seen:
|
||||
continue
|
||||
seen.add(slug)
|
||||
tags.append(NormalizedTag(name=slug.replace("-", " ").title(), slug=slug, external_id=None))
|
||||
return tags
|
||||
|
||||
|
||||
def normalize_studio(raw: RawStudio) -> NormalizedStudio:
|
||||
return NormalizedStudio(
|
||||
name=raw.name,
|
||||
|
|
@ -160,7 +182,7 @@ def normalize_scene(raw: RawScene) -> NormalizedScene:
|
|||
url=raw.url,
|
||||
studio=normalize_studio(raw.studio) if raw.studio else None,
|
||||
performers=[normalize_performer(p) for p in raw.performers],
|
||||
tags=[normalize_tag(t) for t in raw.tags],
|
||||
tags=_merge_inferred_tags(raw.tags, title=raw.title),
|
||||
fingerprints=[(fp.kind, fp.value) for fp in raw.fingerprints],
|
||||
playback_sources=list(raw.playback_sources),
|
||||
cross_source_refs=dict(raw.cross_source_refs),
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ from __future__ import annotations
|
|||
import uuid
|
||||
from datetime import date, timedelta
|
||||
|
||||
from sqlalchemy import and_, or_, select
|
||||
from sqlalchemy import and_, or_, select, text
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.config import get_settings
|
||||
|
|
@ -82,19 +82,43 @@ def find_by_phash_within(
|
|||
) -> tuple[Scene, int] | None:
|
||||
"""Path 3b: pHash w obrębie max_hamming (Hamming distance bitów hex).
|
||||
|
||||
Implementacja: seq scan po wszystkich phashach. Akceptowalne dla self-hosted
|
||||
rzędu 10⁵ scen; przy 10⁶+ można dodać locality-sensitive index (BK-tree, MinHash).
|
||||
Hamming liczony server-side: `bit_count(a # b)` na 64-bitowych bit-stringach
|
||||
(`('x'||hex)::bit(64)`), ORDER BY dist LIMIT 1 → najbliższy match. Postgres robi
|
||||
popcount w C nad całym zbiorem phashy (~10⁵-10⁶) w kilkadziesiąt ms zamiast
|
||||
Python-loop ~6s/scenę (był bottleneck zabijający długie ingest-runy: każda scena
|
||||
z phashem skanowała wszystkie 277k fingerprintów po stronie aplikacji).
|
||||
|
||||
Wymaga 64-bit (16 hex) phasha — `imagehash.phash(hash_size=8)` zawsze taki jest.
|
||||
Dla nietypowej długości fallback do Python-loop (rzadkie, np. legacy/uszkodzone).
|
||||
Zwraca (Scene, distance) dla najbliższego matcha ≤ max_hamming, albo None.
|
||||
"""
|
||||
if max_hamming is None:
|
||||
max_hamming = get_settings().fingerprint_hamming_max
|
||||
|
||||
if len(phash) == 16:
|
||||
row = session.execute(
|
||||
text(
|
||||
"SELECT scene_id, "
|
||||
"bit_count(('x'||value)::bit(64) # ('x'||:phash)::bit(64)) AS dist "
|
||||
"FROM scene_fingerprints "
|
||||
"WHERE kind = 'phash' AND length(value) = 16 "
|
||||
"ORDER BY dist ASC LIMIT 1"
|
||||
),
|
||||
{"phash": phash},
|
||||
).first()
|
||||
if row is None or row.dist > max_hamming:
|
||||
return None
|
||||
scene = session.get(Scene, row.scene_id)
|
||||
if scene is None:
|
||||
return None
|
||||
return scene, int(row.dist)
|
||||
|
||||
# Fallback dla phashy o nietypowej długości — Python-loop nad zgodnymi długościami.
|
||||
rows = session.execute(
|
||||
select(SceneFingerprint.scene_id, SceneFingerprint.value).where(
|
||||
SceneFingerprint.kind == "phash"
|
||||
)
|
||||
).all()
|
||||
|
||||
best: tuple[uuid.UUID, int] | None = None
|
||||
target_len = len(phash)
|
||||
for scene_id, value in rows:
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import signal
|
|||
import sys
|
||||
import time
|
||||
import uuid
|
||||
from datetime import UTC, datetime, timedelta
|
||||
|
||||
from app.config import get_settings
|
||||
from app.connectors.base import BaseConnector
|
||||
|
|
@ -132,6 +133,42 @@ def run_once_strategy(
|
|||
raise SystemExit(f"unknown strategy: {strategy}. available={STRATEGIES}")
|
||||
|
||||
|
||||
def reap_stuck_ingest_runs(max_age_hours: int = 2) -> int:
|
||||
"""Oznacza ingest_runs wiszące w status=running jako failed (killed_by_restart).
|
||||
|
||||
Worker jest single-process: gdy zostanie ubity w trakcie runu (deploy, OOM,
|
||||
restart obrazu), wiersz IngestRun zostaje na zawsze w `running` — `finished_at`
|
||||
nigdy się nie ustawia. Bez reapowania zombie akumulują się (13 znalezionych
|
||||
2026-06-01) i fałszują metryki: `_last_successful_finished_at` ignoruje je
|
||||
(OK), ale watchdog liczący stuck-runy i fail-rate widzi je jako szum.
|
||||
|
||||
Reapujemy TYLKO runy starsze niż `max_age_hours` — healthy run kończy się w
|
||||
sekundy/minuty (po SQL-fix phash), a movie-ingest ma własny 6-min cap, więc
|
||||
cokolwiek `running` > 2h to na pewno trup po ubitym workerze. Margines chroni
|
||||
równolegle odpalony ręczny `--once` (inny proces) przed omyłkowym reapem.
|
||||
"""
|
||||
from sqlalchemy import update
|
||||
|
||||
from app.db import session_scope
|
||||
from app.models.ingest_run import IngestRun, IngestStatus
|
||||
|
||||
now = datetime.now(UTC)
|
||||
cutoff = now - timedelta(hours=max_age_hours)
|
||||
with session_scope() as session:
|
||||
result = session.execute(
|
||||
update(IngestRun)
|
||||
.where(
|
||||
IngestRun.status == IngestStatus.running,
|
||||
IngestRun.started_at < cutoff,
|
||||
)
|
||||
.values(status=IngestStatus.failed, finished_at=now, errors={"message": "killed_by_restart"})
|
||||
)
|
||||
reaped = result.rowcount or 0
|
||||
if reaped:
|
||||
log.warning("reaped %d stuck ingest_runs (running > %dh) on startup", reaped, max_age_hours)
|
||||
return reaped
|
||||
|
||||
|
||||
def run_forever() -> int:
|
||||
"""APScheduler scheduled mode — odpala joby cron-like wg config (env-driven).
|
||||
|
||||
|
|
@ -142,6 +179,9 @@ def run_forever() -> int:
|
|||
"""
|
||||
from app.scheduler.jobs import build_scheduler # opóźniony import (apscheduler)
|
||||
|
||||
# Sprzątanie po poprzednim (ubitym) workerze zanim wystartujemy joby.
|
||||
reap_stuck_ingest_runs()
|
||||
|
||||
settings = get_settings()
|
||||
cfg = {
|
||||
"tpdb_hours": settings.sched_tpdb_hours or None,
|
||||
|
|
|
|||
142
scripts/backfill_inferred_tags.py
Normal file
142
scripts/backfill_inferred_tags.py
Normal file
|
|
@ -0,0 +1,142 @@
|
|||
"""Backfill inferred tags dla scen bez tagów.
|
||||
|
||||
Forward-fix (`normalize_scene._merge_inferred_tags`) taguje tylko nowe/zmienione
|
||||
sceny — istniejące tube-only sceny bez tagów (16% tag coverage, ~868k grywalnych)
|
||||
nie są re-resolvowane (skip przy niezmienionym raw_hash). Ten skrypt jednorazowo
|
||||
przelatuje sceny bez `scene_tags`, infer_tag_slugs(title) → INSERT SceneTag.
|
||||
|
||||
Bezpieczeństwo:
|
||||
- Keyset pagination po `scenes.id` (stały RAM niezależnie od rozmiaru tabeli).
|
||||
- Batch commit co `--batch` scen.
|
||||
- `pg_insert(...).on_conflict_do_nothing` na (scene_id, tag_id) — idempotentne,
|
||||
można puścić wielokrotnie.
|
||||
- `--dry-run` liczy wpływ bez zapisu.
|
||||
- `source_id=NULL` (SceneTag.source_id nullable) — tag z inferencji nie ma źródła.
|
||||
|
||||
Użycie:
|
||||
python scripts/backfill_inferred_tags.py --dry-run # cały katalog, bez zapisu
|
||||
python scripts/backfill_inferred_tags.py --only-playable --dry-run
|
||||
python scripts/backfill_inferred_tags.py --batch 1000 # realny zapis
|
||||
python scripts/backfill_inferred_tags.py --limit 500 # test na 500 scenach
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import sys
|
||||
import time
|
||||
|
||||
from sqlalchemy import select, text
|
||||
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
||||
|
||||
from app.db import session_scope
|
||||
from app.models.scene import SceneTag
|
||||
from app.models.tag import Tag
|
||||
from app.normalize.tag_inference import infer_tag_slugs
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
||||
log = logging.getLogger("backfill_tags")
|
||||
|
||||
|
||||
def _load_tag_cache(session) -> dict[str, "uuid.UUID"]: # type: ignore[name-defined]
|
||||
return {slug: tid for slug, tid in session.execute(select(Tag.slug, Tag.id)).all()}
|
||||
|
||||
|
||||
def _fetch_batch(session, *, last_id, batch: int, only_playable: bool):
|
||||
"""Keyset page: sceny bez scene_tags, z tytułem, id > last_id."""
|
||||
where_playable = (
|
||||
"AND EXISTS (SELECT 1 FROM playback_sources p WHERE p.scene_id = sc.id AND p.dead_at IS NULL)"
|
||||
if only_playable
|
||||
else ""
|
||||
)
|
||||
sql = text(
|
||||
f"""
|
||||
SELECT sc.id, sc.title
|
||||
FROM scenes sc
|
||||
WHERE sc.id > :last_id
|
||||
AND sc.title IS NOT NULL AND sc.title <> ''
|
||||
AND NOT EXISTS (SELECT 1 FROM scene_tags st WHERE st.scene_id = sc.id)
|
||||
{where_playable}
|
||||
ORDER BY sc.id
|
||||
LIMIT :batch
|
||||
"""
|
||||
)
|
||||
return session.execute(sql, {"last_id": last_id, "batch": batch}).all()
|
||||
|
||||
|
||||
def main() -> int:
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("--dry-run", action="store_true", help="Licz wpływ, nie zapisuj")
|
||||
ap.add_argument("--only-playable", action="store_true", help="Tylko sceny z żywym playbackiem")
|
||||
ap.add_argument("--batch", type=int, default=1000, help="Scen na transakcję (default 1000)")
|
||||
ap.add_argument("--limit", type=int, default=None, help="Max scen do przetworzenia (test)")
|
||||
args = ap.parse_args()
|
||||
|
||||
t0 = time.time()
|
||||
scanned = 0
|
||||
scenes_tagged = 0
|
||||
links_added = 0
|
||||
last_id = "00000000-0000-0000-0000-000000000000"
|
||||
|
||||
with session_scope() as session:
|
||||
tag_cache = _load_tag_cache(session)
|
||||
log.info("tag cache: %d slugs; dry_run=%s only_playable=%s", len(tag_cache), args.dry_run, args.only_playable)
|
||||
|
||||
while True:
|
||||
with session_scope() as session:
|
||||
rows = _fetch_batch(session, last_id=last_id, batch=args.batch, only_playable=args.only_playable)
|
||||
if not rows:
|
||||
break
|
||||
|
||||
pending: list[dict] = []
|
||||
for scene_id, title in rows:
|
||||
last_id = str(scene_id)
|
||||
scanned += 1
|
||||
slugs = infer_tag_slugs(title)
|
||||
if not slugs:
|
||||
continue
|
||||
got = 0
|
||||
for slug in slugs:
|
||||
tag_id = tag_cache.get(slug)
|
||||
if tag_id is None:
|
||||
# Slug nie istnieje jako Tag (rzadkie — inference slugi są DB-aligned).
|
||||
# Tworzymy, by nie zgubić sygnału; cache by uniknąć powtórek.
|
||||
if args.dry_run:
|
||||
continue
|
||||
tag = Tag(name=slug.replace("-", " ").title(), slug=slug)
|
||||
session.add(tag)
|
||||
session.flush()
|
||||
tag_id = tag.id
|
||||
tag_cache[slug] = tag_id
|
||||
pending.append({"scene_id": scene_id, "tag_id": tag_id, "source_id": None})
|
||||
got += 1
|
||||
if got:
|
||||
scenes_tagged += 1
|
||||
links_added += got
|
||||
|
||||
if pending and not args.dry_run:
|
||||
session.execute(
|
||||
pg_insert(SceneTag.__table__).values(pending).on_conflict_do_nothing(
|
||||
index_elements=["scene_id", "tag_id"]
|
||||
)
|
||||
)
|
||||
|
||||
if scanned % 20000 < args.batch:
|
||||
rate = scanned / max(time.time() - t0, 0.1)
|
||||
log.info("scanned=%d tagged=%d links=%d (%.0f scenes/s) last_id=%s",
|
||||
scanned, scenes_tagged, links_added, rate, last_id)
|
||||
|
||||
if args.limit and scanned >= args.limit:
|
||||
log.info("limit %d reached", args.limit)
|
||||
break
|
||||
|
||||
log.info(
|
||||
"DONE %s: scanned=%d scenes_tagged=%d links_added=%d elapsed=%.1fs",
|
||||
"(dry-run)" if args.dry_run else "(written)",
|
||||
scanned, scenes_tagged, links_added, time.time() - t0,
|
||||
)
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
|
|
@ -40,7 +40,7 @@ from dataclasses import dataclass, field
|
|||
from datetime import UTC, datetime, timedelta
|
||||
|
||||
import httpx
|
||||
from sqlalchemy import func, select
|
||||
from sqlalchemy import func, select, text
|
||||
|
||||
from app.config import get_settings
|
||||
from app.db import session_scope
|
||||
|
|
@ -185,14 +185,24 @@ def _check_ingest_runs(report: Report) -> None:
|
|||
.group_by(IngestRun.status)
|
||||
).all()
|
||||
counts = {str(status.value if hasattr(status, "value") else status): cnt for status, cnt in rows}
|
||||
# killed_by_restart to NIE awaria connectora — to worker ubity mid-run
|
||||
# (deploy/OOM). Liczony osobno (stuck-run check go pokrywa), żeby nie
|
||||
# odpalać WARN przy każdym deployu.
|
||||
killed = s.execute(
|
||||
select(func.count(IngestRun.id))
|
||||
.where(IngestRun.started_at >= past_24h)
|
||||
.where(IngestRun.status == IngestStatus.failed)
|
||||
.where(IngestRun.errors["message"].astext == "killed_by_restart")
|
||||
).scalar() or 0
|
||||
|
||||
failed = counts.get("failed", 0)
|
||||
real_failed = failed - killed
|
||||
partial = counts.get("partial", 0)
|
||||
ok = counts.get("success", 0)
|
||||
running = counts.get("running", 0)
|
||||
detail = f"success={ok} partial={partial} failed={failed} running={running}"
|
||||
detail = f"success={ok} partial={partial} failed={real_failed} killed={killed} running={running}"
|
||||
|
||||
if failed > 0 or partial > 3:
|
||||
if real_failed > 0 or partial > 3:
|
||||
report.db_metrics.append(CheckResult(
|
||||
"ingest runs (24h)", "WARN", detail, time.time() - t0
|
||||
))
|
||||
|
|
@ -298,6 +308,121 @@ def _check_coverage(report: Report) -> None:
|
|||
))
|
||||
|
||||
|
||||
def _check_ingest_per_source(report: Report) -> None:
|
||||
"""Per-source health (7d). Agregat `_check_ingest_runs` topił sygnał: stashdb
|
||||
2026-06-01 miał 24 failed / 57 runów i 0 sukcesów przez ~7 dni (delta nigdy
|
||||
nie szła naprzód, bo run nie kończył się przed restartem workera), a całość
|
||||
i tak pokazywała trochę zielonego. Tu alarmujemy PER źródło:
|
||||
- runs≥4 i 0 sukcesów → źródło działa, ale nigdy nie kończy (twardy WARN)
|
||||
- ostatni sukces > 24h przy aktywnych runach → delta się zatrzymała
|
||||
"""
|
||||
t0 = time.time()
|
||||
try:
|
||||
with session_scope() as s:
|
||||
now = datetime.now(UTC)
|
||||
past_7d = now - timedelta(days=7)
|
||||
rows = s.execute(
|
||||
select(
|
||||
Source.name,
|
||||
func.count(IngestRun.id),
|
||||
func.count(IngestRun.id).filter(IngestRun.status == IngestStatus.success),
|
||||
func.max(IngestRun.finished_at).filter(IngestRun.status == IngestStatus.success),
|
||||
)
|
||||
.join(IngestRun, IngestRun.source_id == Source.id)
|
||||
.where(IngestRun.started_at >= past_7d)
|
||||
.group_by(Source.name)
|
||||
).all()
|
||||
|
||||
now = datetime.now(UTC)
|
||||
warns: list[str] = []
|
||||
details: list[str] = []
|
||||
for name, runs, ok, last_ok in rows:
|
||||
details.append(f"{name}:{ok}/{runs}")
|
||||
if runs >= 4 and ok == 0:
|
||||
warns.append(f"{name} 0 success/{runs} runs")
|
||||
elif last_ok is not None and (now - last_ok).total_seconds() > 24 * 3600 and runs >= 2:
|
||||
warns.append(f"{name} last-ok {(now - last_ok).total_seconds() / 3600:.0f}h ago")
|
||||
status = "WARN" if warns else "OK"
|
||||
detail = "; ".join(warns) if warns else (" ".join(details) or "no runs")
|
||||
report.db_metrics.append(CheckResult("ingest per-source (7d)", status, detail, time.time() - t0))
|
||||
except Exception as e:
|
||||
report.db_metrics.append(CheckResult(
|
||||
"ingest per-source (7d)", "FAIL", f"{type(e).__name__}: {e}", time.time() - t0
|
||||
))
|
||||
|
||||
|
||||
def _check_stuck_runs(report: Report) -> None:
|
||||
"""Runy wiszące w 'running' > 2h. Reaper czyści je na restarcie workera, ale
|
||||
hang MIĘDZY restartami (connector bez timeoutu) zostanie tu złapany."""
|
||||
t0 = time.time()
|
||||
try:
|
||||
with session_scope() as s:
|
||||
cutoff = datetime.now(UTC) - timedelta(hours=2)
|
||||
n = s.execute(
|
||||
select(func.count(IngestRun.id)).where(
|
||||
IngestRun.status == IngestStatus.running,
|
||||
IngestRun.started_at < cutoff,
|
||||
)
|
||||
).scalar() or 0
|
||||
report.db_metrics.append(CheckResult(
|
||||
"stuck ingest runs (>2h)", "WARN" if n else "OK", f"count={n}", time.time() - t0
|
||||
))
|
||||
except Exception as e:
|
||||
report.db_metrics.append(CheckResult(
|
||||
"stuck ingest runs (>2h)", "FAIL", f"{type(e).__name__}: {e}", time.time() - t0
|
||||
))
|
||||
|
||||
|
||||
def _check_browse_freshness(report: Report) -> None:
|
||||
"""Cicha śmierć browse-scrapera: gdy HTML tube'a się zmieni, `_extract_scene_urls`
|
||||
zwraca [] → run kończy się SUCCESS z 0 scen, bez wyjątku, bez alarmu (bug-reporty
|
||||
93d3c485 / 2fbf1c73 wykryte przez userów, nie monitoring). Browse odpala codziennie
|
||||
i RE-touchuje latest sceny, więc max(last_seen_at) per sitetag musi być świeży.
|
||||
Staleness > 36h dla aktywnego browse sitetagu = scraper przestał działać."""
|
||||
t0 = time.time()
|
||||
try:
|
||||
from app.connectors.direct_scrapers import ALL_BROWSE_SCRAPERS
|
||||
|
||||
browse_sitetags: set[str] = set()
|
||||
for cls in ALL_BROWSE_SCRAPERS:
|
||||
try:
|
||||
browse_sitetags.add(cls().sitetag)
|
||||
except Exception:
|
||||
pass
|
||||
if not browse_sitetags:
|
||||
report.db_metrics.append(CheckResult("browse scraper freshness", "SKIP", "no browse scrapers", time.time() - t0))
|
||||
return
|
||||
|
||||
with session_scope() as s:
|
||||
rows = s.execute(text(
|
||||
"SELECT split_part(er.external_id, ':', 1) AS sitetag, max(er.last_seen_at) AS last_seen "
|
||||
"FROM external_records er JOIN sources s ON s.id = er.source_id "
|
||||
"WHERE s.kind = 'scraper' AND er.entity_kind = 'scene' "
|
||||
"GROUP BY 1"
|
||||
)).all()
|
||||
seen = {st: ls for st, ls in rows}
|
||||
|
||||
now = datetime.now(UTC)
|
||||
warns: list[str] = []
|
||||
details: list[str] = []
|
||||
for st in sorted(browse_sitetags):
|
||||
ls = seen.get(st)
|
||||
if ls is None:
|
||||
warns.append(f"{st} never")
|
||||
continue
|
||||
age_h = (now - ls).total_seconds() / 3600
|
||||
details.append(f"{st}:{age_h:.0f}h")
|
||||
if age_h > 36:
|
||||
warns.append(f"{st} stale {age_h:.0f}h")
|
||||
status = "WARN" if warns else "OK"
|
||||
detail = "; ".join(warns) if warns else (" ".join(details) or "no data")
|
||||
report.db_metrics.append(CheckResult("browse scraper freshness", status, detail, time.time() - t0))
|
||||
except Exception as e:
|
||||
report.db_metrics.append(CheckResult(
|
||||
"browse scraper freshness", "FAIL", f"{type(e).__name__}: {e}", time.time() - t0
|
||||
))
|
||||
|
||||
|
||||
# ---------- 2. CANARY EXTRACTORS ----------
|
||||
|
||||
|
||||
|
|
@ -636,6 +761,9 @@ def main() -> int:
|
|||
# 1. DB metrics
|
||||
_check_new_scenes_per_source(report)
|
||||
_check_ingest_runs(report)
|
||||
_check_ingest_per_source(report)
|
||||
_check_stuck_runs(report)
|
||||
_check_browse_freshness(report)
|
||||
_check_dead_playbacks(report)
|
||||
_check_bug_reports(report)
|
||||
_check_coverage(report)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue