goon/scripts/backfill_phash_tube.py
goon-foss ad0284585b Initial commit
Goon — self-hosted aggregator for adult-content scene metadata.

Indexes scenes from TPDB, StashDB, and 30+ public adult tube sites.
Cross-source deduplication via perceptual hash + Levenshtein distance.
FastAPI backend + APScheduler worker + React Native (Expo) mobile client.

FOSS, ad-free, donation-funded. See README for details.
2026-05-20 10:10:22 +02:00

148 lines
5.6 KiB
Python

"""Phash backfill dla scen z thumbnail_url ale bez fingerprint(kind='phash').
Po this run:
- find_by_phash_within() znajduje exact + Hamming-near duplicates dla tube scen
- phash_dedup_scenes.py auto-merge wykryje exact match duplicaty
- merge_candidates flow może użyć phash distance jako signal
Implementacja:
- Cursor-based pagination po scene_id (no skip-on-fail bug)
- Threadpool concurrency dla httpx (CPU-bound phash calc + I/O fetch)
- Per-scene SAVEPOINT — failure w jednej scenie nie psuje całego batch'a
- Idempotent: scena z istniejącym phash entry (kind='phash') jest skip'owana
Run: `python /srv/scripts/backfill_phash_tube.py --limit 5000 --concurrency 10`
"""
from __future__ import annotations
import argparse
import logging
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import httpx
from sqlalchemy import select, func
from sqlalchemy.orm import Session
sys.path.insert(0, "/srv")
from app.connectors.direct_scrapers._browse_base import compute_thumbnail_phash
from app.db import SessionLocal
from app.models.playback_source import PlaybackSource
from app.models.scene import ScenePerformer, SceneFingerprint
log = logging.getLogger("backfill_phash")
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
def _scenes_needing_phash(session: Session, batch: int, after_id, performer_id=None) -> list[tuple]:
"""Zwraca [(scene_id, thumbnail_url, page_url), ...] — pierwszy alive thumb
per scena, dla scen BEZ istniejącego phash entry.
`performer_id` (optional): ograniczenie do scen w których uczestniczy ten
performer — przydatne do benchmarków per-performer (Imbarbielu, Aletta Ocean).
"""
sub = (
select(SceneFingerprint.scene_id)
.where(SceneFingerprint.kind == "phash")
.scalar_subquery()
)
q = (
select(
PlaybackSource.scene_id,
func.min(PlaybackSource.thumbnail_url).label("thumb_url"),
func.min(PlaybackSource.page_url).label("page_url"),
)
.where(PlaybackSource.dead_at.is_(None))
.where(PlaybackSource.thumbnail_url.isnot(None))
.where(PlaybackSource.scene_id.notin_(sub))
.group_by(PlaybackSource.scene_id)
.order_by(PlaybackSource.scene_id)
.limit(batch)
)
if performer_id is not None:
perf_sub = (
select(ScenePerformer.scene_id)
.where(ScenePerformer.performer_id == performer_id)
.scalar_subquery()
)
q = q.where(PlaybackSource.scene_id.in_(perf_sub))
if after_id is not None:
q = q.where(PlaybackSource.scene_id > after_id)
return list(session.execute(q).all())
def _compute_one(item: tuple) -> tuple:
"""Worker thread fn — fetch + phash. Returns (scene_id, phash_or_None)."""
scene_id, thumb_url, page_url = item
if not thumb_url:
return (scene_id, None)
try:
from urllib.parse import urlparse
host = urlparse(page_url or "").hostname if page_url else None
referer = f"https://{host}/" if host else None
ph = compute_thumbnail_phash(thumb_url, referer=referer, timeout=8.0)
return (scene_id, ph)
except (httpx.HTTPError, OSError, ValueError) as e:
log.debug("phash fetch fail %s: %s", thumb_url, e)
return (scene_id, None)
except Exception as e:
log.warning("phash unexpected %s: %s", thumb_url, e)
return (scene_id, None)
def main() -> None:
import uuid as _uuid
ap = argparse.ArgumentParser()
ap.add_argument("--batch", type=int, default=200, help="scenes per DB query batch")
ap.add_argument("--limit", type=int, default=10_000, help="max scenes total")
ap.add_argument("--concurrency", type=int, default=10, help="parallel fetches")
ap.add_argument("--performer-id", type=str, default=None, help="filter to scenes of this performer (UUID)")
args = ap.parse_args()
performer_id = _uuid.UUID(args.performer_id) if args.performer_id else None
total = ok = fail = 0
cursor = None
started = time.monotonic()
while total < args.limit:
with SessionLocal() as session:
batch = _scenes_needing_phash(session, batch=args.batch, after_id=cursor, performer_id=performer_id)
if not batch:
log.info("no more scenes (cursor=%s, done)", cursor)
break
cursor = batch[-1].scene_id
with ThreadPoolExecutor(max_workers=args.concurrency) as pool:
futures = [pool.submit(_compute_one, tuple(row)) for row in batch]
for fut in as_completed(futures):
scene_id, ph = fut.result()
total += 1
if not ph:
fail += 1
continue
try:
sp = session.begin_nested()
session.add(SceneFingerprint(
scene_id=scene_id, kind="phash", value=ph,
))
sp.commit()
ok += 1
except Exception as e:
log.debug("insert fail scene=%s: %s", scene_id, e)
fail += 1
session.commit()
elapsed = time.monotonic() - started
rate = total / elapsed if elapsed > 0 else 0
log.info(
"progress total=%d ok=%d fail=%d rate=%.1f/s cursor=%s",
total, ok, fail, rate, cursor,
)
elapsed = time.monotonic() - started
log.info("done total=%d ok=%d fail=%d elapsed=%.1fs", total, ok, fail, elapsed)
if __name__ == "__main__":
main()