feat(scheduler): deep-crawl full tube catalogs (Phase 2a — ingest-all)
We ingested only ~3% of each browse tube's catalog (porndoe >62k scenes; we had 1959) because tubes were hit only by performer-search + top-N browse. Pilot (porndoe pages 64-110): 1119 new scenes, 100% playable + 100% tagged, 0% canonical overlap (purely additive — content not in TPDB/StashDB). - app/scheduler/deep_crawl.py: round-robin over ALL_BROWSE_SCRAPERS, per-tube page cursor in app/_state/deepcrawl_state.json (no DB migration), deep-paginate from the cursor, idempotent (resolver skips known by raw_hash), mark 'exhausted' at catalog end then reset cursors for an incremental re-sweep. - _job_deep_crawl: hourly, 60 pages/run (~1860 scenes, ~22 min), wrapped in the 1h hard-timeout; registered in build_scheduler (jobs=10). - config: sched_deep_crawl_hours=1, deep_crawl_pages_per_run=60, deepcrawl_state_path. - scripts/pilot_porndoe_deepcrawl.py: one-off pilot used to validate the approach. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
5e74195878
commit
7e46e5ac48
5 changed files with 264 additions and 0 deletions
|
|
@ -88,6 +88,12 @@ class Settings(BaseSettings):
|
||||||
sched_browse_latest_max_pages: int = Field(
|
sched_browse_latest_max_pages: int = Field(
|
||||||
default=5, validation_alias="GOON_SCHED_BROWSE_LATEST_MAX_PAGES"
|
default=5, validation_alias="GOON_SCHED_BROWSE_LATEST_MAX_PAGES"
|
||||||
)
|
)
|
||||||
|
# Deep-crawl (Faza 2a) — pełne katalogi browse-tube'ów (porndoe ~62k itd.), nie tylko
|
||||||
|
# top-N. Round-robin po tube'ach, wznawialny kursor (app/_state/deepcrawl_state.json).
|
||||||
|
# 0 = wyłączony. 60 stron/run × ~31 scen ≈ 1860 scen/run (~22 min, hard-timeout 1h).
|
||||||
|
sched_deep_crawl_hours: int = Field(default=1, validation_alias="GOON_SCHED_DEEP_CRAWL_HOURS")
|
||||||
|
deep_crawl_pages_per_run: int = Field(default=60, validation_alias="GOON_DEEP_CRAWL_PAGES_PER_RUN")
|
||||||
|
deepcrawl_state_path: str = Field(default="", validation_alias="GOON_DEEPCRAWL_STATE_PATH")
|
||||||
# Bulk-dedup performers safety net — auto-merge duplikatów które resolver-time
|
# Bulk-dedup performers safety net — auto-merge duplikatów które resolver-time
|
||||||
# scoring pominął. 12h cadence: leci 2x dziennie (po porannym browse-latest run).
|
# scoring pominął. 12h cadence: leci 2x dziennie (po porannym browse-latest run).
|
||||||
sched_bulk_dedup_hours: int = Field(
|
sched_bulk_dedup_hours: int = Field(
|
||||||
|
|
|
||||||
153
app/scheduler/deep_crawl.py
Normal file
153
app/scheduler/deep_crawl.py
Normal file
|
|
@ -0,0 +1,153 @@
|
||||||
|
"""Deep-crawl pełnych katalogów browse-tube'ów (Faza 2a — "ingest-all").
|
||||||
|
|
||||||
|
Browse scrapery (ALL_BROWSE_SCRAPERS) mają pełne listingi (np. porndoe >62k scen),
|
||||||
|
a my mieliśmy ~3% katalogu (search-by-performer + top-N browse). Ten job paginuje
|
||||||
|
DEEP: per tube trzyma kursor `last_page`, co run crawluje kolejne N stron od kursora,
|
||||||
|
idempotentnie (resolver pomija znane po raw_hash). Po dojściu do końca katalogu
|
||||||
|
(pusty listing) tube jest `exhausted`; gdy wszystkie exhausted — reset kursorów i
|
||||||
|
re-sweep od page 1 (incremental: łapie nowe + potwierdza istniejące).
|
||||||
|
|
||||||
|
Pilot 2026-06-03 (porndoe ogon, strony 64-110): 1119 nowych scen, 100% grywalne +
|
||||||
|
100% otagowane, 0% canonical-overlap (czysto addytywny content, nie duplikuje TPDB/
|
||||||
|
StashDB). ~1.2s/scenę.
|
||||||
|
|
||||||
|
Stan w JSON (mounted `app/_state/deepcrawl_state.json`) — wznawia między runami bez
|
||||||
|
migracji DB. Round-robin po `updated_at` → wszystkie tube'y postępują równomiernie.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from app.config import get_settings
|
||||||
|
from app.connectors.direct_scrapers import ALL_BROWSE_SCRAPERS
|
||||||
|
from app.db import session_scope
|
||||||
|
from app.extractors import browser_get
|
||||||
|
from app.ingest import _process_scene, get_or_create_source
|
||||||
|
from app.models.source import SourceKind
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_DEFAULT_STATE = Path(__file__).resolve().parent.parent / "_state" / "deepcrawl_state.json"
|
||||||
|
|
||||||
|
|
||||||
|
def _state_path() -> Path:
|
||||||
|
return Path(getattr(get_settings(), "deepcrawl_state_path", None) or _DEFAULT_STATE)
|
||||||
|
|
||||||
|
|
||||||
|
def _load_state() -> dict:
|
||||||
|
p = _state_path()
|
||||||
|
if p.exists():
|
||||||
|
try:
|
||||||
|
return json.loads(p.read_text(encoding="utf-8"))
|
||||||
|
except Exception as e: # pragma: no cover - obronnie
|
||||||
|
log.warning("deep-crawl: bad state file %s: %s — starting fresh", p, e)
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
|
def _save_state(state: dict) -> None:
|
||||||
|
p = _state_path()
|
||||||
|
p.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
tmp = p.with_suffix(".tmp")
|
||||||
|
tmp.write_text(json.dumps(state, indent=2), encoding="utf-8")
|
||||||
|
tmp.replace(p) # atomic
|
||||||
|
|
||||||
|
|
||||||
|
def _browse_scrapers() -> dict:
|
||||||
|
"""{sitetag: scraper_cls} dla zarejestrowanych browse-scraperów."""
|
||||||
|
out: dict = {}
|
||||||
|
for cls in ALL_BROWSE_SCRAPERS:
|
||||||
|
try:
|
||||||
|
out[cls().sitetag] = cls
|
||||||
|
except Exception as e: # pragma: no cover
|
||||||
|
log.warning("deep-crawl: skip scraper %s: %s", cls.__name__, e)
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def _pick_target(state: dict, targets: list[str]) -> str | None:
|
||||||
|
"""Wybierz tube do crawla: najmniej-ostatnio-crawlowany, pomijając exhausted.
|
||||||
|
Gdy wszystkie exhausted → reset (incremental re-sweep od page 1)."""
|
||||||
|
live = [t for t in targets if not state.get(t, {}).get("exhausted")]
|
||||||
|
if not live:
|
||||||
|
if not targets:
|
||||||
|
return None
|
||||||
|
log.info("deep-crawl: all tubes exhausted → reset cursors for incremental re-sweep")
|
||||||
|
for t in targets:
|
||||||
|
state.setdefault(t, {})
|
||||||
|
state[t]["exhausted"] = False
|
||||||
|
state[t]["last_page"] = 0
|
||||||
|
live = targets
|
||||||
|
live.sort(key=lambda t: state.get(t, {}).get("updated_at", 0))
|
||||||
|
return live[0]
|
||||||
|
|
||||||
|
|
||||||
|
def run_deep_crawl(*, pages_per_run: int = 60, sitetags: list[str] | None = None) -> dict:
|
||||||
|
"""Jeden run: wybierz tube, crawl kolejne `pages_per_run` stron od kursora, ingest.
|
||||||
|
Zwraca podsumowanie (sitetag, zakres stron, counters, exhausted)."""
|
||||||
|
scrapers = _browse_scrapers()
|
||||||
|
targets = [t for t in (sitetags or list(scrapers)) if t in scrapers]
|
||||||
|
if not targets:
|
||||||
|
log.warning("deep-crawl: no browse scrapers / matching sitetags")
|
||||||
|
return {}
|
||||||
|
|
||||||
|
state = _load_state()
|
||||||
|
sitetag = _pick_target(state, targets)
|
||||||
|
if sitetag is None:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
scraper = scrapers[sitetag]()
|
||||||
|
start = int(state.get(sitetag, {}).get("last_page", 0)) + 1
|
||||||
|
end = start + pages_per_run - 1
|
||||||
|
|
||||||
|
with session_scope() as session:
|
||||||
|
src = get_or_create_source(session, kind=SourceKind.scraper, name="pornapp")
|
||||||
|
source_id = src.id
|
||||||
|
|
||||||
|
counters = {"seen": 0, "new": 0, "updated": 0, "skipped": 0, "errors": 0}
|
||||||
|
t0 = time.time()
|
||||||
|
last_done = start - 1
|
||||||
|
exhausted = False
|
||||||
|
|
||||||
|
for page in range(start, end + 1):
|
||||||
|
try:
|
||||||
|
res = browser_get(scraper._listing_url(page), timeout=30)
|
||||||
|
html = res.text if hasattr(res, "text") else res
|
||||||
|
except Exception as e:
|
||||||
|
log.warning("deep-crawl %s listing page %d failed: %s", sitetag, page, e)
|
||||||
|
break # nie awansuj kursora przez błąd sieci — następny run powtórzy
|
||||||
|
urls = scraper._extract_scene_urls(html)
|
||||||
|
if not urls:
|
||||||
|
log.info("deep-crawl %s: empty page %d → catalog end (exhausted)", sitetag, page)
|
||||||
|
exhausted = True
|
||||||
|
last_done = page
|
||||||
|
break
|
||||||
|
for u in urls:
|
||||||
|
try:
|
||||||
|
r = browser_get(u, timeout=30)
|
||||||
|
dh = r.text if hasattr(r, "text") else r
|
||||||
|
raw = scraper._parse_detail(u, dh)
|
||||||
|
except Exception:
|
||||||
|
counters["errors"] += 1
|
||||||
|
continue
|
||||||
|
if raw is None:
|
||||||
|
continue
|
||||||
|
counters["seen"] += 1
|
||||||
|
try:
|
||||||
|
_process_scene(source_id=source_id, raw_scene=raw, counters=counters)
|
||||||
|
except Exception:
|
||||||
|
counters["errors"] += 1
|
||||||
|
last_done = page
|
||||||
|
|
||||||
|
st = state.setdefault(sitetag, {})
|
||||||
|
st["last_page"] = last_done
|
||||||
|
st["exhausted"] = exhausted
|
||||||
|
st["updated_at"] = int(time.time())
|
||||||
|
_save_state(state)
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
"deep-crawl %s pages %d-%d: %s exhausted=%s (%.0fs)",
|
||||||
|
sitetag, start, last_done, counters, exhausted, time.time() - t0,
|
||||||
|
)
|
||||||
|
return {"sitetag": sitetag, "start": start, "end": last_done, "exhausted": exhausted, **counters}
|
||||||
|
|
@ -107,6 +107,19 @@ def _job_reap_stuck() -> None:
|
||||||
log.exception("[scheduler] periodic reaper failed")
|
log.exception("[scheduler] periodic reaper failed")
|
||||||
|
|
||||||
|
|
||||||
|
def _job_deep_crawl(pages_per_run: int) -> None:
|
||||||
|
"""Deep-crawl pełnych katalogów browse-tube'ów (Faza 2a — ingest-all). Round-robin
|
||||||
|
po sitetagu, wznawialny kursor (app/scheduler/deep_crawl.py). Hard-timeout 1h."""
|
||||||
|
log.info("[scheduler] deep-crawl starting (pages_per_run=%d)", pages_per_run)
|
||||||
|
from app.scheduler.deep_crawl import run_deep_crawl
|
||||||
|
|
||||||
|
_run_with_timeout(
|
||||||
|
lambda: run_deep_crawl(pages_per_run=pages_per_run),
|
||||||
|
label="deep-crawl",
|
||||||
|
timeout_sec=3600,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _job_browse_latest(max_pages: int) -> None:
|
def _job_browse_latest(max_pages: int) -> None:
|
||||||
"""Browse-latest — scrap newest scenes z rich-metadata tubes (shyfap + ...).
|
"""Browse-latest — scrap newest scenes z rich-metadata tubes (shyfap + ...).
|
||||||
Komplementarny do performer-driven: forward-fill (new scenes) vs backward (known performers).
|
Komplementarny do performer-driven: forward-fill (new scenes) vs backward (known performers).
|
||||||
|
|
@ -334,6 +347,18 @@ def build_scheduler(cfg: dict[str, Any]) -> BlockingScheduler:
|
||||||
seconds, refresh_days,
|
seconds, refresh_days,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if cfg.get("deep_crawl_hours"):
|
||||||
|
pages = cfg.get("deep_crawl_pages_per_run") or 60
|
||||||
|
sched.add_job(
|
||||||
|
lambda: _job_deep_crawl(pages),
|
||||||
|
IntervalTrigger(hours=cfg["deep_crawl_hours"], start_date=INTERVAL_ANCHOR),
|
||||||
|
id="deep_crawl",
|
||||||
|
replace_existing=True,
|
||||||
|
max_instances=1,
|
||||||
|
coalesce=True,
|
||||||
|
)
|
||||||
|
log.info("scheduler: deep-crawl every %dh (%d pages/run)", cfg["deep_crawl_hours"], pages)
|
||||||
|
|
||||||
# Periodic reaper — czyści zombie 'running' runy co godzinę. Domyślnie ZAWSZE on
|
# Periodic reaper — czyści zombie 'running' runy co godzinę. Domyślnie ZAWSZE on
|
||||||
# (cfg.get(...,1)), bo startup-only reaper nie łapie zawisów gdy worker długo żyje.
|
# (cfg.get(...,1)), bo startup-only reaper nie łapie zawisów gdy worker długo żyje.
|
||||||
reap_hours = cfg.get("reap_stuck_hours", 1)
|
reap_hours = cfg.get("reap_stuck_hours", 1)
|
||||||
|
|
@ -377,4 +402,8 @@ DEFAULT_CONFIG: dict[str, Any] = {
|
||||||
"taxonomy_counts_hours": 3,
|
"taxonomy_counts_hours": 3,
|
||||||
# Periodic reaper zombie 'running' runów — co 1h (próg 'running'>2h w funkcji).
|
# Periodic reaper zombie 'running' runów — co 1h (próg 'running'>2h w funkcji).
|
||||||
"reap_stuck_hours": 1,
|
"reap_stuck_hours": 1,
|
||||||
|
# Deep-crawl pełnych katalogów browse-tube'ów (Faza 2a). Co 1h, 60 stron/run,
|
||||||
|
# round-robin po tube'ach. Backfill katalogów (porndoe ~62k itd.) przez dni.
|
||||||
|
"deep_crawl_hours": 1,
|
||||||
|
"deep_crawl_pages_per_run": 60,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -201,6 +201,9 @@ def run_forever() -> int:
|
||||||
# 93d3c485 (2026-05-19) "brak freshporno".
|
# 93d3c485 (2026-05-19) "brak freshporno".
|
||||||
"browse_latest_hours": getattr(settings, "sched_browse_latest_hours", 6) or None,
|
"browse_latest_hours": getattr(settings, "sched_browse_latest_hours", 6) or None,
|
||||||
"browse_latest_max_pages": getattr(settings, "sched_browse_latest_max_pages", 5),
|
"browse_latest_max_pages": getattr(settings, "sched_browse_latest_max_pages", 5),
|
||||||
|
# Deep-crawl pełnych katalogów browse-tube'ów (Faza 2a) — round-robin, wznawialny.
|
||||||
|
"deep_crawl_hours": getattr(settings, "sched_deep_crawl_hours", 1) or None,
|
||||||
|
"deep_crawl_pages_per_run": getattr(settings, "deep_crawl_pages_per_run", 60),
|
||||||
# Bulk-dedup performers — safety net dla duplikatów które resolver
|
# Bulk-dedup performers — safety net dla duplikatów które resolver
|
||||||
# pominął (np. freshporno scen przed fixem release_date). Run 12h.
|
# pominął (np. freshporno scen przed fixem release_date). Run 12h.
|
||||||
"bulk_dedup_hours": getattr(settings, "sched_bulk_dedup_hours", 12) or None,
|
"bulk_dedup_hours": getattr(settings, "sched_bulk_dedup_hours", 12) or None,
|
||||||
|
|
|
||||||
73
scripts/pilot_porndoe_deepcrawl.py
Normal file
73
scripts/pilot_porndoe_deepcrawl.py
Normal file
|
|
@ -0,0 +1,73 @@
|
||||||
|
"""Pilot (Faza 1) — deep-crawl porndoe poza najnowsze strony, żeby zmierzyć WARTOŚĆ
|
||||||
|
pełnego crawlu tube'a (vs obecne search+top-N). Mamy ~3% katalogu porndoe (1959/62k+).
|
||||||
|
|
||||||
|
Crawluje strony START..END (domyślnie 64+, czyli ogon którego jeszcze nie mamy),
|
||||||
|
przepuszcza przez normalny `_process_scene` (resolver: match canonical / orphan + tagi
|
||||||
|
+ duration). Mierzy counters. NIE modyfikuje produkcyjnych jobów — to ad-hoc pomiar.
|
||||||
|
|
||||||
|
Użycie: python scripts/pilot_porndoe_deepcrawl.py --start 64 --end 110
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import logging
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
|
||||||
|
from app.connectors.direct_scrapers.porndoe import PornDoeScraper
|
||||||
|
from app.db import session_scope
|
||||||
|
from app.extractors import browser_get
|
||||||
|
from app.ingest import _process_scene, get_or_create_source
|
||||||
|
from app.models.source import SourceKind
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.WARNING, format="%(asctime)s %(levelname)s %(message)s")
|
||||||
|
log = logging.getLogger("pilot_porndoe")
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> int:
|
||||||
|
ap = argparse.ArgumentParser()
|
||||||
|
ap.add_argument("--start", type=int, default=64)
|
||||||
|
ap.add_argument("--end", type=int, default=110)
|
||||||
|
args = ap.parse_args()
|
||||||
|
|
||||||
|
s = PornDoeScraper()
|
||||||
|
with session_scope() as session:
|
||||||
|
src = get_or_create_source(session, kind=SourceKind.scraper, name="pornapp")
|
||||||
|
source_id = src.id
|
||||||
|
|
||||||
|
counters = {"seen": 0, "new": 0, "updated": 0, "skipped": 0, "errors": 0}
|
||||||
|
t0 = time.time()
|
||||||
|
for page in range(args.start, args.end + 1):
|
||||||
|
try:
|
||||||
|
res = browser_get(s._listing_url(page), timeout=30)
|
||||||
|
html = res.text if hasattr(res, "text") else res
|
||||||
|
except Exception as e:
|
||||||
|
log.warning("listing page %d failed: %s", page, e)
|
||||||
|
continue
|
||||||
|
urls = s._extract_scene_urls(html)
|
||||||
|
if not urls:
|
||||||
|
print(f"empty listing page {page}, stop")
|
||||||
|
break
|
||||||
|
for u in urls:
|
||||||
|
try:
|
||||||
|
r = browser_get(u, timeout=30)
|
||||||
|
dh = r.text if hasattr(r, "text") else r
|
||||||
|
raw = s._parse_detail(u, dh)
|
||||||
|
except Exception:
|
||||||
|
counters["errors"] += 1
|
||||||
|
continue
|
||||||
|
if raw is None:
|
||||||
|
continue
|
||||||
|
counters["seen"] += 1
|
||||||
|
try:
|
||||||
|
_process_scene(source_id=source_id, raw_scene=raw, counters=counters)
|
||||||
|
except Exception:
|
||||||
|
counters["errors"] += 1
|
||||||
|
print(f"page {page}: {counters} ({time.time() - t0:.0f}s)", flush=True)
|
||||||
|
|
||||||
|
print(f"PILOT DONE pages {args.start}-{args.end}: {counters} elapsed={time.time() - t0:.0f}s")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(main())
|
||||||
Loading…
Add table
Reference in a new issue