diff --git a/NPO/__init__.py b/NPO/__init__.py index 98c9b74..261efcf 100644 --- a/NPO/__init__.py +++ b/NPO/__init__.py @@ -1,5 +1,8 @@ import json import re +import time +import base64 +import hashlib from http.cookiejar import CookieJar from typing import Optional from langcodes import Language @@ -12,7 +15,7 @@ from unshackle.core.credential import Credential from unshackle.core.manifests import DASH from unshackle.core.service import Service from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T -from unshackle.core.tracks import Chapter, Tracks, Subtitle +from unshackle.core.tracks import Chapter, Tracks, Subtitle, Chapters class NPO(Service): @@ -27,19 +30,18 @@ class NPO(Service): Supports: • Series ↦ https://npo.nl/start/serie/{slug} - • Movies ↦ https://npo.nl/start/video/{slug} + • Movies ↦ https://npo.nl/start/start/video/{slug} Note: Movie inside a series can be downloaded as movie by converting URL to: - https://npo.nl/start/video/slug + https://npo.nl/start/start/video/slug To change between Widevine and Playready, you need to change the DrmType in config.yaml to either widevine or playready """ TITLE_RE = ( r"^(?:https?://(?:www\.)?npo\.nl/start/)?" - r"(?:(?Pvideo|serie)/(?P[^/]+)" - r"(?:/afleveringen)?" - r"(?:/seizoen-(?P[^/]+)/(?P[^/]+)/afspelen)?)?$" + r"(?:(?Pvideo|serie|afspelen)/(?P[^/]+)" + r"(?:/(?P.*))?)?$" ) GEOFENCE = ("NL",) NO_SUBTITLES = False @@ -54,6 +56,11 @@ class NPO(Service): def __init__(self, ctx, title: str): super().__init__(ctx) + self.slug = None + self.kind = None + self.season_slug = None + self.episode_slug = None + m = re.match(self.TITLE_RE, title) if not m: self.search_term = title @@ -61,12 +68,35 @@ class NPO(Service): self.slug = m.group("slug") self.kind = m.group("type") or "video" - self.season_slug = m.group("season") - self.episode_slug = m.group("episode") + path = m.group("path") or "" + + if self.kind == "afspelen": + self.kind = "video" + + if "afleveringen" in path: + self.kind = "serie" + season_match = re.search(r"seizoen-([^/]+)", path) + if season_match: + self.season_slug = season_match.group(1) + + episode_match = re.search(r"seizoen-([^/]+)/([^/]+)/afspelen", path) + if episode_match: + self.season_slug = episode_match.group(1) + self.episode_slug = episode_match.group(2) + + self.original_title_url = title # Store the original URL for later use if self.config is None: raise EnvironmentError("Missing service config.") + # Construct X-Nos header + salt = int(time.time()) + user_agent = f"nos;{salt};Google/Nexus;Android/6.0;nl.nos.app/5.1.1" + string_to_hash = f";UB}}7Gaji==JPHtjX3@c{user_agent}" + md5_hash = hashlib.md5(string_to_hash.encode('utf-8')).hexdigest() + xnos = md5_hash + base64.b64encode(user_agent.encode('utf-8')).decode('utf-8') + self.session.headers['X-Nos'] = xnos + # Store CDM reference self.cdm = ctx.obj.cdm @@ -93,9 +123,12 @@ class NPO(Service): else: self.log.warning("NPO auth check failed.") - def _fetch_next_data(self, slug: str) -> dict: + def _fetch_next_data(self, slug: str, full_url: Optional[str] = None) -> dict: """Fetch and parse __NEXT_DATA__ from video/series page.""" - url = f"https://npo.nl/start/{'video' if self.kind == 'video' else 'serie'}/{slug}" + if full_url: + url = full_url + else: + url = f"https://npo.nl/start/{'video' if self.kind == 'video' else 'serie'}/{slug}" r = self.session.get(url) r.raise_for_status() match = re.search(r'', r.text, re.DOTALL) @@ -103,58 +136,256 @@ class NPO(Service): raise RuntimeError("Failed to extract __NEXT_DATA__") return json.loads(match.group(1)) + def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes: + license_url_base = self.config["endpoints"]["license"] + # Extract drmToken from track.data where the stream response was stored in get_tracks + npo_stream_data = track.data.get("npo_stream_data", {}) + stream_details = npo_stream_data.get("stream", {}) + drm_token = stream_details.get("drmToken") or stream_details.get("token") + + if not drm_token: + raise ValueError("DRM token not found in title data for license request.") + + # Construct the license_url with custom_data query parameter + license_url = f"{license_url_base}?custom_data={drm_token}" + + # As per working DL.py script, only Content-Type is sent for license request + headers = {'Content-Type': 'application/octet-stream'} + + self.log.debug(f"Requesting Widevine license from {license_url} (with custom_data) using minimal headers...") + + # The challenge (Widevine PSSH) needs to be sent as the raw binary data. + r = self.session.post(license_url, data=challenge, headers=headers) + r.raise_for_status() # Raise an exception for HTTP errors + + self.log.debug(f"Received Widevine license response (status: {r.status_code}, size: {len(r.content)} bytes)") + + # The license response should be returned as raw bytes. + return r.content + def get_titles(self) -> Titles_T: - next_data = self._fetch_next_data(self.slug) - build_id = next_data["buildId"] # keep if needed elsewhere + # Handle 'afspelen' URLs directly for specific episodes + if self.kind == "video" and not self.season_slug and not self.episode_slug and self.original_title_url: + try: + # Use the original URL to fetch __NEXT_DATA__ + next_data = self._fetch_next_data(self.slug, full_url=self.original_title_url) - page_props = next_data["props"]["pageProps"] - queries = page_props["dehydratedState"]["queries"] + product_info = None + # Check the main program data in pageProps + page_props = next_data.get("props", {}).get("pageProps", {}) + if page_props: + program_data = page_props.get("program", {}) + if program_data and program_data.get("productId"): + product_info = program_data + else: + # Fallback for video data, if not found in program + video_data = page_props.get("video", {}) + if video_data and video_data.get("productId"): + product_info = video_data - def get_data(fragment: str): - return next((q["state"]["data"] for q in queries if fragment in str(q.get("queryKey", ""))), None) + # Fallback to dehydrated state queries if not found in pageProps directly + if product_info is None: + queries = next_data.get("props", {}).get("pageProps", {}).get("dehydratedState", {}).get("queries", []) + for item in queries: + state = item.get("state", {}) + if state: + episode_data = state.get('data', {}) + if isinstance(episode_data, dict) and episode_data.get('productId'): + product_info = episode_data + break - if self.kind == "serie": - series_data = get_data("series:detail-") - if not series_data: - raise ValueError("Series metadata not found") + if product_info and product_info.get("productId"): + # Check if it's part of a series + if product_info.get("series"): + season_number = product_info.get("season", {}).get("seasonKey") + if season_number is None and product_info.get("season", {}).get("slug"): + season_match = re.search(r"seizoen-(\d+)", product_info["season"]["slug"]) + if season_match: + season_number = int(season_match.group(1)) + return Series([ + Episode( + id_=product_info["productId"], + service=self.__class__, + title=product_info["series"]["title"], + season=season_number, + number=product_info.get("programKey"), + name=product_info["title"], + description=(product_info.get("synopsis", {}) or {}).get("long", ""), + language=Language.get("nl"), + data=product_info, + ) + ]) + else: + # It's a standalone movie/video + return Movies([ + Movie( + id_=product_info["productId"], + service=self.__class__, + name=product_info["title"], + description=(product_info.get("synopsis", {}) or {}).get("long", ""), + year=(int(product_info["firstBroadcastDate"]) // 31536000 + 1970) if product_info.get("firstBroadcastDate") else None, + language=Language.get("nl"), + data=product_info, + ) + ]) + except Exception as e: + self.log.debug(f"Direct __NEXT_DATA__ fetch for afspelen URL failed: {e}") + + # Prioritize broadcast search for /afspelen/ URLs + if self.kind != 'serie' and not self.season_slug and not self.episode_slug: + search_url_broadcasts = f"https://npo.nl/start/api/domain/search-collection-items?searchType=broadcasts&searchQuery={self.slug}&subscriptionType=anonymous" + broadcast_data = self.session.get(search_url_broadcasts).json() + + if broadcast_data.get("items"): + item_data = broadcast_data["items"][0] + + # If the item has a 'series' key, it's an episode of a series + if item_data.get("series"): + season_number = item_data.get("season", {}).get("seasonKey") + if season_number is None and item_data.get("season", {}).get("slug"): + # Fallback: Extract season number from slug like "seizoen-5" + season_match = re.search(r"seizoen-(\d+)", item_data["season"]["slug"]) + if season_match: + season_number = int(season_match.group(1)) + + return Series([ + Episode( + id_=item_data["productId"], + service=self.__class__, + title=item_data["series"]["title"], # Use series title as main title + season=season_number, + number=item_data.get("programKey"), + name=item_data["title"], # Use episode title as episode name + description=(item_data.get("synopsis", {}) or {}).get("long", ""), + language=Language.get("nl"), + data=item_data, + ) + ]) + else: + # Otherwise, it's a standalone movie + return Movies([ + Movie( + id_=item_data["productId"], + service=self.__class__, + name=item_data["title"], + description=(item_data.get("synopsis", {}) or {}).get("long", ""), + year=(int(item_data["firstBroadcastDate"]) // 31536000 + 1970) if item_data.get("firstBroadcastDate") else None, + language=Language.get("nl"), + data=item_data, + ) + ]) + + # Fallback to series search if not an /afspelen/ single item or if season/episode slugs are present + search_url_series = f"https://npo.nl/start/api/domain/search-collection-items?searchType=series&searchQuery={self.slug}&subscriptionType=anonymous" + series_data = self.session.get(search_url_series).json() + + if series_data.get("items"): + # It's a series + series_info = series_data["items"][0] + series_slug = series_info["slug"] + series_type = series_info["type"] + series_guid = series_info["guid"] + + seasons_url = f"https://npo.nl/start/api/domain/series-seasons?slug={series_slug}&type={series_type}" + seasons_data = self.session.get(seasons_url).json() + episodes = [] - seasons = get_data("series:seasons-") or [] - for season in seasons: - eps = get_data(f"programs:season-{season['guid']}") or [] - for e in eps: + for season in seasons_data: + if self.season_slug and str(season.get("seasonKey")) != self.season_slug and season.get('slug') != f'seizoen-{self.season_slug}': + continue + + season_guid = season["guid"] + episodes_url = f"https://npo.nl/start/api/domain/programs-by-season?guid={season_guid}" + episodes_data = self.session.get(episodes_url).json() + + for episode_data in episodes_data: episodes.append( Episode( - id_=e["guid"], + id_=episode_data["productId"], service=self.__class__, - title=series_data["title"], - season=int(season["seasonKey"]), - number=int(e["programKey"]), - name=e["title"], - description=(e.get("synopsis", {}) or {}).get("long", ""), + title=series_info["title"], + season=episode_data.get("season", {}).get("seasonKey"), + number=episode_data.get("programKey"), + name=episode_data["title"], + description=(episode_data.get("synopsis", {}) or {}).get("long", ""), language=Language.get("nl"), - data=e, + data=episode_data, ) ) - return Series(episodes) + + if self.episode_slug: + # Filter for the specific episode requested + filtered_episodes = [ep for ep in episodes if ep.data.get("slug") == self.episode_slug] + return Series(filtered_episodes) + else: + return Series(episodes) + + # Fallback: If neither broadcast nor series search returned items, + # try to fetch __NEXT_DATA__ for the video page (assuming it's a movie/standalone video) + try: + # Ensure self.kind is set to 'video' for _fetch_next_data to construct the correct URL + original_kind = self.kind + self.kind = "video" + next_data = self._fetch_next_data(self.slug) + self.kind = original_kind # Restore original kind - # Movie - item = get_data("program:detail-") or queries[0]["state"]["data"] - synopsis = item.get("synopsis", {}) - desc = synopsis.get("long") or synopsis.get("short", "") if isinstance(synopsis, dict) else str(synopsis) - year = (int(item["firstBroadcastDate"]) // 31536000 + 1970) if item.get("firstBroadcastDate") else None + # Try to find the product info in the dehydrated state + product_info = None + queries = next_data.get("props", {}).get("pageProps", {}).get("dehydratedState", {}).get("queries", []) + for item in queries: + state = item.get("state", {}) + if state: + episode_data = state.get('data', {}) + if isinstance(episode_data, dict): + # NPO.py uses slug to find, let's use it as well + if episode_data.get('slug') == self.slug: + product_info = episode_data + break + + # Fallback if not found in dehydratedState queries (different Next.js version or structure) + if product_info is None: + page_props = next_data.get("props", {}).get("pageProps", {}) + if page_props: + # Check for program data + program_data = page_props.get("program", {}) + if program_data and program_data.get("slug") == self.slug: + product_info = program_data + else: + # Check for direct video data + video_data = page_props.get("video", {}) + if video_data and video_data.get("slug") == self.slug: + product_info = video_data - return Movies([ - Movie( - id_=item["guid"], - service=self.__class__, - name=item["title"], - description=desc, - year=year, - language=Language.get("nl"), - data=item, - ) - ]) + + if product_info and product_info.get("productId"): + # If it has 'series' key, it's likely a series episode, not a standalone movie + if not product_info.get("series"): + return Movies([ + Movie( + id_=product_info["productId"], + service=self.__class__, + name=product_info.get("title", self.slug), # Use slug as fallback title + description=product_info.get("synopsis", {}).get("long", ""), + year=(int(product_info["firstBroadcastDate"]) // 31536000 + 1970) if product_info.get("firstBroadcastDate") else None, + language=Language.get("nl"), # NPO is Dutch + data=product_info, + ) + ]) + else: + self.log.debug(f"Content for {self.slug} identified as a series episode via __NEXT_DATA__ fallback, not a standalone movie.") + # If it's a series episode, we don't want to treat it as a movie here. + # The series search path should handle it, or this fallback should be for strict movies. + # For now, let's return empty if it's a series episode. + return [] + except Exception as e: + self.log.debug(f"Fallback to __NEXT_DATA__ for video failed: {e}") + + # If neither broadcast, series, nor __NEXT_DATA__ fallback returned items, return an empty list + return [] + + def get_chapters(self, title: Title_T) -> Chapters: + return [] def get_tracks(self, title: Title_T) -> Tracks: product_id = title.data.get("productId") @@ -193,119 +424,30 @@ class NPO(Service): if not manifest_url: raise ValueError("No stream URL in response") - is_unencrypted = "unencrypted" in manifest_url.lower() or not any(k in stream for k in ["drmToken", "token"]) - # Parse DASH tracks = DASH.from_url(manifest_url, session=self.session).to_tracks(language=title.language) + # Store the entire stream response data into track.data so it's accessible later by get_widevine_license + for tr in tracks: + tr.data["npo_stream_data"] = data # Always store stream data for all tracks + + # HACK: NPO reports some Dutch audio tracks as English for older content. + # If the title language is Dutch, assume any English audio tracks are also Dutch. + if title.language == Language.get("nl"): + for track in tracks.audio: + if track.language == Language.get("en"): + self.log.debug("Correcting 'en' audio track to 'nl' for Dutch title.") + track.language = Language.get("nl") + # Subtitles subtitles = [] - for sub in (data.get("assets", {}) or {}).get("subtitles", []) or []: - if not isinstance(sub, dict): - continue - lang = sub.get("iso", "und") - location = sub.get("location") - if not location: - continue # skip if no URL provided - subtitles.append( - Subtitle( - id_=sub.get("name", lang), - url=location.strip(), - language=Language.get(lang), - is_original_lang=lang == "nl", - codec=Subtitle.Codec.WebVTT, - name=sub.get("name", "Unknown"), - forced=False, - sdh=False, - ) - ) - tracks.subtitles = subtitles - - # DRM - if is_unencrypted: - for tr in tracks.videos + tracks.audio: - if hasattr(tr, "drm") and tr.drm: - tr.drm.clear() - else: - self.drm_token = stream.get("drmToken") or stream.get("token") or stream.get("drm_token") - if not self.drm_token: - raise ValueError(f"No DRM token found. Available keys: {list(stream.keys())}") - - for tr in tracks.videos + tracks.audio: - if getattr(tr, "drm", None): - if drm_type == "playready": - tr.drm.license = lambda challenge, **kw: self.get_playready_license( - challenge=challenge, title=title, track=tr - ) - else: - tr.drm.license = lambda challenge, **kw: self.get_widevine_license( - challenge=challenge, title=title, track=tr - ) - - return tracks - - def get_chapters(self, title: Title_T) -> list[Chapter]: - return [] - - def get_widevine_license(self, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes: - if not self.drm_token: - raise ValueError("DRM token not set, login or paid content may be required.") - r = self.session.post( - self.config["endpoints"]["license"], - params={"custom_data": self.drm_token}, - data=challenge, - ) - r.raise_for_status() - return r.content - - def get_playready_license(self, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes: - if not self.drm_token: - raise ValueError("DRM token not set, login or paid content may be required.") - headers = { - "Content-Type": "text/xml; charset=utf-8", - "SOAPAction": "http://schemas.microsoft.com/DRM/2007/03/protocols/AcquireLicense", - "Origin": "https://npo.nl", - "Referer": "https://npo.nl/", - "User-Agent": ( - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " - "AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/141.0.0.0 Safari/537.36 Edg/141.0.0.0" - ), - } - r = self.session.post( - self.config["endpoints"]["license"], - params={"custom_data": self.drm_token}, - data=challenge, - headers=headers, - ) - r.raise_for_status() - return r.content - - def search(self) -> Generator[SearchResult, None, None]: - query = getattr(self, "search_term", None) or getattr(self, "title", None) - search = self.session.get( - url=self.config["endpoints"]["search"], - params={ - "searchQuery": query, # always use the correct attribute - "searchType": "series", - "subscriptionType": "premium", - "includePremiumContent": "true", - }, - headers={ - "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:143.0) Gecko/20100101 Firefox/143.0", - "Accept": "application/json, text/plain, */*", - "Origin": "https://npo.nl", - "Referer": f"https://npo.nl/start/zoeken?zoekTerm={query}", - } - ).json() - for result in search.get("items", []): - yield SearchResult( - id_=result.get("guid"), - title=result.get("title"), - label=result.get("type", "SERIES").upper() if result.get("type") else "SERIES", - url=f"https://npo.nl/start/serie/{result.get('slug')}" if result.get("type") == "timeless_series" else - f"https://npo.nl/start/video/{result.get('slug')}" - ) - + for sub in (data.get("assets", {}) or {}).get("subtitle", []): + if sub["format"] == "webvtt": + subtitles.append(Subtitle(url=sub["url"], language=Language.get(sub["lang"]))) + else: + self.log.warning(f"Unsupported subtitle format: {sub['format']}") + if not self.NO_SUBTITLES: + tracks.subtitles.extend(subtitles) + return tracks \ No newline at end of file