192 lines
7.0 KiB
Python
192 lines
7.0 KiB
Python
import json
|
|
import re
|
|
from typing import Optional
|
|
from http.cookiejar import CookieJar
|
|
from langcodes import Language
|
|
import click
|
|
|
|
from unshackle.core.constants import AnyTrack
|
|
from unshackle.core.credential import Credential
|
|
from unshackle.core.manifests import DASH
|
|
from unshackle.core.service import Service
|
|
from unshackle.core.titles import Movie, Movies, Title_T, Titles_T
|
|
from unshackle.core.tracks import Tracks
|
|
|
|
|
|
class PTHS(Service):
|
|
"""
|
|
Service code for Pathé Thuis (pathe-thuis.nl)
|
|
Version: 1.1.0 (PlayReady Support Added)
|
|
|
|
Security: SD/FHD @ L1/L3 (Widevine)
|
|
SD/FHD @ SL2K/SL3K (Playready)
|
|
Authorization: Cookies with authenticationToken + XSRF-TOKEN
|
|
|
|
Supported:
|
|
• Movies → https://www.pathe-thuis.nl/film/{id}
|
|
|
|
Note:
|
|
Pathé Thuis does not have episodic content, only movies.
|
|
Subtitles are hardcoded here so yeah I can't do anything about it
|
|
The quality is depend on what you rented for, is it SD or HD?
|
|
"""
|
|
|
|
TITLE_RE = (
|
|
r"^(?:https?://(?:www\.)?pathe-thuis\.nl/film/)?(?P<id>\d+)(?:/[^/]+)?$"
|
|
)
|
|
GEOFENCE = ("NL",)
|
|
NO_SUBTITLES = True
|
|
|
|
@staticmethod
|
|
@click.command(name="PTHS", short_help="https://www.pathe-thuis.nl")
|
|
@click.argument("title", type=str)
|
|
@click.pass_context
|
|
def cli(ctx, **kwargs):
|
|
return PTHS(ctx, **kwargs)
|
|
|
|
def __init__(self, ctx, title: str):
|
|
super().__init__(ctx)
|
|
m = re.match(self.TITLE_RE, title)
|
|
if not m:
|
|
raise ValueError(
|
|
f"Unsupported Pathé Thuis URL or ID: {title}\n"
|
|
"Use e.g. https://www.pathe-thuis.nl/film/30591"
|
|
)
|
|
self.movie_id = m.group("id")
|
|
self.drm_token = None
|
|
self.license_url = None
|
|
if self.config is None:
|
|
raise EnvironmentError("Missing service config for Pathé Thuis.")
|
|
|
|
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
|
|
super().authenticate(cookies, credential)
|
|
|
|
if not cookies:
|
|
self.log.warning("No cookies provided, proceeding unauthenticated.")
|
|
return
|
|
|
|
# Extract critical cookies
|
|
auth_token = next((c.value for c in cookies if c.name == "authenticationToken"), None)
|
|
xsrf_token = next((c.value for c in cookies if c.name == "XSRF-TOKEN"), None)
|
|
|
|
if not auth_token:
|
|
self.log.info("No authenticationToken cookie found, unauthenticated mode.")
|
|
return
|
|
|
|
headers = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36 Edg/144.0.0.0",
|
|
"X-Pathe-Device-Identifier": "web-1",
|
|
"X-Pathe-Auth-Session-Token": auth_token,
|
|
}
|
|
|
|
if xsrf_token:
|
|
headers["X-XSRF-TOKEN"] = xsrf_token
|
|
self.log.debug(f"XSRF-TOKEN header set: {xsrf_token[:10]}...")
|
|
|
|
self.session.headers.update(headers)
|
|
auth_status = "with XSRF" if xsrf_token else "without XSRF"
|
|
self.log.info(f"Authentication token attached ({auth_status}).")
|
|
|
|
def get_titles(self) -> Titles_T:
|
|
url = self.config["endpoints"]["metadata"].format(movie_id=self.movie_id)
|
|
r = self.session.get(url)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
|
|
movie = Movie(
|
|
id_=str(data["id"]),
|
|
service=self.__class__,
|
|
name=data["name"],
|
|
description=data.get("intro", ""),
|
|
year=data.get("year"),
|
|
language=Language.get(data.get("language", "nl")), # Default to Dutch
|
|
data=data,
|
|
)
|
|
return Movies([movie])
|
|
|
|
def get_tracks(self, title: Title_T) -> Tracks:
|
|
ticket_id = self._get_ticket_id(title)
|
|
base_url = self.config["endpoints"]["ticket"].format(ticket_id=ticket_id)
|
|
url = f"{base_url}?drmType=dash-widevine"
|
|
|
|
r = self.session.get(url)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
stream = data["stream"]
|
|
|
|
manifest_url = stream.get("url") or stream.get("drmurl")
|
|
if not manifest_url:
|
|
raise ValueError("No stream manifest URL found in ticket response.")
|
|
|
|
# Store DRM context for license acquisition
|
|
self.drm_token = stream["token"]
|
|
self.license_url = stream["rawData"]["licenseserver"]
|
|
drm_type = stream["rawData"].get("type", "unknown")
|
|
self.log.info(f"Acquired {drm_type.upper()} stream manifest. License URL set.")
|
|
|
|
tracks = DASH.from_url(manifest_url, session=self.session).to_tracks(language=title.language)
|
|
return tracks
|
|
|
|
def _get_ticket_id(self, title: Title_T) -> str:
|
|
"""Fetch the user's owned ticket ID if present."""
|
|
data = title.data
|
|
for t in (data.get("tickets") or []):
|
|
if t.get("playable") and str(t.get("movieId")) == str(self.movie_id):
|
|
return str(t["id"])
|
|
raise ValueError("No valid ticket found for this movie. Ensure purchase or login.")
|
|
|
|
def get_chapters(self, title: Title_T):
|
|
return []
|
|
|
|
def get_playready_license(self, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes:
|
|
"""
|
|
Acquire PlayReady license using the authentication token.
|
|
Matches the license request pattern observed in browser traffic.
|
|
"""
|
|
if not self.license_url or not self.drm_token:
|
|
raise ValueError("Missing license URL or DRM token. Call get_tracks() first.")
|
|
|
|
headers = {
|
|
"Content-Type": "application/octet-stream",
|
|
"Authorization": f"Bearer {self.drm_token}",
|
|
|
|
}
|
|
params = {"custom_data": self.drm_token}
|
|
|
|
self.log.debug(f"Requesting PlayReady license from {self.license_url}")
|
|
r = self.session.post(
|
|
self.license_url,
|
|
params=params,
|
|
data=challenge,
|
|
headers=headers,
|
|
timeout=10
|
|
)
|
|
r.raise_for_status()
|
|
|
|
if not r.content or len(r.content) < 10:
|
|
raise ValueError(
|
|
"Invalid PlayReady license response. "
|
|
"Check: 1) Valid session 2) XSRF token 3) Active rental/purchase"
|
|
)
|
|
|
|
self.log.info(f"Successfully acquired PlayReady license ({len(r.content)} bytes)")
|
|
return r.content
|
|
|
|
def get_widevine_license(self, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes:
|
|
"""Widevine license acquisition . """
|
|
if not self.license_url or not self.drm_token:
|
|
raise ValueError("Missing license URL or token.")
|
|
|
|
headers = {
|
|
"Content-Type": "application/octet-stream",
|
|
"Authorization": f"Bearer {self.drm_token}",
|
|
}
|
|
params = {"custom_data": self.drm_token}
|
|
|
|
r = self.session.post(self.license_url, params=params, data=challenge, headers=headers)
|
|
r.raise_for_status()
|
|
|
|
if not r.content:
|
|
raise ValueError("Empty license response, likely invalid or expired token.")
|
|
return r.content
|