Goon — self-hosted aggregator for adult-content scene metadata. Indexes scenes from TPDB, StashDB, and 30+ public adult tube sites. Cross-source deduplication via perceptual hash + Levenshtein distance. FastAPI backend + APScheduler worker + React Native (Expo) mobile client. FOSS, ad-free, donation-funded. See README for details.
148 lines
5.6 KiB
Python
148 lines
5.6 KiB
Python
"""Phash backfill dla scen z thumbnail_url ale bez fingerprint(kind='phash').
|
|
|
|
Po this run:
|
|
- find_by_phash_within() znajduje exact + Hamming-near duplicates dla tube scen
|
|
- phash_dedup_scenes.py auto-merge wykryje exact match duplicaty
|
|
- merge_candidates flow może użyć phash distance jako signal
|
|
|
|
Implementacja:
|
|
- Cursor-based pagination po scene_id (no skip-on-fail bug)
|
|
- Threadpool concurrency dla httpx (CPU-bound phash calc + I/O fetch)
|
|
- Per-scene SAVEPOINT — failure w jednej scenie nie psuje całego batch'a
|
|
- Idempotent: scena z istniejącym phash entry (kind='phash') jest skip'owana
|
|
|
|
Run: `python /srv/scripts/backfill_phash_tube.py --limit 5000 --concurrency 10`
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import logging
|
|
import sys
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
import httpx
|
|
from sqlalchemy import select, func
|
|
from sqlalchemy.orm import Session
|
|
|
|
sys.path.insert(0, "/srv")
|
|
from app.connectors.direct_scrapers._browse_base import compute_thumbnail_phash
|
|
from app.db import SessionLocal
|
|
from app.models.playback_source import PlaybackSource
|
|
from app.models.scene import ScenePerformer, SceneFingerprint
|
|
|
|
log = logging.getLogger("backfill_phash")
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
|
|
|
|
|
def _scenes_needing_phash(session: Session, batch: int, after_id, performer_id=None) -> list[tuple]:
|
|
"""Zwraca [(scene_id, thumbnail_url, page_url), ...] — pierwszy alive thumb
|
|
per scena, dla scen BEZ istniejącego phash entry.
|
|
|
|
`performer_id` (optional): ograniczenie do scen w których uczestniczy ten
|
|
performer — przydatne do benchmarków per-performer (Imbarbielu, Aletta Ocean).
|
|
"""
|
|
sub = (
|
|
select(SceneFingerprint.scene_id)
|
|
.where(SceneFingerprint.kind == "phash")
|
|
.scalar_subquery()
|
|
)
|
|
q = (
|
|
select(
|
|
PlaybackSource.scene_id,
|
|
func.min(PlaybackSource.thumbnail_url).label("thumb_url"),
|
|
func.min(PlaybackSource.page_url).label("page_url"),
|
|
)
|
|
.where(PlaybackSource.dead_at.is_(None))
|
|
.where(PlaybackSource.thumbnail_url.isnot(None))
|
|
.where(PlaybackSource.scene_id.notin_(sub))
|
|
.group_by(PlaybackSource.scene_id)
|
|
.order_by(PlaybackSource.scene_id)
|
|
.limit(batch)
|
|
)
|
|
if performer_id is not None:
|
|
perf_sub = (
|
|
select(ScenePerformer.scene_id)
|
|
.where(ScenePerformer.performer_id == performer_id)
|
|
.scalar_subquery()
|
|
)
|
|
q = q.where(PlaybackSource.scene_id.in_(perf_sub))
|
|
if after_id is not None:
|
|
q = q.where(PlaybackSource.scene_id > after_id)
|
|
return list(session.execute(q).all())
|
|
|
|
|
|
def _compute_one(item: tuple) -> tuple:
|
|
"""Worker thread fn — fetch + phash. Returns (scene_id, phash_or_None)."""
|
|
scene_id, thumb_url, page_url = item
|
|
if not thumb_url:
|
|
return (scene_id, None)
|
|
try:
|
|
from urllib.parse import urlparse
|
|
host = urlparse(page_url or "").hostname if page_url else None
|
|
referer = f"https://{host}/" if host else None
|
|
ph = compute_thumbnail_phash(thumb_url, referer=referer, timeout=8.0)
|
|
return (scene_id, ph)
|
|
except (httpx.HTTPError, OSError, ValueError) as e:
|
|
log.debug("phash fetch fail %s: %s", thumb_url, e)
|
|
return (scene_id, None)
|
|
except Exception as e:
|
|
log.warning("phash unexpected %s: %s", thumb_url, e)
|
|
return (scene_id, None)
|
|
|
|
|
|
def main() -> None:
|
|
import uuid as _uuid
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--batch", type=int, default=200, help="scenes per DB query batch")
|
|
ap.add_argument("--limit", type=int, default=10_000, help="max scenes total")
|
|
ap.add_argument("--concurrency", type=int, default=10, help="parallel fetches")
|
|
ap.add_argument("--performer-id", type=str, default=None, help="filter to scenes of this performer (UUID)")
|
|
args = ap.parse_args()
|
|
performer_id = _uuid.UUID(args.performer_id) if args.performer_id else None
|
|
|
|
total = ok = fail = 0
|
|
cursor = None
|
|
started = time.monotonic()
|
|
|
|
while total < args.limit:
|
|
with SessionLocal() as session:
|
|
batch = _scenes_needing_phash(session, batch=args.batch, after_id=cursor, performer_id=performer_id)
|
|
if not batch:
|
|
log.info("no more scenes (cursor=%s, done)", cursor)
|
|
break
|
|
cursor = batch[-1].scene_id
|
|
|
|
with ThreadPoolExecutor(max_workers=args.concurrency) as pool:
|
|
futures = [pool.submit(_compute_one, tuple(row)) for row in batch]
|
|
for fut in as_completed(futures):
|
|
scene_id, ph = fut.result()
|
|
total += 1
|
|
if not ph:
|
|
fail += 1
|
|
continue
|
|
try:
|
|
sp = session.begin_nested()
|
|
session.add(SceneFingerprint(
|
|
scene_id=scene_id, kind="phash", value=ph,
|
|
))
|
|
sp.commit()
|
|
ok += 1
|
|
except Exception as e:
|
|
log.debug("insert fail scene=%s: %s", scene_id, e)
|
|
fail += 1
|
|
session.commit()
|
|
|
|
elapsed = time.monotonic() - started
|
|
rate = total / elapsed if elapsed > 0 else 0
|
|
log.info(
|
|
"progress total=%d ok=%d fail=%d rate=%.1f/s cursor=%s",
|
|
total, ok, fail, rate, cursor,
|
|
)
|
|
|
|
elapsed = time.monotonic() - started
|
|
log.info("done total=%d ok=%d fail=%d elapsed=%.1fs", total, ok, fail, elapsed)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|