"""Batch curl triage 144 nowych theporndude tubes:
- HEAD root domain (200/4xx/5xx/timeout?)
- GET / → check landing markers: video listing, sceny, login wall, redirect
- GET /latest, /videos, /tube/recent → check które listing path działa
- Wynik: per-slug status + landing markers + scene_url_pattern guess
"""
import asyncio
import json
import re
from pathlib import Path
from urllib.parse import urlparse
import httpx
COVERAGE_FILE = Path("theporndude_coverage.json")
OUT_FILE = Path("theporndude_triage.json")
UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0 Safari/537.36"
# Common candidate landing paths
LANDING_PATHS = ["/", "/latest", "/latest-videos", "/recent", "/new", "/videos", "/category/new", "/top-rated"]
# Markers w HTML
SCENE_LINK_PATTERNS = [
r']+href="(/(?:video|videos|watch|v|scene|scenes|stream|movie|movies|view|play|porn|tube)/[^"]+)"',
r']+href="((?:https?:)?//[^/"]+/(?:video|videos|watch|v|scene|scenes|stream|movie|movies|view|play|porn|tube)/[^"]+)"',
]
META_MARKERS = [
(r'jsonld|json-ld|"@type"\s*:\s*"VideoObject"', "jsonld_video"),
(r'[^<]*\b(?:404|not\s+found|gone|domain)\b[^<]*', "dead_404"),
(r']+http-equiv="refresh"[^>]+url=', "meta_refresh"),
]
async def fetch_one(cli: httpx.AsyncClient, url: str) -> tuple[int, str]:
try:
r = await cli.get(url, headers={"User-Agent": UA}, follow_redirects=True)
return r.status_code, r.text[:200_000] # cap response
except httpx.ConnectError:
return -1, "conn_refused"
except httpx.TimeoutException:
return -2, "timeout"
except Exception as e:
return -9, str(e)[:120]
def analyze_html(html: str) -> dict:
found = {}
for pattern, name in META_MARKERS:
if re.search(pattern, html, re.IGNORECASE):
found[name] = True
# Scene link patterns
scene_links = []
for p in SCENE_LINK_PATTERNS:
for m in re.finditer(p, html, re.IGNORECASE):
scene_links.append(m.group(1)[:120])
if len(scene_links) >= 5:
break
if len(scene_links) >= 5:
break
if scene_links:
found["scene_link_samples"] = scene_links[:3]
# Unique pattern (path prefix po slash)
prefixes = set()
for link in scene_links:
parts = link.lstrip("/").split("/", 2)
if parts:
prefixes.add("/" + parts[0])
found["scene_path_prefixes"] = sorted(prefixes)
return found
async def audit_one(cli: httpx.AsyncClient, slug: str, domain: str) -> dict:
"""Audit pojedynczego tube'a."""
out = {"slug": slug, "domain": domain}
# Próbuj https:/// root
if not domain or not re.match(r"^[\w\.-]+\.\w+$", domain):
out["error"] = "no_valid_domain"
return out
root_url = f"https://{domain}/"
status, html = await fetch_one(cli, root_url)
out["root_status"] = status
if status not in (200, 301, 302):
out["root_error"] = html[:80] if isinstance(html, str) else None
return out
out["root_findings"] = analyze_html(html)
# Heurystyka score 0-3
f = out["root_findings"]
score = 0
reasons = []
if f.get("jsonld_video"):
score += 1
reasons.append("jsonld_video")
if f.get("og_video"):
score += 1
reasons.append("og_video")
if f.get("video_card"):
score += 1
reasons.append("video_card")
if f.get("performer_marker"):
score += 1
reasons.append("performer_marker")
if f.get("studio_marker"):
score += 1
reasons.append("studio_marker")
if f.get("duration_marker"):
score += 0.5
if f.get("hls_marker") or f.get("stream_url_marker"):
score += 0.5
if f.get("scene_path_prefixes"):
score += 1
reasons.append(f"scene_paths={f['scene_path_prefixes']}")
if f.get("auth_wall"):
score -= 2
reasons.append("auth_wall")
if f.get("dead_404"):
score -= 5
reasons.append("dead_404")
if f.get("meta_refresh"):
score -= 1
reasons.append("meta_refresh")
out["heuristic_score"] = round(score, 1)
out["reasons"] = reasons
return out
async def main():
cov = json.loads(COVERAGE_FILE.read_text())
new_candidates = cov["new_candidates"]
print(f"audytuję {len(new_candidates)} nowych kandydatów…")
timeout = httpx.Timeout(15.0, connect=8.0)
limits = httpx.Limits(max_keepalive_connections=20, max_connections=50)
async with httpx.AsyncClient(timeout=timeout, limits=limits, http2=False) as cli:
sem = asyncio.Semaphore(12)
async def worker(r):
async with sem:
# Use slug or guess domain (most slug.com)
domain = r.get("domain") or ""
# Jeśli pdude.link daje porndudecams.com (interstitial), użyj .com
if not domain or "porndudecams" in domain:
domain = f"{r['slug'].lower()}.com"
return {**r, **(await audit_one(cli, r["slug"], domain))}
results = await asyncio.gather(*[worker(r) for r in new_candidates])
OUT_FILE.write_text(json.dumps(results, indent=2))
# Stats
by_score = {}
for r in results:
s = r.get("heuristic_score", 0)
bucket = "5+" if s >= 5 else "3-5" if s >= 3 else "1-3" if s >= 1 else "<1"
by_score.setdefault(bucket, []).append(r)
print("\n=== Heurystyczny rozkład (canonical-fit) ===")
for b in ["5+", "3-5", "1-3", "<1"]:
if b in by_score:
print(f" {b:<5} {len(by_score[b])} tubów")
print(f"\n-> {OUT_FILE}")
if __name__ == "__main__":
asyncio.run(main())