"""Phash backfill dla scen z thumbnail_url ale bez fingerprint(kind='phash'). Po this run: - find_by_phash_within() znajduje exact + Hamming-near duplicates dla tube scen - phash_dedup_scenes.py auto-merge wykryje exact match duplicaty - merge_candidates flow może użyć phash distance jako signal Implementacja: - Cursor-based pagination po scene_id (no skip-on-fail bug) - Threadpool concurrency dla httpx (CPU-bound phash calc + I/O fetch) - Per-scene SAVEPOINT — failure w jednej scenie nie psuje całego batch'a - Idempotent: scena z istniejącym phash entry (kind='phash') jest skip'owana Run: `python /srv/scripts/backfill_phash_tube.py --limit 5000 --concurrency 10` """ from __future__ import annotations import argparse import logging import sys import time from concurrent.futures import ThreadPoolExecutor, as_completed import httpx from sqlalchemy import select, func from sqlalchemy.orm import Session sys.path.insert(0, "/srv") from app.connectors.direct_scrapers._browse_base import compute_thumbnail_phash from app.db import SessionLocal from app.models.playback_source import PlaybackSource from app.models.scene import ScenePerformer, SceneFingerprint log = logging.getLogger("backfill_phash") logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") def _scenes_needing_phash(session: Session, batch: int, after_id, performer_id=None) -> list[tuple]: """Zwraca [(scene_id, thumbnail_url, page_url), ...] — pierwszy alive thumb per scena, dla scen BEZ istniejącego phash entry. `performer_id` (optional): ograniczenie do scen w których uczestniczy ten performer — przydatne do benchmarków per-performer (Imbarbielu, Aletta Ocean). """ sub = ( select(SceneFingerprint.scene_id) .where(SceneFingerprint.kind == "phash") .scalar_subquery() ) q = ( select( PlaybackSource.scene_id, func.min(PlaybackSource.thumbnail_url).label("thumb_url"), func.min(PlaybackSource.page_url).label("page_url"), ) .where(PlaybackSource.dead_at.is_(None)) .where(PlaybackSource.thumbnail_url.isnot(None)) .where(PlaybackSource.scene_id.notin_(sub)) .group_by(PlaybackSource.scene_id) .order_by(PlaybackSource.scene_id) .limit(batch) ) if performer_id is not None: perf_sub = ( select(ScenePerformer.scene_id) .where(ScenePerformer.performer_id == performer_id) .scalar_subquery() ) q = q.where(PlaybackSource.scene_id.in_(perf_sub)) if after_id is not None: q = q.where(PlaybackSource.scene_id > after_id) return list(session.execute(q).all()) def _compute_one(item: tuple) -> tuple: """Worker thread fn — fetch + phash. Returns (scene_id, phash_or_None).""" scene_id, thumb_url, page_url = item if not thumb_url: return (scene_id, None) try: from urllib.parse import urlparse host = urlparse(page_url or "").hostname if page_url else None referer = f"https://{host}/" if host else None ph = compute_thumbnail_phash(thumb_url, referer=referer, timeout=8.0) return (scene_id, ph) except (httpx.HTTPError, OSError, ValueError) as e: log.debug("phash fetch fail %s: %s", thumb_url, e) return (scene_id, None) except Exception as e: log.warning("phash unexpected %s: %s", thumb_url, e) return (scene_id, None) def main() -> None: import uuid as _uuid ap = argparse.ArgumentParser() ap.add_argument("--batch", type=int, default=200, help="scenes per DB query batch") ap.add_argument("--limit", type=int, default=10_000, help="max scenes total") ap.add_argument("--concurrency", type=int, default=10, help="parallel fetches") ap.add_argument("--performer-id", type=str, default=None, help="filter to scenes of this performer (UUID)") args = ap.parse_args() performer_id = _uuid.UUID(args.performer_id) if args.performer_id else None total = ok = fail = 0 cursor = None started = time.monotonic() while total < args.limit: with SessionLocal() as session: batch = _scenes_needing_phash(session, batch=args.batch, after_id=cursor, performer_id=performer_id) if not batch: log.info("no more scenes (cursor=%s, done)", cursor) break cursor = batch[-1].scene_id with ThreadPoolExecutor(max_workers=args.concurrency) as pool: futures = [pool.submit(_compute_one, tuple(row)) for row in batch] for fut in as_completed(futures): scene_id, ph = fut.result() total += 1 if not ph: fail += 1 continue try: sp = session.begin_nested() session.add(SceneFingerprint( scene_id=scene_id, kind="phash", value=ph, )) sp.commit() ok += 1 except Exception as e: log.debug("insert fail scene=%s: %s", scene_id, e) fail += 1 session.commit() elapsed = time.monotonic() - started rate = total / elapsed if elapsed > 0 else 0 log.info( "progress total=%d ok=%d fail=%d rate=%.1f/s cursor=%s", total, ok, fail, rate, cursor, ) elapsed = time.monotonic() - started log.info("done total=%d ok=%d fail=%d elapsed=%.1fs", total, ok, fail, elapsed) if __name__ == "__main__": main()