goon/app/api/stream_proxy.py
jtrzupek 08079787da feat(sxyprn): on-demand thumbnail resolver (live posters, ~1h-TTL workaround)
trafficdeposit poster tokens live ~1h (hour-bucketed), so stored URLs can't persist.
New GET /proxy/sxyprn-thumb/{post_id}: resolves the current og:image from the live
/post/<id> page (cache resolved poster URL ~40min), streams bytes with Referer +
long client Cache-Control (URL is stable per post_id → client disk-caches the image,
backend fetches each post ~once). Deleted posts ("Post Not Found") → 404.

Scene grid now emits /proxy/sxyprn-thumb/<id> for sxyprn sources (derived from
page_url) instead of the dead stored trafficdeposit URL. Verified: live post → 200
image, deleted → 404, grid emits resolver URL. Backend-only, no OTA.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-10 15:02:49 +02:00

687 lines
28 KiB
Python

"""Stream proxy — pomost VPS↔phone dla podpisanych URL-i CDN-ów.
Wiele hosterów (luluvids/medixiru/cdnvids/bigcdn) bindą podpisany URL do IP klienta
który fetchował embed page. Gdy backend ekstraktuje URL z VPS-a, signature
weryfikuje VPS IP — telefon dostaje 403. Player na phonie kieruje requesty
*przez backend* (tym samym IP co podczas extracji) → CDN sprawdza signature
poprawnie i serwuje content.
Flow:
1. /resolve packuje (url, referer) w token (HMAC-podpisany).
2. Mobile dostaje `stream_url = /proxy/{token}/master.m3u8` (lub `.mp4`).
3. ExoPlayer woła backend → backend strumieniuje content z origin URL.
4. HLS: m3u8 manifest jest rewrited tak, że dziecięce segmenty/playlisty
też idą przez proxy (chained tokens).
Token: base64url(json({u: url, r: referer, exp: unix_ts})) + HMAC-SHA256
podpisany shared secret z env (`STREAM_PROXY_SECRET`). TTL 4h żeby gracz mógł
oglądać dłuższe sceny + pause/seek bez ryzyka expired token.
"""
from __future__ import annotations
import base64
import hashlib
import hmac
import json
import logging
import os
import re
import time
from typing import Annotated
from urllib.parse import urljoin, urlparse
import httpx
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from fastapi.responses import Response, StreamingResponse
from app.auth import require_api_key
router = APIRouter(prefix="/proxy", tags=["proxy"])
log = logging.getLogger(__name__)
# In-memory bandwidth counter — bytes-out per CDN domain per hour bucket.
# Restart api resetuje counter (akceptowalne — to operational metric, nie billing).
# Critical dla widzenia gdzie VPS bandwidth wycieka przed Hetzner overage.
from collections import defaultdict
from threading import Lock
_bw_counters: dict[str, dict[int, int]] = defaultdict(lambda: defaultdict(int))
_bw_lock = Lock()
def _record_proxy_bytes(target_url: str, n_bytes: int) -> None:
"""Append n_bytes to current hour bucket for given target CDN domain.
Auto-prunes buckets older than 7 days. Thread-safe."""
if n_bytes <= 0:
return
try:
host = urlparse(target_url).hostname or "unknown"
except Exception:
host = "unknown"
hour = int(time.time() // 3600)
with _bw_lock:
_bw_counters[host][hour] += n_bytes
# Prune >7d (keep counter map small)
cutoff = hour - 168
old = [h for h in _bw_counters[host] if h < cutoff]
for h in old:
del _bw_counters[host][h]
def get_bandwidth_stats(hours: int = 24) -> dict[str, int]:
"""Returns {cdn_domain: bytes_out_in_last_N_hours}, sorted desc by bytes."""
now_hour = int(time.time() // 3600)
cutoff = now_hour - hours
result: dict[str, int] = {}
with _bw_lock:
for cdn, buckets in _bw_counters.items():
total = sum(b for h, b in buckets.items() if h > cutoff)
if total > 0:
result[cdn] = total
return dict(sorted(result.items(), key=lambda kv: -kv[1]))
DEFAULT_UA = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36"
)
TOKEN_TTL_SEC = 4 * 60 * 60 # 4h
# URL-level redirect cache: target_url -> (final_resolved_url, expires_ts).
# Mobile ExoPlayer robi range-requesty per seek/preload — każdy hituje proxy z tym
# samym tokenem, proxy GET-uje target_url. Dla `porntrex.com/get_file/...` (a także
# fpoxxx, freshporno) URL jest **single-use**: pierwszy GET → 302 → CDN URL (time-bound),
# drugi GET → 410. Bez cache: drugi range = 410 → ExoPlayer fail → mobile fallback do
# `Linking.openURL(page_url)` → reklama (bug-reports `cee51c76`, `e2e365e3` 2026-05-22).
#
# Z cache: pierwszy GET follow-uje redirect, cache'uje final URL. Kolejne range hituje
# direct w CDN URL który jest time-bound (~1-2h), nie single-use. Mobile gra do końca
# bez fallbacku.
#
# TTL 1800s = 30 min: krócej niż typowy CDN signed-URL lifetime (~1h+), więc stale
# entries nie powodują 403 spam. Mobile po expiry retry-uje /resolve → fresh token.
_REDIRECT_CACHE: dict[str, tuple[str, float]] = {}
_REDIRECT_CACHE_TTL_SEC = 1800
_REDIRECT_CACHE_MAX = 1000
def _redirect_cache_get(target_url: str) -> str | None:
entry = _REDIRECT_CACHE.get(target_url)
if not entry:
return None
final, exp = entry
if exp < time.time():
_REDIRECT_CACHE.pop(target_url, None)
return None
return final
def _redirect_cache_put(target_url: str, final_url: str) -> None:
if not final_url or target_url == final_url:
return
_REDIRECT_CACHE[target_url] = (final_url, time.time() + _REDIRECT_CACHE_TTL_SEC)
if len(_REDIRECT_CACHE) > _REDIRECT_CACHE_MAX:
cutoff = time.time()
for k in list(_REDIRECT_CACHE.keys()):
v = _REDIRECT_CACHE.get(k)
if v is None or v[1] < cutoff:
_REDIRECT_CACHE.pop(k, None)
def _redirect_cache_invalidate(target_url: str) -> None:
_REDIRECT_CACHE.pop(target_url, None)
HOP_BY_HOP = {
"connection",
"keep-alive",
"proxy-authenticate",
"proxy-authorization",
"te",
"trailers",
"transfer-encoding",
"upgrade",
"content-encoding",
"content-length",
}
def _secret() -> bytes:
s = os.environ.get("STREAM_PROXY_SECRET") or os.environ.get("API_KEYS", "")
if not s:
raise RuntimeError("STREAM_PROXY_SECRET (or API_KEYS) must be set")
return s.encode("utf-8")
def make_token(
url: str,
referer: str | None = None,
ttl_sec: int = TOKEN_TTL_SEC,
*,
refresh: str | None = None,
refresh_hoster: str | None = None,
impersonate: bool = False,
stable_bucket_sec: int | None = None,
) -> str:
"""Build proxy token.
`refresh`: URL embed page do refetch gdy `url` zwraca 4xx. Proxy odbierze
fresh stream URL z embed (np. mixdrop MDCore.wurl) gdy oryginalny token expired.
`refresh_hoster`: hoster name dla refresh logic (mixdrop / etc.) — proxy
dispatch do dedicated re-extract logic.
`impersonate`: użyć curl_cffi chrome120 zamiast httpx (dla hosterów z JA3 bot
detection — mxcontent, cloudflare-protected).
`stable_bucket_sec`: kwantyzuje czas bazowy expiry do okna N sekund, więc token
(→ proxied URL) jest IDENTYCZNY dla wszystkich requestów w oknie. Bez tego `e`
zawiera bieżący timestamp → URL inny przy każdym fetchu listy → expo-image cache
miss → re-download miniatur za każdym razem. Dla thumbów (stabilne) dajemy 7d bucket
→ URL stały przez tydzień → disk-cache hit. exp = bucket_start + ttl (zawsze > now).
"""
now = int(time.time())
base = (now // stable_bucket_sec) * stable_bucket_sec if stable_bucket_sec else now
payload: dict = {"u": url, "r": referer or "", "e": base + ttl_sec}
if refresh:
payload["rf"] = refresh
if refresh_hoster:
payload["rh"] = refresh_hoster
if impersonate:
payload["i"] = 1
raw = json.dumps(payload, separators=(",", ":")).encode("utf-8")
body = base64.urlsafe_b64encode(raw).rstrip(b"=").decode("ascii")
sig = base64.urlsafe_b64encode(
hmac.new(_secret(), raw, hashlib.sha256).digest()
).rstrip(b"=").decode("ascii")
return f"{body}.{sig}"
def parse_token(token: str) -> dict:
try:
body_b64, sig_b64 = token.split(".", 1)
except ValueError:
raise HTTPException(status_code=400, detail="malformed token") from None
raw = base64.urlsafe_b64decode(body_b64 + "==")
expected = base64.urlsafe_b64encode(
hmac.new(_secret(), raw, hashlib.sha256).digest()
).rstrip(b"=").decode("ascii")
if not hmac.compare_digest(expected, sig_b64):
raise HTTPException(status_code=403, detail="bad token sig")
payload = json.loads(raw)
if int(payload.get("e", 0)) < int(time.time()):
raise HTTPException(status_code=410, detail="token expired")
return payload
def _ascii_safe_url(url: str) -> str:
"""Encode non-ASCII chars w URL path/query, zachowując reserved chars dla URI.
httpx wymaga ASCII headers — Referer z polskim/cyrillic/unicode (np. hqporner
`Honies_№2.html`) wcześniej throw'ował UnicodeEncodeError (GOON-A). `quote`
z `safe=":/?#[]@!$&'()*+,;=%"` zostawia URI structure nietkniętą, tylko
enkoduje znaki spoza ASCII."""
try:
from urllib.parse import quote
return quote(url, safe=":/?#[]@!$&'()*+,;=%~")
except Exception:
return url
def _build_headers(referer: str | None) -> dict[str, str]:
h = {
"User-Agent": DEFAULT_UA,
"Accept": "*/*",
"Accept-Language": "en-US,en;q=0.9",
}
if referer:
h["Referer"] = _ascii_safe_url(referer)
try:
host = urlparse(referer).hostname
if host:
h["Origin"] = _ascii_safe_url("https://" + host)
except Exception:
pass
return h
_M3U8_URI_RE = re.compile(r'(URI=")([^"]+)(")', re.IGNORECASE)
def _rewrite_m3u8(content: str, base_url: str, referer: str | None) -> str:
"""Rewrite m3u8 manifest tak, że wszystkie sub-resourcey idą przez proxy.
HLS manifest ma:
- linie URI (segmenty .ts / sub-playlisty .m3u8) — relatywne lub absolute
- tagi typu `#EXT-X-KEY:METHOD=AES-128,URI="key.bin"` — też potrzebują rewrite
Każdy URL → token + /proxy/{token}/<basename>.<ext>.
"""
out: list[str] = []
for raw_line in content.splitlines():
line = raw_line.strip()
if not line:
out.append(raw_line)
continue
if line.startswith("#"):
# Match URI="..." inside #EXT-X-KEY / #EXT-X-MEDIA / etc.
def _sub(m: re.Match) -> str:
inner = urljoin(base_url, m.group(2))
t = make_token(inner, referer)
return f'{m.group(1)}/proxy/{t}/seg{m.group(3)}'
new_line = _M3U8_URI_RE.sub(_sub, raw_line)
out.append(new_line)
continue
# Resource URI line
absolute = urljoin(base_url, line)
t = make_token(absolute, referer)
# Zachowaj rozszerzenie żeby ExoPlayer rozpoznał content-type:
ext = os.path.splitext(urlparse(absolute).path)[1].lstrip(".") or "ts"
out.append(f"/proxy/{t}/seg.{ext}")
return "\n".join(out) + "\n"
@router.get("/sign")
def sign_url(
_api: Annotated[None, Depends(require_api_key)],
url: str = Query(...),
referer: str | None = Query(default=None),
) -> dict:
"""Pomocniczy endpoint dla mobile do uzyskania świeżego tokena (np. po expiry).
Normalnie /resolve zwraca już proxy URL — to fallback."""
return {"token": make_token(url, referer), "expires_in": TOKEN_TTL_SEC}
@router.get("/img/{token}/{_basename:path}")
async def proxy_image(
token: str,
_basename: str,
request: Request,
) -> Response:
"""Image proxy — używany dla thumbnaili z CDN-ów wymagających Referera
(hqporner i inne porn-app sourcy). Mobile expo-image nie wysyła Referera
domyślnie, CDN zwraca 403. Backend dodaje Referer i streamuje obrazek.
Cache-Control: public,max-age=86400 — thumby są stabilne, klient może cachować."""
payload = parse_token(token)
target = payload["u"]
referer = payload["r"] or None
headers = _build_headers(referer)
timeout = httpx.Timeout(connect=10.0, read=30.0, write=15.0, pool=5.0)
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client:
try:
r = await client.get(target, headers=headers)
except (httpx.ConnectError, httpx.ConnectTimeout, httpx.ReadTimeout) as e:
# CDN connect/timeout — transient (np. Cloudflare 523 origin unreachable
# gdy upstream host jest off). Log INFO + 503, mobile renderuje placeholder.
# Bez tego Sentry dostawał setki ERROR-ów (GOON-D/6) z każdym broken
# tube'em — spam-szumiło real-issues.
log.info("img proxy connect/timeout for %s: %s", target, e)
return Response(content=b"", status_code=503, media_type="image/jpeg")
except Exception as e:
log.warning("img proxy fetch failed for %s: %s", target, e)
raise HTTPException(status_code=502, detail=f"img fetch failed: {e}") from e
if r.status_code >= 400:
# Upstream 4xx/5xx dla thumba — degraded zamiast raise (placeholder w mobile).
# GOON-5 (Cloudflare 523) i GOON-D — bezsensowny noise w Sentry, lepiej
# info log + 502 pass-through bez exception.
log.info("img proxy upstream %d for %s", r.status_code, target)
return Response(
content=b"",
status_code=502 if r.status_code >= 500 else r.status_code,
media_type="image/jpeg",
)
ct = r.headers.get("content-type", "image/jpeg")
return Response(
content=r.content,
media_type=ct,
headers={"Cache-Control": "public, max-age=86400"},
)
# sxyprn on-demand thumbnail resolve (bug 2026-06-10). trafficdeposit poster token
# żyje ~1h (bucket godzinowy), więc URL-i NIE da się przechować — resolvujemy bieżący
# og:image ze strony /post/<id> przy serwowaniu. Cache resolved poster URL ~40min
# (< 1h TTL). Klient dostaje STABILNY /proxy/sxyprn-thumb/<id> → cache'uje bajty na
# stałe (treść postera niezmienna), więc fetchujemy stronę post ~raz per post.
_SXYPRN_POSTER_CACHE: dict[str, tuple[str, float]] = {}
_SXYPRN_POSTER_TTL = 2400
_OG_IMG_RE = re.compile(r"og:image[\"'][^>]*content=[\"']([^\"']+)", re.IGNORECASE)
_OG_IMG_RE2 = re.compile(r"content=[\"']([^\"']+)[\"'][^>]*property=[\"']og:image", re.IGNORECASE)
_VID_POSTER_RE = re.compile(r"<video[^>]*poster=[\"']([^\"']+)", re.IGNORECASE)
_SXYPRN_PID_RE = re.compile(r"^[0-9a-f]{6,40}$")
@router.get("/sxyprn-thumb/{post_id}")
async def sxyprn_thumb(post_id: str) -> Response:
"""On-demand poster sxyprn. URL stabilny per post_id (klient cache'uje bajty);
backend resolvuje bieżący og:image (token ~1h) i streamuje z Refererem."""
pid = post_id.split(".")[0] # zdejmij ewentualne .jpg
if not _SXYPRN_PID_RE.match(pid):
raise HTTPException(status_code=400, detail="bad post_id")
now = time.time()
cached = _SXYPRN_POSTER_CACHE.get(pid)
poster = cached[0] if (cached and cached[1] > now) else None
timeout = httpx.Timeout(connect=10.0, read=20.0, write=10.0, pool=5.0)
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client:
if poster is None:
try:
r = await client.get(
f"https://sxyprn.com/post/{pid}.html", headers=_build_headers(None)
)
except Exception as e:
log.info("sxyprn-thumb page fetch failed %s: %s", pid, e)
return Response(content=b"", status_code=502, media_type="image/jpeg")
html = r.text
if "Post Not Found" in html:
return Response(content=b"", status_code=404, media_type="image/jpeg")
m = _OG_IMG_RE.search(html) or _OG_IMG_RE2.search(html) or _VID_POSTER_RE.search(html)
if not m:
return Response(content=b"", status_code=404, media_type="image/jpeg")
poster = m.group(1).strip()
if poster.startswith("//"):
poster = "https:" + poster
_SXYPRN_POSTER_CACHE[pid] = (poster, now + _SXYPRN_POSTER_TTL)
if len(_SXYPRN_POSTER_CACHE) > 8000:
for k in [k for k, v in list(_SXYPRN_POSTER_CACHE.items()) if v[1] < now]:
_SXYPRN_POSTER_CACHE.pop(k, None)
try:
pr = await client.get(poster, headers=_build_headers("https://sxyprn.com/"))
except Exception as e:
log.info("sxyprn-thumb poster fetch failed %s: %s", pid, e)
return Response(content=b"", status_code=502, media_type="image/jpeg")
if pr.status_code >= 400:
_SXYPRN_POSTER_CACHE.pop(pid, None) # stale token → re-resolve next time
return Response(content=b"", status_code=502, media_type="image/jpeg")
return Response(
content=pr.content,
media_type=pr.headers.get("content-type", "image/jpeg"),
headers={"Cache-Control": "public, max-age=604800"},
)
async def _refetch_mixdrop_url(session: "AsyncSession", embed_url: str) -> str | None:
"""Re-fetch mixdrop embed, decode P.A.C.K.E.R., extract fresh MDCore.wurl.
Cookies persist w session, użytkowane potem do mp4 GET (same-session bind).
UA + Accept wymagane — bez tego mixdrop zwraca minimalny body (bez packera).
"""
import re
from yt_dlp.utils import decode_packed_codes
embed_headers = {
"User-Agent": DEFAULT_UA,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
}
try:
r = await session.get(embed_url, headers=embed_headers, impersonate="chrome120",
timeout=15, allow_redirects=True)
if r.status_code != 200:
return None
m = re.search(r"eval\(function\(p,a,c,k,e,d\)\{.+?\}\(.+?\)\)", r.text, re.DOTALL)
if not m:
return None
decoded = decode_packed_codes(m.group(0))
url_m = re.search(r'MDCore\.wurl\s*=\s*"([^"]+\.mp4[^"]*)"', decoded)
if not url_m:
return None
url = url_m.group(1)
if url.startswith("//"):
url = "https:" + url
return url
except Exception as e:
log.warning("refetch mixdrop failed for %s: %s", embed_url, e)
return None
async def _curl_cffi_stream(
target: str,
headers: dict,
*,
refetch_url: str | None = None,
refetch_hoster: str | None = None,
) -> Response:
"""Fallback dla hosterów które detect plain httpx JA3 jako bot (mxcontent,
cloudflare-protected CDNs). curl_cffi async z chrome120 impersonate ma
identyczny TLS fingerprint jak prawdziwy Chrome → CDN go przepuszcza.
Gdy `refetch_url` ustawione i mp4 GET zwraca 4xx, re-fetcha embed page
w SAME session żeby odświeżyć cookies + dostać nowy mp4 URL (same-session
bind dla mxcontent). Bez tego mixdrop mp4 token expires + brak cookies → 403.
"""
from curl_cffi.requests import AsyncSession
session = AsyncSession()
try:
# Dla mixdrop: ZAWSZE refetch embed jako PIERWSZE (przed mp4) żeby session
# miała fresh cookies. Initial mp4 attempt z expired/old token + brak
# cookies = 403 + anti-bot flag w cookies → blokuje retry też.
if refetch_url and refetch_hoster == "mixdrop":
new_mp4 = await _refetch_mixdrop_url(session, refetch_url)
if new_mp4:
target = new_mp4
log.info("mixdrop fresh-extract mp4 %s", new_mp4[:80])
upstream = await session.get(
target,
headers=headers,
impersonate="chrome120",
stream=True,
timeout=120,
allow_redirects=True,
)
log.info("mixdrop mp4 fetch %s%d", target[:60], upstream.status_code)
if upstream.status_code >= 400:
await session.close()
return _upstream_error_response(upstream.status_code, dict(upstream.headers), target)
out_headers = {
k: v for k, v in upstream.headers.items() if k.lower() not in HOP_BY_HOP
}
async def streamer():
bytes_out = 0
try:
async for chunk in upstream.aiter_content():
bytes_out += len(chunk)
yield chunk
finally:
await session.close()
_record_proxy_bytes(target, bytes_out)
return StreamingResponse(
streamer(),
status_code=upstream.status_code,
headers=out_headers,
media_type=upstream.headers.get("content-type", "application/octet-stream"),
)
except Exception as e:
try:
await session.close()
except Exception:
pass
log.warning("curl_cffi proxy failed for %s: %s", target, e)
raise HTTPException(status_code=502, detail=f"proxy error: {e}") from e
@router.get("/{token}/{_basename:path}")
async def proxy_stream(
token: str,
_basename: str,
request: Request,
) -> Response:
payload = parse_token(token)
original_target = payload["u"]
referer = payload["r"] or None
use_impersonate = bool(payload.get("i"))
refetch_url = payload.get("rf")
refetch_hoster = payload.get("rh")
# Jeśli ten target był już wcześniej follow-redirect-ed, użyj cached final URL.
# Powód: porntrex `get_file/` 410 po reuse — patrz `_REDIRECT_CACHE` docstring.
cached_target = _redirect_cache_get(original_target)
target = cached_target or original_target
# Forwardujemy Range header (HLS/MP4 player robi byte-range fetches dla seek/preload)
headers = _build_headers(referer)
range_h = request.headers.get("range")
if range_h:
headers["Range"] = range_h
method = "GET" # ExoPlayer głównie GET; HEAD nie potrzebny — proxy zwraca pełne odpowiedzi
# Hostery które wymagają Chrome JA3 fingerprint (mxcontent / cloudflare-protected
# CDNs) — od razu używamy curl_cffi zamiast httpx żeby uniknąć 403→retry round-trip.
# Token `i=1` flag ustawiana przez extractor dla tych hostów (mixdrop.py).
if use_impersonate:
return await _curl_cffi_stream(
target, headers,
refetch_url=refetch_url, refetch_hoster=refetch_hoster,
)
# Krótszy timeout na request, ale długi read żeby streaming nie zerwał
timeout = httpx.Timeout(connect=15.0, read=120.0, write=30.0, pool=10.0)
parsed = urlparse(target)
path_lower = parsed.path.lower()
# Path-hint dla wstępnej decyzji, ale FINAL decyzja po content-type response.
# Powód: pornhat `get_file/.../<id>.mp4/` 302 → CDN m3u8 manifest mimo `.mp4`
# w path. Bez content-type check proxy traktuje jako binary, mobile dostaje
# m3u8 z RAW CDN URLs (IP-bound do VPS) → "no extractors" w ExoPlayer.
path_suggests_m3u8 = path_lower.endswith(".m3u8")
client = httpx.AsyncClient(timeout=timeout, follow_redirects=True)
try:
# Sprobój streaming send PIERWSZY — sprawdź content-type po headers,
# potem decyzja: rewrite manifest vs stream binary.
upstream = await client.send(
client.build_request(method, target, headers=headers),
stream=True,
follow_redirects=True,
)
if upstream.status_code >= 400:
status = upstream.status_code
ups_headers = dict(upstream.headers)
await upstream.aclose()
await client.aclose()
# Cached final URL zwrócił error (np. CDN signed-URL expired, 403/410) —
# invaliduj cache i daj mobile retry przez fresh /resolve. Bez tego stale
# cache trzymałby martwy CDN URL przez 30 min (TTL).
if cached_target is not None and status in (401, 403, 404, 410):
_redirect_cache_invalidate(original_target)
return _upstream_error_response(status, ups_headers, target)
# Pierwszy successful pass dla single-use targets (np. porntrex get_file):
# cache resolved final URL (po follow_redirects). Następne range-requesty
# pójdą direct w CDN URL — get_file nie dostaje drugiego hita.
if cached_target is None:
final_url = str(upstream.url)
if final_url != original_target:
_redirect_cache_put(original_target, final_url)
ct = (upstream.headers.get("content-type") or "").lower()
is_m3u8 = (
path_suggests_m3u8
or "mpegurl" in ct
or "application/x-mpegurl" in ct
)
if is_m3u8:
# Manifest content — buffer fully, rewrite, return as m3u8.
body = await upstream.aread()
await upstream.aclose()
await client.aclose()
try:
rewritten = _rewrite_m3u8(body.decode("utf-8", errors="replace"),
base_url=str(upstream.url), referer=referer)
except Exception as e:
log.warning("m3u8 rewrite failed for %s: %s", target, e)
raise HTTPException(status_code=502, detail="manifest rewrite failed") from e
return Response(
content=rewritten,
media_type="application/vnd.apple.mpegurl",
headers={"Cache-Control": "no-store"},
)
out_headers = {
k: v for k, v in upstream.headers.items() if k.lower() not in HOP_BY_HOP
}
async def streamer():
bytes_out = 0
try:
async for chunk in upstream.aiter_raw():
bytes_out += len(chunk)
yield chunk
finally:
await upstream.aclose()
await client.aclose()
_record_proxy_bytes(target, bytes_out)
return StreamingResponse(
streamer(),
status_code=upstream.status_code,
headers=out_headers,
media_type=upstream.headers.get("content-type", "application/octet-stream"),
)
except HTTPException:
await client.aclose()
raise
except (httpx.ConnectError, httpx.ConnectTimeout, httpx.ReadTimeout) as e:
# CDN connect failure / timeout — transient, log na INFO (nie ERROR do Sentry).
# Zwracamy 503 zamiast 502 + Retry-After, mobile może retry-ować bez panic.
await client.aclose()
log.info("proxy connect/timeout for %s: %s", target, e)
return Response(
content=f"upstream unreachable: {type(e).__name__}",
status_code=503,
headers={"Retry-After": "5"},
media_type="text/plain",
)
except Exception as e:
await client.aclose()
log.warning("proxy failed for %s: %s", target, e)
raise HTTPException(status_code=502, detail=f"proxy error: {e}") from e
def _upstream_error_response(
status: int,
upstream_headers: dict,
target: str,
) -> Response:
"""Mapuje upstream HTTP error na nasz response.
Rationale per status:
- **429 Too Many Requests**: CDN rate-limit (np. fpo.xxx gdy proxy hammeruje
get_file/). Pass-through 429 + Retry-After żeby mobile zrobiło backoff.
Log INFO (nie ERROR) — to expected behavior CDN-a, nie nasz bug.
- **404/410**: video deleted/expired token. Pass-through żeby player wiedział.
- **5xx upstream**: pochodzi z CDN-a, nie z naszego kodu. Log INFO.
- **inne 4xx**: 502 (i Sentry warn) — może być nasza wina (bad referer itp.).
"""
retry_after = upstream_headers.get("retry-after") or upstream_headers.get("Retry-After")
if status == 429:
log.info("proxy upstream 429 for %s (Retry-After=%s)", target, retry_after)
out_headers: dict[str, str] = {"Cache-Control": "no-store"}
if retry_after:
out_headers["Retry-After"] = str(retry_after)
else:
out_headers["Retry-After"] = "10"
return Response(
content="upstream rate limited",
status_code=429,
headers=out_headers,
media_type="text/plain",
)
if status in (404, 410):
log.info("proxy upstream %d for %s", status, target)
return Response(
content=f"upstream {status}",
status_code=status,
media_type="text/plain",
)
if 500 <= status < 600:
# CDN-side error (np. Cloudflare 523 — origin unreachable). Pass-through
# 502 ale log INFO bo to nie nasza wina.
log.info("proxy upstream %d for %s", status, target)
return Response(
content=f"upstream {status}",
status_code=502,
headers={"Retry-After": "5"},
media_type="text/plain",
)
# 4xx other (403 itp.) — raise żeby Sentry zarejestrował (może bug naszego kodu)
raise HTTPException(status_code=502, detail=f"upstream {status}")