Update NPO.py

This commit is contained in:
adef17286-sudo 2025-12-17 17:51:23 +00:00 committed by GitHub
parent b3e4842a6d
commit 298f1179f7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

291
NPO.py
View File

@ -1,199 +1,122 @@
#!/usr/bin/env python3
import subprocess
import re
import requests
from pathlib import Path
import re
import json
import argparse
import os
import sys
import traceback
from pywidevine.cdm import Cdm
from pywidevine.device import Device
from pywidevine.pssh import PSSH
def run_npo_get_output(link):
def load_cookies(cookie_file):
"""Load cookies from a Netscape format cookie file and return a Cookie header string."""
cookie_header = []
try:
script_dir = Path(sys.argv[0]).parent.resolve()
npo_path = script_dir / "NPO.py"
p = subprocess.run([sys.executable, str(npo_path), link],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
text=True)
return p.stdout or "", p.stderr or "", p.returncode
with open(cookie_file, 'r') as f:
for line in f:
if line.startswith('#') or not line.strip():
continue # Skip comments and empty lines
parts = line.strip().split('\t')
if len(parts) >= 7:
domain = parts[0]
if domain.startswith('.'):
domain = domain[1:] # Remove leading dot
cookie_name = parts[5]
cookie_value = parts[6]
cookie_header.append(f"{cookie_name}={cookie_value}")
except Exception as e:
print("[!] Failed to run NPO:", e)
return "", str(e), 1
print(f"Error loading cookies: {str(e)}")
return '; '.join(cookie_header)
def get_stream_url(url):
# Validate the new URL structure
if url.startswith("https://npo.nl/start/afspelen/"):
try:
# Load cookies from cookies.txt if it exists
cookie_file = 'cookies.txt'
cookie_header = load_cookies(cookie_file) if os.path.exists(cookie_file) else None
def extract_mpd_url(text):
m = re.search(r'(https?://[^\s\'"]+?\.mpd(?:[^\s\'"]*)?)', text, re.IGNORECASE)
return m.group(1).strip() if m else None
# Step 1: Make a request to the input URL
headers = {'Cookie': cookie_header} if cookie_header else {}
response = requests.get(url, headers=headers)
response.raise_for_status()
# Extract the JSON data embedded in the HTML
match = re.search(r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>', response.text, re.DOTALL)
if match:
json_data = match.group(1)
data = json.loads(json_data)
def extract_drm_token(text):
match = re.search(r'DRM Token:\s*([A-Za-z0-9_\-\.=]+)', text)
if match:
return match.group(1).strip()
return None
product_info = None
slug = url.split('/')[-1]
for item in data.get('props', {}).get('pageProps', {}).get('dehydratedState', {}).get('queries', []):
state = item.get('state', {})
if state:
episode_data = state.get('data', {})
if isinstance(episode_data, dict):
if episode_data.get('slug') == slug:
product_info = {
'productId': episode_data.get('productId'),
'guid': episode_data.get('guid')
}
break
def download_mpd(mpd_url, out_path="manifest.mpd"):
try:
r = requests.get(mpd_url, timeout=20)
r.raise_for_status()
Path(out_path).write_text(r.text, encoding='utf-8', errors='ignore')
return out_path
except requests.RequestException as e:
print("[!] Failed to download MPD:", e)
return None
def extract_pssh_blocks_from_mpd(path, max_len=200):
text = Path(path).read_text(encoding='utf-8', errors='ignore')
pattern = re.compile(r"<cenc:pssh>(.*?)</cenc:pssh>", re.DOTALL | re.IGNORECASE)
found = pattern.findall(text)
short = [f.strip() for f in found if len(f.strip()) <= max_len]
return short
def to_hex(val):
if isinstance(val, bytes):
return val.hex()
elif isinstance(val, str):
return val
else:
return str(val)
def kid_to_nodash_hex(kid_val):
if isinstance(kid_val, bytes):
return kid_val.hex()
if isinstance(kid_val, str):
return kid_val.replace("-", "").lower()
return str(kid_val)
def process_pssh_with_pywidevine(pssh_b64, provision_path, license_url):
try:
pssh = PSSH(pssh_b64)
device = Device.load(provision_path)
cdm = Cdm.from_device(device)
session_id = cdm.open()
challenge = cdm.get_license_challenge(session_id, pssh)
headers = {'Content-Type': 'application/octet-stream'}
resp = requests.post(license_url, data=challenge, headers=headers, timeout=30)
resp.raise_for_status()
cdm.parse_license(session_id, resp.content)
keys = cdm.get_keys(session_id)
if not keys:
print("[!] No keys returned.")
cdm.close(session_id)
return []
keypairs = []
for key in keys:
kid_raw = kid_to_nodash_hex(key.kid).lower()
key_hex = to_hex(key.key).lower()
kid_raw = kid_raw.replace("-", "")
output = f"{kid_raw}:{key_hex}"
if len(output) < 70:
print(output)
keypairs.append(output)
cdm.close(session_id)
return keypairs
except Exception:
traceback.print_exc()
return []
def main():
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <link>")
sys.exit(1)
link = sys.argv[1]
provision_path = "cdm.wvd"
if not Path(provision_path).is_file():
print(f"[!] Provisioning file '{provision_path}' not found.")
sys.exit(1)
out, err, code = run_npo_get_output(link)
if code != 0:
print("[!] NPO.py exited with code", code)
sys.exit(1)
combined = out + "\n" + err
mpd_url = extract_mpd_url(combined)
if not mpd_url:
print("[!] Could not find .mpd URL in output.")
sys.exit(1)
drm_token = extract_drm_token(combined)
if not drm_token:
print("[!] Could not find DRM token.")
sys.exit(1)
license_url = f"https://npo-drm-gateway.samgcloud.nepworldwide.nl/authentication?custom_data={drm_token}"
mpd_file = download_mpd(mpd_url)
if not mpd_file:
sys.exit(1)
psshs = extract_pssh_blocks_from_mpd(mpd_file, max_len=200)
try:
os.remove(mpd_file)
except Exception:
pass
if not psshs:
print("[!] No suitable PSSH blocks found.")
sys.exit(1)
all_keypairs = []
for pssh in psshs:
p_clean = "".join(pssh.split())
keys = process_pssh_with_pywidevine(p_clean, provision_path, license_url)
if not keys:
sys.exit(1)
all_keypairs.extend(keys)
script_dir = Path(sys.argv[0]).parent.resolve()
if sys.platform == "win32":
n_m3u8dl_re_path = script_dir / "N_m3u8DL-RE.exe"
mp4decrypt_path = script_dir / "mp4decrypt.exe"
else:
n_m3u8dl_re_path = script_dir / "N_m3u8DL-RE"
mp4decrypt_path = script_dir / "mp4decrypt"
# Build command for N_m3u8dl-re
cmd = [str(n_m3u8dl_re_path), mpd_url]
for keypair in all_keypairs:
cmd += ["--key", keypair]
# Append fixed options
cmd += ["--decryption-engine", "MP4DECRYPT"]
cmd += ["--decryption-binary-path", str(mp4decrypt_path)]
cmd += ["-sv", "best", "-sa", "best", "-M", "mkv"]
print("\n[+] Running command:")
print(" ".join(cmd))
# Run the command
try:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as e:
print("[!] N_m3u8dl-re failed:", e)
sys.exit(1)
if product_info:
# Step 2: Get JWT using the same cookies
token_url = f"https://npo.nl/start/api/domain/player-token?productId={product_info['productId']}"
token_response = requests.get(token_url, headers=headers)
token_response.raise_for_status()
jwt = token_response.json().get('jwt')
if jwt:
# Step 3: Make POST request to get stream link
post_headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36",
"Authorization": jwt,
"Content-Type": "application/json",
"Accept": "*/*",
"Referer": "https://npo.nl/",
'Cookie': cookie_header
}
body = {
"profileName": "dash",
"drmType": "widevine",
"referrerUrl": url,
"ster": {
"identifier": "npo-app-desktop",
"deviceType": 4,
"player": "web"
}
}
# Send the POST request to get the stream link
stream_response = requests.post("https://prod.npoplayer.nl/stream-link", headers=post_headers, json=body)
stream_response.raise_for_status()
# Step 4: Extract streams URL and drmToken
stream_data = stream_response.json().get('stream', {})
stream_url = stream_data.get('streamURL', "streamURL not found in response.")
drm_token = stream_data.get('drmToken', "drmToken not found in response.")
return (stream_url, drm_token) # Return both if needed
return "Product ID and GUID not found for the given slug."
return "JSON script not found in the response."
except requests.exceptions.RequestException as e:
return f"An error occurred while making the request: {str(e)}"
except json.JSONDecodeError:
return "Failed to decode JSON data."
return "Invalid URL. Please provide a URL that starts with 'https://npo.nl/start/afspelen/'."
if __name__ == "__main__":
main()
parser = argparse.ArgumentParser(description="Get the streaming URL from an NPO series page.")
parser.add_argument("url", type=str, help="The URL of the NPO series page.")
args = parser.parse_args()
stream_url_response = get_stream_url(args.url)
# Print the final result
if isinstance(stream_url_response, tuple):
print(f"Stream URL: {stream_url_response[0]}")
print(f"DRM Token: {stream_url_response[1]}")
else:
print(stream_url_response)