goon/scripts/theporndude_curl_triage.py
https://github.com/goon-foss/goon 642f1ab8b8 Mobile 0.1.9: OTA enable, WebView cookie-dismiss fix, porndoe connector
Mobile / OTA:
- Enable Expo Updates (app.json + AndroidManifest) → api.goon-foss.org
- Bump 0.1.6 → 0.1.9 (build.gradle, app.json, appVersion.ts, main.py /version)
- backend.ts: default public backend auto-connect (no manual login)

WebView fallback fix (PlayerScreen INJECTED_JS):
- Auto-dismiss cookie/consent gates (hqporner et al. blocked kt_player init)
- Context-scoped: only clicks consent buttons inside cookie/gdpr containers
- Retry window for <source>.src polling raised 5→15 ticks (post-dismiss init)

Resolver:
- Series-position + modifier mismatch detector (Episode 2≠4, BTS/unedited)
  → composite_score hard-reject / cap; wired into scene_score + bulk_dedup
- aggregator-mode candidate query: LIMIT 500 + title-match ordering

Connectors:
- porndoe.com browse scraper (JSON-LD VideoObject) — theporndude audit pilot

landing: APK links → goon-v0.1.9.apk

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-22 11:20:57 +02:00

176 lines
6.4 KiB
Python

"""Batch curl triage 144 nowych theporndude tubes:
- HEAD root domain (200/4xx/5xx/timeout?)
- GET / → check landing markers: video listing, sceny, login wall, redirect
- GET /latest, /videos, /tube/recent → check które listing path działa
- Wynik: per-slug status + landing markers + scene_url_pattern guess
"""
import asyncio
import json
import re
from pathlib import Path
from urllib.parse import urlparse
import httpx
COVERAGE_FILE = Path("theporndude_coverage.json")
OUT_FILE = Path("theporndude_triage.json")
UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0 Safari/537.36"
# Common candidate landing paths
LANDING_PATHS = ["/", "/latest", "/latest-videos", "/recent", "/new", "/videos", "/category/new", "/top-rated"]
# Markers w HTML
SCENE_LINK_PATTERNS = [
r'<a[^>]+href="(/(?:video|videos|watch|v|scene|scenes|stream|movie|movies|view|play|porn|tube)/[^"]+)"',
r'<a[^>]+href="((?:https?:)?//[^/"]+/(?:video|videos|watch|v|scene|scenes|stream|movie|movies|view|play|porn|tube)/[^"]+)"',
]
META_MARKERS = [
(r'jsonld|json-ld|"@type"\s*:\s*"VideoObject"', "jsonld_video"),
(r'<meta\s+property="og:type"\s+content="video', "og_video"),
(r'<meta\s+name="description"\s+content="([^"]+)"', "meta_desc"),
(r'class="[^"]*\b(?:video|scene|episode)-?(?:item|card|tile|thumb)\b', "video_card"),
(r'class="[^"]*\b(?:performer|actress|model|pornstar)\b', "performer_marker"),
(r'class="[^"]*\b(?:studio|production|brand|channel)\b', "studio_marker"),
(r'class="[^"]*\b(?:duration|runtime|length)\b|<time\s+datetime=', "duration_marker"),
(r'\b(?:HLS|hls|m3u8|application/x-mpegURL)\b', "hls_marker"),
(r'(?:hlsmanifest|videoUrl|video_url|stream_url|streamUrl)\s*[:=]\s*["\']', "stream_url_marker"),
(r'login\s*required|create\s+account|sign\s+(?:in|up)|members\s+only|join\s+now\s+to\s+watch', "auth_wall"),
(r'<title>[^<]*\b(?:404|not\s+found|gone|domain)\b[^<]*</title>', "dead_404"),
(r'<meta[^>]+http-equiv="refresh"[^>]+url=', "meta_refresh"),
]
async def fetch_one(cli: httpx.AsyncClient, url: str) -> tuple[int, str]:
try:
r = await cli.get(url, headers={"User-Agent": UA}, follow_redirects=True)
return r.status_code, r.text[:200_000] # cap response
except httpx.ConnectError:
return -1, "conn_refused"
except httpx.TimeoutException:
return -2, "timeout"
except Exception as e:
return -9, str(e)[:120]
def analyze_html(html: str) -> dict:
found = {}
for pattern, name in META_MARKERS:
if re.search(pattern, html, re.IGNORECASE):
found[name] = True
# Scene link patterns
scene_links = []
for p in SCENE_LINK_PATTERNS:
for m in re.finditer(p, html, re.IGNORECASE):
scene_links.append(m.group(1)[:120])
if len(scene_links) >= 5:
break
if len(scene_links) >= 5:
break
if scene_links:
found["scene_link_samples"] = scene_links[:3]
# Unique pattern (path prefix po slash)
prefixes = set()
for link in scene_links:
parts = link.lstrip("/").split("/", 2)
if parts:
prefixes.add("/" + parts[0])
found["scene_path_prefixes"] = sorted(prefixes)
return found
async def audit_one(cli: httpx.AsyncClient, slug: str, domain: str) -> dict:
"""Audit pojedynczego tube'a."""
out = {"slug": slug, "domain": domain}
# Próbuj https://<domain>/ root
if not domain or not re.match(r"^[\w\.-]+\.\w+$", domain):
out["error"] = "no_valid_domain"
return out
root_url = f"https://{domain}/"
status, html = await fetch_one(cli, root_url)
out["root_status"] = status
if status not in (200, 301, 302):
out["root_error"] = html[:80] if isinstance(html, str) else None
return out
out["root_findings"] = analyze_html(html)
# Heurystyka score 0-3
f = out["root_findings"]
score = 0
reasons = []
if f.get("jsonld_video"):
score += 1
reasons.append("jsonld_video")
if f.get("og_video"):
score += 1
reasons.append("og_video")
if f.get("video_card"):
score += 1
reasons.append("video_card")
if f.get("performer_marker"):
score += 1
reasons.append("performer_marker")
if f.get("studio_marker"):
score += 1
reasons.append("studio_marker")
if f.get("duration_marker"):
score += 0.5
if f.get("hls_marker") or f.get("stream_url_marker"):
score += 0.5
if f.get("scene_path_prefixes"):
score += 1
reasons.append(f"scene_paths={f['scene_path_prefixes']}")
if f.get("auth_wall"):
score -= 2
reasons.append("auth_wall")
if f.get("dead_404"):
score -= 5
reasons.append("dead_404")
if f.get("meta_refresh"):
score -= 1
reasons.append("meta_refresh")
out["heuristic_score"] = round(score, 1)
out["reasons"] = reasons
return out
async def main():
cov = json.loads(COVERAGE_FILE.read_text())
new_candidates = cov["new_candidates"]
print(f"audytuję {len(new_candidates)} nowych kandydatów…")
timeout = httpx.Timeout(15.0, connect=8.0)
limits = httpx.Limits(max_keepalive_connections=20, max_connections=50)
async with httpx.AsyncClient(timeout=timeout, limits=limits, http2=False) as cli:
sem = asyncio.Semaphore(12)
async def worker(r):
async with sem:
# Use slug or guess domain (most slug.com)
domain = r.get("domain") or ""
# Jeśli pdude.link daje porndudecams.com (interstitial), użyj <slug>.com
if not domain or "porndudecams" in domain:
domain = f"{r['slug'].lower()}.com"
return {**r, **(await audit_one(cli, r["slug"], domain))}
results = await asyncio.gather(*[worker(r) for r in new_candidates])
OUT_FILE.write_text(json.dumps(results, indent=2))
# Stats
by_score = {}
for r in results:
s = r.get("heuristic_score", 0)
bucket = "5+" if s >= 5 else "3-5" if s >= 3 else "1-3" if s >= 1 else "<1"
by_score.setdefault(bucket, []).append(r)
print("\n=== Heurystyczny rozkład (canonical-fit) ===")
for b in ["5+", "3-5", "1-3", "<1"]:
if b in by_score:
print(f" {b:<5} {len(by_score[b])} tubów")
print(f"\n-> {OUT_FILE}")
if __name__ == "__main__":
asyncio.run(main())