The umbrella Source.name for all direct tube scrapers (deep-crawl, browse-latest, performer-driven) was "pornapp" — a misleading leftover from the removed external porn-app API. It read like a dependency on a third-party "pornapp" service; it is not — these are our own scrapers hitting 25+ tubes directly (kind=scraper, origin tube:<sitetag>). Renamed to "tube-scraper" via a single SCRAPER_SOURCE_NAME constant; DB row renamed in place (UPDATE name, same id) so all ingest_runs + external_records history stays linked. No behavior change — external_id keying (sitetag:url) and dedup are unaffected. NOTE: playback_sources.origin "pornapp:<sitetag>" prefix is a separate legacy format (resolve_playback parses it) and is intentionally left untouched. Verified on prod: row renamed (0 stray "pornapp"), new runs land on "tube-scraper". Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
158 lines
6.1 KiB
Python
158 lines
6.1 KiB
Python
"""Deep-crawl pełnych katalogów browse-tube'ów (Faza 2a — "ingest-all").
|
|
|
|
Browse scrapery (ALL_BROWSE_SCRAPERS) mają pełne listingi (np. porndoe >62k scen),
|
|
a my mieliśmy ~3% katalogu (search-by-performer + top-N browse). Ten job paginuje
|
|
DEEP: per tube trzyma kursor `last_page`, co run crawluje kolejne N stron od kursora,
|
|
idempotentnie (resolver pomija znane po raw_hash). Po dojściu do końca katalogu
|
|
(pusty listing) tube jest `exhausted`; gdy wszystkie exhausted — reset kursorów i
|
|
re-sweep od page 1 (incremental: łapie nowe + potwierdza istniejące).
|
|
|
|
Pilot 2026-06-03 (porndoe ogon, strony 64-110): 1119 nowych scen, 100% grywalne +
|
|
100% otagowane, 0% canonical-overlap (czysto addytywny content, nie duplikuje TPDB/
|
|
StashDB). ~1.2s/scenę.
|
|
|
|
Stan w JSON (mounted `app/_state/deepcrawl_state.json`) — wznawia między runami bez
|
|
migracji DB. Round-robin po `updated_at` → wszystkie tube'y postępują równomiernie.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import time
|
|
from pathlib import Path
|
|
|
|
from app.config import get_settings
|
|
from app.connectors.direct_scrapers import ALL_BROWSE_SCRAPERS, SCRAPER_SOURCE_NAME
|
|
from app.db import session_scope
|
|
from app.ingest import _process_scene, get_or_create_source
|
|
from app.models.source import SourceKind
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
_DEFAULT_STATE = Path(__file__).resolve().parent.parent / "_state" / "deepcrawl_state.json"
|
|
|
|
# Per-tube depth cap (stron). Mega-tube'y (xvideos ~13M scen) crawlowane do końca
|
|
# zmonopolizowałyby round-robin i zalały bazę — capujemy do ~najnowszych N stron, potem
|
|
# exhausted→reset (incremental re-sweep świeżych). Tube'y skończone (porndoe/eporner) bez
|
|
# capu (None) → naturalny koniec katalogu. xvideos /new/ ~27 scen/stronę → 1800 ≈ ~50k.
|
|
_PAGE_CAP: dict[str, int] = {
|
|
"xvideoscom": 1800,
|
|
}
|
|
|
|
|
|
def _state_path() -> Path:
|
|
return Path(getattr(get_settings(), "deepcrawl_state_path", None) or _DEFAULT_STATE)
|
|
|
|
|
|
def _load_state() -> dict:
|
|
p = _state_path()
|
|
if p.exists():
|
|
try:
|
|
return json.loads(p.read_text(encoding="utf-8"))
|
|
except Exception as e: # pragma: no cover - obronnie
|
|
log.warning("deep-crawl: bad state file %s: %s — starting fresh", p, e)
|
|
return {}
|
|
|
|
|
|
def _save_state(state: dict) -> None:
|
|
p = _state_path()
|
|
p.parent.mkdir(parents=True, exist_ok=True)
|
|
tmp = p.with_suffix(".tmp")
|
|
tmp.write_text(json.dumps(state, indent=2), encoding="utf-8")
|
|
tmp.replace(p) # atomic
|
|
|
|
|
|
def _browse_scrapers() -> dict:
|
|
"""{sitetag: scraper_cls} dla zarejestrowanych browse-scraperów."""
|
|
out: dict = {}
|
|
for cls in ALL_BROWSE_SCRAPERS:
|
|
try:
|
|
out[cls().sitetag] = cls
|
|
except Exception as e: # pragma: no cover
|
|
log.warning("deep-crawl: skip scraper %s: %s", cls.__name__, e)
|
|
return out
|
|
|
|
|
|
def _pick_target(state: dict, targets: list[str]) -> str | None:
|
|
"""Wybierz tube do crawla: najmniej-ostatnio-crawlowany, pomijając exhausted.
|
|
Gdy wszystkie exhausted → reset (incremental re-sweep od page 1)."""
|
|
live = [t for t in targets if not state.get(t, {}).get("exhausted")]
|
|
if not live:
|
|
if not targets:
|
|
return None
|
|
log.info("deep-crawl: all tubes exhausted → reset cursors for incremental re-sweep")
|
|
for t in targets:
|
|
state.setdefault(t, {})
|
|
state[t]["exhausted"] = False
|
|
state[t]["last_page"] = 0
|
|
live = targets
|
|
live.sort(key=lambda t: state.get(t, {}).get("updated_at", 0))
|
|
return live[0]
|
|
|
|
|
|
def run_deep_crawl(*, pages_per_run: int = 60, sitetags: list[str] | None = None) -> dict:
|
|
"""Jeden run: wybierz tube, crawl kolejne `pages_per_run` stron od kursora, ingest.
|
|
Zwraca podsumowanie (sitetag, zakres stron, counters, exhausted)."""
|
|
scrapers = _browse_scrapers()
|
|
targets = [t for t in (sitetags or list(scrapers)) if t in scrapers]
|
|
if not targets:
|
|
log.warning("deep-crawl: no browse scrapers / matching sitetags")
|
|
return {}
|
|
|
|
state = _load_state()
|
|
sitetag = _pick_target(state, targets)
|
|
if sitetag is None:
|
|
return {}
|
|
|
|
scraper = scrapers[sitetag]()
|
|
cap = _PAGE_CAP.get(sitetag) # mega-tube depth cap (None = crawl do końca katalogu)
|
|
start = int(state.get(sitetag, {}).get("last_page", 0)) + 1
|
|
end = start + pages_per_run - 1
|
|
if cap is not None:
|
|
end = min(end, cap)
|
|
|
|
with session_scope() as session:
|
|
src = get_or_create_source(session, kind=SourceKind.scraper, name=SCRAPER_SOURCE_NAME)
|
|
source_id = src.id
|
|
|
|
counters = {"seen": 0, "new": 0, "updated": 0, "skipped": 0, "errors": 0}
|
|
t0 = time.time()
|
|
last_done = start - 1
|
|
exhausted = False
|
|
|
|
if cap is not None and start > cap:
|
|
# kursor osiągnął per-tube cap → traktuj jak koniec katalogu (reset re-sweepuje od 1)
|
|
exhausted = True
|
|
else:
|
|
for page in range(start, end + 1):
|
|
scenes = scraper.crawl_page(page)
|
|
if scenes is None:
|
|
# transient fetch-fail listingu — NIE awansuj kursora, następny run powtórzy
|
|
break
|
|
if not scenes:
|
|
log.info("deep-crawl %s: empty page %d → catalog end (exhausted)", sitetag, page)
|
|
exhausted = True
|
|
last_done = page
|
|
break
|
|
for raw in scenes:
|
|
counters["seen"] += 1
|
|
try:
|
|
_process_scene(source_id=source_id, raw_scene=raw, counters=counters)
|
|
except Exception:
|
|
counters["errors"] += 1
|
|
last_done = page
|
|
if cap is not None and last_done >= cap:
|
|
log.info("deep-crawl %s: reached page cap %d (exhausted)", sitetag, cap)
|
|
exhausted = True
|
|
|
|
st = state.setdefault(sitetag, {})
|
|
st["last_page"] = last_done
|
|
st["exhausted"] = exhausted
|
|
st["updated_at"] = int(time.time())
|
|
_save_state(state)
|
|
|
|
log.info(
|
|
"deep-crawl %s pages %d-%d: %s exhausted=%s (%.0fs)",
|
|
sitetag, start, last_done, counters, exhausted, time.time() - t0,
|
|
)
|
|
return {"sitetag": sitetag, "start": start, "end": last_done, "exhausted": exhausted, **counters}
|