"""Admin API: lista pending merge candidates + side-by-side detail + resolve.""" from __future__ import annotations import uuid from typing import Annotated, Literal from fastapi import APIRouter, Depends, HTTPException, Query from pydantic import BaseModel, ConfigDict from sqlalchemy import func, select from sqlalchemy.orm import Session from app.api.scenes import _build_scene_out from app.api.schemas import SceneOut from app.auth import require_api_key from app.db import get_session from app.models.external_record import ExternalRecord from app.models.merge_candidate import MergeCandidate, MergeKind, MergeStatus from app.models.playback_source import PlaybackSource from app.models.scene import Scene, SceneExternalRef from app.models.source import Source, SourceKind from app.resolve.scene_merge import MergeError, resolve_candidate router = APIRouter( prefix="/admin", tags=["admin"], dependencies=[Depends(require_api_key)], ) def _raw_to_thumb(raw: dict, kind: SourceKind) -> str | None: """Wyciąga thumbnail URL z external_records.raw dla danego źródła. TPDB ma `image`/`poster`/`background.large`. StashDB raw nie zawiera image (osobny query do StashDB potrzebny — tu zwracamy None).""" if kind == SourceKind.tpdb: for k in ("image", "poster"): v = raw.get(k) if isinstance(v, str) and v.startswith("http"): return v bg = raw.get("background") if isinstance(bg, dict): v = bg.get("large") or bg.get("medium") or bg.get("full") if isinstance(v, str) and v.startswith("http"): return v elif kind == SourceKind.stashdb: # StashDB scene response includes images via separate query — nie trzymamy # tego w raw obecnie. TODO: dorzucić mirror do `paths.screenshot` przy ingest. paths = raw.get("paths") if isinstance(paths, dict): for k in ("screenshot", "image", "preview"): v = paths.get(k) if isinstance(v, str) and v.startswith("http"): return v return None # ---- schemas -------------------------------------------------------------- class MergeCandidateSummary(BaseModel): model_config = ConfigDict(from_attributes=True) id: uuid.UUID kind: str left_id: uuid.UUID right_id: uuid.UUID score: float status: str left_title: str | None = None right_title: str | None = None left_thumbnail_url: str | None = None left_animated_thumbnail_url: str | None = None right_thumbnail_url: str | None = None right_animated_thumbnail_url: str | None = None class MergeCandidateListOut(BaseModel): items: list[MergeCandidateSummary] total: int page: int per_page: int class MergeCandidateDetail(BaseModel): id: uuid.UUID kind: str score: float status: str reasons: dict left: SceneOut | None right: SceneOut | None class ResolveBody(BaseModel): action: Literal["merge", "reject"] keep: Literal["left", "right"] = "left" resolved_by: str | None = None class ResolveResult(BaseModel): id: uuid.UUID status: str keep_id: uuid.UUID | None = None drop_id: uuid.UUID | None = None # ---- endpoints ------------------------------------------------------------ @router.get("/merge-candidates", response_model=MergeCandidateListOut) def list_candidates( session: Annotated[Session, Depends(get_session)], status: Annotated[str, Query(pattern="^(pending|auto_merged|merged|rejected|all)$")] = "pending", kind: Annotated[str, Query(pattern="^(scene|performer|studio|all)$")] = "scene", page: Annotated[int, Query(ge=1)] = 1, per_page: Annotated[int, Query(ge=1, le=200)] = 50, ) -> MergeCandidateListOut: base = select(MergeCandidate) if status != "all": base = base.where(MergeCandidate.status == MergeStatus(status)) if kind != "all": base = base.where(MergeCandidate.kind == MergeKind(kind)) total = session.execute(select(func.count()).select_from(base.subquery())).scalar_one() rows = ( session.execute( base.order_by(MergeCandidate.score.desc(), MergeCandidate.created_at.desc()) .offset((page - 1) * per_page) .limit(per_page) ) .scalars() .all() ) # Pre-fetch tytułów scen (gdy kind=scene) dla wygodnego podglądu titles: dict[uuid.UUID, str] = {} scene_ids = {r.left_id for r in rows if r.kind == MergeKind.scene} | { r.right_id for r in rows if r.kind == MergeKind.scene } if scene_ids: for sid, title in session.execute( select(Scene.id, Scene.title).where(Scene.id.in_(scene_ids)) ): titles[sid] = title # Pre-fetch po jednym statycznym i animowanym thumbnailu per scenę (mobile queue # używa statycznego do listy + animowanego po hold-to-preview). Wybieramy najpierw # napotkany niepusty URL — kolejność rzędów playback_sources nie jest gwarantowana, # ale dla triage to wystarcza. thumbs: dict[uuid.UUID, str] = {} animated_thumbs: dict[uuid.UUID, str] = {} if scene_ids: for sid, static_url, animated_url in session.execute( select( PlaybackSource.scene_id, PlaybackSource.thumbnail_url, PlaybackSource.animated_thumbnail_url, ).where(PlaybackSource.scene_id.in_(scene_ids)) ): if static_url and sid not in thumbs: thumbs[sid] = static_url if animated_url and sid not in animated_thumbs: animated_thumbs[sid] = animated_url # Fallback: dla scen TPDB/StashDB-only (brak playback_source) wyciągamy # poster URL z external_records.raw['image' | 'poster' | 'paths.screenshot']. # Bez tego merge queue ma 70%+ wpisów bez thumb (canonical TPDB↔StashDB pary). missing = [sid for sid in scene_ids if sid not in thumbs] if missing: ext_rows = session.execute( select(SceneExternalRef.scene_id, ExternalRecord.raw, Source.kind) .join( ExternalRecord, (ExternalRecord.source_id == SceneExternalRef.source_id) & (ExternalRecord.external_id == SceneExternalRef.external_id), ) .join(Source, Source.id == SceneExternalRef.source_id) .where(SceneExternalRef.scene_id.in_(missing)) .where(ExternalRecord.entity_kind == "scene") ).all() for sid, raw, kind in ext_rows: if sid in thumbs or not isinstance(raw, dict): continue url = _raw_to_thumb(raw, kind) if url: thumbs[sid] = url items = [ MergeCandidateSummary( id=r.id, kind=r.kind.value, left_id=r.left_id, right_id=r.right_id, score=r.score, status=r.status.value, left_title=titles.get(r.left_id), right_title=titles.get(r.right_id), left_thumbnail_url=thumbs.get(r.left_id), right_thumbnail_url=thumbs.get(r.right_id), left_animated_thumbnail_url=animated_thumbs.get(r.left_id), right_animated_thumbnail_url=animated_thumbs.get(r.right_id), ) for r in rows ] return MergeCandidateListOut(items=items, total=total, page=page, per_page=per_page) @router.get("/merge-candidates/{candidate_id}", response_model=MergeCandidateDetail) def get_candidate( candidate_id: uuid.UUID, session: Annotated[Session, Depends(get_session)], ) -> MergeCandidateDetail: cand = session.get(MergeCandidate, candidate_id) if cand is None: raise HTTPException(status_code=404, detail="merge candidate not found") left_out = right_out = None if cand.kind == MergeKind.scene: left_scene = session.get(Scene, cand.left_id) right_scene = session.get(Scene, cand.right_id) if left_scene is not None: left_out = _build_scene_out(session, left_scene) if right_scene is not None and right_scene.id != cand.left_id: right_out = _build_scene_out(session, right_scene) return MergeCandidateDetail( id=cand.id, kind=cand.kind.value, score=cand.score, status=cand.status.value, reasons=cand.reasons or {}, left=left_out, right=right_out, ) @router.post("/merge-candidates/{candidate_id}/resolve", response_model=ResolveResult) def resolve( candidate_id: uuid.UUID, body: ResolveBody, session: Annotated[Session, Depends(get_session)], ) -> ResolveResult: try: cand = resolve_candidate( session, candidate_id=candidate_id, action=body.action, keep_left=(body.keep == "left"), resolved_by=body.resolved_by, ) except MergeError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc keep_id = drop_id = None if body.action == "merge": keep_id = cand.left_id if body.keep == "left" else cand.right_id drop_id = cand.right_id if body.keep == "left" else cand.left_id return ResolveResult(id=cand.id, status=cand.status.value, keep_id=keep_id, drop_id=drop_id) # ---- Bandwidth monitor ----------------------------------------------------- class BandwidthCdnRow(BaseModel): cdn: str bytes: int pretty: str class BandwidthStats(BaseModel): """Per-CDN bytes-out z VPS proxy (rolling buckets). Restart api resetuje. Hetzner widoczne tylko gdy HETZNER_API_TOKEN i HETZNER_SERVER_ID w env.""" last_1h: list[BandwidthCdnRow] last_24h: list[BandwidthCdnRow] last_7d: list[BandwidthCdnRow] total_bytes_1h: int total_bytes_24h: int total_bytes_7d: int hetzner: dict | None = None def _fmt_bytes(b: int) -> str: if b < 1024: return f"{b} B" val = float(b) for u in ("KB", "MB", "GB", "TB"): val /= 1024 if val < 1024: return f"{val:.2f} {u}" return f"{val:.2f} PB" @router.get("/bandwidth", response_model=BandwidthStats) def bandwidth_stats() -> BandwidthStats: """Per-CDN VPS proxy bytes-out + Hetzner traffic stats. Critical dla public release — pokazuje gdzie VPS bandwidth wycieka. Pozwala spotted Mixdrop / bandwidth-heavy CDN-y przed Hetzner overage charge. """ from app.api.stream_proxy import get_bandwidth_stats from app.config import get_settings def _rows(stats: dict[str, int]) -> list[BandwidthCdnRow]: return [ BandwidthCdnRow(cdn=cdn, bytes=b, pretty=_fmt_bytes(b)) for cdn, b in stats.items() ] s_1h = get_bandwidth_stats(1) s_24h = get_bandwidth_stats(24) s_7d = get_bandwidth_stats(168) # Hetzner stats — load from cache file (written by check_hetzner_traffic.py cron). hetzner_data = None settings = get_settings() if settings.hetzner_api_token and settings.hetzner_server_id: import json from pathlib import Path cache_path = Path("/tmp/hetzner_traffic.json") if cache_path.exists(): try: hetzner_data = json.loads(cache_path.read_text()) except Exception: pass return BandwidthStats( last_1h=_rows(s_1h), last_24h=_rows(s_24h), last_7d=_rows(s_7d), total_bytes_1h=sum(s_1h.values()), total_bytes_24h=sum(s_24h.values()), total_bytes_7d=sum(s_7d.values()), hetzner=hetzner_data, )