"""Współdzielony resolver dla KVS (kt_player) tube'ów z `function/0/` + `license_code`. Wzorzec (yespornvip, pornditt, ... — wszystkie KVS na tym samym silniku): flashvars: `video_url` / `video_alt_url` / `video_alt_url2` = `function/0/https:/// get_file///.../.mp4/` (480/720/1080p) + `license_code: '$...'`. kt_player dekoduje HASH (permutacja pierwszych 32 znaków algorytmem license_code — algo zgodny z yt-dlp KVS `_kvs_get_real_url`, zweryfikowany 2026-05-31 że odtwarza output kt_player). Zdekodowany `get_file` 302-redirectuje do CDN (cdn5.yesporn.vip / twa.tgprn.com / ...) który serwuje wideo (206). Finalny URL jest **time-bound signed, NIE IP/cookie-bound** — portable cross-IP. get_file token jest session-bound, więc 302 MUSI być rozwiązany w tej samej sesji curl_cffi co fetch strony. Oddajemy finalny portable CDN url per jakość → mobile gra direct natywnie, multi-quality, zero WebView/reklam/proxy. Dlaczego server-side (nie WebView): WebView ładuje ad-heavy stronę + VAST preroll, a scrape łapie reklamę zamiast contentu. Direct get_file→CDN omija player/reklamę. """ from __future__ import annotations import logging import re import time import urllib.parse as _up from app.extractors._fetch import _DEFAULT_IMPERSONATE, _DEFAULT_UA, _HAS_CURL_CFFI from app.extractors._models import StreamSource log = logging.getLogger(__name__) _URL_RE = re.compile( r"(video(?:_alt)?_url\d*)\s*:\s*[\"'](function/0/[^\"']+)[\"']", re.IGNORECASE, ) _TEXT_RE = re.compile( r"(video(?:_alt)?_url\d*)_text\s*:\s*[\"']([^\"']*)[\"']", re.IGNORECASE, ) _LICENSE_RE = re.compile(r"license_code\s*:\s*[\"'](\$[^\"']+)[\"']", re.IGNORECASE) _HASH_LENGTH = 32 # get_file 302-follow timeout. Zdrowy CDN odpowiada <1s; gdy wideo zostało usunięte # z CDN (strona istnieje, ale get_file stalluje) request wisi do timeoutu. Trzymamy # go NISKO i osobno od page-fetch timeoutu — bug 6ec1960e 2026-06-02: yesporn.vip # przeszedł na cdn4/remote_control.php i dla martwych scen wszystkie 3 jakości # wisiały po 60s = 180s → mobile "resolving w nieskończoność". _GETFILE_TIMEOUT = 10.0 def _license_token(license_code: str) -> list[int]: license_code = license_code.replace("$", "") license_values = [int(c) for c in license_code] modlicense = license_code.replace("0", "1") center = len(modlicense) // 2 modlicense = str(4 * abs(int(modlicense[:center + 1]) - int(modlicense[center:])))[:center + 1] return [ (license_values[index + offset] + current) % 10 for index, current in enumerate(int(c) for c in modlicense) for offset in range(4) ] def real_url(video_url: str, license_code: str) -> str: """Dekoduje `function/0/...get_file/N//...` permutując pierwsze 32 znaki HASH.""" if not video_url.startswith("function/0/"): return video_url parsed = _up.urlparse(video_url[len("function/0/"):]) lt = _license_token(license_code) parts = parsed.path.split("/") h = parts[3][:_HASH_LENGTH] idx = list(range(_HASH_LENGTH)) acc = 0 for src in reversed(range(_HASH_LENGTH)): acc += lt[src] dest = (src + acc) % _HASH_LENGTH idx[src], idx[dest] = idx[dest], idx[src] parts[3] = "".join(h[i] for i in idx) + parts[3][_HASH_LENGTH:] return _up.urlunparse(parsed._replace(path="/".join(parts))) def _quality_rank(label: str | None) -> int: if not label: return -1 m = re.search(r"(\d{3,4})\s*p", label, re.IGNORECASE) return int(m.group(1)) if m else -1 def _resolve_get_file(session, base_url: str, get_file_url: str, timeout: float) -> str | None: sep = "&" if "?" in get_file_url else "?" url = f"{get_file_url}{sep}rnd={int(time.time() * 1000)}" try: r = session.get( url, timeout=timeout, allow_redirects=True, stream=True, headers={"Referer": base_url + "/", "Range": "bytes=0-1"}, ) final = str(r.url) status = r.status_code r.close() except Exception as e: log.info("kvs: get_file resolve failed (%s): %s", get_file_url[:60], e) return None if status >= 400 or "/get_file/" in final: log.info("kvs: get_file resolve bad status=%s final=%s", status, final[:70]) return None return final def resolve_kvs(page_url: str, *, base_url: str, timeout: float = 60.0) -> list[StreamSource] | None: """Fetch KVS page → decode function/0 get_file (per jakość) → follow 302 → portable CDN. Zwraca StreamSource'y posortowane malejąco po jakości, lub None gdy nic nie wyszło. base_url: scheme+host hosta (np. 'https://yesporn.vip') — do Referera i logów. """ if not _HAS_CURL_CFFI: log.info("kvs: curl_cffi unavailable — cannot resolve %s", page_url) return None from curl_cffi import requests as _cf_requests session = _cf_requests.Session(impersonate=_DEFAULT_IMPERSONATE) try: resp = session.get( page_url, headers={"User-Agent": _DEFAULT_UA, "Accept": "text/html,application/xhtml+xml"}, timeout=timeout, allow_redirects=True, ) html = resp.text if resp.status_code < 400 else "" except Exception as e: log.info("kvs: page fetch failed %s: %s", page_url, e) return None if not html: log.info("kvs: empty page %s", page_url) return None licm = _LICENSE_RE.search(html) if not licm: log.info("kvs: no license_code on %s", page_url) return None license_code = licm.group(1) quality_by_var: dict[str, str] = {} for m in _TEXT_RE.finditer(html): quality_by_var[m.group(1).lower()] = m.group(2).strip() # get_file timeout NISKI (osobny od page-fetch) + early-break: gdy 2 pierwsze # jakości nie rozwiążą się (przy 0 dotychczasowych wyników) scena jest martwa na # CDN — nie ma sensu czekać na trzecią (oszczędza kolejne _GETFILE_TIMEOUT). gf_timeout = min(timeout, _GETFILE_TIMEOUT) seen_dec: set[str] = set() result: list[StreamSource] = [] fails = 0 for m in _URL_RE.finditer(html): var_name = m.group(1).lower() decoded = real_url(m.group(2), license_code) if decoded in seen_dec: continue seen_dec.add(decoded) final = _resolve_get_file(session, base_url, decoded, gf_timeout) if not final: fails += 1 if fails >= 2 and not result: log.info("kvs: %d get_file fails, 0 results — martwa scena? %s", fails, page_url) break continue result.append( StreamSource( link=final, type="mp4", quality=quality_by_var.get(var_name) or None, referer=base_url + "/", raw={"mobile_direct_ok": True}, ) ) if not result: log.info("kvs: no resolvable get_file on %s", page_url) return None result.sort(key=lambda s: _quality_rank(s.quality), reverse=True) return result