From b3e4842a6d6b005a702f273d7a131c760cf429d4 Mon Sep 17 00:00:00 2001 From: adef17286-sudo Date: Wed, 17 Dec 2025 17:50:49 +0000 Subject: [PATCH] NPO_win_en_linux --- NPO.py | 291 ++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 184 insertions(+), 107 deletions(-) diff --git a/NPO.py b/NPO.py index 62bda20..c268cef 100644 --- a/NPO.py +++ b/NPO.py @@ -1,122 +1,199 @@ -import requests +#!/usr/bin/env python3 + +import subprocess import re -import json -import argparse +import requests +from pathlib import Path import os +import sys +import traceback -def load_cookies(cookie_file): - """Load cookies from a Netscape format cookie file and return a Cookie header string.""" - cookie_header = [] +from pywidevine.cdm import Cdm +from pywidevine.device import Device +from pywidevine.pssh import PSSH + + +def run_npo_get_output(link): try: - with open(cookie_file, 'r') as f: - for line in f: - if line.startswith('#') or not line.strip(): - continue # Skip comments and empty lines - parts = line.strip().split('\t') - if len(parts) >= 7: - domain = parts[0] - if domain.startswith('.'): - domain = domain[1:] # Remove leading dot - - cookie_name = parts[5] - cookie_value = parts[6] - cookie_header.append(f"{cookie_name}={cookie_value}") + script_dir = Path(sys.argv[0]).parent.resolve() + npo_path = script_dir / "NPO.py" + p = subprocess.run([sys.executable, str(npo_path), link], + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + text=True) + return p.stdout or "", p.stderr or "", p.returncode except Exception as e: - print(f"Error loading cookies: {str(e)}") - return '; '.join(cookie_header) + print("[!] Failed to run NPO:", e) + return "", str(e), 1 -def get_stream_url(url): - # Validate the new URL structure - if url.startswith("https://npo.nl/start/afspelen/"): - try: - # Load cookies from cookies.txt if it exists - cookie_file = 'cookies.txt' - cookie_header = load_cookies(cookie_file) if os.path.exists(cookie_file) else None - # Step 1: Make a request to the input URL - headers = {'Cookie': cookie_header} if cookie_header else {} - response = requests.get(url, headers=headers) - response.raise_for_status() +def extract_mpd_url(text): + m = re.search(r'(https?://[^\s\'"]+?\.mpd(?:[^\s\'"]*)?)', text, re.IGNORECASE) + return m.group(1).strip() if m else None - # Extract the JSON data embedded in the HTML - match = re.search(r'', response.text, re.DOTALL) - if match: - json_data = match.group(1) - data = json.loads(json_data) - product_info = None - slug = url.split('/')[-1] +def extract_drm_token(text): + match = re.search(r'DRM Token:\s*([A-Za-z0-9_\-\.=]+)', text) + if match: + return match.group(1).strip() + return None - for item in data.get('props', {}).get('pageProps', {}).get('dehydratedState', {}).get('queries', []): - state = item.get('state', {}) - if state: - episode_data = state.get('data', {}) - if isinstance(episode_data, dict): - if episode_data.get('slug') == slug: - product_info = { - 'productId': episode_data.get('productId'), - 'guid': episode_data.get('guid') - } - break - if product_info: - # Step 2: Get JWT using the same cookies - token_url = f"https://npo.nl/start/api/domain/player-token?productId={product_info['productId']}" - token_response = requests.get(token_url, headers=headers) - token_response.raise_for_status() - jwt = token_response.json().get('jwt') +def download_mpd(mpd_url, out_path="manifest.mpd"): + try: + r = requests.get(mpd_url, timeout=20) + r.raise_for_status() + Path(out_path).write_text(r.text, encoding='utf-8', errors='ignore') + return out_path + except requests.RequestException as e: + print("[!] Failed to download MPD:", e) + return None + + +def extract_pssh_blocks_from_mpd(path, max_len=200): + text = Path(path).read_text(encoding='utf-8', errors='ignore') + pattern = re.compile(r"(.*?)", re.DOTALL | re.IGNORECASE) + found = pattern.findall(text) + short = [f.strip() for f in found if len(f.strip()) <= max_len] + return short + + +def to_hex(val): + if isinstance(val, bytes): + return val.hex() + elif isinstance(val, str): + return val + else: + return str(val) + + +def kid_to_nodash_hex(kid_val): + if isinstance(kid_val, bytes): + return kid_val.hex() + if isinstance(kid_val, str): + return kid_val.replace("-", "").lower() + return str(kid_val) + + +def process_pssh_with_pywidevine(pssh_b64, provision_path, license_url): + try: + pssh = PSSH(pssh_b64) + device = Device.load(provision_path) + cdm = Cdm.from_device(device) + session_id = cdm.open() + + challenge = cdm.get_license_challenge(session_id, pssh) + + headers = {'Content-Type': 'application/octet-stream'} + resp = requests.post(license_url, data=challenge, headers=headers, timeout=30) + resp.raise_for_status() + + cdm.parse_license(session_id, resp.content) + + keys = cdm.get_keys(session_id) + if not keys: + print("[!] No keys returned.") + cdm.close(session_id) + return [] + + keypairs = [] + for key in keys: + kid_raw = kid_to_nodash_hex(key.kid).lower() + key_hex = to_hex(key.key).lower() + kid_raw = kid_raw.replace("-", "") + output = f"{kid_raw}:{key_hex}" + if len(output) < 70: + print(output) + keypairs.append(output) + + cdm.close(session_id) + return keypairs + + except Exception: + traceback.print_exc() + return [] + + +def main(): + if len(sys.argv) < 2: + print(f"Usage: {sys.argv[0]} ") + sys.exit(1) + + link = sys.argv[1] + provision_path = "cdm.wvd" + if not Path(provision_path).is_file(): + print(f"[!] Provisioning file '{provision_path}' not found.") + sys.exit(1) + + out, err, code = run_npo_get_output(link) + if code != 0: + print("[!] NPO.py exited with code", code) + sys.exit(1) + + combined = out + "\n" + err + mpd_url = extract_mpd_url(combined) + if not mpd_url: + print("[!] Could not find .mpd URL in output.") + sys.exit(1) + + drm_token = extract_drm_token(combined) + if not drm_token: + print("[!] Could not find DRM token.") + sys.exit(1) + + license_url = f"https://npo-drm-gateway.samgcloud.nepworldwide.nl/authentication?custom_data={drm_token}" + + mpd_file = download_mpd(mpd_url) + if not mpd_file: + sys.exit(1) + + psshs = extract_pssh_blocks_from_mpd(mpd_file, max_len=200) + + try: + os.remove(mpd_file) + except Exception: + pass + + if not psshs: + print("[!] No suitable PSSH blocks found.") + sys.exit(1) + + all_keypairs = [] + for pssh in psshs: + p_clean = "".join(pssh.split()) + keys = process_pssh_with_pywidevine(p_clean, provision_path, license_url) + if not keys: + sys.exit(1) + all_keypairs.extend(keys) + + script_dir = Path(sys.argv[0]).parent.resolve() + if sys.platform == "win32": + n_m3u8dl_re_path = script_dir / "N_m3u8DL-RE.exe" + mp4decrypt_path = script_dir / "mp4decrypt.exe" + else: + n_m3u8dl_re_path = script_dir / "N_m3u8DL-RE" + mp4decrypt_path = script_dir / "mp4decrypt" + + # Build command for N_m3u8dl-re + cmd = [str(n_m3u8dl_re_path), mpd_url] + for keypair in all_keypairs: + cmd += ["--key", keypair] + + # Append fixed options + cmd += ["--decryption-engine", "MP4DECRYPT"] + cmd += ["--decryption-binary-path", str(mp4decrypt_path)] + cmd += ["-sv", "best", "-sa", "best", "-M", "mkv"] + + print("\n[+] Running command:") + print(" ".join(cmd)) + + # Run the command + try: + subprocess.run(cmd, check=True) + except subprocess.CalledProcessError as e: + print("[!] N_m3u8dl-re failed:", e) + sys.exit(1) - if jwt: - # Step 3: Make POST request to get stream link - post_headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36", - "Authorization": jwt, - "Content-Type": "application/json", - "Accept": "*/*", - "Referer": "https://npo.nl/", - 'Cookie': cookie_header - } - - body = { - "profileName": "dash", - "drmType": "widevine", - "referrerUrl": url, - "ster": { - "identifier": "npo-app-desktop", - "deviceType": 4, - "player": "web" - } - } - - # Send the POST request to get the stream link - stream_response = requests.post("https://prod.npoplayer.nl/stream-link", headers=post_headers, json=body) - stream_response.raise_for_status() - - # Step 4: Extract streams URL and drmToken - stream_data = stream_response.json().get('stream', {}) - stream_url = stream_data.get('streamURL', "streamURL not found in response.") - drm_token = stream_data.get('drmToken', "drmToken not found in response.") - - return (stream_url, drm_token) # Return both if needed - - return "Product ID and GUID not found for the given slug." - return "JSON script not found in the response." - except requests.exceptions.RequestException as e: - return f"An error occurred while making the request: {str(e)}" - except json.JSONDecodeError: - return "Failed to decode JSON data." - return "Invalid URL. Please provide a URL that starts with 'https://npo.nl/start/afspelen/'." if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Get the streaming URL from an NPO series page.") - parser.add_argument("url", type=str, help="The URL of the NPO series page.") - args = parser.parse_args() + main() - stream_url_response = get_stream_url(args.url) - - # Print the final result - if isinstance(stream_url_response, tuple): - print(f"Stream URL: {stream_url_response[0]}") - print(f"DRM Token: {stream_url_response[1]}") - else: - print(stream_url_response)