From 2c09bce845e67267ad1ce2d3f65586759e5ac0ed Mon Sep 17 00:00:00 2001 From: FairTrade Date: Mon, 2 Feb 2026 15:21:08 +0100 Subject: [PATCH] Added HPLA --- HPLA/__init__.py | 509 +++++++++++++++++++++++++++++++++++++++++++++++ HPLA/config.yaml | 22 ++ README.md | 3 + 3 files changed, 534 insertions(+) create mode 100644 HPLA/__init__.py create mode 100644 HPLA/config.yaml diff --git a/HPLA/__init__.py b/HPLA/__init__.py new file mode 100644 index 0000000..e5d4131 --- /dev/null +++ b/HPLA/__init__.py @@ -0,0 +1,509 @@ +import base64 +import hashlib +import json +import re +from typing import Optional, Union, Generator + +import click +from langcodes import Language +from lxml import etree + +from unshackle.core.constants import AnyTrack +from unshackle.core.credential import Credential +from unshackle.core.manifests import DASH +from unshackle.core.search_result import SearchResult +from unshackle.core.service import Service +from unshackle.core.titles import Movie, Movies, Title_T, Titles_T, Song, Album +from unshackle.core.tracks import Chapter, Subtitle, Tracks, Audio + + +class HPLA(Service): + """ + Service code for Hoopla Digital (https://www.hoopladigital.com) + Version: 1.0.7 + + Authorization: Credentials (Email & Password) + + Security: + - SL2K/SL3K/L1/L3: SD/360p + + They are using the license server of DRMToday with encoded streams from CastLabs. + Supports movie and music (but kinda broken) at the moment + Television kinda sucks since you need to borrow it one by one, idk why people would want this shit quality series anyways + + Use full URL (for example - https://www.hoopladigital.com/movie/title-name/10979706) or content ID. + """ + + ALIASES = ("HPLA", "hoopla") + TITLE_RE = r"^(?:https?://(?:www\.)?hoopladigital\.com/[^/]*/[^/]*/)?(?P\d+)" + GEOFENCE = ("US",) + + @staticmethod + @click.command(name="HPLA", short_help="https://www.hoopladigital.com") + @click.argument("title", type=str) + @click.option("-m", "--movie", is_flag=True, default=False, help="Specify if it's a movie") + @click.pass_context + def cli(ctx, **kwargs): + return HPLA(ctx, **kwargs) + + def __init__(self, ctx, title, movie): + super().__init__(ctx) + self.title = title + self.movie = movie + + if self.config is None: + raise Exception("Config is missing!") + + profile_name = ctx.parent.params.get("profile") + self.profile = profile_name if profile_name else "default" + + self.platform = self.config["platform"]["amazon"] + + def authenticate(self, cookies: Optional[any] = None, credential: Optional[Credential] = None) -> None: + super().authenticate(cookies, credential) + if not credential or not credential.username or not credential.password: + raise EnvironmentError("Service requires Credentials for Authentication.") + + self.credential = credential + + self.session.headers.update(self.platform["headers"]) + + cache_key = f"tokens_{self.profile}" + + cache = self.cache.get(cache_key) + + if cache and not cache.expired: + cached_data = cache.data + if isinstance(cached_data, dict) and cached_data.get("username") == credential.username: + self.log.info("Using cached tokens") + self._restore_from_cache(cached_data) + return + + self.log.info("Logging in...") + self._do_login(credential) + + self._cache_tokens(credential.username, cache_key) + + def _restore_from_cache(self, cached_data: dict) -> None: + """Restore authentication state from cached data.""" + self.access_token = cached_data["access_token"] + self.patron_id = cached_data["patron_id"] + self.session.headers.update({ + "Authorization": f"Bearer {self.access_token}", + "patron-id": self.patron_id, + }) + + def _cache_tokens(self, username: str, cache_key: str) -> None: + """Cache the current authentication tokens.""" + cache = self.cache.get(cache_key) + cache.set( + data={ + "username": username, + "access_token": self.access_token, + "patron_id": self.patron_id, + }, + expiration=3600 + ) + + def _is_music_mpd(self, mpd: etree._Element) -> bool: + """ + Detect if MPD represents a single-file music asset. + """ + adaptation_sets = mpd.findall(".//AdaptationSet") + + for aset in adaptation_sets: + if aset.get("contentType") == "video": + return False + + audio_reps = mpd.findall(".//AdaptationSet[@contentType='audio']/Representation") + if len(audio_reps) != 1: + return False + + if mpd.find(".//SegmentTemplate") is not None: + return False + + return mpd.find(".//BaseURL") is not None + + def _extract_music_audio(self, mpd: etree._Element, manifest_url: str) -> str: + base = mpd.find(".//BaseURL") + if base is None or not base.text: + raise ValueError("Music MPD has no BaseURL") + + return manifest_url.rsplit("/", 1)[0] + "/" + base.text + + + def _do_login(self, credential: Credential) -> None: + """Perform full login flow.""" + # Step 1: Get Bearer Token + login_response = self.session.post( + url=self.config["endpoints"]["login"], + data={ + "username": credential.username, + "password": credential.password, + }, + headers={"Content-Type": "application/x-www-form-urlencoded"} + ).json() + + if login_response.get("tokenStatus") != "SUCCESS": + raise EnvironmentError(f"Login failed: {login_response.get('tokenStatus', 'Unknown error')}") + + self.access_token = login_response["token"] + self.session.headers.update({"Authorization": f"Bearer {self.access_token}"}) + + # Step 2: Get Patron ID + self.log.info("Fetching Patron ID...") + query = 'query { patron { id email } }' + patron_data = self.session.post( + url=self.config["endpoints"]["graphql"], + json={"query": query}, + headers={"Content-Type": "application/json"} + ).json() + + self.patron_id = patron_data["data"]["patron"]["id"] + self.session.headers.update({"patron-id": self.patron_id}) + self.log.debug(f"Logged in as Patron ID: {self.patron_id}") + + def search(self) -> Generator[SearchResult, None, None]: + query = """ + query GetFilterSearchQuery($criteria: SearchCriteria!, $sort: Sort) { + search(criteria: $criteria, sort: $sort) { + hits { + id + title + kind { name } + } + } + } + """ + + payload = { + "operationName": "GetFilterSearchQuery", + "variables": { + "criteria": { + "q": self.title, + "availability": "ALL_TITLES", + "pagination": { + "page": 1, + "pageSize": 48, + }, + } + }, + "query": query, + } + + resp = self.session.post( + self.config["endpoints"]["graphql"], + json=payload, + headers={"Content-Type": "application/json"}, + ).json() + + hits = ( + resp + .get("data", {}) + .get("search", {}) + .get("hits", []) + ) + + for hit in hits: + kind = hit["kind"]["name"] + + label = { + "MOVIE": "MOVIE", + "TVSHOW": "SERIES", + "MUSIC": "ALBUM", + "AUDIOBOOK": "AUDIOBOOK", + "EBOOK": "BOOK", + "COMIC": "COMIC", + }.get(kind, kind) + + yield SearchResult( + id_=hit["id"], + title=hit["title"], + label=label, + url=f"https://www.hoopladigital.com/title/{hit['id']}", + ) + + def get_titles(self) -> Titles_T: + title_match = re.match(self.TITLE_RE, self.title) + if not title_match: + raise ValueError(f"Invalid title format: {self.title}") + + content_id = title_match.group("title_id") + + query = """ + query { + contents(criteria:{contentIds:[%s]}) { + contents { + id + title + kind { id name } + mediaKey + circulation { id dueDate } + year + seconds + primaryArtist { name } + tracks { + id + mediaKey + name + seconds + segmentNumber + } + } + } + } + """ % content_id + + data = self.session.post( + url=self.config["endpoints"]["graphql"], + json={"query": query}, + headers={"Content-Type": "application/json"} + ).json() + + contents = data.get("data", {}).get("contents", {}).get("contents", []) + if not contents: + raise ValueError("Content not found") + + meta = contents[0] + kind_name = meta["kind"]["name"] + + if not meta.get("circulation"): + raise ValueError("You must borrow this title on your Hoopla account before downloading.") + + if kind_name == "MOVIE": + return Movies([ + Movie( + id_=meta["id"], + service=self.__class__, + name=meta["title"], + year=int(meta["year"]) if meta.get("year") else None, + language=Language.get("en"), + data={ + "mediaKey": meta["mediaKey"], + "circulationId": meta["circulation"]["id"], + "is_music": False, + }, + ) + ]) + + elif kind_name == "MUSIC": + if not meta.get("tracks"): + # Single-track album? Use main mediaKey + songs = [ + Song( + id_=meta["id"], + service=self.__class__, + name=meta["title"], + artist=meta.get("primaryArtist", {}).get("name", "Unknown Artist"), + album=meta["title"], + track=1, + disc=1, + year=int(meta["year"]) if meta.get("year") else None, + data={ + "mediaKey": meta["mediaKey"], + "circulationId": meta["circulation"]["id"], + "is_music": True, + } + ) + ] + else: + songs = [] + for idx, track in enumerate(meta["tracks"], start=1): + songs.append( + Song( + id_=track["id"], + service=self.__class__, + name=track["name"], + artist=meta.get("primaryArtist", {}).get("name", "Unknown Artist"), + album=meta["title"], + track=track.get("segmentNumber", idx), + disc=1, + year=int(meta["year"]) if meta.get("year") else None, + data={ + "mediaKey": track["mediaKey"], # ← Per-track mediaKey! + "circulationId": meta["circulation"]["id"], + "is_music": True, + } + ) + ) + return Album(songs) + + else: + raise ValueError(f"Unsupported content type: {kind_name}. Only MOVIE and MUSIC are supported.") + + def get_tracks(self, title: Title_T) -> Tracks: + media_key = title.data["mediaKey"] + circulation_id = title.data["circulationId"] + + # --- DRM bootstrap --- + self.asset_id = self.session.get( + self.config["endpoints"]["license_asset"].format(media_key=media_key) + ).text.strip() + + self.auth_token = self.session.get( + self.config["endpoints"]["license_token"].format( + media_key=media_key, + patron_id=self.patron_id, + circulation_id=circulation_id, + ) + ).text.strip() + + self.custom_data = self._extract_custom_data(self.auth_token) + + manifest_url = self.config["endpoints"]["manifest"].format(media_key=media_key) + mpd_xml = self.session.get(manifest_url).text + mpd_xml = self._strip_namespaces(mpd_xml) + mpd = etree.fromstring(mpd_xml.encode("utf-8")) + + if self._is_music_mpd(mpd): + self.log.info("Detected Hoopla music MPD") + + audio_url = self._extract_music_audio(mpd, manifest_url) + + tracks = Tracks() + tracks.add( + Audio( + url=audio_url, + drm=[], + codec=Audio.Codec.AAC, + language=title.language or "en", + channels=2, + ) + ) + return tracks + + self.log.info("Detected Hoopla movie MPD") + + tracks = DASH(mpd, manifest_url).to_tracks( + language=title.language or Language.get("en") + ) + + self._add_subtitles(tracks, manifest_url, media_key) + return tracks + + + def _strip_namespaces(self, xml_string: str) -> str: + """ + Strip namespace declarations and prefixes from XML string. + This is needed because unshackle's DASH parser expects plain 'MPD' tag, + not '{urn:mpeg:dash:schema:mpd:2011}MPD'. + """ + # Remove xmlns declarations (both default and prefixed) + xml_string = re.sub(r'\s+xmlns(:\w+)?="[^"]+"', '', xml_string) + + # Remove namespace prefixes from element tags (e.g., -> ) + xml_string = re.sub(r'<(/?)(\w+):', r'<\1', xml_string) + + # Remove namespace prefixes from attributes (e.g., cenc:default_KID -> default_KID) + xml_string = re.sub(r'\s+\w+:(\w+)=', r' \1=', xml_string) + + # Remove urn: prefixed attributes entirely (e.g., urn:assetId="...") + xml_string = re.sub(r'\s+urn:\w+="[^"]+"', '', xml_string) + + return xml_string + + def _extract_custom_data(self, jwt_token: str) -> str: + """Extract and encode optData from JWT for dt-custom-data header.""" + try: + jwt_parts = jwt_token.split(".") + padded_payload = jwt_parts[1] + "=" * (-len(jwt_parts[1]) % 4) + payload_json = json.loads(base64.urlsafe_b64decode(padded_payload)) + + opt_data_str = payload_json.get("optData") + if not opt_data_str: + raise ValueError("optData not found in JWT") + + return base64.b64encode(opt_data_str.encode("utf-8")).decode("utf-8") + + except Exception as e: + raise ValueError(f"Failed to process license token: {e}") + + def _add_subtitles(self, tracks: Tracks, manifest_url: str, media_key: str) -> None: + """Add VTT subtitles from manifest if available.""" + base_url = manifest_url.rsplit('/', 1)[0] + + vtt_patterns = [ + f"{base_url}/{media_key}-8784525650515056532-en/{media_key}-8784525650515056532-en.vtt", + ] + + for vtt_url in vtt_patterns: + try: + response = self.session.head(vtt_url) + if response.status_code == 200: + tracks.add( + Subtitle( + id_=hashlib.md5(vtt_url.encode()).hexdigest()[0:6], + url=vtt_url, + codec=Subtitle.Codec.WebVTT, + language=Language.get("en"), + sdh=True, + ) + ) + break + except Exception: + pass + + def get_chapters(self, title: Title_T) -> list[Chapter]: + return [] + + def get_widevine_service_certificate(self, **_) -> Optional[str]: + return self.config.get("certificate") + + def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]: + response = self.session.post( + url=self.config["endpoints"]["license_wv"], + params={ + "logRequestId": "unshackle", + "assetId": self.asset_id, + }, + headers={ + "dt-custom-data": self.custom_data, + "x-dt-auth-token": self.auth_token, + "Content-Type": "text/xml", + }, + data=challenge, + ) + + if response.status_code != 200: + self.log.error(f"License Error: {response.text}") + raise ValueError(f"Failed to get Widevine license: {response.status_code}") + + return response.json().get("license") + + def get_playready_license(self, *, challenge: bytes | str, title: Title_T, track: AnyTrack) -> bytes: + if not hasattr(self, 'auth_token') or not hasattr(self, 'custom_data'): + raise RuntimeError("Authentication tokens missing. Call get_tracks() first.") + + if isinstance(challenge, str): + request_body = challenge.encode('utf-8') + else: + request_body = challenge + + headers = { + "Accept": "*/*", + "Accept-Language": "nl", + "Cache-Control": "no-cache", + "Content-Type": "text/xml; charset=utf-8", + "dt-custom-data": self.custom_data, + "x-dt-auth-token": self.auth_token, + "soapaction": '"http://schemas.microsoft.com/DRM/2007/03/protocols/AcquireLicense"', + "Origin": "https://www.hoopladigital.com", + "Referer": "https://www.hoopladigital.com/", + "Pragma": "no-cache", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36 Edg/144.0.0.0", + } + + + response = self.session.post( + url=self.config["endpoints"]["license_pr"], + data=request_body, + headers=headers, + timeout=30 + ) + + if response.status_code != 200: + self.log.error(f"PlayReady license failed: {response.status_code}") + self.log.error(f"Response: {response.text[:1000]}") + raise ValueError(f"PlayReady license failed: HTTP {response.status_code}") + + return response.content diff --git a/HPLA/config.yaml b/HPLA/config.yaml new file mode 100644 index 0000000..65470ef --- /dev/null +++ b/HPLA/config.yaml @@ -0,0 +1,22 @@ +endpoints: + login: https://patron-api-gateway.hoopladigital.com/core/tokens + graphql: https://patron-api-gateway.hoopladigital.com/graphql + manifest: https://dash.hoopladigital.com/{media_key}/Manifest.mpd + license_asset: https://patron-api-gateway.hoopladigital.com/license/castlabs/asset-id/{media_key} + license_token: https://patron-api-gateway.hoopladigital.com/license/castlabs/upfront-auth-tokens/{media_key}/{patron_id}/{circulation_id} + license_wv: https://lic.drmtoday.com/license-proxy-widevine/cenc/ + license_pr: https://lic.drmtoday.com/license-proxy-headerauth/drmtoday/RightsManager.asmx?persistent=false + +platform: + amazon: + headers: + app: AMAZON + device-model: SM-A525F + os: AMAZON + User-Agent: Hoopla Amazon/4.84.1 + app-version: "4.84.1" + os-version: "15" + ws-api: "2.1" + device-version: a52q + hoopla-version: "4.84.1" + Accept-Language: en-US \ No newline at end of file diff --git a/README.md b/README.md index ebb04ce..37b5abe 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,9 @@ - Subtitle has been fixed, hopefully no issue 11. VLD: - Token isn't cached so that's a major problem with series + 12. HPLA: + - No support for Television yet + - Music needs to be fixed since the output is a mp4 instead of m4a - Acknowledgment