"""Batch curl triage 144 nowych theporndude tubes: - HEAD root domain (200/4xx/5xx/timeout?) - GET / → check landing markers: video listing, sceny, login wall, redirect - GET /latest, /videos, /tube/recent → check które listing path działa - Wynik: per-slug status + landing markers + scene_url_pattern guess """ import asyncio import json import re from pathlib import Path from urllib.parse import urlparse import httpx COVERAGE_FILE = Path("theporndude_coverage.json") OUT_FILE = Path("theporndude_triage.json") UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0 Safari/537.36" # Common candidate landing paths LANDING_PATHS = ["/", "/latest", "/latest-videos", "/recent", "/new", "/videos", "/category/new", "/top-rated"] # Markers w HTML SCENE_LINK_PATTERNS = [ r']+href="(/(?:video|videos|watch|v|scene|scenes|stream|movie|movies|view|play|porn|tube)/[^"]+)"', r']+href="((?:https?:)?//[^/"]+/(?:video|videos|watch|v|scene|scenes|stream|movie|movies|view|play|porn|tube)/[^"]+)"', ] META_MARKERS = [ (r'jsonld|json-ld|"@type"\s*:\s*"VideoObject"', "jsonld_video"), (r'[^<]*\b(?:404|not\s+found|gone|domain)\b[^<]*', "dead_404"), (r']+http-equiv="refresh"[^>]+url=', "meta_refresh"), ] async def fetch_one(cli: httpx.AsyncClient, url: str) -> tuple[int, str]: try: r = await cli.get(url, headers={"User-Agent": UA}, follow_redirects=True) return r.status_code, r.text[:200_000] # cap response except httpx.ConnectError: return -1, "conn_refused" except httpx.TimeoutException: return -2, "timeout" except Exception as e: return -9, str(e)[:120] def analyze_html(html: str) -> dict: found = {} for pattern, name in META_MARKERS: if re.search(pattern, html, re.IGNORECASE): found[name] = True # Scene link patterns scene_links = [] for p in SCENE_LINK_PATTERNS: for m in re.finditer(p, html, re.IGNORECASE): scene_links.append(m.group(1)[:120]) if len(scene_links) >= 5: break if len(scene_links) >= 5: break if scene_links: found["scene_link_samples"] = scene_links[:3] # Unique pattern (path prefix po slash) prefixes = set() for link in scene_links: parts = link.lstrip("/").split("/", 2) if parts: prefixes.add("/" + parts[0]) found["scene_path_prefixes"] = sorted(prefixes) return found async def audit_one(cli: httpx.AsyncClient, slug: str, domain: str) -> dict: """Audit pojedynczego tube'a.""" out = {"slug": slug, "domain": domain} # Próbuj https:/// root if not domain or not re.match(r"^[\w\.-]+\.\w+$", domain): out["error"] = "no_valid_domain" return out root_url = f"https://{domain}/" status, html = await fetch_one(cli, root_url) out["root_status"] = status if status not in (200, 301, 302): out["root_error"] = html[:80] if isinstance(html, str) else None return out out["root_findings"] = analyze_html(html) # Heurystyka score 0-3 f = out["root_findings"] score = 0 reasons = [] if f.get("jsonld_video"): score += 1 reasons.append("jsonld_video") if f.get("og_video"): score += 1 reasons.append("og_video") if f.get("video_card"): score += 1 reasons.append("video_card") if f.get("performer_marker"): score += 1 reasons.append("performer_marker") if f.get("studio_marker"): score += 1 reasons.append("studio_marker") if f.get("duration_marker"): score += 0.5 if f.get("hls_marker") or f.get("stream_url_marker"): score += 0.5 if f.get("scene_path_prefixes"): score += 1 reasons.append(f"scene_paths={f['scene_path_prefixes']}") if f.get("auth_wall"): score -= 2 reasons.append("auth_wall") if f.get("dead_404"): score -= 5 reasons.append("dead_404") if f.get("meta_refresh"): score -= 1 reasons.append("meta_refresh") out["heuristic_score"] = round(score, 1) out["reasons"] = reasons return out async def main(): cov = json.loads(COVERAGE_FILE.read_text()) new_candidates = cov["new_candidates"] print(f"audytuję {len(new_candidates)} nowych kandydatów…") timeout = httpx.Timeout(15.0, connect=8.0) limits = httpx.Limits(max_keepalive_connections=20, max_connections=50) async with httpx.AsyncClient(timeout=timeout, limits=limits, http2=False) as cli: sem = asyncio.Semaphore(12) async def worker(r): async with sem: # Use slug or guess domain (most slug.com) domain = r.get("domain") or "" # Jeśli pdude.link daje porndudecams.com (interstitial), użyj .com if not domain or "porndudecams" in domain: domain = f"{r['slug'].lower()}.com" return {**r, **(await audit_one(cli, r["slug"], domain))} results = await asyncio.gather(*[worker(r) for r in new_candidates]) OUT_FILE.write_text(json.dumps(results, indent=2)) # Stats by_score = {} for r in results: s = r.get("heuristic_score", 0) bucket = "5+" if s >= 5 else "3-5" if s >= 3 else "1-3" if s >= 1 else "<1" by_score.setdefault(bucket, []).append(r) print("\n=== Heurystyczny rozkład (canonical-fit) ===") for b in ["5+", "3-5", "1-3", "<1"]: if b in by_score: print(f" {b:<5} {len(by_score[b])} tubów") print(f"\n-> {OUT_FILE}") if __name__ == "__main__": asyncio.run(main())