goon/app/scheduler/worker.py
jtrzupek cd12348782 fix(movies): paradisehill delta date-granularity + browse cadence docs
- paradisehill.fetch_movies compared release_date coerced to midnight against the
  `since` timestamp, so the chronological crawl stopped at the first upload dated
  the same calendar day as `since` and silently dropped most new movies (0-2 seen
  per run; Movies tab stalled). Compare by DATE with a 1-day grace instead; idempotent
  external_records upsert dedups the re-fetched recent window.
- scripts/backfill_paradisehill_movies.py: one-off no-delta deep crawl to recover the
  backlog missed during the bug (idempotent, resumable).
- docs: correct stale 'raz dziennie/24h' browse-latest comments to 6h (4x/day), the
  actual configured cadence (config.py sched_browse_latest_hours=6).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-01 17:00:10 +02:00

338 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Worker process.
Tryby:
--once --source=tpdb|stashdb [--limit=N] [--no-delta]
Klasyczny ingest z jednego connectora.
--once --strategy=performer-driven [--top-n=N] [--performers="Lola,Mia"]
Strategia performer-driven: dla top-N performerów (lub explicit listy)
pobierz wszystkie sceny z TPDB/StashDB + szukaj w direct tube scrapach.
Patrz `app/scheduler/performer_driven.py`.
Pełen scheduler (APScheduler + cron) — patrz `jobs.py`.
"""
from __future__ import annotations
import argparse
import logging
import signal
import sys
import time
import uuid
from datetime import UTC, datetime, timedelta
from app.config import get_settings
from app.connectors.base import BaseConnector
from app.connectors.stashdb import StashDBConnector
from app.connectors.tpdb import TPDBConnector
from app.ingest import ingest_from_connector
from app.scheduler.performer_driven import run_performer_driven
log = logging.getLogger(__name__)
CONNECTORS: dict[str, type[BaseConnector]] = {
"tpdb": TPDBConnector,
"stashdb": StashDBConnector,
}
STRATEGIES = ("performer-driven", "bulk-dedup", "movies")
# Movie connectors zarejestrowane w app/connectors/__init__.py — wspólne miejsce
# dla scheduler-job i ręcznego --strategy=movies.
def _movie_connectors() -> dict[str, type]:
from app.connectors import get_movie_connectors
return dict(get_movie_connectors())
def _build_connector(name: str) -> BaseConnector:
cls = CONNECTORS.get(name)
if cls is None:
raise SystemExit(f"unknown source: {name}. available={list(CONNECTORS)}")
return cls()
def run_once_source(*, source: str, limit: int | None, use_delta: bool) -> int:
connector = _build_connector(source)
counters = ingest_from_connector(connector, limit=limit, use_delta=use_delta)
log.info("run_once finished counters=%s", counters)
return 0 if counters["errors"] == 0 else 1
def run_once_strategy(
*,
strategy: str,
top_n: int,
performers: list[str] | None,
performer_ids: list[uuid.UUID] | None,
sitetags: list[str] | None,
per_performer_limit: int | None,
skip_tubes: bool = False,
dedup_strategy: str = "all",
cross_source_only: bool = False,
dry_run: bool = False,
) -> int:
if strategy == "performer-driven":
counters = run_performer_driven(
performer_names=performers,
performer_ids=performer_ids,
top_n=top_n,
sitetags=sitetags,
per_performer_limit=per_performer_limit,
skip_tubes=skip_tubes,
)
log.info(
"performer-driven done: processed=%d skipped_no_refs=%d per_source=%s",
counters.performers_processed,
counters.performers_skipped_no_refs,
counters.per_source,
)
total_errors = sum(s.get("errors", 0) for s in counters.per_source.values())
return 0 if total_errors == 0 else 1
if strategy == "movies":
from app.ingest import ingest_movies_from_connector
registry = _movie_connectors()
# `performers` field reusowane: lista `paradisehill,streamporn,...` lub "all".
# Default: wszystkie. CLI: --performers=paradisehill albo --performers=streamporn,mangoporn
which = performers or ["all"]
targets: list[str] = list(registry.keys()) if "all" in which else which
any_error = False
for tgt in targets:
cls = registry.get(tgt)
if cls is None:
log.error("unknown movie source: %s (available: %s)", tgt, list(registry))
any_error = True
continue
log.info("movie ingest %s starting (delta=%s, limit=%s)", tgt, True, per_performer_limit)
try:
counters = ingest_movies_from_connector(
cls(),
use_delta=True,
limit=per_performer_limit,
)
log.info("movie ingest %s done: %s", tgt, counters)
if counters.get("errors", 0) > 0:
any_error = True
except Exception:
log.exception("movie ingest %s failed", tgt)
any_error = True
return 1 if any_error else 0
if strategy == "bulk-dedup":
from app.scheduler.bulk_dedup import run_bulk_dedup
bc = run_bulk_dedup(
strategy=dedup_strategy,
dry_run=dry_run,
cross_source_only=cross_source_only,
)
log.info("bulk-dedup done: %s", bc)
return 0
raise SystemExit(f"unknown strategy: {strategy}. available={STRATEGIES}")
def reap_stuck_ingest_runs(max_age_hours: int = 2) -> int:
"""Oznacza ingest_runs wiszące w status=running jako failed (killed_by_restart).
Worker jest single-process: gdy zostanie ubity w trakcie runu (deploy, OOM,
restart obrazu), wiersz IngestRun zostaje na zawsze w `running` — `finished_at`
nigdy się nie ustawia. Bez reapowania zombie akumulują się (13 znalezionych
2026-06-01) i fałszują metryki: `_last_successful_finished_at` ignoruje je
(OK), ale watchdog liczący stuck-runy i fail-rate widzi je jako szum.
Reapujemy TYLKO runy starsze niż `max_age_hours` — healthy run kończy się w
sekundy/minuty (po SQL-fix phash), a movie-ingest ma własny 6-min cap, więc
cokolwiek `running` > 2h to na pewno trup po ubitym workerze. Margines chroni
równolegle odpalony ręczny `--once` (inny proces) przed omyłkowym reapem.
"""
from sqlalchemy import update
from app.db import session_scope
from app.models.ingest_run import IngestRun, IngestStatus
now = datetime.now(UTC)
cutoff = now - timedelta(hours=max_age_hours)
with session_scope() as session:
result = session.execute(
update(IngestRun)
.where(
IngestRun.status == IngestStatus.running,
IngestRun.started_at < cutoff,
)
.values(status=IngestStatus.failed, finished_at=now, errors={"message": "killed_by_restart"})
)
reaped = result.rowcount or 0
if reaped:
log.warning("reaped %d stuck ingest_runs (running > %dh) on startup", reaped, max_age_hours)
return reaped
def run_forever() -> int:
"""APScheduler scheduled mode — odpala joby cron-like wg config (env-driven).
Joby:
tpdb / stashdb — co N godzin, delta od ostatniego successful run
performer-driven — co N godzin, top-N performerów z bazy
performer-continuous — tick co N sekund, 1 performer per tick (tube backfill)
"""
from app.scheduler.jobs import build_scheduler # opóźniony import (apscheduler)
# Sprzątanie po poprzednim (ubitym) workerze zanim wystartujemy joby.
reap_stuck_ingest_runs()
settings = get_settings()
cfg = {
"tpdb_hours": settings.sched_tpdb_hours or None,
"stashdb_hours": settings.sched_stashdb_hours or None,
"performer_driven_hours": settings.sched_performer_driven_hours or None,
"performer_driven_top_n": settings.sched_performer_driven_top_n,
"performer_continuous_seconds": getattr(
settings, "sched_performer_continuous_seconds", 60
) or None,
"performer_continuous_refresh_days": getattr(
settings, "sched_performer_continuous_refresh_days", 30
),
"movie_ingest_hours": getattr(settings, "sched_movie_ingest_hours", 24) or None,
# Browse-latest scheduler — browse newest scenes co 6h / 4×dobę (~100 scen/tube/run).
# Bug: brak tego klucza w worker config przez ~tydzień powodował że browse-mode nigdy
# nie odpalał (15k freshporno z 2026-05-13 to bulk import jednorazowy). Bug-report
# 93d3c485 (2026-05-19) "brak freshporno".
"browse_latest_hours": getattr(settings, "sched_browse_latest_hours", 6) or None,
"browse_latest_max_pages": getattr(settings, "sched_browse_latest_max_pages", 5),
# Bulk-dedup performers — safety net dla duplikatów które resolver
# pominął (np. freshporno scen przed fixem release_date). Run 12h.
"bulk_dedup_hours": getattr(settings, "sched_bulk_dedup_hours", 12) or None,
# Taxonomy scene_count refresh — denormalizacja liczników (perf fix 0019).
"taxonomy_counts_hours": getattr(settings, "sched_taxonomy_counts_hours", 3) or None,
}
sched = build_scheduler(cfg)
log.info("worker scheduled mode starting (jobs=%d)", len(sched.get_jobs()))
try:
sched.start()
except (KeyboardInterrupt, SystemExit):
log.info("worker received shutdown")
return 0
def _split_csv(s: str | None) -> list[str]:
if not s:
return []
return [x.strip() for x in s.split(",") if x.strip()]
def main(argv: list[str] | None = None) -> int:
settings = get_settings()
logging.basicConfig(
level=settings.log_level,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
# Sentry: pusty DSN → no-op. Worker errory (failed ingest, dead connector,
# connection loss do TPDB/StashDB) trafiają do Sentry z release tagiem
# `goon-worker@...` — odróżnia ich od API errors w UI.
if settings.sentry_dsn:
import sentry_sdk
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
sentry_sdk.init(
dsn=settings.sentry_dsn,
environment=settings.sentry_environment,
traces_sample_rate=settings.sentry_traces_sample_rate,
integrations=[SqlalchemyIntegration()],
release="goon-worker@0.1.0",
)
parser = argparse.ArgumentParser(prog="goon-worker")
parser.add_argument("--once", action="store_true", help="Uruchom jeden cykl ingest i zakończ")
parser.add_argument("--source", choices=sorted(CONNECTORS), help="Nazwa źródła (np. tpdb)")
parser.add_argument(
"--strategy",
choices=STRATEGIES,
help="Strategia ingest (alternatywa do --source)",
)
parser.add_argument("--limit", type=int, default=None, help="Limit scen do pobrania")
parser.add_argument(
"--no-delta",
action="store_true",
help="Pomija filtr od ostatniego successful run (full re-pull)",
)
# Strategy-specific options
parser.add_argument("--top-n", type=int, default=20, help="Liczba top performerów (performer-driven)")
parser.add_argument(
"--performers",
type=str,
default=None,
help="Lista nazwisk performerów (CSV) — overrides top-N",
)
parser.add_argument(
"--performer-ids",
type=str,
default=None,
help="Lista naszych kanonicznych UUID-ów (CSV) — overrides top-N i --performers",
)
parser.add_argument(
"--sitetags",
type=str,
default=None,
help="CSV sitetagów do tube search (default: wszystkie z ALL_DIRECT_SCRAPERS)",
)
parser.add_argument(
"--skip-tubes",
action="store_true",
help="Pomija tube discovery (canonical-only catch-up: tylko TPDB+StashDB)",
)
parser.add_argument(
"--per-performer-limit",
type=int,
default=None,
help="Limit scen z jednego connectora per performer (debug)",
)
parser.add_argument(
"--dedup-strategy",
choices=("phash", "performers", "all"),
default="all",
help="bulk-dedup zakres: phash | performers | all",
)
parser.add_argument(
"--cross-source-only",
action="store_true",
help="bulk-dedup: tylko pary (tpdb-only ↔ stashdb-only) — najwyższa wartość,"
" pomija pary już zmergowane lub same-source",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Tylko loguj, nic nie zapisuj (dla bulk-dedup preview)",
)
args = parser.parse_args(argv)
if args.once:
if args.strategy:
performer_ids = None
if args.performer_ids:
try:
performer_ids = [uuid.UUID(s) for s in _split_csv(args.performer_ids)]
except ValueError as e:
parser.error(f"invalid --performer-ids: {e}")
return run_once_strategy(
strategy=args.strategy,
top_n=args.top_n,
performers=_split_csv(args.performers) or None,
performer_ids=performer_ids,
sitetags=_split_csv(args.sitetags) or None,
per_performer_limit=args.per_performer_limit,
skip_tubes=args.skip_tubes,
dedup_strategy=args.dedup_strategy,
cross_source_only=args.cross_source_only,
dry_run=args.dry_run,
)
if not args.source:
parser.error("--once wymaga --source albo --strategy")
return run_once_source(source=args.source, limit=args.limit, use_delta=not args.no_delta)
return run_forever()
if __name__ == "__main__":
sys.exit(main())