diff --git a/README.md b/README.md index 71509f0..301efcc 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,8 @@ 9. VRT: - Search functionality - Fixing few hickups + 10. SKST (the hardest service I ever dealt upon now): + - Subtitles is a litte bit hit or miss for movies and for series there's still no subtitles - Acknowledgment diff --git a/SKST/__init__.py b/SKST/__init__.py new file mode 100644 index 0000000..4849c36 --- /dev/null +++ b/SKST/__init__.py @@ -0,0 +1,1048 @@ +from __future__ import annotations + +import base64 +import hashlib +import hmac +import json +import re +import sys +import time +import uuid +from collections import defaultdict +from copy import deepcopy +from http.cookiejar import CookieJar +from typing import Any, Optional, Generator +from urllib.parse import urlparse, urlencode +from zlib import crc32 +from lxml import etree + +import click +from langcodes import Language + +from unshackle.core.search_result import SearchResult +from unshackle.core.constants import AnyTrack +from unshackle.core.credential import Credential +from unshackle.core.manifests import DASH +from unshackle.core.service import Service +from unshackle.core.titles import Movie, Movies, Series, Episode, Title_T, Titles_T +from unshackle.core.tracks import Chapter, Tracks, Subtitle, Track +from unshackle.core.utilities import is_close_match + + +class SkySignature: + """SkyShowtime API signature generator.""" + + def __init__(self, app_id: str, signature_key: str, version: str = "1.0"): + self.app_id = app_id + self.signature_key = signature_key.encode('utf-8') + self.sig_version = version + + def calculate_signature(self, method: str, url: str, headers: dict, + payload: bytes = b'', timestamp: Optional[int] = None) -> dict: + if timestamp is None: + timestamp = int(time.time()) + + if url.startswith('http'): + parsed_url = urlparse(url) + path = parsed_url.path + if parsed_url.query: + path += '?' + parsed_url.query + else: + path = url + + text_headers = '' + for key in sorted(headers.keys()): + if key.lower().startswith('x-skyott') or key.lower().startswith('x-showmax'): + text_headers += key.lower() + ': ' + str(headers[key]) + '\n' + + headers_md5 = hashlib.md5(text_headers.encode()).hexdigest() + + if isinstance(payload, str): + payload = payload.encode('utf-8') + payload_md5 = hashlib.md5(payload).hexdigest() + + to_hash = ( + f'{method}\n' + f'{path}\n' + f'\n' + f'{self.app_id}\n' + f'{self.sig_version}\n' + f'{headers_md5}\n' + f'{timestamp}\n' + f'{payload_md5}\n' + ) + + hashed = hmac.new(self.signature_key, to_hash.encode('utf8'), hashlib.sha1).digest() + signature = base64.b64encode(hashed).decode('utf8') + + return { + 'x-sky-signature': f'SkyOTT client="{self.app_id}",signature="{signature}",timestamp="{timestamp}",version="{self.sig_version}"' + } + + +class SKST(Service): + """ + \b + Service code for SkyShowtime streaming service (https://skyshowtime.com). + + \b + Author: FairTrade + Authorization: Cookies or Credentials + Robustness: + Widevine: + L3: 1080p + + \b + Tips: + - Use -t/--territory to specify your region (e.g., -t ES for Spain) + - Use -p/--profile to select a specific profile by name or ID + - Use --list-profiles to see all available profiles + """ + + ALIASES = ("skyshowtime", "sst") + TITLE_RE = r"^(?:https?://(?:www\.)?skyshowtime\.com)?/(?:[a-z]{2}/)?watch/asset/(?Ptv|movies?)/(?P[a-z0-9-]+)/(?P[a-f0-9-]+).*$" + + @staticmethod + @click.command(name="SKST", short_help="https://skyshowtime.com", help=__doc__) + @click.argument("title", type=str) + @click.option("-t", "--territory", type=str, default="PL", help="Territory code (e.g., PL, NL, ES)") + @click.option("-p", "--profile", type=str, default=None, help="Profile name or ID to use") + @click.option("--list-profiles", is_flag=True, default=False, help="List all available profiles and exit") + @click.pass_context + def cli(ctx, **kwargs): + return SKST(ctx, **kwargs) + + def __init__(self, ctx, title: str, territory: str = "PL", profile: Optional[str] = None, list_profiles: bool = False): + super().__init__(ctx) + + self.territory = territory.upper() + self.language = self._get_language_for_territory(territory) + self.requested_profile = profile + self.list_profiles_only = list_profiles + + # Initialize signature generator from config + sig_config = self.config.get("signature", {}) + self.signer = SkySignature( + app_id=sig_config.get("app_id", "SHOWMAX-ANDROID-v1"), + signature_key=sig_config.get("key", ""), + version=sig_config.get("version", "1.0") + ) + + m = re.match(self.TITLE_RE, title, re.IGNORECASE) + if not m: + self.search_term = title + self.title_url = None + self.content_slug = None + self.content_uuid = None + self.content_type = None + return + + content_type = m.group("type").lower() + self.content_type = "movie" if content_type.startswith("movie") else "tv" + self.content_slug = m.group("slug") + self.content_uuid = m.group("uuid") + self.title_url = title + self.search_term = None + + self.user_token: Optional[str] = None + self.device_id: Optional[str] = None + self.persona_id: Optional[str] = None + self.persona_data: Optional[dict] = None + self.all_personas: list[dict] = [] + + self.drm_license_url: Optional[str] = None + self.license_token: Optional[str] = None + + self.cdm = ctx.obj.cdm + + def _get_language_for_territory(self, territory: str) -> str: + territory_languages = { + "PL": "pl-PL", "NL": "nl-NL", "ES": "es-ES", "PT": "pt-PT", + "SE": "sv-SE", "NO": "nb-NO", "DK": "da-DK", "FI": "fi-FI", + "CZ": "cs-CZ", "SK": "sk-SK", "HU": "hu-HU", "RO": "ro-RO", + "BG": "bg-BG", "HR": "hr-HR", "SI": "sl-SI", "BA": "bs-BA", + "RS": "sr-RS", "ME": "sr-ME", "MK": "mk-MK", "AL": "sq-AL", + } + return territory_languages.get(territory.upper(), "en-US") + + def _get_common_headers(self) -> dict: + return { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:138.0) Gecko/20100101 Firefox/138.0", + "Accept": "*/*", + "Accept-Language": "en-US,en;q=0.5", + "Origin": "https://www.skyshowtime.com", + "Referer": "https://www.skyshowtime.com/", + "Sec-Fetch-Dest": "empty", + "Sec-Fetch-Mode": "cors", + "Sec-Fetch-Site": "same-site", + } + + def _get_skyott_headers(self, extra: Optional[dict] = None) -> dict: + params = self.config.get("params", {}) + headers = { + "X-SkyOTT-Provider": params.get("provider", "SKYSHOWTIME"), + "X-SkyOTT-Territory": self.territory, + "X-SkyOTT-Proposition": params.get("proposition", "SKYSHOWTIME"), + "X-SkyOTT-Platform": params.get("platform", "PC"), + "X-SkyOTT-Device": params.get("device", "COMPUTER"), + "X-SkyOTT-ActiveTerritory": self.territory, + } + if extra: + headers.update(extra) + return headers + + def _get_atom_headers(self) -> dict: + params = self.config.get("params", {}) + headers = self._get_common_headers() + headers.update(self._get_skyott_headers({ + "X-SkyOTT-Language": "en-US", + "X-SkyOTT-Client-Version": params.get("client_version", "6.11.21-gsp"), + })) + return headers + + def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None: + super().authenticate(cookies, credential) + + self.device_id = self._get_cookie_value(cookies, "deviceid") or str(uuid.uuid4()) + + if cookies: + sky_umv = self._get_cookie_value(cookies, "skyUMV") + if sky_umv: + self.user_token = sky_umv + self.log.info("Using existing session from cookies.") + else: + raise PermissionError("skyUMV cookie not found. Please provide fresh cookies.") + elif credential: + self._authenticate_with_credentials(credential) + self._get_user_token() + else: + raise PermissionError("SKST requires either cookies or credentials for authentication.") + + self._fetch_personas() + + if self.list_profiles_only: + self._display_profiles() + sys.exit(0) + + self._select_profile() + + self.log.info("SkyShowtime authentication successful.") + + def _get_cookie_value(self, cookies: Optional[CookieJar], name: str) -> Optional[str]: + if not cookies: + return None + for cookie in cookies: + if cookie.name == name: + return cookie.value + return None + + def _authenticate_with_credentials(self, credential: Credential) -> None: + self.log.info(f"Logging in as {credential.username}...") + + signin_url = self.config["endpoints"]["signin"] + + headers = self._get_common_headers() + headers.update({ + "Accept": "application/vnd.siren+json", + "Content-Type": "application/x-www-form-urlencoded", + }) + headers.update(self._get_skyott_headers()) + + r = self.session.post(signin_url, headers=headers, data=urlencode({ + "userIdentifier": credential.username, + "password": credential.password, + "rememberMe": "true", + "isWeb": "true" + })) + + if r.status_code != 200: + raise PermissionError(f"Sign-in failed: {r.status_code} - {r.text}") + + signin_response = r.json() + + if signin_response.get("class") != ["success"]: + raise PermissionError(f"Sign-in failed: {signin_response}") + + if "properties" in signin_response and "data" in signin_response["properties"]: + self.device_id = signin_response["properties"]["data"].get("deviceid", self.device_id) + + self.log.debug(f"Sign-in successful. Device ID: {self.device_id}") + + def _get_user_token(self) -> None: + token_url = self.config["endpoints"]["tokens"] + params = self.config.get("params", {}) + + skyott_headers = self._get_skyott_headers({ + "X-SkyOTT-Language": self.language, + }) + + payload = { + "auth": { + "authScheme": "MESSO", + "authIssuer": "NOWTV", + "provider": params.get("provider", "SKYSHOWTIME"), + "providerTerritory": self.territory, + "proposition": params.get("proposition", "SKYSHOWTIME") + }, + "device": { + "type": params.get("device", "COMPUTER"), + "platform": params.get("platform", "PC"), + "id": self.device_id[:20] if self.device_id else str(uuid.uuid4())[:20], + "drmDeviceId": "UNKNOWN" + } + } + + payload_str = json.dumps(payload, separators=(',', ':')) + + sig_result = self.signer.calculate_signature( + method="POST", + url=token_url, + headers=skyott_headers, + payload=payload_str.encode('utf-8') + ) + + headers = self._get_common_headers() + headers.update({ + "Accept": "application/vnd.tokens.v1+json", + "Content-Type": "application/vnd.tokens.v1+json", + }) + headers.update(skyott_headers) + headers.update(sig_result) + + r = self.session.post(token_url, headers=headers, data=payload_str) + + if r.status_code != 200: + self.log.warning(f"Token request failed: {r.status_code}") + return + + token_data = r.json() + self.user_token = token_data.get("userToken") + + if self.user_token: + self.log.debug("User token obtained successfully.") + + def _fetch_personas(self) -> None: + persona_url = self.config["endpoints"]["personas"] + params = self.config.get("params", {}) + + query_params = { + "personaType": "Adult", + "in_setup": "false" + } + + headers = self._get_common_headers() + headers.update(self._get_skyott_headers({ + "X-SkyOTT-Language": "en-US", + "X-SkyOTT-Client-Version": params.get("client_version", "6.11.21-gsp"), + })) + headers["Accept"] = "application/json" + headers["content-type"] = "application/json" + + r = self.session.post(persona_url, headers=headers, params=query_params) + + if r.status_code != 200: + self.log.warning(f"Failed to get personas: {r.status_code}") + return + + persona_data = r.json() + self.all_personas = persona_data.get("personas", []) + + def _display_profiles(self) -> None: + if not self.all_personas: + self.log.info("No profiles available.") + return + + self.log.info("\n" + "=" * 60) + self.log.info("Available Profiles:") + self.log.info("=" * 60) + + for idx, persona in enumerate(self.all_personas, 1): + display_name = persona.get("displayName", "Unknown") + persona_id = persona.get("id", "N/A") + is_account_holder = persona.get("isAccountHolder", False) + + controls = persona.get("controls", {}) + maturity_rating = controls.get("maturityRatingLabel", controls.get("maturityRating", "N/A")) + + holder_badge = " [Account Holder]" if is_account_holder else "" + + self.log.info(f" [{idx}] {display_name}{holder_badge}") + self.log.info(f" ID: {persona_id}") + self.log.info(f" Maturity Rating: {maturity_rating}") + + self.log.info("=" * 60) + + def _select_profile(self) -> None: + if not self.all_personas: + return + + selected_persona = None + + if self.requested_profile: + for persona in self.all_personas: + if persona.get("displayName", "").lower() == self.requested_profile.lower(): + selected_persona = persona + break + + if not selected_persona: + for persona in self.all_personas: + if persona.get("id") == self.requested_profile: + selected_persona = persona + break + + if not selected_persona: + try: + idx = int(self.requested_profile) - 1 + if 0 <= idx < len(self.all_personas): + selected_persona = self.all_personas[idx] + except ValueError: + pass + + if not selected_persona: + self._display_profiles() + raise ValueError(f"Profile '{self.requested_profile}' not found.") + else: + selected_persona = self.all_personas[0] + + self.persona_id = selected_persona.get("id") + self.persona_data = selected_persona + + display_name = selected_persona.get("displayName", "Unknown") + self.log.info(f"Using profile: {display_name}") + + def get_titles(self) -> Titles_T: + if not self.content_uuid: + raise ValueError("No content UUID found.") + + headers = self._get_atom_headers() + atom_url = self.config["endpoints"]["atom_node"] + + if self.content_type == "movie": + slug_paths = [ + f"/movies/{self.content_slug}/{self.content_uuid}", + f"/movie/{self.content_slug}/{self.content_uuid}", + f"/film/{self.content_slug}/{self.content_uuid}", + ] + else: + slug_paths = [ + f"/tv/{self.content_slug}/{self.content_uuid}", + ] + + data = None + for slug_path in slug_paths: + params = {"slug": slug_path} + r = self.session.get(atom_url, headers=headers, params=params) + + if r.status_code == 200: + data = r.json() + break + + if not data: + alt_url = f"{atom_url}/provider_variant_id/{self.content_uuid}" + r = self.session.get(alt_url, headers=headers) + + if r.status_code == 200: + data = r.json() + else: + uuid_url = f"{atom_url}/uuid/{self.content_uuid}" + r = self.session.get(uuid_url, headers=headers) + + if r.status_code == 200: + data = r.json() + else: + raise RuntimeError(f"Failed to get content details. UUID: {self.content_uuid}") + + content_type = data.get("type", "") + + if "SERIES" in content_type: + return self._parse_series(data) + elif "MOVIE" in content_type or "FILM" in content_type: + return self._parse_movie(data) + else: + attrs = data.get("attributes", {}) + if attrs.get("availableSeasonCount") or attrs.get("availableEpisodeCount"): + return self._parse_series(data) + else: + return self._parse_movie(data) + + def _parse_movie(self, data: dict) -> Movies: + attrs = data.get("attributes", {}) + + title = attrs.get("title", attrs.get("titleMedium", "Unknown Title")) + year = attrs.get("year") + + formats = attrs.get("formats", {}) + content_id = None + + for fmt_key in ["HDSDR", "HD", "SD"]: + if fmt_key in formats: + fmt_data = formats[fmt_key] + if "contentId" in fmt_data: + content_id = fmt_data["contentId"] + break + + if not content_id: + for fmt_key, fmt_data in formats.items(): + if isinstance(fmt_data, dict) and "contentId" in fmt_data: + content_id = fmt_data["contentId"] + break + + provider_variant_id = attrs.get("providerVariantId", attrs.get("programmeUuid", self.content_uuid)) + + if not content_id: + content_id = data.get("id") + + original_lang = attrs.get("mainOriginalLanguage", attrs.get("productionLanguage", "en")) + if "-" in str(original_lang): + original_lang = original_lang.split("-")[0] + + return Movies([ + Movie( + id_=content_id, + service=self.__class__, + name=title, + year=year, + language=Language.get(original_lang), + data={ + "content_id": content_id, + "provider_variant_id": provider_variant_id, + "attrs": attrs + } + ) + ]) + + def _parse_series(self, data: dict) -> Series: + attrs = data.get("attributes", {}) + + series_title = attrs.get("title", attrs.get("titleMedium", "Unknown Series")) + series_uuid = attrs.get("seriesUuid", attrs.get("providerSeriesId", self.content_uuid)) + + original_lang = attrs.get("mainOriginalLanguage", attrs.get("productionLanguage", "en")) + if "-" in str(original_lang): + original_lang = original_lang.split("-")[0] + + episodes = self._fetch_all_episodes(series_uuid, series_title, original_lang) + + return Series(episodes) + + def _fetch_all_episodes(self, series_uuid: str, series_title: str, original_lang: str) -> list[Episode]: + episodes = [] + + atom_url = f"{self.config['endpoints']['atom_node']}/provider_series_id/{series_uuid}" + + params = { + "slug": f"/tv/{self.content_slug}/{series_uuid}", + "represent": "(items(items),recs[take=8],collections(items(items[take=8])),trailers)" + } + + headers = self._get_atom_headers() + r = self.session.get(atom_url, headers=headers, params=params) + + if r.status_code != 200: + self.log.warning(f"Failed to fetch series details: {r.status_code}") + return episodes + + data = r.json() + + relationships = data.get("relationships", {}) + items = relationships.get("items", {}).get("data", []) + + for season_data in items: + if season_data.get("type") != "CATALOGUE/SEASON": + continue + + season_attrs = season_data.get("attributes", {}) + season_number = season_attrs.get("seasonNumber", 1) + + season_relationships = season_data.get("relationships", {}) + season_items = season_relationships.get("items", {}).get("data", []) + + for ep_data in season_items: + if ep_data.get("type") != "ASSET/EPISODE": + continue + + ep_attrs = ep_data.get("attributes", {}) + ep_number = ep_attrs.get("episodeNumber", 1) + ep_title = ep_attrs.get("title", ep_attrs.get("episodeName", f"Episode {ep_number}")) + + formats = ep_attrs.get("formats", {}) + content_id = None + for fmt_key, fmt_data in formats.items(): + if isinstance(fmt_data, dict) and "contentId" in fmt_data: + content_id = fmt_data["contentId"] + break + + provider_variant_id = ep_attrs.get("providerVariantId", ep_attrs.get("programmeUuid")) + + episodes.append( + Episode( + id_=content_id or ep_data.get("id"), + service=self.__class__, + title=series_title, + season=season_number, + number=ep_number, + name=ep_title, + language=Language.get(original_lang), + data={ + "content_id": content_id, + "provider_variant_id": provider_variant_id, + "attrs": ep_attrs + } + ) + ) + + return episodes + + def get_tracks(self, title: Title_T) -> Tracks: + content_id = title.data.get("content_id") + provider_variant_id = title.data.get("provider_variant_id") + + if not content_id: + raise ValueError("No content_id found for this title") + + if self.content_type == "movie" and "_HDSDR" not in content_id and ":" in content_id: + content_id = content_id + "_HDSDR" + + playback_url = self.config["endpoints"]["playback"] + + skyott_headers = self._get_skyott_headers({ + "X-SkyOTT-PinOverride": "false", + "X-SkyOTT-UserToken": self.user_token, + "X-SkyOTT-COPPA": "false", + "X-SkyOTT-JourneyContext": "PRE_FETCH", + "X-SkyOTT-PrePlayout": "true", + "X-SkyOTT-Language": "en-US", + }) + + attrs = title.data.get("attrs", {}) + if attrs.get("isKidsContent", False): + skyott_headers["X-SkyOTT-COPPA"] = "true" + + persona_maturity = "9" + if self.persona_data: + controls = self.persona_data.get("controls", {}) + persona_maturity = controls.get("maturityRating", "9") + + payload = { + "device": { + "capabilities": [ + { + "protection": "WIDEVINE", + "container": "ISOBMFF", + "transport": "DASH", + "acodec": "AAC", + "vcodec": "H264" + }, + { + "protection": "NONE", + "container": "ISOBMFF", + "transport": "DASH", + "acodec": "AAC", + "vcodec": "H264" + } + ], + "maxVideoFormat": "HD", + "supportedColourSpaces": ["SDR"], + "model": "PC", + "hdcpEnabled": True + }, + "client": { + "thirdParties": ["FREEWHEEL", "MEDIATAILOR", "CONVIVA"], + "variantCapable": True + }, + "contentId": content_id, + "providerVariantId": provider_variant_id, + "parentalControlPin": None, + "personaParentalControlRating": persona_maturity + } + + payload_str = json.dumps(payload, separators=(',', ':')) + + sig_result = self.signer.calculate_signature( + method="POST", + url=playback_url, + headers=skyott_headers, + payload=payload_str.encode('utf-8') + ) + + headers = self._get_common_headers() + headers.update({ + "Accept": "application/vnd.playvod.v1+json", + "Content-Type": "application/vnd.playvod.v1+json", + }) + headers.update(skyott_headers) + headers.update(sig_result) + + r = self.session.post(playback_url, headers=headers, data=payload_str) + + if r.status_code != 200: + raise RuntimeError(f"Failed to get playback info: {r.status_code} - {r.text}") + + playback_data = r.json() + + asset = playback_data.get("asset", {}) + endpoints = asset.get("endpoints", []) + + manifest_url = None + for endpoint in endpoints: + if endpoint.get("cdn") == "CLOUDFRONT": + manifest_url = endpoint.get("url") + break + + if not manifest_url and endpoints: + manifest_url = endpoints[0].get("url") + + if not manifest_url: + raise ValueError("No manifest URL found in playback response") + + protection = playback_data.get("protection", {}) + self.drm_license_url = protection.get("licenceAcquisitionUrl") + self.license_token = protection.get("licenceToken") + + dash = DASH.from_url(manifest_url, session=self.session) + tracks = dash.to_tracks(language=title.language) + + # Remove default subtitle tracks and add properly processed ones + for track in list(tracks.subtitles): + tracks.subtitles.remove(track) + + subtitles = self._process_subtitles(dash, str(title.language)) + tracks.add(subtitles) + + return tracks + + @staticmethod + def _process_subtitles(dash: DASH, language: str) -> list[Subtitle]: + subtitle_groups = defaultdict(list) + manifest = dash.manifest + # Define namespace map for DASH MPD + nsmap = { + 'mpd': 'urn:mpeg:dash:schema:mpd:2011', + 'cenc': 'urn:mpeg:cenc:2013', + } + + # Try to find periods with and without namespace + periods = manifest.findall("Period", namespaces=None) + if not periods: + periods = manifest.findall("{urn:mpeg:dash:schema:mpd:2011}Period") + if not periods: + # Try xpath with namespace + periods = manifest.xpath("//mpd:Period", namespaces={'mpd': 'urn:mpeg:dash:schema:mpd:2011'}) + if not periods: + # Last resort: find all Period elements regardless of namespace + periods = manifest.iter() + periods = [el for el in manifest.iter() if el.tag.endswith('Period')] + + for period in periods: + # Find AdaptationSets - try multiple methods + adapt_sets = period.findall("AdaptationSet", namespaces=None) + if not adapt_sets: + adapt_sets = period.findall("{urn:mpeg:dash:schema:mpd:2011}AdaptationSet") + if not adapt_sets: + adapt_sets = [el for el in period.iter() if el.tag.endswith('AdaptationSet')] + + for adapt_set in adapt_sets: + content_type = adapt_set.get("contentType", "") + mime_type = adapt_set.get("mimeType", "") + lang = adapt_set.get("lang") + + # Check if this is a text/subtitle adaptation set + is_text = ( + content_type == "text" or + "text/vtt" in mime_type or + "application/ttml" in mime_type or + adapt_set.get("group") == "3" # Based on your MPD, group 3 is subtitles + ) + + if not is_text or not lang: + continue + + # Find Role element + role = adapt_set.find("Role", namespaces=None) + if role is None: + role = adapt_set.find("{urn:mpeg:dash:schema:mpd:2011}Role") + if role is None: + for el in adapt_set.iter(): + if el.tag.endswith('Role'): + role = el + break + + # Find Label element + label = adapt_set.find("Label", namespaces=None) + if label is None: + label = adapt_set.find("{urn:mpeg:dash:schema:mpd:2011}Label") + if label is None: + for el in adapt_set.iter(): + if el.tag.endswith('Label'): + label = el + break + + # Also check for Label attribute (some MPDs use it as attribute) + label_text = "" + if label is not None and label.text: + label_text = label.text + elif adapt_set.get("Label"): + label_text = adapt_set.get("Label") + + role_value = role.get("value") if role is not None else "subtitle" + + key = (lang, role_value, label_text) + subtitle_groups[key].append((period, adapt_set)) + + final_tracks = [] + for (lang, role_value, label_text), adapt_set_group in subtitle_groups.items(): + first_period, first_adapt = adapt_set_group[0] + + # Find Representation + rep = first_adapt.find("Representation", namespaces=None) + if rep is None: + rep = first_adapt.find("{urn:mpeg:dash:schema:mpd:2011}Representation") + if rep is None: + for el in first_adapt.iter(): + if el.tag.endswith('Representation'): + rep = el + break + + if rep is None: + continue + + s_elements_with_context = [] + for _, adapt_set in adapt_set_group: + # Find Representation in this adapt_set + current_rep = adapt_set.find("Representation", namespaces=None) + if current_rep is None: + current_rep = adapt_set.find("{urn:mpeg:dash:schema:mpd:2011}Representation") + if current_rep is None: + for el in adapt_set.iter(): + if el.tag.endswith('Representation'): + current_rep = el + break + + if current_rep is None: + continue + + # Find SegmentTemplate - check both Representation and AdaptationSet level + template = None + for parent in [current_rep, adapt_set]: + template = parent.find("SegmentTemplate", namespaces=None) + if template is None: + template = parent.find("{urn:mpeg:dash:schema:mpd:2011}SegmentTemplate") + if template is None: + for el in parent.iter(): + if el.tag.endswith('SegmentTemplate'): + template = el + break + if template is not None: + break + + if template is None: + continue + + # Find SegmentTimeline + timeline = template.find("SegmentTimeline", namespaces=None) + if timeline is None: + timeline = template.find("{urn:mpeg:dash:schema:mpd:2011}SegmentTimeline") + if timeline is None: + for el in template.iter(): + if el.tag.endswith('SegmentTimeline'): + timeline = el + break + + if timeline is not None: + start_num = int(template.get("startNumber", 0)) + + # Find S elements + s_elements = timeline.findall("S", namespaces=None) + if not s_elements: + s_elements = timeline.findall("{urn:mpeg:dash:schema:mpd:2011}S") + if not s_elements: + s_elements = [el for el in timeline.iter() if el.tag.endswith('}S') or el.tag == 'S'] + + s_elements_with_context.extend((start_num, s_elem) for s_elem in s_elements) + + if not s_elements_with_context: + # No timeline found, but we might still have a valid subtitle track + # Continue with empty timeline handling + pass + + s_elements_with_context.sort(key=lambda x: x[0]) + + combined_adapt = deepcopy(first_adapt) + + # Find combined_rep + combined_rep = combined_adapt.find("Representation", namespaces=None) + if combined_rep is None: + combined_rep = combined_adapt.find("{urn:mpeg:dash:schema:mpd:2011}Representation") + if combined_rep is None: + for el in combined_adapt.iter(): + if el.tag.endswith('Representation'): + combined_rep = el + break + + if combined_rep is None: + continue + + # Find or create SegmentTemplate + seg_template = None + for parent in [combined_rep, combined_adapt]: + seg_template = parent.find("SegmentTemplate", namespaces=None) + if seg_template is None: + seg_template = parent.find("{urn:mpeg:dash:schema:mpd:2011}SegmentTemplate") + if seg_template is None: + for el in parent.iter(): + if el.tag.endswith('SegmentTemplate'): + seg_template = el + break + if seg_template is not None: + break + + if seg_template is None: + # Try to find at AdaptationSet level and move to Representation + template_at_adapt = None + for el in combined_adapt.iter(): + if el.tag.endswith('SegmentTemplate'): + template_at_adapt = el + break + + if template_at_adapt is not None: + seg_template = deepcopy(template_at_adapt) + combined_rep.append(seg_template) + try: + combined_adapt.remove(template_at_adapt) + except ValueError: + pass + else: + continue + + # Remove existing SegmentTimeline if present + existing_timeline = None + for el in seg_template.iter(): + if el.tag.endswith('SegmentTimeline'): + existing_timeline = el + break + + if existing_timeline is not None: + try: + seg_template.remove(existing_timeline) + except ValueError: + pass + + # Create new timeline with collected S elements + if s_elements_with_context: + new_timeline = etree.Element("SegmentTimeline") + new_timeline.extend(deepcopy(s) for _, s in s_elements_with_context) + seg_template.append(new_timeline) + + seg_template.set("startNumber", "0") + if "endNumber" in seg_template.attrib: + del seg_template.attrib["endNumber"] + + track_id = hex(crc32(f"sub-{lang}-{role_value}-{label_text}".encode()) & 0xFFFFFFFF)[2:] + lang_obj = Language.get(lang) + track_name = "Original" if (language and is_close_match(lang_obj, [language])) else lang_obj.display_name() + + # Determine codec from mimeType + mime_type = first_adapt.get("mimeType", "text/vtt") + if "ttml" in mime_type.lower(): + codec = Subtitle.Codec.TimedTextMarkupLang + else: + codec = Subtitle.Codec.WebVTT + + final_tracks.append( + Subtitle( + id_=track_id, + url=dash.url, + codec=codec, + language=lang_obj, + is_original_lang=bool(language and is_close_match(lang_obj, [language])), + descriptor=Track.Descriptor.DASH, + sdh="sdh" in label_text.lower() or role_value == "caption", + forced="forced" in label_text.lower() or "forced" in role_value.lower(), + name=track_name, + data={ + "dash": { + "manifest": manifest, + "period": first_period, + "adaptation_set": combined_adapt, + "representation": combined_rep, + } + }, + ) + ) + + return final_tracks + + def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes: + if not self.drm_license_url: + raise ValueError("DRM license URL not available.") + + sig_result = self.signer.calculate_signature( + method="POST", + url=self.drm_license_url, + headers={}, + payload=challenge + ) + + headers = self._get_common_headers() + headers.update({ + "Content-Type": "text/plain", + }) + headers.update(sig_result) + + r = self.session.post( + self.drm_license_url, + data=challenge, + headers=headers + ) + + if r.status_code != 200: + raise RuntimeError(f"License request failed: {r.status_code} - {r.text}") + + return r.content + + # def search(self) -> Generator[SearchResult, None, None]: + # if not self.search_term: + # return + + # search_url = self.config["endpoints"]["atom_search"] + + # params = { + # "q": self.search_term, + # "take": 20, + # "skip": 0 + # } + + # headers = self._get_atom_headers() + # r = self.session.get(search_url, headers=headers, params=params) + + # if r.status_code != 200: + # return + + # data = r.json() + # results = data.get("results", data.get("data", [])) + + # for result in results: + # attrs = result.get("attributes", {}) + # content_type = result.get("type", "").lower() + + # title = attrs.get("title", attrs.get("titleMedium", "Unknown")) + # year = attrs.get("year") + # slug = attrs.get("slug", "") + + # if "series" in content_type: + # type_str = "series" + # elif "movie" in content_type or "film" in content_type: + # type_str = "movie" + # else: + # type_str = "unknown" + + # yield SearchResult( + # id_=result.get("id"), + # title=title, + # year=year, + # type_=type_str, + # url=f"https://www.skyshowtime.com{slug}" if slug else None + # ) + + def get_chapters(self, title: Title_T) -> list[Chapter]: + return [] \ No newline at end of file diff --git a/SKST/config.yaml b/SKST/config.yaml new file mode 100644 index 0000000..a202614 --- /dev/null +++ b/SKST/config.yaml @@ -0,0 +1,42 @@ +endpoints: + signin: "https://rango.id.skyshowtime.com/signin/service/international" + tokens: "https://ovp.skyshowtime.com/auth/tokens" + personas: "https://web.clients.skyshowtime.com/bff/personas/v2" + atom_node: "https://atom.skyshowtime.com/adapter-calypso/v3/query/node" + atom_search: "https://atom.skyshowtime.com/adapter-calypso/v3/query/search" + playback: "https://ovp.skyshowtime.com/video/playouts/vod" + +params: + provider: "SKYSHOWTIME" + proposition: "SKYSHOWTIME" + platform: "PC" + device: "COMPUTER" + client_version: "6.11.21-gsp" + +signature: + app_id: "SHOWMAX-ANDROID-v1" + key: "kC2UFjsH6PHrc5ENGfyTgC5bPA7aBVZ4aJAyqBBP" + version: "1.0" + +territories: + - NL + - PL + - ES + - PT + - SE + - NO + - DK + - FI + - CZ + - SK + - HU + - RO + - BG + - HR + - SI + - BA + - RS + - ME + - MK + - AL + - XK \ No newline at end of file