From 7e46e5ac4858f52c66cfe0d9446dd36b37024271 Mon Sep 17 00:00:00 2001 From: jtrzupek Date: Wed, 3 Jun 2026 09:26:44 +0200 Subject: [PATCH] =?UTF-8?q?feat(scheduler):=20deep-crawl=20full=20tube=20c?= =?UTF-8?q?atalogs=20(Phase=202a=20=E2=80=94=20ingest-all)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We ingested only ~3% of each browse tube's catalog (porndoe >62k scenes; we had 1959) because tubes were hit only by performer-search + top-N browse. Pilot (porndoe pages 64-110): 1119 new scenes, 100% playable + 100% tagged, 0% canonical overlap (purely additive — content not in TPDB/StashDB). - app/scheduler/deep_crawl.py: round-robin over ALL_BROWSE_SCRAPERS, per-tube page cursor in app/_state/deepcrawl_state.json (no DB migration), deep-paginate from the cursor, idempotent (resolver skips known by raw_hash), mark 'exhausted' at catalog end then reset cursors for an incremental re-sweep. - _job_deep_crawl: hourly, 60 pages/run (~1860 scenes, ~22 min), wrapped in the 1h hard-timeout; registered in build_scheduler (jobs=10). - config: sched_deep_crawl_hours=1, deep_crawl_pages_per_run=60, deepcrawl_state_path. - scripts/pilot_porndoe_deepcrawl.py: one-off pilot used to validate the approach. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/config.py | 6 ++ app/scheduler/deep_crawl.py | 153 +++++++++++++++++++++++++++++ app/scheduler/jobs.py | 29 ++++++ app/scheduler/worker.py | 3 + scripts/pilot_porndoe_deepcrawl.py | 73 ++++++++++++++ 5 files changed, 264 insertions(+) create mode 100644 app/scheduler/deep_crawl.py create mode 100644 scripts/pilot_porndoe_deepcrawl.py diff --git a/app/config.py b/app/config.py index 3bfa3b3..f7e4c42 100644 --- a/app/config.py +++ b/app/config.py @@ -88,6 +88,12 @@ class Settings(BaseSettings): sched_browse_latest_max_pages: int = Field( default=5, validation_alias="GOON_SCHED_BROWSE_LATEST_MAX_PAGES" ) + # Deep-crawl (Faza 2a) — pełne katalogi browse-tube'ów (porndoe ~62k itd.), nie tylko + # top-N. Round-robin po tube'ach, wznawialny kursor (app/_state/deepcrawl_state.json). + # 0 = wyłączony. 60 stron/run × ~31 scen ≈ 1860 scen/run (~22 min, hard-timeout 1h). + sched_deep_crawl_hours: int = Field(default=1, validation_alias="GOON_SCHED_DEEP_CRAWL_HOURS") + deep_crawl_pages_per_run: int = Field(default=60, validation_alias="GOON_DEEP_CRAWL_PAGES_PER_RUN") + deepcrawl_state_path: str = Field(default="", validation_alias="GOON_DEEPCRAWL_STATE_PATH") # Bulk-dedup performers safety net — auto-merge duplikatów które resolver-time # scoring pominął. 12h cadence: leci 2x dziennie (po porannym browse-latest run). sched_bulk_dedup_hours: int = Field( diff --git a/app/scheduler/deep_crawl.py b/app/scheduler/deep_crawl.py new file mode 100644 index 0000000..b89d9b8 --- /dev/null +++ b/app/scheduler/deep_crawl.py @@ -0,0 +1,153 @@ +"""Deep-crawl pełnych katalogów browse-tube'ów (Faza 2a — "ingest-all"). + +Browse scrapery (ALL_BROWSE_SCRAPERS) mają pełne listingi (np. porndoe >62k scen), +a my mieliśmy ~3% katalogu (search-by-performer + top-N browse). Ten job paginuje +DEEP: per tube trzyma kursor `last_page`, co run crawluje kolejne N stron od kursora, +idempotentnie (resolver pomija znane po raw_hash). Po dojściu do końca katalogu +(pusty listing) tube jest `exhausted`; gdy wszystkie exhausted — reset kursorów i +re-sweep od page 1 (incremental: łapie nowe + potwierdza istniejące). + +Pilot 2026-06-03 (porndoe ogon, strony 64-110): 1119 nowych scen, 100% grywalne + +100% otagowane, 0% canonical-overlap (czysto addytywny content, nie duplikuje TPDB/ +StashDB). ~1.2s/scenę. + +Stan w JSON (mounted `app/_state/deepcrawl_state.json`) — wznawia między runami bez +migracji DB. Round-robin po `updated_at` → wszystkie tube'y postępują równomiernie. +""" +from __future__ import annotations + +import json +import logging +import time +from pathlib import Path + +from app.config import get_settings +from app.connectors.direct_scrapers import ALL_BROWSE_SCRAPERS +from app.db import session_scope +from app.extractors import browser_get +from app.ingest import _process_scene, get_or_create_source +from app.models.source import SourceKind + +log = logging.getLogger(__name__) + +_DEFAULT_STATE = Path(__file__).resolve().parent.parent / "_state" / "deepcrawl_state.json" + + +def _state_path() -> Path: + return Path(getattr(get_settings(), "deepcrawl_state_path", None) or _DEFAULT_STATE) + + +def _load_state() -> dict: + p = _state_path() + if p.exists(): + try: + return json.loads(p.read_text(encoding="utf-8")) + except Exception as e: # pragma: no cover - obronnie + log.warning("deep-crawl: bad state file %s: %s — starting fresh", p, e) + return {} + + +def _save_state(state: dict) -> None: + p = _state_path() + p.parent.mkdir(parents=True, exist_ok=True) + tmp = p.with_suffix(".tmp") + tmp.write_text(json.dumps(state, indent=2), encoding="utf-8") + tmp.replace(p) # atomic + + +def _browse_scrapers() -> dict: + """{sitetag: scraper_cls} dla zarejestrowanych browse-scraperów.""" + out: dict = {} + for cls in ALL_BROWSE_SCRAPERS: + try: + out[cls().sitetag] = cls + except Exception as e: # pragma: no cover + log.warning("deep-crawl: skip scraper %s: %s", cls.__name__, e) + return out + + +def _pick_target(state: dict, targets: list[str]) -> str | None: + """Wybierz tube do crawla: najmniej-ostatnio-crawlowany, pomijając exhausted. + Gdy wszystkie exhausted → reset (incremental re-sweep od page 1).""" + live = [t for t in targets if not state.get(t, {}).get("exhausted")] + if not live: + if not targets: + return None + log.info("deep-crawl: all tubes exhausted → reset cursors for incremental re-sweep") + for t in targets: + state.setdefault(t, {}) + state[t]["exhausted"] = False + state[t]["last_page"] = 0 + live = targets + live.sort(key=lambda t: state.get(t, {}).get("updated_at", 0)) + return live[0] + + +def run_deep_crawl(*, pages_per_run: int = 60, sitetags: list[str] | None = None) -> dict: + """Jeden run: wybierz tube, crawl kolejne `pages_per_run` stron od kursora, ingest. + Zwraca podsumowanie (sitetag, zakres stron, counters, exhausted).""" + scrapers = _browse_scrapers() + targets = [t for t in (sitetags or list(scrapers)) if t in scrapers] + if not targets: + log.warning("deep-crawl: no browse scrapers / matching sitetags") + return {} + + state = _load_state() + sitetag = _pick_target(state, targets) + if sitetag is None: + return {} + + scraper = scrapers[sitetag]() + start = int(state.get(sitetag, {}).get("last_page", 0)) + 1 + end = start + pages_per_run - 1 + + with session_scope() as session: + src = get_or_create_source(session, kind=SourceKind.scraper, name="pornapp") + source_id = src.id + + counters = {"seen": 0, "new": 0, "updated": 0, "skipped": 0, "errors": 0} + t0 = time.time() + last_done = start - 1 + exhausted = False + + for page in range(start, end + 1): + try: + res = browser_get(scraper._listing_url(page), timeout=30) + html = res.text if hasattr(res, "text") else res + except Exception as e: + log.warning("deep-crawl %s listing page %d failed: %s", sitetag, page, e) + break # nie awansuj kursora przez błąd sieci — następny run powtórzy + urls = scraper._extract_scene_urls(html) + if not urls: + log.info("deep-crawl %s: empty page %d → catalog end (exhausted)", sitetag, page) + exhausted = True + last_done = page + break + for u in urls: + try: + r = browser_get(u, timeout=30) + dh = r.text if hasattr(r, "text") else r + raw = scraper._parse_detail(u, dh) + except Exception: + counters["errors"] += 1 + continue + if raw is None: + continue + counters["seen"] += 1 + try: + _process_scene(source_id=source_id, raw_scene=raw, counters=counters) + except Exception: + counters["errors"] += 1 + last_done = page + + st = state.setdefault(sitetag, {}) + st["last_page"] = last_done + st["exhausted"] = exhausted + st["updated_at"] = int(time.time()) + _save_state(state) + + log.info( + "deep-crawl %s pages %d-%d: %s exhausted=%s (%.0fs)", + sitetag, start, last_done, counters, exhausted, time.time() - t0, + ) + return {"sitetag": sitetag, "start": start, "end": last_done, "exhausted": exhausted, **counters} diff --git a/app/scheduler/jobs.py b/app/scheduler/jobs.py index 811e9db..23eb24f 100644 --- a/app/scheduler/jobs.py +++ b/app/scheduler/jobs.py @@ -107,6 +107,19 @@ def _job_reap_stuck() -> None: log.exception("[scheduler] periodic reaper failed") +def _job_deep_crawl(pages_per_run: int) -> None: + """Deep-crawl pełnych katalogów browse-tube'ów (Faza 2a — ingest-all). Round-robin + po sitetagu, wznawialny kursor (app/scheduler/deep_crawl.py). Hard-timeout 1h.""" + log.info("[scheduler] deep-crawl starting (pages_per_run=%d)", pages_per_run) + from app.scheduler.deep_crawl import run_deep_crawl + + _run_with_timeout( + lambda: run_deep_crawl(pages_per_run=pages_per_run), + label="deep-crawl", + timeout_sec=3600, + ) + + def _job_browse_latest(max_pages: int) -> None: """Browse-latest — scrap newest scenes z rich-metadata tubes (shyfap + ...). Komplementarny do performer-driven: forward-fill (new scenes) vs backward (known performers). @@ -334,6 +347,18 @@ def build_scheduler(cfg: dict[str, Any]) -> BlockingScheduler: seconds, refresh_days, ) + if cfg.get("deep_crawl_hours"): + pages = cfg.get("deep_crawl_pages_per_run") or 60 + sched.add_job( + lambda: _job_deep_crawl(pages), + IntervalTrigger(hours=cfg["deep_crawl_hours"], start_date=INTERVAL_ANCHOR), + id="deep_crawl", + replace_existing=True, + max_instances=1, + coalesce=True, + ) + log.info("scheduler: deep-crawl every %dh (%d pages/run)", cfg["deep_crawl_hours"], pages) + # Periodic reaper — czyści zombie 'running' runy co godzinę. Domyślnie ZAWSZE on # (cfg.get(...,1)), bo startup-only reaper nie łapie zawisów gdy worker długo żyje. reap_hours = cfg.get("reap_stuck_hours", 1) @@ -377,4 +402,8 @@ DEFAULT_CONFIG: dict[str, Any] = { "taxonomy_counts_hours": 3, # Periodic reaper zombie 'running' runów — co 1h (próg 'running'>2h w funkcji). "reap_stuck_hours": 1, + # Deep-crawl pełnych katalogów browse-tube'ów (Faza 2a). Co 1h, 60 stron/run, + # round-robin po tube'ach. Backfill katalogów (porndoe ~62k itd.) przez dni. + "deep_crawl_hours": 1, + "deep_crawl_pages_per_run": 60, } diff --git a/app/scheduler/worker.py b/app/scheduler/worker.py index bb353ef..eb0377f 100644 --- a/app/scheduler/worker.py +++ b/app/scheduler/worker.py @@ -201,6 +201,9 @@ def run_forever() -> int: # 93d3c485 (2026-05-19) "brak freshporno". "browse_latest_hours": getattr(settings, "sched_browse_latest_hours", 6) or None, "browse_latest_max_pages": getattr(settings, "sched_browse_latest_max_pages", 5), + # Deep-crawl pełnych katalogów browse-tube'ów (Faza 2a) — round-robin, wznawialny. + "deep_crawl_hours": getattr(settings, "sched_deep_crawl_hours", 1) or None, + "deep_crawl_pages_per_run": getattr(settings, "deep_crawl_pages_per_run", 60), # Bulk-dedup performers — safety net dla duplikatów które resolver # pominął (np. freshporno scen przed fixem release_date). Run 12h. "bulk_dedup_hours": getattr(settings, "sched_bulk_dedup_hours", 12) or None, diff --git a/scripts/pilot_porndoe_deepcrawl.py b/scripts/pilot_porndoe_deepcrawl.py new file mode 100644 index 0000000..06f6e3f --- /dev/null +++ b/scripts/pilot_porndoe_deepcrawl.py @@ -0,0 +1,73 @@ +"""Pilot (Faza 1) — deep-crawl porndoe poza najnowsze strony, żeby zmierzyć WARTOŚĆ +pełnego crawlu tube'a (vs obecne search+top-N). Mamy ~3% katalogu porndoe (1959/62k+). + +Crawluje strony START..END (domyślnie 64+, czyli ogon którego jeszcze nie mamy), +przepuszcza przez normalny `_process_scene` (resolver: match canonical / orphan + tagi ++ duration). Mierzy counters. NIE modyfikuje produkcyjnych jobów — to ad-hoc pomiar. + +Użycie: python scripts/pilot_porndoe_deepcrawl.py --start 64 --end 110 +""" +from __future__ import annotations + +import argparse +import logging +import sys +import time + +from app.connectors.direct_scrapers.porndoe import PornDoeScraper +from app.db import session_scope +from app.extractors import browser_get +from app.ingest import _process_scene, get_or_create_source +from app.models.source import SourceKind + +logging.basicConfig(level=logging.WARNING, format="%(asctime)s %(levelname)s %(message)s") +log = logging.getLogger("pilot_porndoe") + + +def main() -> int: + ap = argparse.ArgumentParser() + ap.add_argument("--start", type=int, default=64) + ap.add_argument("--end", type=int, default=110) + args = ap.parse_args() + + s = PornDoeScraper() + with session_scope() as session: + src = get_or_create_source(session, kind=SourceKind.scraper, name="pornapp") + source_id = src.id + + counters = {"seen": 0, "new": 0, "updated": 0, "skipped": 0, "errors": 0} + t0 = time.time() + for page in range(args.start, args.end + 1): + try: + res = browser_get(s._listing_url(page), timeout=30) + html = res.text if hasattr(res, "text") else res + except Exception as e: + log.warning("listing page %d failed: %s", page, e) + continue + urls = s._extract_scene_urls(html) + if not urls: + print(f"empty listing page {page}, stop") + break + for u in urls: + try: + r = browser_get(u, timeout=30) + dh = r.text if hasattr(r, "text") else r + raw = s._parse_detail(u, dh) + except Exception: + counters["errors"] += 1 + continue + if raw is None: + continue + counters["seen"] += 1 + try: + _process_scene(source_id=source_id, raw_scene=raw, counters=counters) + except Exception: + counters["errors"] += 1 + print(f"page {page}: {counters} ({time.time() - t0:.0f}s)", flush=True) + + print(f"PILOT DONE pages {args.start}-{args.end}: {counters} elapsed={time.time() - t0:.0f}s") + return 0 + + +if __name__ == "__main__": + sys.exit(main())