unshackle-services/HPLA/__init__.py

510 lines
18 KiB
Python
Raw Normal View History

2026-02-02 15:21:08 +01:00
import base64
import hashlib
import json
import re
from typing import Optional, Union, Generator
import click
from langcodes import Language
from lxml import etree
from unshackle.core.constants import AnyTrack
from unshackle.core.credential import Credential
from unshackle.core.manifests import DASH
from unshackle.core.search_result import SearchResult
from unshackle.core.service import Service
from unshackle.core.titles import Movie, Movies, Title_T, Titles_T, Song, Album
from unshackle.core.tracks import Chapter, Subtitle, Tracks, Audio
class HPLA(Service):
"""
Service code for Hoopla Digital (https://www.hoopladigital.com)
Version: 1.0.7
Authorization: Credentials (Email & Password)
Security:
- SL2K/SL3K/L1/L3: SD/360p
They are using the license server of DRMToday with encoded streams from CastLabs.
Supports movie and music (but kinda broken) at the moment
Television kinda sucks since you need to borrow it one by one, idk why people would want this shit quality series anyways
Use full URL (for example - https://www.hoopladigital.com/movie/title-name/10979706) or content ID.
"""
ALIASES = ("HPLA", "hoopla")
TITLE_RE = r"^(?:https?://(?:www\.)?hoopladigital\.com/[^/]*/[^/]*/)?(?P<title_id>\d+)"
GEOFENCE = ("US",)
@staticmethod
@click.command(name="HPLA", short_help="https://www.hoopladigital.com")
@click.argument("title", type=str)
@click.option("-m", "--movie", is_flag=True, default=False, help="Specify if it's a movie")
@click.pass_context
def cli(ctx, **kwargs):
return HPLA(ctx, **kwargs)
def __init__(self, ctx, title, movie):
super().__init__(ctx)
self.title = title
self.movie = movie
if self.config is None:
raise Exception("Config is missing!")
profile_name = ctx.parent.params.get("profile")
self.profile = profile_name if profile_name else "default"
self.platform = self.config["platform"]["amazon"]
def authenticate(self, cookies: Optional[any] = None, credential: Optional[Credential] = None) -> None:
super().authenticate(cookies, credential)
if not credential or not credential.username or not credential.password:
raise EnvironmentError("Service requires Credentials for Authentication.")
self.credential = credential
self.session.headers.update(self.platform["headers"])
cache_key = f"tokens_{self.profile}"
cache = self.cache.get(cache_key)
if cache and not cache.expired:
cached_data = cache.data
if isinstance(cached_data, dict) and cached_data.get("username") == credential.username:
self.log.info("Using cached tokens")
self._restore_from_cache(cached_data)
return
self.log.info("Logging in...")
self._do_login(credential)
self._cache_tokens(credential.username, cache_key)
def _restore_from_cache(self, cached_data: dict) -> None:
"""Restore authentication state from cached data."""
self.access_token = cached_data["access_token"]
self.patron_id = cached_data["patron_id"]
self.session.headers.update({
"Authorization": f"Bearer {self.access_token}",
"patron-id": self.patron_id,
})
def _cache_tokens(self, username: str, cache_key: str) -> None:
"""Cache the current authentication tokens."""
cache = self.cache.get(cache_key)
cache.set(
data={
"username": username,
"access_token": self.access_token,
"patron_id": self.patron_id,
},
expiration=3600
)
def _is_music_mpd(self, mpd: etree._Element) -> bool:
"""
Detect if MPD represents a single-file music asset.
"""
adaptation_sets = mpd.findall(".//AdaptationSet")
for aset in adaptation_sets:
if aset.get("contentType") == "video":
return False
audio_reps = mpd.findall(".//AdaptationSet[@contentType='audio']/Representation")
if len(audio_reps) != 1:
return False
if mpd.find(".//SegmentTemplate") is not None:
return False
return mpd.find(".//BaseURL") is not None
def _extract_music_audio(self, mpd: etree._Element, manifest_url: str) -> str:
base = mpd.find(".//BaseURL")
if base is None or not base.text:
raise ValueError("Music MPD has no BaseURL")
return manifest_url.rsplit("/", 1)[0] + "/" + base.text
def _do_login(self, credential: Credential) -> None:
"""Perform full login flow."""
# Step 1: Get Bearer Token
login_response = self.session.post(
url=self.config["endpoints"]["login"],
data={
"username": credential.username,
"password": credential.password,
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
).json()
if login_response.get("tokenStatus") != "SUCCESS":
raise EnvironmentError(f"Login failed: {login_response.get('tokenStatus', 'Unknown error')}")
self.access_token = login_response["token"]
self.session.headers.update({"Authorization": f"Bearer {self.access_token}"})
# Step 2: Get Patron ID
self.log.info("Fetching Patron ID...")
query = 'query { patron { id email } }'
patron_data = self.session.post(
url=self.config["endpoints"]["graphql"],
json={"query": query},
headers={"Content-Type": "application/json"}
).json()
self.patron_id = patron_data["data"]["patron"]["id"]
self.session.headers.update({"patron-id": self.patron_id})
self.log.debug(f"Logged in as Patron ID: {self.patron_id}")
def search(self) -> Generator[SearchResult, None, None]:
query = """
query GetFilterSearchQuery($criteria: SearchCriteria!, $sort: Sort) {
search(criteria: $criteria, sort: $sort) {
hits {
id
title
kind { name }
}
}
}
"""
payload = {
"operationName": "GetFilterSearchQuery",
"variables": {
"criteria": {
"q": self.title,
"availability": "ALL_TITLES",
"pagination": {
"page": 1,
"pageSize": 48,
},
}
},
"query": query,
}
resp = self.session.post(
self.config["endpoints"]["graphql"],
json=payload,
headers={"Content-Type": "application/json"},
).json()
hits = (
resp
.get("data", {})
.get("search", {})
.get("hits", [])
)
for hit in hits:
kind = hit["kind"]["name"]
label = {
"MOVIE": "MOVIE",
"TVSHOW": "SERIES",
"MUSIC": "ALBUM",
"AUDIOBOOK": "AUDIOBOOK",
"EBOOK": "BOOK",
"COMIC": "COMIC",
}.get(kind, kind)
yield SearchResult(
id_=hit["id"],
title=hit["title"],
label=label,
url=f"https://www.hoopladigital.com/title/{hit['id']}",
)
def get_titles(self) -> Titles_T:
title_match = re.match(self.TITLE_RE, self.title)
if not title_match:
raise ValueError(f"Invalid title format: {self.title}")
content_id = title_match.group("title_id")
query = """
query {
contents(criteria:{contentIds:[%s]}) {
contents {
id
title
kind { id name }
mediaKey
circulation { id dueDate }
year
seconds
primaryArtist { name }
tracks {
id
mediaKey
name
seconds
segmentNumber
}
}
}
}
""" % content_id
data = self.session.post(
url=self.config["endpoints"]["graphql"],
json={"query": query},
headers={"Content-Type": "application/json"}
).json()
contents = data.get("data", {}).get("contents", {}).get("contents", [])
if not contents:
raise ValueError("Content not found")
meta = contents[0]
kind_name = meta["kind"]["name"]
if not meta.get("circulation"):
raise ValueError("You must borrow this title on your Hoopla account before downloading.")
if kind_name == "MOVIE":
return Movies([
Movie(
id_=meta["id"],
service=self.__class__,
name=meta["title"],
year=int(meta["year"]) if meta.get("year") else None,
language=Language.get("en"),
data={
"mediaKey": meta["mediaKey"],
"circulationId": meta["circulation"]["id"],
"is_music": False,
},
)
])
elif kind_name == "MUSIC":
if not meta.get("tracks"):
# Single-track album? Use main mediaKey
songs = [
Song(
id_=meta["id"],
service=self.__class__,
name=meta["title"],
artist=meta.get("primaryArtist", {}).get("name", "Unknown Artist"),
album=meta["title"],
track=1,
disc=1,
year=int(meta["year"]) if meta.get("year") else None,
data={
"mediaKey": meta["mediaKey"],
"circulationId": meta["circulation"]["id"],
"is_music": True,
}
)
]
else:
songs = []
for idx, track in enumerate(meta["tracks"], start=1):
songs.append(
Song(
id_=track["id"],
service=self.__class__,
name=track["name"],
artist=meta.get("primaryArtist", {}).get("name", "Unknown Artist"),
album=meta["title"],
track=track.get("segmentNumber", idx),
disc=1,
year=int(meta["year"]) if meta.get("year") else None,
data={
"mediaKey": track["mediaKey"], # ← Per-track mediaKey!
"circulationId": meta["circulation"]["id"],
"is_music": True,
}
)
)
return Album(songs)
else:
raise ValueError(f"Unsupported content type: {kind_name}. Only MOVIE and MUSIC are supported.")
def get_tracks(self, title: Title_T) -> Tracks:
media_key = title.data["mediaKey"]
circulation_id = title.data["circulationId"]
# --- DRM bootstrap ---
self.asset_id = self.session.get(
self.config["endpoints"]["license_asset"].format(media_key=media_key)
).text.strip()
self.auth_token = self.session.get(
self.config["endpoints"]["license_token"].format(
media_key=media_key,
patron_id=self.patron_id,
circulation_id=circulation_id,
)
).text.strip()
self.custom_data = self._extract_custom_data(self.auth_token)
manifest_url = self.config["endpoints"]["manifest"].format(media_key=media_key)
mpd_xml = self.session.get(manifest_url).text
mpd_xml = self._strip_namespaces(mpd_xml)
mpd = etree.fromstring(mpd_xml.encode("utf-8"))
if self._is_music_mpd(mpd):
self.log.info("Detected Hoopla music MPD")
audio_url = self._extract_music_audio(mpd, manifest_url)
tracks = Tracks()
tracks.add(
Audio(
url=audio_url,
drm=[],
codec=Audio.Codec.AAC,
language=title.language or "en",
channels=2,
)
)
return tracks
self.log.info("Detected Hoopla movie MPD")
tracks = DASH(mpd, manifest_url).to_tracks(
language=title.language or Language.get("en")
)
self._add_subtitles(tracks, manifest_url, media_key)
return tracks
def _strip_namespaces(self, xml_string: str) -> str:
"""
Strip namespace declarations and prefixes from XML string.
This is needed because unshackle's DASH parser expects plain 'MPD' tag,
not '{urn:mpeg:dash:schema:mpd:2011}MPD'.
"""
# Remove xmlns declarations (both default and prefixed)
xml_string = re.sub(r'\s+xmlns(:\w+)?="[^"]+"', '', xml_string)
# Remove namespace prefixes from element tags (e.g., <cenc:pssh> -> <pssh>)
xml_string = re.sub(r'<(/?)(\w+):', r'<\1', xml_string)
# Remove namespace prefixes from attributes (e.g., cenc:default_KID -> default_KID)
xml_string = re.sub(r'\s+\w+:(\w+)=', r' \1=', xml_string)
# Remove urn: prefixed attributes entirely (e.g., urn:assetId="...")
xml_string = re.sub(r'\s+urn:\w+="[^"]+"', '', xml_string)
return xml_string
def _extract_custom_data(self, jwt_token: str) -> str:
"""Extract and encode optData from JWT for dt-custom-data header."""
try:
jwt_parts = jwt_token.split(".")
padded_payload = jwt_parts[1] + "=" * (-len(jwt_parts[1]) % 4)
payload_json = json.loads(base64.urlsafe_b64decode(padded_payload))
opt_data_str = payload_json.get("optData")
if not opt_data_str:
raise ValueError("optData not found in JWT")
return base64.b64encode(opt_data_str.encode("utf-8")).decode("utf-8")
except Exception as e:
raise ValueError(f"Failed to process license token: {e}")
def _add_subtitles(self, tracks: Tracks, manifest_url: str, media_key: str) -> None:
"""Add VTT subtitles from manifest if available."""
base_url = manifest_url.rsplit('/', 1)[0]
vtt_patterns = [
f"{base_url}/{media_key}-8784525650515056532-en/{media_key}-8784525650515056532-en.vtt",
]
for vtt_url in vtt_patterns:
try:
response = self.session.head(vtt_url)
if response.status_code == 200:
tracks.add(
Subtitle(
id_=hashlib.md5(vtt_url.encode()).hexdigest()[0:6],
url=vtt_url,
codec=Subtitle.Codec.WebVTT,
language=Language.get("en"),
sdh=True,
)
)
break
except Exception:
pass
def get_chapters(self, title: Title_T) -> list[Chapter]:
return []
def get_widevine_service_certificate(self, **_) -> Optional[str]:
return self.config.get("certificate")
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]:
response = self.session.post(
url=self.config["endpoints"]["license_wv"],
params={
"logRequestId": "unshackle",
"assetId": self.asset_id,
},
headers={
"dt-custom-data": self.custom_data,
"x-dt-auth-token": self.auth_token,
"Content-Type": "text/xml",
},
data=challenge,
)
if response.status_code != 200:
self.log.error(f"License Error: {response.text}")
raise ValueError(f"Failed to get Widevine license: {response.status_code}")
return response.json().get("license")
def get_playready_license(self, *, challenge: bytes | str, title: Title_T, track: AnyTrack) -> bytes:
if not hasattr(self, 'auth_token') or not hasattr(self, 'custom_data'):
raise RuntimeError("Authentication tokens missing. Call get_tracks() first.")
if isinstance(challenge, str):
request_body = challenge.encode('utf-8')
else:
request_body = challenge
headers = {
"Accept": "*/*",
"Accept-Language": "nl",
"Cache-Control": "no-cache",
"Content-Type": "text/xml; charset=utf-8",
"dt-custom-data": self.custom_data,
"x-dt-auth-token": self.auth_token,
"soapaction": '"http://schemas.microsoft.com/DRM/2007/03/protocols/AcquireLicense"',
"Origin": "https://www.hoopladigital.com",
"Referer": "https://www.hoopladigital.com/",
"Pragma": "no-cache",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36 Edg/144.0.0.0",
}
response = self.session.post(
url=self.config["endpoints"]["license_pr"],
data=request_body,
headers=headers,
timeout=30
)
if response.status_code != 200:
self.log.error(f"PlayReady license failed: {response.status_code}")
self.log.error(f"Response: {response.text[:1000]}")
raise ValueError(f"PlayReady license failed: HTTP {response.status_code}")
return response.content