trafficdeposit poster tokens live ~1h (hour-bucketed), so stored URLs can't persist.
New GET /proxy/sxyprn-thumb/{post_id}: resolves the current og:image from the live
/post/<id> page (cache resolved poster URL ~40min), streams bytes with Referer +
long client Cache-Control (URL is stable per post_id → client disk-caches the image,
backend fetches each post ~once). Deleted posts ("Post Not Found") → 404.
Scene grid now emits /proxy/sxyprn-thumb/<id> for sxyprn sources (derived from
page_url) instead of the dead stored trafficdeposit URL. Verified: live post → 200
image, deleted → 404, grid emits resolver URL. Backend-only, no OTA.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
687 lines
28 KiB
Python
687 lines
28 KiB
Python
"""Stream proxy — pomost VPS↔phone dla podpisanych URL-i CDN-ów.
|
|
|
|
Wiele hosterów (luluvids/medixiru/cdnvids/bigcdn) bindą podpisany URL do IP klienta
|
|
który fetchował embed page. Gdy backend ekstraktuje URL z VPS-a, signature
|
|
weryfikuje VPS IP — telefon dostaje 403. Player na phonie kieruje requesty
|
|
*przez backend* (tym samym IP co podczas extracji) → CDN sprawdza signature
|
|
poprawnie i serwuje content.
|
|
|
|
Flow:
|
|
1. /resolve packuje (url, referer) w token (HMAC-podpisany).
|
|
2. Mobile dostaje `stream_url = /proxy/{token}/master.m3u8` (lub `.mp4`).
|
|
3. ExoPlayer woła backend → backend strumieniuje content z origin URL.
|
|
4. HLS: m3u8 manifest jest rewrited tak, że dziecięce segmenty/playlisty
|
|
też idą przez proxy (chained tokens).
|
|
|
|
Token: base64url(json({u: url, r: referer, exp: unix_ts})) + HMAC-SHA256
|
|
podpisany shared secret z env (`STREAM_PROXY_SECRET`). TTL 4h żeby gracz mógł
|
|
oglądać dłuższe sceny + pause/seek bez ryzyka expired token.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import time
|
|
from typing import Annotated
|
|
from urllib.parse import urljoin, urlparse
|
|
|
|
import httpx
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, Request
|
|
from fastapi.responses import Response, StreamingResponse
|
|
|
|
from app.auth import require_api_key
|
|
|
|
router = APIRouter(prefix="/proxy", tags=["proxy"])
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
# In-memory bandwidth counter — bytes-out per CDN domain per hour bucket.
|
|
# Restart api resetuje counter (akceptowalne — to operational metric, nie billing).
|
|
# Critical dla widzenia gdzie VPS bandwidth wycieka przed Hetzner overage.
|
|
from collections import defaultdict
|
|
from threading import Lock
|
|
|
|
_bw_counters: dict[str, dict[int, int]] = defaultdict(lambda: defaultdict(int))
|
|
_bw_lock = Lock()
|
|
|
|
|
|
def _record_proxy_bytes(target_url: str, n_bytes: int) -> None:
|
|
"""Append n_bytes to current hour bucket for given target CDN domain.
|
|
Auto-prunes buckets older than 7 days. Thread-safe."""
|
|
if n_bytes <= 0:
|
|
return
|
|
try:
|
|
host = urlparse(target_url).hostname or "unknown"
|
|
except Exception:
|
|
host = "unknown"
|
|
hour = int(time.time() // 3600)
|
|
with _bw_lock:
|
|
_bw_counters[host][hour] += n_bytes
|
|
# Prune >7d (keep counter map small)
|
|
cutoff = hour - 168
|
|
old = [h for h in _bw_counters[host] if h < cutoff]
|
|
for h in old:
|
|
del _bw_counters[host][h]
|
|
|
|
|
|
def get_bandwidth_stats(hours: int = 24) -> dict[str, int]:
|
|
"""Returns {cdn_domain: bytes_out_in_last_N_hours}, sorted desc by bytes."""
|
|
now_hour = int(time.time() // 3600)
|
|
cutoff = now_hour - hours
|
|
result: dict[str, int] = {}
|
|
with _bw_lock:
|
|
for cdn, buckets in _bw_counters.items():
|
|
total = sum(b for h, b in buckets.items() if h > cutoff)
|
|
if total > 0:
|
|
result[cdn] = total
|
|
return dict(sorted(result.items(), key=lambda kv: -kv[1]))
|
|
|
|
DEFAULT_UA = (
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
|
|
"(KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36"
|
|
)
|
|
TOKEN_TTL_SEC = 4 * 60 * 60 # 4h
|
|
|
|
|
|
# URL-level redirect cache: target_url -> (final_resolved_url, expires_ts).
|
|
# Mobile ExoPlayer robi range-requesty per seek/preload — każdy hituje proxy z tym
|
|
# samym tokenem, proxy GET-uje target_url. Dla `porntrex.com/get_file/...` (a także
|
|
# fpoxxx, freshporno) URL jest **single-use**: pierwszy GET → 302 → CDN URL (time-bound),
|
|
# drugi GET → 410. Bez cache: drugi range = 410 → ExoPlayer fail → mobile fallback do
|
|
# `Linking.openURL(page_url)` → reklama (bug-reports `cee51c76`, `e2e365e3` 2026-05-22).
|
|
#
|
|
# Z cache: pierwszy GET follow-uje redirect, cache'uje final URL. Kolejne range hituje
|
|
# direct w CDN URL który jest time-bound (~1-2h), nie single-use. Mobile gra do końca
|
|
# bez fallbacku.
|
|
#
|
|
# TTL 1800s = 30 min: krócej niż typowy CDN signed-URL lifetime (~1h+), więc stale
|
|
# entries nie powodują 403 spam. Mobile po expiry retry-uje /resolve → fresh token.
|
|
_REDIRECT_CACHE: dict[str, tuple[str, float]] = {}
|
|
_REDIRECT_CACHE_TTL_SEC = 1800
|
|
_REDIRECT_CACHE_MAX = 1000
|
|
|
|
|
|
def _redirect_cache_get(target_url: str) -> str | None:
|
|
entry = _REDIRECT_CACHE.get(target_url)
|
|
if not entry:
|
|
return None
|
|
final, exp = entry
|
|
if exp < time.time():
|
|
_REDIRECT_CACHE.pop(target_url, None)
|
|
return None
|
|
return final
|
|
|
|
|
|
def _redirect_cache_put(target_url: str, final_url: str) -> None:
|
|
if not final_url or target_url == final_url:
|
|
return
|
|
_REDIRECT_CACHE[target_url] = (final_url, time.time() + _REDIRECT_CACHE_TTL_SEC)
|
|
if len(_REDIRECT_CACHE) > _REDIRECT_CACHE_MAX:
|
|
cutoff = time.time()
|
|
for k in list(_REDIRECT_CACHE.keys()):
|
|
v = _REDIRECT_CACHE.get(k)
|
|
if v is None or v[1] < cutoff:
|
|
_REDIRECT_CACHE.pop(k, None)
|
|
|
|
|
|
def _redirect_cache_invalidate(target_url: str) -> None:
|
|
_REDIRECT_CACHE.pop(target_url, None)
|
|
|
|
|
|
HOP_BY_HOP = {
|
|
"connection",
|
|
"keep-alive",
|
|
"proxy-authenticate",
|
|
"proxy-authorization",
|
|
"te",
|
|
"trailers",
|
|
"transfer-encoding",
|
|
"upgrade",
|
|
"content-encoding",
|
|
"content-length",
|
|
}
|
|
|
|
|
|
def _secret() -> bytes:
|
|
s = os.environ.get("STREAM_PROXY_SECRET") or os.environ.get("API_KEYS", "")
|
|
if not s:
|
|
raise RuntimeError("STREAM_PROXY_SECRET (or API_KEYS) must be set")
|
|
return s.encode("utf-8")
|
|
|
|
|
|
def make_token(
|
|
url: str,
|
|
referer: str | None = None,
|
|
ttl_sec: int = TOKEN_TTL_SEC,
|
|
*,
|
|
refresh: str | None = None,
|
|
refresh_hoster: str | None = None,
|
|
impersonate: bool = False,
|
|
stable_bucket_sec: int | None = None,
|
|
) -> str:
|
|
"""Build proxy token.
|
|
|
|
`refresh`: URL embed page do refetch gdy `url` zwraca 4xx. Proxy odbierze
|
|
fresh stream URL z embed (np. mixdrop MDCore.wurl) gdy oryginalny token expired.
|
|
`refresh_hoster`: hoster name dla refresh logic (mixdrop / etc.) — proxy
|
|
dispatch do dedicated re-extract logic.
|
|
`impersonate`: użyć curl_cffi chrome120 zamiast httpx (dla hosterów z JA3 bot
|
|
detection — mxcontent, cloudflare-protected).
|
|
`stable_bucket_sec`: kwantyzuje czas bazowy expiry do okna N sekund, więc token
|
|
(→ proxied URL) jest IDENTYCZNY dla wszystkich requestów w oknie. Bez tego `e`
|
|
zawiera bieżący timestamp → URL inny przy każdym fetchu listy → expo-image cache
|
|
miss → re-download miniatur za każdym razem. Dla thumbów (stabilne) dajemy 7d bucket
|
|
→ URL stały przez tydzień → disk-cache hit. exp = bucket_start + ttl (zawsze > now).
|
|
"""
|
|
now = int(time.time())
|
|
base = (now // stable_bucket_sec) * stable_bucket_sec if stable_bucket_sec else now
|
|
payload: dict = {"u": url, "r": referer or "", "e": base + ttl_sec}
|
|
if refresh:
|
|
payload["rf"] = refresh
|
|
if refresh_hoster:
|
|
payload["rh"] = refresh_hoster
|
|
if impersonate:
|
|
payload["i"] = 1
|
|
raw = json.dumps(payload, separators=(",", ":")).encode("utf-8")
|
|
body = base64.urlsafe_b64encode(raw).rstrip(b"=").decode("ascii")
|
|
sig = base64.urlsafe_b64encode(
|
|
hmac.new(_secret(), raw, hashlib.sha256).digest()
|
|
).rstrip(b"=").decode("ascii")
|
|
return f"{body}.{sig}"
|
|
|
|
|
|
def parse_token(token: str) -> dict:
|
|
try:
|
|
body_b64, sig_b64 = token.split(".", 1)
|
|
except ValueError:
|
|
raise HTTPException(status_code=400, detail="malformed token") from None
|
|
raw = base64.urlsafe_b64decode(body_b64 + "==")
|
|
expected = base64.urlsafe_b64encode(
|
|
hmac.new(_secret(), raw, hashlib.sha256).digest()
|
|
).rstrip(b"=").decode("ascii")
|
|
if not hmac.compare_digest(expected, sig_b64):
|
|
raise HTTPException(status_code=403, detail="bad token sig")
|
|
payload = json.loads(raw)
|
|
if int(payload.get("e", 0)) < int(time.time()):
|
|
raise HTTPException(status_code=410, detail="token expired")
|
|
return payload
|
|
|
|
|
|
def _ascii_safe_url(url: str) -> str:
|
|
"""Encode non-ASCII chars w URL path/query, zachowując reserved chars dla URI.
|
|
httpx wymaga ASCII headers — Referer z polskim/cyrillic/unicode (np. hqporner
|
|
`Honies_№2.html`) wcześniej throw'ował UnicodeEncodeError (GOON-A). `quote`
|
|
z `safe=":/?#[]@!$&'()*+,;=%"` zostawia URI structure nietkniętą, tylko
|
|
enkoduje znaki spoza ASCII."""
|
|
try:
|
|
from urllib.parse import quote
|
|
return quote(url, safe=":/?#[]@!$&'()*+,;=%~")
|
|
except Exception:
|
|
return url
|
|
|
|
|
|
def _build_headers(referer: str | None) -> dict[str, str]:
|
|
h = {
|
|
"User-Agent": DEFAULT_UA,
|
|
"Accept": "*/*",
|
|
"Accept-Language": "en-US,en;q=0.9",
|
|
}
|
|
if referer:
|
|
h["Referer"] = _ascii_safe_url(referer)
|
|
try:
|
|
host = urlparse(referer).hostname
|
|
if host:
|
|
h["Origin"] = _ascii_safe_url("https://" + host)
|
|
except Exception:
|
|
pass
|
|
return h
|
|
|
|
|
|
_M3U8_URI_RE = re.compile(r'(URI=")([^"]+)(")', re.IGNORECASE)
|
|
|
|
|
|
def _rewrite_m3u8(content: str, base_url: str, referer: str | None) -> str:
|
|
"""Rewrite m3u8 manifest tak, że wszystkie sub-resourcey idą przez proxy.
|
|
|
|
HLS manifest ma:
|
|
- linie URI (segmenty .ts / sub-playlisty .m3u8) — relatywne lub absolute
|
|
- tagi typu `#EXT-X-KEY:METHOD=AES-128,URI="key.bin"` — też potrzebują rewrite
|
|
Każdy URL → token + /proxy/{token}/<basename>.<ext>.
|
|
"""
|
|
out: list[str] = []
|
|
for raw_line in content.splitlines():
|
|
line = raw_line.strip()
|
|
if not line:
|
|
out.append(raw_line)
|
|
continue
|
|
if line.startswith("#"):
|
|
# Match URI="..." inside #EXT-X-KEY / #EXT-X-MEDIA / etc.
|
|
def _sub(m: re.Match) -> str:
|
|
inner = urljoin(base_url, m.group(2))
|
|
t = make_token(inner, referer)
|
|
return f'{m.group(1)}/proxy/{t}/seg{m.group(3)}'
|
|
new_line = _M3U8_URI_RE.sub(_sub, raw_line)
|
|
out.append(new_line)
|
|
continue
|
|
# Resource URI line
|
|
absolute = urljoin(base_url, line)
|
|
t = make_token(absolute, referer)
|
|
# Zachowaj rozszerzenie żeby ExoPlayer rozpoznał content-type:
|
|
ext = os.path.splitext(urlparse(absolute).path)[1].lstrip(".") or "ts"
|
|
out.append(f"/proxy/{t}/seg.{ext}")
|
|
return "\n".join(out) + "\n"
|
|
|
|
|
|
@router.get("/sign")
|
|
def sign_url(
|
|
_api: Annotated[None, Depends(require_api_key)],
|
|
url: str = Query(...),
|
|
referer: str | None = Query(default=None),
|
|
) -> dict:
|
|
"""Pomocniczy endpoint dla mobile do uzyskania świeżego tokena (np. po expiry).
|
|
Normalnie /resolve zwraca już proxy URL — to fallback."""
|
|
return {"token": make_token(url, referer), "expires_in": TOKEN_TTL_SEC}
|
|
|
|
|
|
@router.get("/img/{token}/{_basename:path}")
|
|
async def proxy_image(
|
|
token: str,
|
|
_basename: str,
|
|
request: Request,
|
|
) -> Response:
|
|
"""Image proxy — używany dla thumbnaili z CDN-ów wymagających Referera
|
|
(hqporner i inne porn-app sourcy). Mobile expo-image nie wysyła Referera
|
|
domyślnie, CDN zwraca 403. Backend dodaje Referer i streamuje obrazek.
|
|
|
|
Cache-Control: public,max-age=86400 — thumby są stabilne, klient może cachować."""
|
|
payload = parse_token(token)
|
|
target = payload["u"]
|
|
referer = payload["r"] or None
|
|
headers = _build_headers(referer)
|
|
timeout = httpx.Timeout(connect=10.0, read=30.0, write=15.0, pool=5.0)
|
|
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client:
|
|
try:
|
|
r = await client.get(target, headers=headers)
|
|
except (httpx.ConnectError, httpx.ConnectTimeout, httpx.ReadTimeout) as e:
|
|
# CDN connect/timeout — transient (np. Cloudflare 523 origin unreachable
|
|
# gdy upstream host jest off). Log INFO + 503, mobile renderuje placeholder.
|
|
# Bez tego Sentry dostawał setki ERROR-ów (GOON-D/6) z każdym broken
|
|
# tube'em — spam-szumiło real-issues.
|
|
log.info("img proxy connect/timeout for %s: %s", target, e)
|
|
return Response(content=b"", status_code=503, media_type="image/jpeg")
|
|
except Exception as e:
|
|
log.warning("img proxy fetch failed for %s: %s", target, e)
|
|
raise HTTPException(status_code=502, detail=f"img fetch failed: {e}") from e
|
|
if r.status_code >= 400:
|
|
# Upstream 4xx/5xx dla thumba — degraded zamiast raise (placeholder w mobile).
|
|
# GOON-5 (Cloudflare 523) i GOON-D — bezsensowny noise w Sentry, lepiej
|
|
# info log + 502 pass-through bez exception.
|
|
log.info("img proxy upstream %d for %s", r.status_code, target)
|
|
return Response(
|
|
content=b"",
|
|
status_code=502 if r.status_code >= 500 else r.status_code,
|
|
media_type="image/jpeg",
|
|
)
|
|
ct = r.headers.get("content-type", "image/jpeg")
|
|
return Response(
|
|
content=r.content,
|
|
media_type=ct,
|
|
headers={"Cache-Control": "public, max-age=86400"},
|
|
)
|
|
|
|
|
|
# sxyprn on-demand thumbnail resolve (bug 2026-06-10). trafficdeposit poster token
|
|
# żyje ~1h (bucket godzinowy), więc URL-i NIE da się przechować — resolvujemy bieżący
|
|
# og:image ze strony /post/<id> przy serwowaniu. Cache resolved poster URL ~40min
|
|
# (< 1h TTL). Klient dostaje STABILNY /proxy/sxyprn-thumb/<id> → cache'uje bajty na
|
|
# stałe (treść postera niezmienna), więc fetchujemy stronę post ~raz per post.
|
|
_SXYPRN_POSTER_CACHE: dict[str, tuple[str, float]] = {}
|
|
_SXYPRN_POSTER_TTL = 2400
|
|
_OG_IMG_RE = re.compile(r"og:image[\"'][^>]*content=[\"']([^\"']+)", re.IGNORECASE)
|
|
_OG_IMG_RE2 = re.compile(r"content=[\"']([^\"']+)[\"'][^>]*property=[\"']og:image", re.IGNORECASE)
|
|
_VID_POSTER_RE = re.compile(r"<video[^>]*poster=[\"']([^\"']+)", re.IGNORECASE)
|
|
_SXYPRN_PID_RE = re.compile(r"^[0-9a-f]{6,40}$")
|
|
|
|
|
|
@router.get("/sxyprn-thumb/{post_id}")
|
|
async def sxyprn_thumb(post_id: str) -> Response:
|
|
"""On-demand poster sxyprn. URL stabilny per post_id (klient cache'uje bajty);
|
|
backend resolvuje bieżący og:image (token ~1h) i streamuje z Refererem."""
|
|
pid = post_id.split(".")[0] # zdejmij ewentualne .jpg
|
|
if not _SXYPRN_PID_RE.match(pid):
|
|
raise HTTPException(status_code=400, detail="bad post_id")
|
|
now = time.time()
|
|
cached = _SXYPRN_POSTER_CACHE.get(pid)
|
|
poster = cached[0] if (cached and cached[1] > now) else None
|
|
timeout = httpx.Timeout(connect=10.0, read=20.0, write=10.0, pool=5.0)
|
|
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client:
|
|
if poster is None:
|
|
try:
|
|
r = await client.get(
|
|
f"https://sxyprn.com/post/{pid}.html", headers=_build_headers(None)
|
|
)
|
|
except Exception as e:
|
|
log.info("sxyprn-thumb page fetch failed %s: %s", pid, e)
|
|
return Response(content=b"", status_code=502, media_type="image/jpeg")
|
|
html = r.text
|
|
if "Post Not Found" in html:
|
|
return Response(content=b"", status_code=404, media_type="image/jpeg")
|
|
m = _OG_IMG_RE.search(html) or _OG_IMG_RE2.search(html) or _VID_POSTER_RE.search(html)
|
|
if not m:
|
|
return Response(content=b"", status_code=404, media_type="image/jpeg")
|
|
poster = m.group(1).strip()
|
|
if poster.startswith("//"):
|
|
poster = "https:" + poster
|
|
_SXYPRN_POSTER_CACHE[pid] = (poster, now + _SXYPRN_POSTER_TTL)
|
|
if len(_SXYPRN_POSTER_CACHE) > 8000:
|
|
for k in [k for k, v in list(_SXYPRN_POSTER_CACHE.items()) if v[1] < now]:
|
|
_SXYPRN_POSTER_CACHE.pop(k, None)
|
|
try:
|
|
pr = await client.get(poster, headers=_build_headers("https://sxyprn.com/"))
|
|
except Exception as e:
|
|
log.info("sxyprn-thumb poster fetch failed %s: %s", pid, e)
|
|
return Response(content=b"", status_code=502, media_type="image/jpeg")
|
|
if pr.status_code >= 400:
|
|
_SXYPRN_POSTER_CACHE.pop(pid, None) # stale token → re-resolve next time
|
|
return Response(content=b"", status_code=502, media_type="image/jpeg")
|
|
return Response(
|
|
content=pr.content,
|
|
media_type=pr.headers.get("content-type", "image/jpeg"),
|
|
headers={"Cache-Control": "public, max-age=604800"},
|
|
)
|
|
|
|
|
|
async def _refetch_mixdrop_url(session: "AsyncSession", embed_url: str) -> str | None:
|
|
"""Re-fetch mixdrop embed, decode P.A.C.K.E.R., extract fresh MDCore.wurl.
|
|
Cookies persist w session, użytkowane potem do mp4 GET (same-session bind).
|
|
UA + Accept wymagane — bez tego mixdrop zwraca minimalny body (bez packera).
|
|
"""
|
|
import re
|
|
from yt_dlp.utils import decode_packed_codes
|
|
embed_headers = {
|
|
"User-Agent": DEFAULT_UA,
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
|
"Accept-Language": "en-US,en;q=0.9",
|
|
}
|
|
try:
|
|
r = await session.get(embed_url, headers=embed_headers, impersonate="chrome120",
|
|
timeout=15, allow_redirects=True)
|
|
if r.status_code != 200:
|
|
return None
|
|
m = re.search(r"eval\(function\(p,a,c,k,e,d\)\{.+?\}\(.+?\)\)", r.text, re.DOTALL)
|
|
if not m:
|
|
return None
|
|
decoded = decode_packed_codes(m.group(0))
|
|
url_m = re.search(r'MDCore\.wurl\s*=\s*"([^"]+\.mp4[^"]*)"', decoded)
|
|
if not url_m:
|
|
return None
|
|
url = url_m.group(1)
|
|
if url.startswith("//"):
|
|
url = "https:" + url
|
|
return url
|
|
except Exception as e:
|
|
log.warning("refetch mixdrop failed for %s: %s", embed_url, e)
|
|
return None
|
|
|
|
|
|
async def _curl_cffi_stream(
|
|
target: str,
|
|
headers: dict,
|
|
*,
|
|
refetch_url: str | None = None,
|
|
refetch_hoster: str | None = None,
|
|
) -> Response:
|
|
"""Fallback dla hosterów które detect plain httpx JA3 jako bot (mxcontent,
|
|
cloudflare-protected CDNs). curl_cffi async z chrome120 impersonate ma
|
|
identyczny TLS fingerprint jak prawdziwy Chrome → CDN go przepuszcza.
|
|
|
|
Gdy `refetch_url` ustawione i mp4 GET zwraca 4xx, re-fetcha embed page
|
|
w SAME session żeby odświeżyć cookies + dostać nowy mp4 URL (same-session
|
|
bind dla mxcontent). Bez tego mixdrop mp4 token expires + brak cookies → 403.
|
|
"""
|
|
from curl_cffi.requests import AsyncSession
|
|
|
|
session = AsyncSession()
|
|
try:
|
|
# Dla mixdrop: ZAWSZE refetch embed jako PIERWSZE (przed mp4) żeby session
|
|
# miała fresh cookies. Initial mp4 attempt z expired/old token + brak
|
|
# cookies = 403 + anti-bot flag w cookies → blokuje retry też.
|
|
if refetch_url and refetch_hoster == "mixdrop":
|
|
new_mp4 = await _refetch_mixdrop_url(session, refetch_url)
|
|
if new_mp4:
|
|
target = new_mp4
|
|
log.info("mixdrop fresh-extract mp4 %s", new_mp4[:80])
|
|
|
|
upstream = await session.get(
|
|
target,
|
|
headers=headers,
|
|
impersonate="chrome120",
|
|
stream=True,
|
|
timeout=120,
|
|
allow_redirects=True,
|
|
)
|
|
log.info("mixdrop mp4 fetch %s → %d", target[:60], upstream.status_code)
|
|
if upstream.status_code >= 400:
|
|
await session.close()
|
|
return _upstream_error_response(upstream.status_code, dict(upstream.headers), target)
|
|
|
|
out_headers = {
|
|
k: v for k, v in upstream.headers.items() if k.lower() not in HOP_BY_HOP
|
|
}
|
|
|
|
async def streamer():
|
|
bytes_out = 0
|
|
try:
|
|
async for chunk in upstream.aiter_content():
|
|
bytes_out += len(chunk)
|
|
yield chunk
|
|
finally:
|
|
await session.close()
|
|
_record_proxy_bytes(target, bytes_out)
|
|
|
|
return StreamingResponse(
|
|
streamer(),
|
|
status_code=upstream.status_code,
|
|
headers=out_headers,
|
|
media_type=upstream.headers.get("content-type", "application/octet-stream"),
|
|
)
|
|
except Exception as e:
|
|
try:
|
|
await session.close()
|
|
except Exception:
|
|
pass
|
|
log.warning("curl_cffi proxy failed for %s: %s", target, e)
|
|
raise HTTPException(status_code=502, detail=f"proxy error: {e}") from e
|
|
|
|
|
|
@router.get("/{token}/{_basename:path}")
|
|
async def proxy_stream(
|
|
token: str,
|
|
_basename: str,
|
|
request: Request,
|
|
) -> Response:
|
|
payload = parse_token(token)
|
|
original_target = payload["u"]
|
|
referer = payload["r"] or None
|
|
use_impersonate = bool(payload.get("i"))
|
|
refetch_url = payload.get("rf")
|
|
refetch_hoster = payload.get("rh")
|
|
|
|
# Jeśli ten target był już wcześniej follow-redirect-ed, użyj cached final URL.
|
|
# Powód: porntrex `get_file/` 410 po reuse — patrz `_REDIRECT_CACHE` docstring.
|
|
cached_target = _redirect_cache_get(original_target)
|
|
target = cached_target or original_target
|
|
|
|
# Forwardujemy Range header (HLS/MP4 player robi byte-range fetches dla seek/preload)
|
|
headers = _build_headers(referer)
|
|
range_h = request.headers.get("range")
|
|
if range_h:
|
|
headers["Range"] = range_h
|
|
|
|
method = "GET" # ExoPlayer głównie GET; HEAD nie potrzebny — proxy zwraca pełne odpowiedzi
|
|
|
|
# Hostery które wymagają Chrome JA3 fingerprint (mxcontent / cloudflare-protected
|
|
# CDNs) — od razu używamy curl_cffi zamiast httpx żeby uniknąć 403→retry round-trip.
|
|
# Token `i=1` flag ustawiana przez extractor dla tych hostów (mixdrop.py).
|
|
if use_impersonate:
|
|
return await _curl_cffi_stream(
|
|
target, headers,
|
|
refetch_url=refetch_url, refetch_hoster=refetch_hoster,
|
|
)
|
|
|
|
# Krótszy timeout na request, ale długi read żeby streaming nie zerwał
|
|
timeout = httpx.Timeout(connect=15.0, read=120.0, write=30.0, pool=10.0)
|
|
parsed = urlparse(target)
|
|
path_lower = parsed.path.lower()
|
|
# Path-hint dla wstępnej decyzji, ale FINAL decyzja po content-type response.
|
|
# Powód: pornhat `get_file/.../<id>.mp4/` 302 → CDN m3u8 manifest mimo `.mp4`
|
|
# w path. Bez content-type check proxy traktuje jako binary, mobile dostaje
|
|
# m3u8 z RAW CDN URLs (IP-bound do VPS) → "no extractors" w ExoPlayer.
|
|
path_suggests_m3u8 = path_lower.endswith(".m3u8")
|
|
|
|
client = httpx.AsyncClient(timeout=timeout, follow_redirects=True)
|
|
try:
|
|
# Sprobój streaming send PIERWSZY — sprawdź content-type po headers,
|
|
# potem decyzja: rewrite manifest vs stream binary.
|
|
upstream = await client.send(
|
|
client.build_request(method, target, headers=headers),
|
|
stream=True,
|
|
follow_redirects=True,
|
|
)
|
|
if upstream.status_code >= 400:
|
|
status = upstream.status_code
|
|
ups_headers = dict(upstream.headers)
|
|
await upstream.aclose()
|
|
await client.aclose()
|
|
# Cached final URL zwrócił error (np. CDN signed-URL expired, 403/410) —
|
|
# invaliduj cache i daj mobile retry przez fresh /resolve. Bez tego stale
|
|
# cache trzymałby martwy CDN URL przez 30 min (TTL).
|
|
if cached_target is not None and status in (401, 403, 404, 410):
|
|
_redirect_cache_invalidate(original_target)
|
|
return _upstream_error_response(status, ups_headers, target)
|
|
|
|
# Pierwszy successful pass dla single-use targets (np. porntrex get_file):
|
|
# cache resolved final URL (po follow_redirects). Następne range-requesty
|
|
# pójdą direct w CDN URL — get_file nie dostaje drugiego hita.
|
|
if cached_target is None:
|
|
final_url = str(upstream.url)
|
|
if final_url != original_target:
|
|
_redirect_cache_put(original_target, final_url)
|
|
|
|
ct = (upstream.headers.get("content-type") or "").lower()
|
|
is_m3u8 = (
|
|
path_suggests_m3u8
|
|
or "mpegurl" in ct
|
|
or "application/x-mpegurl" in ct
|
|
)
|
|
if is_m3u8:
|
|
# Manifest content — buffer fully, rewrite, return as m3u8.
|
|
body = await upstream.aread()
|
|
await upstream.aclose()
|
|
await client.aclose()
|
|
try:
|
|
rewritten = _rewrite_m3u8(body.decode("utf-8", errors="replace"),
|
|
base_url=str(upstream.url), referer=referer)
|
|
except Exception as e:
|
|
log.warning("m3u8 rewrite failed for %s: %s", target, e)
|
|
raise HTTPException(status_code=502, detail="manifest rewrite failed") from e
|
|
return Response(
|
|
content=rewritten,
|
|
media_type="application/vnd.apple.mpegurl",
|
|
headers={"Cache-Control": "no-store"},
|
|
)
|
|
|
|
out_headers = {
|
|
k: v for k, v in upstream.headers.items() if k.lower() not in HOP_BY_HOP
|
|
}
|
|
|
|
async def streamer():
|
|
bytes_out = 0
|
|
try:
|
|
async for chunk in upstream.aiter_raw():
|
|
bytes_out += len(chunk)
|
|
yield chunk
|
|
finally:
|
|
await upstream.aclose()
|
|
await client.aclose()
|
|
_record_proxy_bytes(target, bytes_out)
|
|
|
|
return StreamingResponse(
|
|
streamer(),
|
|
status_code=upstream.status_code,
|
|
headers=out_headers,
|
|
media_type=upstream.headers.get("content-type", "application/octet-stream"),
|
|
)
|
|
except HTTPException:
|
|
await client.aclose()
|
|
raise
|
|
except (httpx.ConnectError, httpx.ConnectTimeout, httpx.ReadTimeout) as e:
|
|
# CDN connect failure / timeout — transient, log na INFO (nie ERROR do Sentry).
|
|
# Zwracamy 503 zamiast 502 + Retry-After, mobile może retry-ować bez panic.
|
|
await client.aclose()
|
|
log.info("proxy connect/timeout for %s: %s", target, e)
|
|
return Response(
|
|
content=f"upstream unreachable: {type(e).__name__}",
|
|
status_code=503,
|
|
headers={"Retry-After": "5"},
|
|
media_type="text/plain",
|
|
)
|
|
except Exception as e:
|
|
await client.aclose()
|
|
log.warning("proxy failed for %s: %s", target, e)
|
|
raise HTTPException(status_code=502, detail=f"proxy error: {e}") from e
|
|
|
|
|
|
def _upstream_error_response(
|
|
status: int,
|
|
upstream_headers: dict,
|
|
target: str,
|
|
) -> Response:
|
|
"""Mapuje upstream HTTP error na nasz response.
|
|
|
|
Rationale per status:
|
|
- **429 Too Many Requests**: CDN rate-limit (np. fpo.xxx gdy proxy hammeruje
|
|
get_file/). Pass-through 429 + Retry-After żeby mobile zrobiło backoff.
|
|
Log INFO (nie ERROR) — to expected behavior CDN-a, nie nasz bug.
|
|
- **404/410**: video deleted/expired token. Pass-through żeby player wiedział.
|
|
- **5xx upstream**: pochodzi z CDN-a, nie z naszego kodu. Log INFO.
|
|
- **inne 4xx**: 502 (i Sentry warn) — może być nasza wina (bad referer itp.).
|
|
"""
|
|
retry_after = upstream_headers.get("retry-after") or upstream_headers.get("Retry-After")
|
|
if status == 429:
|
|
log.info("proxy upstream 429 for %s (Retry-After=%s)", target, retry_after)
|
|
out_headers: dict[str, str] = {"Cache-Control": "no-store"}
|
|
if retry_after:
|
|
out_headers["Retry-After"] = str(retry_after)
|
|
else:
|
|
out_headers["Retry-After"] = "10"
|
|
return Response(
|
|
content="upstream rate limited",
|
|
status_code=429,
|
|
headers=out_headers,
|
|
media_type="text/plain",
|
|
)
|
|
if status in (404, 410):
|
|
log.info("proxy upstream %d for %s", status, target)
|
|
return Response(
|
|
content=f"upstream {status}",
|
|
status_code=status,
|
|
media_type="text/plain",
|
|
)
|
|
if 500 <= status < 600:
|
|
# CDN-side error (np. Cloudflare 523 — origin unreachable). Pass-through
|
|
# 502 ale log INFO bo to nie nasza wina.
|
|
log.info("proxy upstream %d for %s", status, target)
|
|
return Response(
|
|
content=f"upstream {status}",
|
|
status_code=502,
|
|
headers={"Retry-After": "5"},
|
|
media_type="text/plain",
|
|
)
|
|
# 4xx other (403 itp.) — raise żeby Sentry zarejestrował (może bug naszego kodu)
|
|
raise HTTPException(status_code=502, detail=f"upstream {status}")
|