"""Pełny pipeline dla theporndude /full-porn-movies-sites (94 tubes): 1. Resolve real domains (pdude.link follow, ale follow only 1 hop) 2. Coverage match vs nasze 25+ origins 3. Curl triage HTML markers 4. Per-tube scorecard """ import asyncio import json import re from pathlib import Path from urllib.parse import urlparse import httpx UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0 Safari/537.36" OUR_ORIGINS = [ "tube:0dayxxcom", "tube:epornercom", "tube:fpoxxx", "tube:freshpornoorg", "tube:hqpornercom", "tube:latestpornvideocom", "tube:mypornerleakcom", "tube:perverzijacom", "tube:porn00org", "tube:porndishcom", "tube:porndittcom", "tube:pornhatcom", "tube:pornhubcom", "tube:porntrexcom", "tube:pornxpph", "tube:redtubecom", "tube:sxylandcom", "tube:sxyprncom", "tube:xhamstercom", "tube:xnxxcom", "tube:xvideoscom", "tube:youporncom", "tube:latestleaksco", "tube:siskavideo", "tube:hdporn92com", "tube:xmoviesforyoucom", "tube:watchporn", "tube:porn4dayspw", "tube:paradisehillcc", ] _TLD_RE = re.compile(r"(com|net|org|tv|cc|pw|co|to|ws|me|sx|info|biz)$") def _strip_tld(s: str) -> str: return _TLD_RE.sub("", s) def match(slug: str, domain: str) -> str | None: candidates = [] if slug: candidates.append(slug.lower().replace("-", "")) if domain: candidates.append(domain.lower().replace(".", "").replace("-", "")) for o in OUR_ORIGINS: st = o.replace("tube:", "") st_no_tld = _strip_tld(st) for c in candidates: c_no_tld = _strip_tld(c) if c_no_tld == st_no_tld and len(c_no_tld) >= 3: return o return None SCENE_PATH_RE = re.compile( r']+href="((?:https?:)?//?[^"]*?/(?:video|videos|watch|v|scene|movie|movies|play|view|stream)/[^"]+)"', re.IGNORECASE, ) META_MARKERS = [ (r'"@type"\s*:\s*"VideoObject"', "jsonld_video"), (r'[^<]*\b(?:404|not\s+found|domain\s+for\s+sale|gone)\b', "dead_404"), ] async def fetch_one(cli: httpx.AsyncClient, url: str, *, max_redirects: int = 5) -> tuple[int, str, str]: try: r = await cli.get(url, headers={"User-Agent": UA}, follow_redirects=False) # Follow up to max_redirects but stop on cross-domain redirect-out (to detect pdude.link → ad) hops = 0 first_external_domain = None cur = r cur_url = url while cur.status_code in (301, 302, 303, 307, 308) and hops < max_redirects: loc = cur.headers.get("location") if not loc: break if loc.startswith("/"): p = urlparse(cur_url) loc = f"{p.scheme}://{p.netloc}{loc}" cur_url = loc hops += 1 # Track first external (non-pdude, non-theporndude) host = urlparse(loc).hostname or "" if first_external_domain is None and not host.endswith("pdude.link") and not host.endswith("theporndude.com"): first_external_domain = host.replace("www.", "") cur = await cli.get(loc, headers={"User-Agent": UA}, follow_redirects=False) return cur.status_code, cur.text[:200_000] if hasattr(cur, "text") else "", first_external_domain or (urlparse(cur_url).hostname or "").replace("www.", "") except httpx.ConnectError: return -1, "conn_refused", "" except httpx.TimeoutException: return -2, "timeout", "" except Exception as e: return -9, str(e)[:120], "" async def resolve_domain(cli: httpx.AsyncClient, slug: str) -> str: """Pdude.link follow z early-exit dla first external.""" try: r = await cli.get(f"https://pdude.link/{slug}", headers={"User-Agent": UA}, follow_redirects=False) loc = r.headers.get("location", "") if loc: host = urlparse(loc).hostname or "" host = host.replace("www.", "") # Jeśli pdude.link redirectuje na affiliate (anexo.link/awejmp.com/etc) — wyciągnij subAffId if "anexo.link" in host or "awejmp.com" in host or "porndudecams" in host: # Try slug.com fallback return "" return host except Exception: pass return "" def analyze_html(html: str) -> dict: found = {} for pattern, name in META_MARKERS: if re.search(pattern, html, re.IGNORECASE): found[name] = True prefixes = set() sample = [] for m in SCENE_PATH_RE.finditer(html): link = m.group(1) sample.append(link[:100]) # Wyciągnij prefix # Normalize: //host/path → /path; otherwise full match if link.startswith("//"): link = "/" + link.split("/", 3)[3] if "/" in link[2:] else "/" if link.startswith("/"): parts = link.lstrip("/").split("/", 2) if parts: prefixes.add("/" + parts[0]) if len(sample) >= 5: break if prefixes: found["scene_path_prefixes"] = sorted(prefixes) if sample: found["scene_link_samples"] = sample[:3] return found def score_findings(f: dict) -> tuple[float, list]: score, reasons = 0.0, [] if f.get("jsonld_video"): score += 1.5; reasons.append("jsonld_video") if f.get("og_video"): score += 0.5; reasons.append("og_video") if f.get("video_card"): score += 1; reasons.append("video_card") if f.get("performer_marker"): score += 1; reasons.append("performer_marker") if f.get("studio_marker"): score += 1; reasons.append("studio_marker") if f.get("duration_marker"): score += 0.5; reasons.append("duration_marker") if f.get("hls_marker") or f.get("stream_url_marker"): score += 0.5 if f.get("scene_path_prefixes"): score += 1; reasons.append(f"paths={f['scene_path_prefixes']}") if f.get("auth_wall"): score -= 2; reasons.append("auth_wall") if f.get("dead_404"): score -= 5; reasons.append("dead_404") return round(score, 1), reasons async def main(): movies = json.loads(Path("theporndude_movies.json").read_text())["all"] print(f"audyt {len(movies)} tubów z full-porn-movies-sites…") timeout = httpx.Timeout(15.0, connect=8.0) async with httpx.AsyncClient(timeout=timeout, http2=False) as cli: sem = asyncio.Semaphore(12) async def worker(r): async with sem: slug = r["slug"] # Resolve real domain z pdude.link first hop domain = await resolve_domain(cli, slug) if not domain or any(x in domain for x in ["anexo.link", "awejmp.com", "porndudecams"]): domain = f"{slug.lower()}.com" # Curl root + scene path heurystyka status, html, _ = await fetch_one(cli, f"https://{domain}/") findings = analyze_html(html) if status == 200 else {} score, reasons = score_findings(findings) our = match(slug, domain) return { **r, "domain": domain, "root_status": status, "findings": findings, "score": score, "reasons": reasons, "our_origin": our, } results = await asyncio.gather(*[worker(r) for r in movies]) # Aggregate have = [r for r in results if r["our_origin"]] new_promising = [r for r in results if not r["our_origin"] and r["score"] >= 2.5] new_low = [r for r in results if not r["our_origin"] and 1 <= r["score"] < 2.5] new_zero = [r for r in results if not r["our_origin"] and 0 < r["score"] < 1] new_dead = [r for r in results if not r["our_origin"] and (r["root_status"] <= 0 or r["score"] < 0)] new_no_signal = [r for r in results if not r["our_origin"] and r["score"] == 0 and r["root_status"] == 200] print(f"\n=== Coverage /full-porn-movies-sites ({len(results)} tubes) ===") print(f" already have: {len(have):>3}") print(f" promising: {len(new_promising):>3}") print(f" low value: {len(new_low):>3}") print(f" no signal: {len(new_no_signal):>3}") print(f" dead: {len(new_dead):>3}") print() print("ALREADY HAVE:") for r in have: print(f" {r['slug']:<20} -> {r['our_origin']}") print() print("PROMISING (score >= 2.5):") for r in sorted(new_promising, key=lambda x: -x["score"]): print(f" score={r['score']:>4} {r['domain']:<25} ({r['slug']:<20}) reasons={','.join(r['reasons'])[:60]}") print() print("LOW VALUE (1-2.5):") for r in sorted(new_low, key=lambda x: -x["score"]): print(f" score={r['score']:>4} {r['domain']:<25} ({r['slug']:<20}) reasons={','.join(r['reasons'])[:60]}") Path("theporndude_movies_scorecard.json").write_text(json.dumps(results, indent=2)) if __name__ == "__main__": asyncio.run(main())