goon/app/api/taxonomies.py
jtrzupek 2163fee245 perf(taxonomy): denormalize scene_count for tags/performers/studios
Counts for /tags, /performers, /studios and /favorites were computed live
per-request by aggregating scene_tags / scene_performers with an EXISTS to
playback_sources. As the catalog grew to ~1.7M scenes (6.3M scene_tags) this
ran ~4.3s for /tags?order=popular (x2 incl. the total count) and ~950ms for
the default /scenes count, making those screens load in several seconds.

- migration 0019: add scene_count (+ DESC index) to tags/performers/studios
- background job _job_refresh_taxonomy_counts (every 3h) recomputes the counts
  in one UPDATE..FROM each (IS DISTINCT FROM to skip unchanged rows)
- /tags, /performers, /studios scenes path now read the column + ORDER BY the
  indexed scene_count; for_movies paths keep live aggregation (small tables)
- favorites read denormalized scene_count instead of a grouped EXISTS aggregate
- /scenes default count: 10-min in-process TTL cache (header is approximate)

Measured: /tags?order=popular&per_page=500 ~8s -> 66ms incl. serialization.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 17:53:48 +02:00

599 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""GET /tags, /performers, /studios — listy taxonomies do filtrów na mobile.
Każdy endpoint wspiera:
- q: substring search po name_normalized (trgm fallback ilike)
- order: 'name' (alfabetycznie) | 'popular' lub 'scene_count' (po liczbie scen desc)
- page/per_page
Zwraca też scene_count żeby UI pokazywał "(123)" przy każdym tagu/performerze/studio.
"""
from __future__ import annotations
import uuid
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel, ConfigDict
from sqlalchemy import and_, exists, func, select
from sqlalchemy.orm import Session
from app.auth import require_api_key
from app.db import get_session
from app.models.movie import Movie, MovieTag
from app.models.movie_playback_source import MoviePlaybackSource
from app.models.performer import Performer
from app.models.scene import ScenePerformer
from app.models.studio import Studio
from app.models.tag import Tag
router = APIRouter(tags=["taxonomies"], dependencies=[Depends(require_api_key)])
# ---- Schemas ----------------------------------------------------------
class TagCount(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: uuid.UUID
name: str
slug: str
scene_count: int = 0
class TagListOut(BaseModel):
items: list[TagCount]
total: int
page: int
per_page: int
class PerformerCount(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: uuid.UUID
canonical_name: str
slug: str
gender: str | None = None
scene_count: int = 0
class PerformerListOut(BaseModel):
items: list[PerformerCount]
total: int
page: int
per_page: int
class StudioCount(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: uuid.UUID
name: str
slug: str
network: str | None = None
scene_count: int = 0
class StudioListOut(BaseModel):
items: list[StudioCount]
total: int
page: int
per_page: int
# ---- Endpoints --------------------------------------------------------
@router.get("/tags", response_model=TagListOut)
def list_tags(
session: Annotated[Session, Depends(get_session)],
q: str | None = Query(default=None),
order: str = Query(default="popular", description="popular|name"),
page: int = Query(default=1, ge=1),
per_page: int = Query(default=50, ge=1, le=500),
for_movies: bool = Query(
default=False,
description=(
"True: zlicza wystąpienia tagu w movies (z live MoviePlaybackSource) "
"zamiast w scenes. UI używa do filtrowania movie genres."
),
),
only_with_content: bool = Query(
default=False,
description=(
"True: ukrywa tagi z 0 wystąpieniami w wybranym typie (scenes/movies)."
" Filtruje krótkie listy filtrów żeby nie pokazywać tagów-sierot."
),
),
) -> TagListOut:
if order not in ("popular", "scene_count", "name"):
raise HTTPException(status_code=400, detail="order must be 'popular' or 'name'")
if for_movies:
# Movie tag count — zliczamy tylko Movies z ≥1 live MoviePlaybackSource.
# Tag-bez-żadnego-movie zwraca 0 (LEFT OUTER JOIN przez coalesce). Movies są
# małe (~41k) więc live-aggregat OK; scenes path niżej używa denormalizacji.
_movie_live = exists().where(
and_(
MoviePlaybackSource.movie_id == MovieTag.movie_id,
MoviePlaybackSource.dead_at.is_(None),
)
)
count_sub = (
select(MovieTag.tag_id, func.count(MovieTag.movie_id).label("c"))
.where(_movie_live)
.group_by(MovieTag.tag_id)
.subquery()
)
base = (
select(Tag, func.coalesce(count_sub.c.c, 0).label("scene_count"))
.outerjoin(count_sub, count_sub.c.tag_id == Tag.id)
)
if q:
base = base.where(Tag.name.ilike(f"%{q}%"))
if only_with_content:
base = base.where(count_sub.c.tag_id.is_not(None))
total = session.execute(
select(func.count()).select_from(base.subquery())
).scalar_one()
if order in ("popular", "scene_count"):
ordered = base.order_by(func.coalesce(count_sub.c.c, 0).desc(), Tag.name.asc())
else:
ordered = base.order_by(Tag.name.asc())
rows = session.execute(
ordered.offset((page - 1) * per_page).limit(per_page)
).all()
items = [
TagCount(id=t.id, name=t.name, slug=t.slug, scene_count=int(c))
for t, c in rows
]
return TagListOut(items=items, total=total, page=page, per_page=per_page)
# Scenes path — czyta zdenormalizowany Tag.scene_count (refresh w tle przez
# _job_refresh_taxonomy_counts). Wcześniej liczył agregat 6.3M scene_tags +
# EXISTS playback per-request (~4.3s ×2). Patrz migracja 0019.
base = select(Tag)
if q:
base = base.where(Tag.name.ilike(f"%{q}%"))
if only_with_content:
base = base.where(Tag.scene_count > 0)
total = session.execute(
select(func.count()).select_from(base.subquery())
).scalar_one()
if order in ("popular", "scene_count"):
ordered = base.order_by(Tag.scene_count.desc(), Tag.name.asc())
else:
ordered = base.order_by(Tag.name.asc())
tags_page = session.execute(
ordered.offset((page - 1) * per_page).limit(per_page)
).scalars().all()
items = [
TagCount(id=t.id, name=t.name, slug=t.slug, scene_count=t.scene_count)
for t in tags_page
]
return TagListOut(items=items, total=total, page=page, per_page=per_page)
@router.get("/performers", response_model=PerformerListOut)
def list_performers(
session: Annotated[Session, Depends(get_session)],
q: str | None = Query(default=None, description="substring po name_normalized"),
order: str = Query(default="scene_count", description="scene_count|name"),
page: int = Query(default=1, ge=1),
per_page: int = Query(default=50, ge=1, le=500),
) -> PerformerListOut:
if order not in ("scene_count", "popular", "name"):
raise HTTPException(status_code=400, detail="order must be 'scene_count' or 'name'")
# Czyta zdenormalizowany Performer.scene_count (refresh w tle). Wcześniej agregat
# 3M scene_performers + EXISTS playback per-request. Patrz migracja 0019.
base = select(Performer)
if q:
base = base.where(Performer.name_normalized.ilike(f"%{q.lower()}%"))
total = session.execute(
select(func.count()).select_from(base.subquery())
).scalar_one()
if order in ("scene_count", "popular"):
ordered = base.order_by(
Performer.scene_count.desc(), Performer.canonical_name.asc()
)
else:
ordered = base.order_by(Performer.canonical_name.asc())
perfs_page = session.execute(
ordered.offset((page - 1) * per_page).limit(per_page)
).scalars().all()
items = [
PerformerCount(
id=p.id,
canonical_name=p.canonical_name,
slug=p.slug,
gender=p.gender.value if p.gender else None,
scene_count=p.scene_count,
)
for p in perfs_page
]
return PerformerListOut(items=items, total=total, page=page, per_page=per_page)
@router.get("/studios", response_model=StudioListOut)
def list_studios(
session: Annotated[Session, Depends(get_session)],
q: str | None = Query(default=None),
order: str = Query(default="name", description="name|scene_count"),
page: int = Query(default=1, ge=1),
per_page: int = Query(default=50, ge=1, le=500),
for_movies: bool = Query(
default=False,
description="True: zlicza tylko studia mające ≥1 movie z live playback.",
),
only_with_content: bool = Query(
default=False,
description="True: ukrywa studia z 0 wystąpieniami w wybranym typie.",
),
) -> StudioListOut:
from app.models.scene import Scene # lokalny import — Scene FK do Studio
if order not in ("name", "scene_count", "popular"):
raise HTTPException(status_code=400, detail="order must be 'name' or 'scene_count'")
if for_movies:
# Movies małe (~41k) — live aggregat OK. Scenes path niżej = denormalizacja.
_movie_live = exists().where(
and_(
MoviePlaybackSource.movie_id == Movie.id,
MoviePlaybackSource.dead_at.is_(None),
)
)
count_sub = (
select(Movie.studio_id, func.count(Movie.id).label("c"))
.where(Movie.studio_id.is_not(None))
.where(_movie_live)
.group_by(Movie.studio_id)
.subquery()
)
base = (
select(Studio, func.coalesce(count_sub.c.c, 0).label("scene_count"))
.outerjoin(count_sub, count_sub.c.studio_id == Studio.id)
)
if q:
base = base.where(Studio.name.ilike(f"%{q}%"))
if only_with_content:
base = base.where(count_sub.c.studio_id.is_not(None))
total = session.execute(
select(func.count()).select_from(base.subquery())
).scalar_one()
if order in ("scene_count", "popular"):
ordered = base.order_by(func.coalesce(count_sub.c.c, 0).desc(), Studio.name.asc())
else:
ordered = base.order_by(Studio.name_normalized.asc())
rows = session.execute(
ordered.offset((page - 1) * per_page).limit(per_page)
).all()
items = [
StudioCount(id=s.id, name=s.name, slug=s.slug, network=s.network, scene_count=int(c))
for s, c in rows
]
return StudioListOut(items=items, total=total, page=page, per_page=per_page)
# Scenes path — czyta zdenormalizowany Studio.scene_count (refresh w tle).
# Wcześniej agregat 1.69M scenes + EXISTS playback per-request. Patrz migracja 0019.
base = select(Studio)
if q:
base = base.where(Studio.name.ilike(f"%{q}%"))
if only_with_content:
base = base.where(Studio.scene_count > 0)
total = session.execute(
select(func.count()).select_from(base.subquery())
).scalar_one()
if order in ("scene_count", "popular"):
ordered = base.order_by(Studio.scene_count.desc(), Studio.name.asc())
else:
ordered = base.order_by(Studio.name_normalized.asc())
studios_page = session.execute(
ordered.offset((page - 1) * per_page).limit(per_page)
).scalars().all()
items = [
StudioCount(
id=s.id,
name=s.name,
slug=s.slug,
network=s.network,
scene_count=s.scene_count,
)
for s in studios_page
]
return StudioListOut(items=items, total=total, page=page, per_page=per_page)
# ---- Performer refresh on-demand --------------------------------------
class PerformerRefreshOut(BaseModel):
performer_id: uuid.UUID
canonical_name: str
counters: dict[str, dict[str, int]]
new_scenes: int
last_searched_at: str | None
class PerformerRescrapeOut(BaseModel):
performer_id: uuid.UUID
canonical_name: str
scenes_total: int
scenes_processed: int
thumbs_added: int
tags_added: int
failures: int
capped: bool
cap_reason: str | None = None
# Hard caps żeby request się nie wisiał i nginx (60s read timeout) nie 504'ował
# przy partial commits. 45s wall-clock + 50 scen max = ~12 fetches × 3s budgetowo.
# Większe rescrape'y user może odpalać wielokrotnie (idempotent dzięki has_thumb/
# tag_count check).
_RESCRAPE_WALL_SEC = 55.0 # nginx read timeout 60s — 5s margin na response build
_RESCRAPE_MAX_SCENES = 50
# Re-fetch tagów dla scen z < N tagami. Niektórzy performerzy mają legit 1-2 tagi
# (niche), no harm w sprawdzeniu pierwszy raz; powtarzane wywołania są idempotent
# bo INSERT ... ON CONFLICT DO NOTHING.
_TAG_RESCRAPE_THRESHOLD = 3
# Mainstream tubes priority dla tagów — bogate metadane.
_TAG_PRIORITY = [
"xhamstercom", "porntrexcom", "epornercom", "youporncom",
"xvideoscom", "xnxxcom", "redtubecom", "pornhatcom",
]
@router.post("/performers/{performer_id}/rescrape", response_model=PerformerRescrapeOut)
def rescrape_performer_scenes(
performer_id: uuid.UUID,
session: Annotated[Session, Depends(get_session)],
) -> PerformerRescrapeOut:
"""Re-scrapuje miniaturki + tagi z tube pages dla scen performera (bulk).
Bug-report 2026-05-16 (6fcaa5f4): per-scene enrich działa on-demand, ale dla
całej listy (np. 200 scen xhamstera) user musiałby kliknąć każdą osobno.
Cap'owane: max `_RESCRAPE_MAX_SCENES` (50) lub `_RESCRAPE_WALL_SEC` (45s),
żeby nginx 60s read timeout nie 504'ował partial commit. Większe ilości
wymagają wielu kliknięć (idempotent, scene z thumb się skipuje).
Idempotent: scena która ma już thumb i ≥3 tagi jest pomijana.
"""
import time as _time
import httpx as _httpx
from app.extractors._fetch import browser_get
from app.extractors._models import TubePageError
from app.extractors.tag_extract import EXTRACTORS as TAG_EXTRACTORS, extract_tags
from app.extractors.thumb_extract import extract_thumbnail_url
from app.models.playback_source import PlaybackSource
from app.models.scene import Scene, SceneTag
from app.normalize.scenes import NormalizedTag
from app.normalize.text import slugify
from app.resolve.tag_resolver import resolve_tag
from sqlalchemy.dialects.postgresql import insert as pg_insert
perf = session.get(Performer, performer_id)
if perf is None:
raise HTTPException(status_code=404, detail="performer not found")
# 1) ID-only query — sceny ze ≥1 alive tube playback.
scene_ids = session.execute(
select(Scene.id)
.join(ScenePerformer, ScenePerformer.scene_id == Scene.id)
.where(ScenePerformer.performer_id == performer_id)
.where(
exists().where(
PlaybackSource.scene_id == Scene.id,
PlaybackSource.dead_at.is_(None),
PlaybackSource.origin.like("tube:%"),
)
)
.limit(_RESCRAPE_MAX_SCENES)
).scalars().all()
scenes_total = len(scene_ids)
if not scene_ids:
return PerformerRescrapeOut(
performer_id=performer_id,
canonical_name=perf.canonical_name,
scenes_total=0, scenes_processed=0,
thumbs_added=0, tags_added=0, failures=0,
capped=False,
)
# 2) Batch fetch: wszystkie alive tube playback_sources dla tych scen w 1 query.
pb_rows = session.execute(
select(PlaybackSource)
.where(PlaybackSource.scene_id.in_(scene_ids))
.where(PlaybackSource.dead_at.is_(None))
.where(PlaybackSource.origin.like("tube:%"))
).scalars().all()
sources_by_scene: dict = {}
for s in pb_rows:
sources_by_scene.setdefault(s.scene_id, []).append(s)
# 3) Batch fetch tag counts per scene (1 query zamiast N).
tag_counts = dict(session.execute(
select(SceneTag.scene_id, func.count())
.where(SceneTag.scene_id.in_(scene_ids))
.group_by(SceneTag.scene_id)
).all())
thumbs_added = 0
tags_added = 0
failures = 0
scenes_processed = 0
capped = False
cap_reason: str | None = None
started = _time.monotonic()
# Narrow exception set — łapiemy TYLKO oczekiwane network/parse failures.
# `Exception` catch-all blokował KeyboardInterrupt + maskował pool exhaustion.
NET_EXC = (TubePageError, _httpx.HTTPError, OSError, ValueError)
for scene_id in scene_ids:
if _time.monotonic() - started > _RESCRAPE_WALL_SEC:
capped = True
cap_reason = f"wall-clock {_RESCRAPE_WALL_SEC}s reached"
break
sources = sources_by_scene.get(scene_id, [])
if not sources:
continue
scenes_processed += 1
has_thumb = any(s.thumbnail_url for s in sources)
existing_tag_count = tag_counts.get(scene_id, 0)
# SAVEPOINT — fail isolation. Pojedyncza scena z FK violation w SceneTag
# insert nie odpaliłby outer transaction; bez nested rollback całe N scen
# po niej miałoby PendingRollbackError.
sp = session.begin_nested()
try:
if not has_thumb:
thumb_added_here = False
for src in sources:
try:
r = browser_get(src.page_url, timeout=10.0, follow_redirects=True)
except NET_EXC as e:
log.debug("rescrape thumb fetch fail %s: %s", src.page_url, e)
continue
if r.status_code >= 400:
continue
thumb = extract_thumbnail_url(r.text)
if thumb:
# Update tylko źródła z którego pochodzi thumb (single playback).
# Wcześniej apply'owalismy do wszystkich siblings — wrong-CDN
# cross-attribution (np. xhamster thumb na porntrex entry).
# `scene.thumbnail_url` w UI bierze pierwszy z thumb (mobile
# find()), więc 1 wystarczy.
session.execute(
PlaybackSource.__table__.update()
.where(PlaybackSource.id == src.id)
.where(PlaybackSource.thumbnail_url.is_(None))
.values(thumbnail_url=thumb)
)
thumbs_added += 1
thumb_added_here = True
break
if not thumb_added_here:
failures += 1
if existing_tag_count < _TAG_RESCRAPE_THRESHOLD:
chosen = None
for tag in _TAG_PRIORITY:
for src in sources:
if src.origin == f"tube:{tag}":
chosen = src
break
if chosen:
break
if chosen is None:
for src in sources:
sitetag_part = src.origin.split(":", 1)[1]
if sitetag_part in TAG_EXTRACTORS:
chosen = src
break
if chosen is not None:
sitetag_part = chosen.origin.split(":", 1)[1]
try:
r = browser_get(chosen.page_url, timeout=10.0, follow_redirects=True)
if r.status_code < 400:
tag_names = extract_tags(sitetag_part, r.text)
else:
tag_names = []
except NET_EXC as e:
log.debug("rescrape tags fetch fail %s: %s", chosen.page_url, e)
tag_names = []
seen_tag_ids: set = set()
for name in tag_names:
norm = NormalizedTag(name=name, slug=slugify(name), external_id=None)
tag = resolve_tag(session, norm=norm)
if tag is None or tag.id in seen_tag_ids:
continue
seen_tag_ids.add(tag.id)
stmt = (
pg_insert(SceneTag.__table__)
.values(scene_id=scene_id, tag_id=tag.id)
.on_conflict_do_nothing(index_elements=["scene_id", "tag_id"])
)
result = session.execute(stmt)
if result.rowcount:
tags_added += 1
sp.commit()
session.commit()
except Exception as e:
sp.rollback()
log.warning("rescrape scene %s failed: %s", scene_id, e)
failures += 1
return PerformerRescrapeOut(
performer_id=performer_id,
canonical_name=perf.canonical_name,
scenes_total=scenes_total,
scenes_processed=scenes_processed,
thumbs_added=thumbs_added,
tags_added=tags_added,
failures=failures,
capped=capped,
cap_reason=cap_reason,
)
@router.post("/performers/{performer_id}/refresh", response_model=PerformerRefreshOut)
def refresh_performer(
performer_id: uuid.UUID,
session: Annotated[Session, Depends(get_session)],
) -> PerformerRefreshOut:
"""On-demand search across all tubes dla pojedynczego performera. Synchronous —
blokujemy aż search skończy. Mobile pokazuje spinner.
Rate-guard: jeśli refresh był < 60s temu, zwraca cached result (HTTP 429-style
detail). Continuous worker w tle też robi swoje, więc cache jest częsty.
"""
from datetime import UTC as _UTC, datetime as _dt, timedelta as _td
perf = session.get(Performer, performer_id)
if perf is None:
raise HTTPException(status_code=404, detail="performer not found")
if perf.last_searched_at is not None:
elapsed = _dt.now(_UTC) - perf.last_searched_at
if elapsed < _td(seconds=60):
raise HTTPException(
status_code=429,
detail=f"recently searched {int(elapsed.total_seconds())}s ago, try in a bit",
)
# Lazy import — performer_driven ma ciężki connector tree
from app.scheduler.performer_driven import run_performer_driven
# NOTE: ten request blokuje request thread API na 30-90s (search across ~25 tubes).
# Akceptowalne dla self-hosted single-user. W razie potrzeby dorobić task queue.
counters_obj = run_performer_driven(
performer_ids=[performer_id],
top_n=0,
per_performer_limit=200,
)
# Update last_searched_at + counter (tak samo jak continuous worker)
perf.last_searched_at = _dt.now(_UTC)
perf.search_run_count = (perf.search_run_count or 0) + 1
session.commit()
new_total = sum(s.get("new", 0) for s in counters_obj.per_source.values())
return PerformerRefreshOut(
performer_id=performer_id,
canonical_name=perf.canonical_name,
counters=counters_obj.per_source,
new_scenes=new_total,
last_searched_at=perf.last_searched_at.isoformat() if perf.last_searched_at else None,
)