unshackle-services/NPO/__init__.py
adef b796a820b0 Update NPO/__init__.py
user experience improvements and some stability fixes
2026-01-11 11:19:31 +00:00

453 lines
21 KiB
Python

import json
import re
import time
import base64
import hashlib
from http.cookiejar import CookieJar
from typing import Optional
from langcodes import Language
import click
from collections.abc import Generator
from unshackle.core.search_result import SearchResult
from unshackle.core.constants import AnyTrack
from unshackle.core.credential import Credential
from unshackle.core.manifests import DASH
from unshackle.core.service import Service
from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T
from unshackle.core.tracks import Chapter, Tracks, Subtitle, Chapters
class NPO(Service):
"""
Service code for NPO Start (npo.nl)
Version: 1.1.0
Authorization: optional cookies (free/paid content supported)
Security: FHD @ L3
FHD @ SL3000
(Widevine and PlayReady support)
Supports:
• Series ↦ https://npo.nl/start/serie/{slug}
• Movies ↦ https://npo.nl/start/start/video/{slug}
Note: Movie inside a series can be downloaded as movie by converting URL to:
https://npo.nl/start/start/video/slug
To change between Widevine and Playready, you need to change the DrmType in config.yaml to either widevine or playready
"""
TITLE_RE = (
r"^(?:https?://(?:www\.)?npo\.nl/start/)?"
r"(?:(?P<type>video|serie|afspelen)/(?P<slug>[^/]+)"
r"(?:/(?P<path>.*))?)?$"
)
GEOFENCE = ("NL",)
NO_SUBTITLES = False
@staticmethod
@click.command(name="NPO", short_help="https://npo.nl")
@click.argument("title", type=str)
@click.pass_context
def cli(ctx, **kwargs):
return NPO(ctx, **kwargs)
def __init__(self, ctx, title: str):
super().__init__(ctx)
self.slug = None
self.kind = None
self.season_slug = None
self.episode_slug = None
m = re.match(self.TITLE_RE, title)
if not m:
self.search_term = title
return
self.slug = m.group("slug")
self.kind = m.group("type") or "video"
path = m.group("path") or ""
if self.kind == "afspelen":
self.kind = "video"
if "afleveringen" in path:
self.kind = "serie"
season_match = re.search(r"seizoen-([^/]+)", path)
if season_match:
self.season_slug = season_match.group(1)
episode_match = re.search(r"seizoen-([^/]+)/([^/]+)/afspelen", path)
if episode_match:
self.season_slug = episode_match.group(1)
self.episode_slug = episode_match.group(2)
self.original_title_url = title # Store the original URL for later use
if self.config is None:
raise EnvironmentError("Missing service config.")
# Construct X-Nos header
salt = int(time.time())
user_agent = f"nos;{salt};Google/Nexus;Android/6.0;nl.nos.app/5.1.1"
string_to_hash = f";UB}}7Gaji==JPHtjX3@c{user_agent}"
md5_hash = hashlib.md5(string_to_hash.encode('utf-8')).hexdigest()
xnos = md5_hash + base64.b64encode(user_agent.encode('utf-8')).decode('utf-8')
self.session.headers['X-Nos'] = xnos
# Store CDM reference
self.cdm = ctx.obj.cdm
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
super().authenticate(cookies, credential)
if not cookies:
self.log.info("No cookies, proceeding anonymously.")
return
token = next((c.value for c in cookies if c.name == "__Secure-next-auth.session-token"), None)
if not token:
self.log.info("No session token, proceeding unauthenticated.")
return
self.session.headers.update({
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) Firefox/143.0",
"Origin": "https://npo.nl",
"Referer": "https://npo.nl/",
})
r = self.session.get("https://npo.nl/start/api/domain/user-profiles", cookies=cookies)
if r.ok and isinstance(r.json(), list) and r.json():
self.log.info(f"NPO login OK, profiles: {[p['name'] for p in r.json()]}")
else:
self.log.warning("NPO auth check failed.")
def _fetch_next_data(self, slug: str, full_url: Optional[str] = None) -> dict:
"""Fetch and parse __NEXT_DATA__ from video/series page."""
if full_url:
url = full_url
else:
url = f"https://npo.nl/start/{'video' if self.kind == 'video' else 'serie'}/{slug}"
r = self.session.get(url)
r.raise_for_status()
match = re.search(r'<script id="__NEXT_DATA__" type="application/json">({.*?})</script>', r.text, re.DOTALL)
if not match:
raise RuntimeError("Failed to extract __NEXT_DATA__")
return json.loads(match.group(1))
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes:
license_url_base = self.config["endpoints"]["license"]
# Extract drmToken from track.data where the stream response was stored in get_tracks
npo_stream_data = track.data.get("npo_stream_data", {})
stream_details = npo_stream_data.get("stream", {})
drm_token = stream_details.get("drmToken") or stream_details.get("token")
if not drm_token:
raise ValueError("DRM token not found in title data for license request.")
# Construct the license_url with custom_data query parameter
license_url = f"{license_url_base}?custom_data={drm_token}"
# As per working DL.py script, only Content-Type is sent for license request
headers = {'Content-Type': 'application/octet-stream'}
self.log.debug(f"Requesting Widevine license from {license_url} (with custom_data) using minimal headers...")
# The challenge (Widevine PSSH) needs to be sent as the raw binary data.
r = self.session.post(license_url, data=challenge, headers=headers)
r.raise_for_status() # Raise an exception for HTTP errors
self.log.debug(f"Received Widevine license response (status: {r.status_code}, size: {len(r.content)} bytes)")
# The license response should be returned as raw bytes.
return r.content
def get_titles(self) -> Titles_T:
# Handle 'afspelen' URLs directly for specific episodes
if self.kind == "video" and not self.season_slug and not self.episode_slug and self.original_title_url:
try:
# Use the original URL to fetch __NEXT_DATA__
next_data = self._fetch_next_data(self.slug, full_url=self.original_title_url)
product_info = None
# Check the main program data in pageProps
page_props = next_data.get("props", {}).get("pageProps", {})
if page_props:
program_data = page_props.get("program", {})
if program_data and program_data.get("productId"):
product_info = program_data
else:
# Fallback for video data, if not found in program
video_data = page_props.get("video", {})
if video_data and video_data.get("productId"):
product_info = video_data
# Fallback to dehydrated state queries if not found in pageProps directly
if product_info is None:
queries = next_data.get("props", {}).get("pageProps", {}).get("dehydratedState", {}).get("queries", [])
for item in queries:
state = item.get("state", {})
if state:
episode_data = state.get('data', {})
if isinstance(episode_data, dict) and episode_data.get('productId'):
product_info = episode_data
break
if product_info and product_info.get("productId"):
# Check if it's part of a series
if product_info.get("series"):
season_number = product_info.get("season", {}).get("seasonKey")
if season_number is None and product_info.get("season", {}).get("slug"):
season_match = re.search(r"seizoen-(\d+)", product_info["season"]["slug"])
if season_match:
season_number = int(season_match.group(1))
return Series([
Episode(
id_=product_info["productId"],
service=self.__class__,
title=product_info["series"]["title"],
season=season_number,
number=product_info.get("programKey"),
name=product_info["title"],
description=(product_info.get("synopsis", {}) or {}).get("long", ""),
language=Language.get("nl"),
data=product_info,
)
])
else:
# It's a standalone movie/video
return Movies([
Movie(
id_=product_info["productId"],
service=self.__class__,
name=product_info["title"],
description=(product_info.get("synopsis", {}) or {}).get("long", ""),
year=(int(product_info["firstBroadcastDate"]) // 31536000 + 1970) if product_info.get("firstBroadcastDate") else None,
language=Language.get("nl"),
data=product_info,
)
])
except Exception as e:
self.log.debug(f"Direct __NEXT_DATA__ fetch for afspelen URL failed: {e}")
# Prioritize broadcast search for /afspelen/ URLs
if self.kind != 'serie' and not self.season_slug and not self.episode_slug:
search_url_broadcasts = f"https://npo.nl/start/api/domain/search-collection-items?searchType=broadcasts&searchQuery={self.slug}&subscriptionType=anonymous"
broadcast_data = self.session.get(search_url_broadcasts).json()
if broadcast_data.get("items"):
item_data = broadcast_data["items"][0]
# If the item has a 'series' key, it's an episode of a series
if item_data.get("series"):
season_number = item_data.get("season", {}).get("seasonKey")
if season_number is None and item_data.get("season", {}).get("slug"):
# Fallback: Extract season number from slug like "seizoen-5"
season_match = re.search(r"seizoen-(\d+)", item_data["season"]["slug"])
if season_match:
season_number = int(season_match.group(1))
return Series([
Episode(
id_=item_data["productId"],
service=self.__class__,
title=item_data["series"]["title"], # Use series title as main title
season=season_number,
number=item_data.get("programKey"),
name=item_data["title"], # Use episode title as episode name
description=(item_data.get("synopsis", {}) or {}).get("long", ""),
language=Language.get("nl"),
data=item_data,
)
])
else:
# Otherwise, it's a standalone movie
return Movies([
Movie(
id_=item_data["productId"],
service=self.__class__,
name=item_data["title"],
description=(item_data.get("synopsis", {}) or {}).get("long", ""),
year=(int(item_data["firstBroadcastDate"]) // 31536000 + 1970) if item_data.get("firstBroadcastDate") else None,
language=Language.get("nl"),
data=item_data,
)
])
# Fallback to series search if not an /afspelen/ single item or if season/episode slugs are present
search_url_series = f"https://npo.nl/start/api/domain/search-collection-items?searchType=series&searchQuery={self.slug}&subscriptionType=anonymous"
series_data = self.session.get(search_url_series).json()
if series_data.get("items"):
# It's a series
series_info = series_data["items"][0]
series_slug = series_info["slug"]
series_type = series_info["type"]
series_guid = series_info["guid"]
seasons_url = f"https://npo.nl/start/api/domain/series-seasons?slug={series_slug}&type={series_type}"
seasons_data = self.session.get(seasons_url).json()
episodes = []
for season in seasons_data:
if self.season_slug and str(season.get("seasonKey")) != self.season_slug and season.get('slug') != f'seizoen-{self.season_slug}':
continue
season_guid = season["guid"]
episodes_url = f"https://npo.nl/start/api/domain/programs-by-season?guid={season_guid}"
episodes_data = self.session.get(episodes_url).json()
for episode_data in episodes_data:
episodes.append(
Episode(
id_=episode_data["productId"],
service=self.__class__,
title=series_info["title"],
season=episode_data.get("season", {}).get("seasonKey"),
number=episode_data.get("programKey"),
name=episode_data["title"],
description=(episode_data.get("synopsis", {}) or {}).get("long", ""),
language=Language.get("nl"),
data=episode_data,
)
)
if self.episode_slug:
# Filter for the specific episode requested
filtered_episodes = [ep for ep in episodes if ep.data.get("slug") == self.episode_slug]
return Series(filtered_episodes)
else:
return Series(episodes)
# Fallback: If neither broadcast nor series search returned items,
# try to fetch __NEXT_DATA__ for the video page (assuming it's a movie/standalone video)
try:
# Ensure self.kind is set to 'video' for _fetch_next_data to construct the correct URL
original_kind = self.kind
self.kind = "video"
next_data = self._fetch_next_data(self.slug)
self.kind = original_kind # Restore original kind
# Try to find the product info in the dehydrated state
product_info = None
queries = next_data.get("props", {}).get("pageProps", {}).get("dehydratedState", {}).get("queries", [])
for item in queries:
state = item.get("state", {})
if state:
episode_data = state.get('data', {})
if isinstance(episode_data, dict):
# NPO.py uses slug to find, let's use it as well
if episode_data.get('slug') == self.slug:
product_info = episode_data
break
# Fallback if not found in dehydratedState queries (different Next.js version or structure)
if product_info is None:
page_props = next_data.get("props", {}).get("pageProps", {})
if page_props:
# Check for program data
program_data = page_props.get("program", {})
if program_data and program_data.get("slug") == self.slug:
product_info = program_data
else:
# Check for direct video data
video_data = page_props.get("video", {})
if video_data and video_data.get("slug") == self.slug:
product_info = video_data
if product_info and product_info.get("productId"):
# If it has 'series' key, it's likely a series episode, not a standalone movie
if not product_info.get("series"):
return Movies([
Movie(
id_=product_info["productId"],
service=self.__class__,
name=product_info.get("title", self.slug), # Use slug as fallback title
description=product_info.get("synopsis", {}).get("long", ""),
year=(int(product_info["firstBroadcastDate"]) // 31536000 + 1970) if product_info.get("firstBroadcastDate") else None,
language=Language.get("nl"), # NPO is Dutch
data=product_info,
)
])
else:
self.log.debug(f"Content for {self.slug} identified as a series episode via __NEXT_DATA__ fallback, not a standalone movie.")
# If it's a series episode, we don't want to treat it as a movie here.
# The series search path should handle it, or this fallback should be for strict movies.
# For now, let's return empty if it's a series episode.
return []
except Exception as e:
self.log.debug(f"Fallback to __NEXT_DATA__ for video failed: {e}")
# If neither broadcast, series, nor __NEXT_DATA__ fallback returned items, return an empty list
return []
def get_chapters(self, title: Title_T) -> Chapters:
return []
def get_tracks(self, title: Title_T) -> Tracks:
product_id = title.data.get("productId")
if not product_id:
raise ValueError("no productId detected.")
token_url = self.config["endpoints"]["player_token"].format(product_id=product_id)
r_tok = self.session.get(token_url, headers={"Referer": f"https://npo.nl/start/video/{self.slug}"})
r_tok.raise_for_status()
jwt = r_tok.json()["jwt"]
# Request stream
r_stream = self.session.post(
self.config["endpoints"]["streams"],
json={
"profileName": "dash",
"drmType": self.config["DrmType"],
"referrerUrl": f"https://npo.nl/start/video/{self.slug}",
"ster": {"identifier": "npo-app-desktop", "deviceType": 4, "player": "web"},
},
headers={
"Authorization": jwt,
"Content-Type": "application/json",
"Origin": "https://npo.nl",
"Referer": f"https://npo.nl/start/video/{self.slug}",
},
)
r_stream.raise_for_status()
data = r_stream.json()
if "error" in data:
raise PermissionError(f"Stream error: {data['error']}")
stream = data["stream"]
manifest_url = stream.get("streamURL") or stream.get("url")
if not manifest_url:
raise ValueError("No stream URL in response")
# Parse DASH
tracks = DASH.from_url(manifest_url, session=self.session).to_tracks(language=title.language)
# Store the entire stream response data into track.data so it's accessible later by get_widevine_license
for tr in tracks:
tr.data["npo_stream_data"] = data # Always store stream data for all tracks
# HACK: NPO reports some Dutch audio tracks as English for older content.
# If the title language is Dutch, assume any English audio tracks are also Dutch.
if title.language == Language.get("nl"):
for track in tracks.audio:
if track.language == Language.get("en"):
self.log.debug("Correcting 'en' audio track to 'nl' for Dutch title.")
track.language = Language.get("nl")
# Subtitles
subtitles = []
for sub in (data.get("assets", {}) or {}).get("subtitle", []):
if sub["format"] == "webvtt":
subtitles.append(Subtitle(url=sub["url"], language=Language.get(sub["lang"])))
else:
self.log.warning(f"Unsupported subtitle format: {sub['format']}")
if not self.NO_SUBTITLES:
tracks.subtitles.extend(subtitles)
return tracks