From d6798f6eed4c8ac8b0bf516de1ee4d3dfa00b5f3 Mon Sep 17 00:00:00 2001 From: adef Date: Sun, 11 Jan 2026 11:15:23 +0000 Subject: [PATCH] Delete SKST/__init__.py --- SKST/__init__.py | 1048 ---------------------------------------------- 1 file changed, 1048 deletions(-) delete mode 100644 SKST/__init__.py diff --git a/SKST/__init__.py b/SKST/__init__.py deleted file mode 100644 index 4849c36..0000000 --- a/SKST/__init__.py +++ /dev/null @@ -1,1048 +0,0 @@ -from __future__ import annotations - -import base64 -import hashlib -import hmac -import json -import re -import sys -import time -import uuid -from collections import defaultdict -from copy import deepcopy -from http.cookiejar import CookieJar -from typing import Any, Optional, Generator -from urllib.parse import urlparse, urlencode -from zlib import crc32 -from lxml import etree - -import click -from langcodes import Language - -from unshackle.core.search_result import SearchResult -from unshackle.core.constants import AnyTrack -from unshackle.core.credential import Credential -from unshackle.core.manifests import DASH -from unshackle.core.service import Service -from unshackle.core.titles import Movie, Movies, Series, Episode, Title_T, Titles_T -from unshackle.core.tracks import Chapter, Tracks, Subtitle, Track -from unshackle.core.utilities import is_close_match - - -class SkySignature: - """SkyShowtime API signature generator.""" - - def __init__(self, app_id: str, signature_key: str, version: str = "1.0"): - self.app_id = app_id - self.signature_key = signature_key.encode('utf-8') - self.sig_version = version - - def calculate_signature(self, method: str, url: str, headers: dict, - payload: bytes = b'', timestamp: Optional[int] = None) -> dict: - if timestamp is None: - timestamp = int(time.time()) - - if url.startswith('http'): - parsed_url = urlparse(url) - path = parsed_url.path - if parsed_url.query: - path += '?' + parsed_url.query - else: - path = url - - text_headers = '' - for key in sorted(headers.keys()): - if key.lower().startswith('x-skyott') or key.lower().startswith('x-showmax'): - text_headers += key.lower() + ': ' + str(headers[key]) + '\n' - - headers_md5 = hashlib.md5(text_headers.encode()).hexdigest() - - if isinstance(payload, str): - payload = payload.encode('utf-8') - payload_md5 = hashlib.md5(payload).hexdigest() - - to_hash = ( - f'{method}\n' - f'{path}\n' - f'\n' - f'{self.app_id}\n' - f'{self.sig_version}\n' - f'{headers_md5}\n' - f'{timestamp}\n' - f'{payload_md5}\n' - ) - - hashed = hmac.new(self.signature_key, to_hash.encode('utf8'), hashlib.sha1).digest() - signature = base64.b64encode(hashed).decode('utf8') - - return { - 'x-sky-signature': f'SkyOTT client="{self.app_id}",signature="{signature}",timestamp="{timestamp}",version="{self.sig_version}"' - } - - -class SKST(Service): - """ - \b - Service code for SkyShowtime streaming service (https://skyshowtime.com). - - \b - Author: FairTrade - Authorization: Cookies or Credentials - Robustness: - Widevine: - L3: 1080p - - \b - Tips: - - Use -t/--territory to specify your region (e.g., -t ES for Spain) - - Use -p/--profile to select a specific profile by name or ID - - Use --list-profiles to see all available profiles - """ - - ALIASES = ("skyshowtime", "sst") - TITLE_RE = r"^(?:https?://(?:www\.)?skyshowtime\.com)?/(?:[a-z]{2}/)?watch/asset/(?Ptv|movies?)/(?P[a-z0-9-]+)/(?P[a-f0-9-]+).*$" - - @staticmethod - @click.command(name="SKST", short_help="https://skyshowtime.com", help=__doc__) - @click.argument("title", type=str) - @click.option("-t", "--territory", type=str, default="PL", help="Territory code (e.g., PL, NL, ES)") - @click.option("-p", "--profile", type=str, default=None, help="Profile name or ID to use") - @click.option("--list-profiles", is_flag=True, default=False, help="List all available profiles and exit") - @click.pass_context - def cli(ctx, **kwargs): - return SKST(ctx, **kwargs) - - def __init__(self, ctx, title: str, territory: str = "PL", profile: Optional[str] = None, list_profiles: bool = False): - super().__init__(ctx) - - self.territory = territory.upper() - self.language = self._get_language_for_territory(territory) - self.requested_profile = profile - self.list_profiles_only = list_profiles - - # Initialize signature generator from config - sig_config = self.config.get("signature", {}) - self.signer = SkySignature( - app_id=sig_config.get("app_id", "SHOWMAX-ANDROID-v1"), - signature_key=sig_config.get("key", ""), - version=sig_config.get("version", "1.0") - ) - - m = re.match(self.TITLE_RE, title, re.IGNORECASE) - if not m: - self.search_term = title - self.title_url = None - self.content_slug = None - self.content_uuid = None - self.content_type = None - return - - content_type = m.group("type").lower() - self.content_type = "movie" if content_type.startswith("movie") else "tv" - self.content_slug = m.group("slug") - self.content_uuid = m.group("uuid") - self.title_url = title - self.search_term = None - - self.user_token: Optional[str] = None - self.device_id: Optional[str] = None - self.persona_id: Optional[str] = None - self.persona_data: Optional[dict] = None - self.all_personas: list[dict] = [] - - self.drm_license_url: Optional[str] = None - self.license_token: Optional[str] = None - - self.cdm = ctx.obj.cdm - - def _get_language_for_territory(self, territory: str) -> str: - territory_languages = { - "PL": "pl-PL", "NL": "nl-NL", "ES": "es-ES", "PT": "pt-PT", - "SE": "sv-SE", "NO": "nb-NO", "DK": "da-DK", "FI": "fi-FI", - "CZ": "cs-CZ", "SK": "sk-SK", "HU": "hu-HU", "RO": "ro-RO", - "BG": "bg-BG", "HR": "hr-HR", "SI": "sl-SI", "BA": "bs-BA", - "RS": "sr-RS", "ME": "sr-ME", "MK": "mk-MK", "AL": "sq-AL", - } - return territory_languages.get(territory.upper(), "en-US") - - def _get_common_headers(self) -> dict: - return { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:138.0) Gecko/20100101 Firefox/138.0", - "Accept": "*/*", - "Accept-Language": "en-US,en;q=0.5", - "Origin": "https://www.skyshowtime.com", - "Referer": "https://www.skyshowtime.com/", - "Sec-Fetch-Dest": "empty", - "Sec-Fetch-Mode": "cors", - "Sec-Fetch-Site": "same-site", - } - - def _get_skyott_headers(self, extra: Optional[dict] = None) -> dict: - params = self.config.get("params", {}) - headers = { - "X-SkyOTT-Provider": params.get("provider", "SKYSHOWTIME"), - "X-SkyOTT-Territory": self.territory, - "X-SkyOTT-Proposition": params.get("proposition", "SKYSHOWTIME"), - "X-SkyOTT-Platform": params.get("platform", "PC"), - "X-SkyOTT-Device": params.get("device", "COMPUTER"), - "X-SkyOTT-ActiveTerritory": self.territory, - } - if extra: - headers.update(extra) - return headers - - def _get_atom_headers(self) -> dict: - params = self.config.get("params", {}) - headers = self._get_common_headers() - headers.update(self._get_skyott_headers({ - "X-SkyOTT-Language": "en-US", - "X-SkyOTT-Client-Version": params.get("client_version", "6.11.21-gsp"), - })) - return headers - - def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None: - super().authenticate(cookies, credential) - - self.device_id = self._get_cookie_value(cookies, "deviceid") or str(uuid.uuid4()) - - if cookies: - sky_umv = self._get_cookie_value(cookies, "skyUMV") - if sky_umv: - self.user_token = sky_umv - self.log.info("Using existing session from cookies.") - else: - raise PermissionError("skyUMV cookie not found. Please provide fresh cookies.") - elif credential: - self._authenticate_with_credentials(credential) - self._get_user_token() - else: - raise PermissionError("SKST requires either cookies or credentials for authentication.") - - self._fetch_personas() - - if self.list_profiles_only: - self._display_profiles() - sys.exit(0) - - self._select_profile() - - self.log.info("SkyShowtime authentication successful.") - - def _get_cookie_value(self, cookies: Optional[CookieJar], name: str) -> Optional[str]: - if not cookies: - return None - for cookie in cookies: - if cookie.name == name: - return cookie.value - return None - - def _authenticate_with_credentials(self, credential: Credential) -> None: - self.log.info(f"Logging in as {credential.username}...") - - signin_url = self.config["endpoints"]["signin"] - - headers = self._get_common_headers() - headers.update({ - "Accept": "application/vnd.siren+json", - "Content-Type": "application/x-www-form-urlencoded", - }) - headers.update(self._get_skyott_headers()) - - r = self.session.post(signin_url, headers=headers, data=urlencode({ - "userIdentifier": credential.username, - "password": credential.password, - "rememberMe": "true", - "isWeb": "true" - })) - - if r.status_code != 200: - raise PermissionError(f"Sign-in failed: {r.status_code} - {r.text}") - - signin_response = r.json() - - if signin_response.get("class") != ["success"]: - raise PermissionError(f"Sign-in failed: {signin_response}") - - if "properties" in signin_response and "data" in signin_response["properties"]: - self.device_id = signin_response["properties"]["data"].get("deviceid", self.device_id) - - self.log.debug(f"Sign-in successful. Device ID: {self.device_id}") - - def _get_user_token(self) -> None: - token_url = self.config["endpoints"]["tokens"] - params = self.config.get("params", {}) - - skyott_headers = self._get_skyott_headers({ - "X-SkyOTT-Language": self.language, - }) - - payload = { - "auth": { - "authScheme": "MESSO", - "authIssuer": "NOWTV", - "provider": params.get("provider", "SKYSHOWTIME"), - "providerTerritory": self.territory, - "proposition": params.get("proposition", "SKYSHOWTIME") - }, - "device": { - "type": params.get("device", "COMPUTER"), - "platform": params.get("platform", "PC"), - "id": self.device_id[:20] if self.device_id else str(uuid.uuid4())[:20], - "drmDeviceId": "UNKNOWN" - } - } - - payload_str = json.dumps(payload, separators=(',', ':')) - - sig_result = self.signer.calculate_signature( - method="POST", - url=token_url, - headers=skyott_headers, - payload=payload_str.encode('utf-8') - ) - - headers = self._get_common_headers() - headers.update({ - "Accept": "application/vnd.tokens.v1+json", - "Content-Type": "application/vnd.tokens.v1+json", - }) - headers.update(skyott_headers) - headers.update(sig_result) - - r = self.session.post(token_url, headers=headers, data=payload_str) - - if r.status_code != 200: - self.log.warning(f"Token request failed: {r.status_code}") - return - - token_data = r.json() - self.user_token = token_data.get("userToken") - - if self.user_token: - self.log.debug("User token obtained successfully.") - - def _fetch_personas(self) -> None: - persona_url = self.config["endpoints"]["personas"] - params = self.config.get("params", {}) - - query_params = { - "personaType": "Adult", - "in_setup": "false" - } - - headers = self._get_common_headers() - headers.update(self._get_skyott_headers({ - "X-SkyOTT-Language": "en-US", - "X-SkyOTT-Client-Version": params.get("client_version", "6.11.21-gsp"), - })) - headers["Accept"] = "application/json" - headers["content-type"] = "application/json" - - r = self.session.post(persona_url, headers=headers, params=query_params) - - if r.status_code != 200: - self.log.warning(f"Failed to get personas: {r.status_code}") - return - - persona_data = r.json() - self.all_personas = persona_data.get("personas", []) - - def _display_profiles(self) -> None: - if not self.all_personas: - self.log.info("No profiles available.") - return - - self.log.info("\n" + "=" * 60) - self.log.info("Available Profiles:") - self.log.info("=" * 60) - - for idx, persona in enumerate(self.all_personas, 1): - display_name = persona.get("displayName", "Unknown") - persona_id = persona.get("id", "N/A") - is_account_holder = persona.get("isAccountHolder", False) - - controls = persona.get("controls", {}) - maturity_rating = controls.get("maturityRatingLabel", controls.get("maturityRating", "N/A")) - - holder_badge = " [Account Holder]" if is_account_holder else "" - - self.log.info(f" [{idx}] {display_name}{holder_badge}") - self.log.info(f" ID: {persona_id}") - self.log.info(f" Maturity Rating: {maturity_rating}") - - self.log.info("=" * 60) - - def _select_profile(self) -> None: - if not self.all_personas: - return - - selected_persona = None - - if self.requested_profile: - for persona in self.all_personas: - if persona.get("displayName", "").lower() == self.requested_profile.lower(): - selected_persona = persona - break - - if not selected_persona: - for persona in self.all_personas: - if persona.get("id") == self.requested_profile: - selected_persona = persona - break - - if not selected_persona: - try: - idx = int(self.requested_profile) - 1 - if 0 <= idx < len(self.all_personas): - selected_persona = self.all_personas[idx] - except ValueError: - pass - - if not selected_persona: - self._display_profiles() - raise ValueError(f"Profile '{self.requested_profile}' not found.") - else: - selected_persona = self.all_personas[0] - - self.persona_id = selected_persona.get("id") - self.persona_data = selected_persona - - display_name = selected_persona.get("displayName", "Unknown") - self.log.info(f"Using profile: {display_name}") - - def get_titles(self) -> Titles_T: - if not self.content_uuid: - raise ValueError("No content UUID found.") - - headers = self._get_atom_headers() - atom_url = self.config["endpoints"]["atom_node"] - - if self.content_type == "movie": - slug_paths = [ - f"/movies/{self.content_slug}/{self.content_uuid}", - f"/movie/{self.content_slug}/{self.content_uuid}", - f"/film/{self.content_slug}/{self.content_uuid}", - ] - else: - slug_paths = [ - f"/tv/{self.content_slug}/{self.content_uuid}", - ] - - data = None - for slug_path in slug_paths: - params = {"slug": slug_path} - r = self.session.get(atom_url, headers=headers, params=params) - - if r.status_code == 200: - data = r.json() - break - - if not data: - alt_url = f"{atom_url}/provider_variant_id/{self.content_uuid}" - r = self.session.get(alt_url, headers=headers) - - if r.status_code == 200: - data = r.json() - else: - uuid_url = f"{atom_url}/uuid/{self.content_uuid}" - r = self.session.get(uuid_url, headers=headers) - - if r.status_code == 200: - data = r.json() - else: - raise RuntimeError(f"Failed to get content details. UUID: {self.content_uuid}") - - content_type = data.get("type", "") - - if "SERIES" in content_type: - return self._parse_series(data) - elif "MOVIE" in content_type or "FILM" in content_type: - return self._parse_movie(data) - else: - attrs = data.get("attributes", {}) - if attrs.get("availableSeasonCount") or attrs.get("availableEpisodeCount"): - return self._parse_series(data) - else: - return self._parse_movie(data) - - def _parse_movie(self, data: dict) -> Movies: - attrs = data.get("attributes", {}) - - title = attrs.get("title", attrs.get("titleMedium", "Unknown Title")) - year = attrs.get("year") - - formats = attrs.get("formats", {}) - content_id = None - - for fmt_key in ["HDSDR", "HD", "SD"]: - if fmt_key in formats: - fmt_data = formats[fmt_key] - if "contentId" in fmt_data: - content_id = fmt_data["contentId"] - break - - if not content_id: - for fmt_key, fmt_data in formats.items(): - if isinstance(fmt_data, dict) and "contentId" in fmt_data: - content_id = fmt_data["contentId"] - break - - provider_variant_id = attrs.get("providerVariantId", attrs.get("programmeUuid", self.content_uuid)) - - if not content_id: - content_id = data.get("id") - - original_lang = attrs.get("mainOriginalLanguage", attrs.get("productionLanguage", "en")) - if "-" in str(original_lang): - original_lang = original_lang.split("-")[0] - - return Movies([ - Movie( - id_=content_id, - service=self.__class__, - name=title, - year=year, - language=Language.get(original_lang), - data={ - "content_id": content_id, - "provider_variant_id": provider_variant_id, - "attrs": attrs - } - ) - ]) - - def _parse_series(self, data: dict) -> Series: - attrs = data.get("attributes", {}) - - series_title = attrs.get("title", attrs.get("titleMedium", "Unknown Series")) - series_uuid = attrs.get("seriesUuid", attrs.get("providerSeriesId", self.content_uuid)) - - original_lang = attrs.get("mainOriginalLanguage", attrs.get("productionLanguage", "en")) - if "-" in str(original_lang): - original_lang = original_lang.split("-")[0] - - episodes = self._fetch_all_episodes(series_uuid, series_title, original_lang) - - return Series(episodes) - - def _fetch_all_episodes(self, series_uuid: str, series_title: str, original_lang: str) -> list[Episode]: - episodes = [] - - atom_url = f"{self.config['endpoints']['atom_node']}/provider_series_id/{series_uuid}" - - params = { - "slug": f"/tv/{self.content_slug}/{series_uuid}", - "represent": "(items(items),recs[take=8],collections(items(items[take=8])),trailers)" - } - - headers = self._get_atom_headers() - r = self.session.get(atom_url, headers=headers, params=params) - - if r.status_code != 200: - self.log.warning(f"Failed to fetch series details: {r.status_code}") - return episodes - - data = r.json() - - relationships = data.get("relationships", {}) - items = relationships.get("items", {}).get("data", []) - - for season_data in items: - if season_data.get("type") != "CATALOGUE/SEASON": - continue - - season_attrs = season_data.get("attributes", {}) - season_number = season_attrs.get("seasonNumber", 1) - - season_relationships = season_data.get("relationships", {}) - season_items = season_relationships.get("items", {}).get("data", []) - - for ep_data in season_items: - if ep_data.get("type") != "ASSET/EPISODE": - continue - - ep_attrs = ep_data.get("attributes", {}) - ep_number = ep_attrs.get("episodeNumber", 1) - ep_title = ep_attrs.get("title", ep_attrs.get("episodeName", f"Episode {ep_number}")) - - formats = ep_attrs.get("formats", {}) - content_id = None - for fmt_key, fmt_data in formats.items(): - if isinstance(fmt_data, dict) and "contentId" in fmt_data: - content_id = fmt_data["contentId"] - break - - provider_variant_id = ep_attrs.get("providerVariantId", ep_attrs.get("programmeUuid")) - - episodes.append( - Episode( - id_=content_id or ep_data.get("id"), - service=self.__class__, - title=series_title, - season=season_number, - number=ep_number, - name=ep_title, - language=Language.get(original_lang), - data={ - "content_id": content_id, - "provider_variant_id": provider_variant_id, - "attrs": ep_attrs - } - ) - ) - - return episodes - - def get_tracks(self, title: Title_T) -> Tracks: - content_id = title.data.get("content_id") - provider_variant_id = title.data.get("provider_variant_id") - - if not content_id: - raise ValueError("No content_id found for this title") - - if self.content_type == "movie" and "_HDSDR" not in content_id and ":" in content_id: - content_id = content_id + "_HDSDR" - - playback_url = self.config["endpoints"]["playback"] - - skyott_headers = self._get_skyott_headers({ - "X-SkyOTT-PinOverride": "false", - "X-SkyOTT-UserToken": self.user_token, - "X-SkyOTT-COPPA": "false", - "X-SkyOTT-JourneyContext": "PRE_FETCH", - "X-SkyOTT-PrePlayout": "true", - "X-SkyOTT-Language": "en-US", - }) - - attrs = title.data.get("attrs", {}) - if attrs.get("isKidsContent", False): - skyott_headers["X-SkyOTT-COPPA"] = "true" - - persona_maturity = "9" - if self.persona_data: - controls = self.persona_data.get("controls", {}) - persona_maturity = controls.get("maturityRating", "9") - - payload = { - "device": { - "capabilities": [ - { - "protection": "WIDEVINE", - "container": "ISOBMFF", - "transport": "DASH", - "acodec": "AAC", - "vcodec": "H264" - }, - { - "protection": "NONE", - "container": "ISOBMFF", - "transport": "DASH", - "acodec": "AAC", - "vcodec": "H264" - } - ], - "maxVideoFormat": "HD", - "supportedColourSpaces": ["SDR"], - "model": "PC", - "hdcpEnabled": True - }, - "client": { - "thirdParties": ["FREEWHEEL", "MEDIATAILOR", "CONVIVA"], - "variantCapable": True - }, - "contentId": content_id, - "providerVariantId": provider_variant_id, - "parentalControlPin": None, - "personaParentalControlRating": persona_maturity - } - - payload_str = json.dumps(payload, separators=(',', ':')) - - sig_result = self.signer.calculate_signature( - method="POST", - url=playback_url, - headers=skyott_headers, - payload=payload_str.encode('utf-8') - ) - - headers = self._get_common_headers() - headers.update({ - "Accept": "application/vnd.playvod.v1+json", - "Content-Type": "application/vnd.playvod.v1+json", - }) - headers.update(skyott_headers) - headers.update(sig_result) - - r = self.session.post(playback_url, headers=headers, data=payload_str) - - if r.status_code != 200: - raise RuntimeError(f"Failed to get playback info: {r.status_code} - {r.text}") - - playback_data = r.json() - - asset = playback_data.get("asset", {}) - endpoints = asset.get("endpoints", []) - - manifest_url = None - for endpoint in endpoints: - if endpoint.get("cdn") == "CLOUDFRONT": - manifest_url = endpoint.get("url") - break - - if not manifest_url and endpoints: - manifest_url = endpoints[0].get("url") - - if not manifest_url: - raise ValueError("No manifest URL found in playback response") - - protection = playback_data.get("protection", {}) - self.drm_license_url = protection.get("licenceAcquisitionUrl") - self.license_token = protection.get("licenceToken") - - dash = DASH.from_url(manifest_url, session=self.session) - tracks = dash.to_tracks(language=title.language) - - # Remove default subtitle tracks and add properly processed ones - for track in list(tracks.subtitles): - tracks.subtitles.remove(track) - - subtitles = self._process_subtitles(dash, str(title.language)) - tracks.add(subtitles) - - return tracks - - @staticmethod - def _process_subtitles(dash: DASH, language: str) -> list[Subtitle]: - subtitle_groups = defaultdict(list) - manifest = dash.manifest - # Define namespace map for DASH MPD - nsmap = { - 'mpd': 'urn:mpeg:dash:schema:mpd:2011', - 'cenc': 'urn:mpeg:cenc:2013', - } - - # Try to find periods with and without namespace - periods = manifest.findall("Period", namespaces=None) - if not periods: - periods = manifest.findall("{urn:mpeg:dash:schema:mpd:2011}Period") - if not periods: - # Try xpath with namespace - periods = manifest.xpath("//mpd:Period", namespaces={'mpd': 'urn:mpeg:dash:schema:mpd:2011'}) - if not periods: - # Last resort: find all Period elements regardless of namespace - periods = manifest.iter() - periods = [el for el in manifest.iter() if el.tag.endswith('Period')] - - for period in periods: - # Find AdaptationSets - try multiple methods - adapt_sets = period.findall("AdaptationSet", namespaces=None) - if not adapt_sets: - adapt_sets = period.findall("{urn:mpeg:dash:schema:mpd:2011}AdaptationSet") - if not adapt_sets: - adapt_sets = [el for el in period.iter() if el.tag.endswith('AdaptationSet')] - - for adapt_set in adapt_sets: - content_type = adapt_set.get("contentType", "") - mime_type = adapt_set.get("mimeType", "") - lang = adapt_set.get("lang") - - # Check if this is a text/subtitle adaptation set - is_text = ( - content_type == "text" or - "text/vtt" in mime_type or - "application/ttml" in mime_type or - adapt_set.get("group") == "3" # Based on your MPD, group 3 is subtitles - ) - - if not is_text or not lang: - continue - - # Find Role element - role = adapt_set.find("Role", namespaces=None) - if role is None: - role = adapt_set.find("{urn:mpeg:dash:schema:mpd:2011}Role") - if role is None: - for el in adapt_set.iter(): - if el.tag.endswith('Role'): - role = el - break - - # Find Label element - label = adapt_set.find("Label", namespaces=None) - if label is None: - label = adapt_set.find("{urn:mpeg:dash:schema:mpd:2011}Label") - if label is None: - for el in adapt_set.iter(): - if el.tag.endswith('Label'): - label = el - break - - # Also check for Label attribute (some MPDs use it as attribute) - label_text = "" - if label is not None and label.text: - label_text = label.text - elif adapt_set.get("Label"): - label_text = adapt_set.get("Label") - - role_value = role.get("value") if role is not None else "subtitle" - - key = (lang, role_value, label_text) - subtitle_groups[key].append((period, adapt_set)) - - final_tracks = [] - for (lang, role_value, label_text), adapt_set_group in subtitle_groups.items(): - first_period, first_adapt = adapt_set_group[0] - - # Find Representation - rep = first_adapt.find("Representation", namespaces=None) - if rep is None: - rep = first_adapt.find("{urn:mpeg:dash:schema:mpd:2011}Representation") - if rep is None: - for el in first_adapt.iter(): - if el.tag.endswith('Representation'): - rep = el - break - - if rep is None: - continue - - s_elements_with_context = [] - for _, adapt_set in adapt_set_group: - # Find Representation in this adapt_set - current_rep = adapt_set.find("Representation", namespaces=None) - if current_rep is None: - current_rep = adapt_set.find("{urn:mpeg:dash:schema:mpd:2011}Representation") - if current_rep is None: - for el in adapt_set.iter(): - if el.tag.endswith('Representation'): - current_rep = el - break - - if current_rep is None: - continue - - # Find SegmentTemplate - check both Representation and AdaptationSet level - template = None - for parent in [current_rep, adapt_set]: - template = parent.find("SegmentTemplate", namespaces=None) - if template is None: - template = parent.find("{urn:mpeg:dash:schema:mpd:2011}SegmentTemplate") - if template is None: - for el in parent.iter(): - if el.tag.endswith('SegmentTemplate'): - template = el - break - if template is not None: - break - - if template is None: - continue - - # Find SegmentTimeline - timeline = template.find("SegmentTimeline", namespaces=None) - if timeline is None: - timeline = template.find("{urn:mpeg:dash:schema:mpd:2011}SegmentTimeline") - if timeline is None: - for el in template.iter(): - if el.tag.endswith('SegmentTimeline'): - timeline = el - break - - if timeline is not None: - start_num = int(template.get("startNumber", 0)) - - # Find S elements - s_elements = timeline.findall("S", namespaces=None) - if not s_elements: - s_elements = timeline.findall("{urn:mpeg:dash:schema:mpd:2011}S") - if not s_elements: - s_elements = [el for el in timeline.iter() if el.tag.endswith('}S') or el.tag == 'S'] - - s_elements_with_context.extend((start_num, s_elem) for s_elem in s_elements) - - if not s_elements_with_context: - # No timeline found, but we might still have a valid subtitle track - # Continue with empty timeline handling - pass - - s_elements_with_context.sort(key=lambda x: x[0]) - - combined_adapt = deepcopy(first_adapt) - - # Find combined_rep - combined_rep = combined_adapt.find("Representation", namespaces=None) - if combined_rep is None: - combined_rep = combined_adapt.find("{urn:mpeg:dash:schema:mpd:2011}Representation") - if combined_rep is None: - for el in combined_adapt.iter(): - if el.tag.endswith('Representation'): - combined_rep = el - break - - if combined_rep is None: - continue - - # Find or create SegmentTemplate - seg_template = None - for parent in [combined_rep, combined_adapt]: - seg_template = parent.find("SegmentTemplate", namespaces=None) - if seg_template is None: - seg_template = parent.find("{urn:mpeg:dash:schema:mpd:2011}SegmentTemplate") - if seg_template is None: - for el in parent.iter(): - if el.tag.endswith('SegmentTemplate'): - seg_template = el - break - if seg_template is not None: - break - - if seg_template is None: - # Try to find at AdaptationSet level and move to Representation - template_at_adapt = None - for el in combined_adapt.iter(): - if el.tag.endswith('SegmentTemplate'): - template_at_adapt = el - break - - if template_at_adapt is not None: - seg_template = deepcopy(template_at_adapt) - combined_rep.append(seg_template) - try: - combined_adapt.remove(template_at_adapt) - except ValueError: - pass - else: - continue - - # Remove existing SegmentTimeline if present - existing_timeline = None - for el in seg_template.iter(): - if el.tag.endswith('SegmentTimeline'): - existing_timeline = el - break - - if existing_timeline is not None: - try: - seg_template.remove(existing_timeline) - except ValueError: - pass - - # Create new timeline with collected S elements - if s_elements_with_context: - new_timeline = etree.Element("SegmentTimeline") - new_timeline.extend(deepcopy(s) for _, s in s_elements_with_context) - seg_template.append(new_timeline) - - seg_template.set("startNumber", "0") - if "endNumber" in seg_template.attrib: - del seg_template.attrib["endNumber"] - - track_id = hex(crc32(f"sub-{lang}-{role_value}-{label_text}".encode()) & 0xFFFFFFFF)[2:] - lang_obj = Language.get(lang) - track_name = "Original" if (language and is_close_match(lang_obj, [language])) else lang_obj.display_name() - - # Determine codec from mimeType - mime_type = first_adapt.get("mimeType", "text/vtt") - if "ttml" in mime_type.lower(): - codec = Subtitle.Codec.TimedTextMarkupLang - else: - codec = Subtitle.Codec.WebVTT - - final_tracks.append( - Subtitle( - id_=track_id, - url=dash.url, - codec=codec, - language=lang_obj, - is_original_lang=bool(language and is_close_match(lang_obj, [language])), - descriptor=Track.Descriptor.DASH, - sdh="sdh" in label_text.lower() or role_value == "caption", - forced="forced" in label_text.lower() or "forced" in role_value.lower(), - name=track_name, - data={ - "dash": { - "manifest": manifest, - "period": first_period, - "adaptation_set": combined_adapt, - "representation": combined_rep, - } - }, - ) - ) - - return final_tracks - - def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes: - if not self.drm_license_url: - raise ValueError("DRM license URL not available.") - - sig_result = self.signer.calculate_signature( - method="POST", - url=self.drm_license_url, - headers={}, - payload=challenge - ) - - headers = self._get_common_headers() - headers.update({ - "Content-Type": "text/plain", - }) - headers.update(sig_result) - - r = self.session.post( - self.drm_license_url, - data=challenge, - headers=headers - ) - - if r.status_code != 200: - raise RuntimeError(f"License request failed: {r.status_code} - {r.text}") - - return r.content - - # def search(self) -> Generator[SearchResult, None, None]: - # if not self.search_term: - # return - - # search_url = self.config["endpoints"]["atom_search"] - - # params = { - # "q": self.search_term, - # "take": 20, - # "skip": 0 - # } - - # headers = self._get_atom_headers() - # r = self.session.get(search_url, headers=headers, params=params) - - # if r.status_code != 200: - # return - - # data = r.json() - # results = data.get("results", data.get("data", [])) - - # for result in results: - # attrs = result.get("attributes", {}) - # content_type = result.get("type", "").lower() - - # title = attrs.get("title", attrs.get("titleMedium", "Unknown")) - # year = attrs.get("year") - # slug = attrs.get("slug", "") - - # if "series" in content_type: - # type_str = "series" - # elif "movie" in content_type or "film" in content_type: - # type_str = "movie" - # else: - # type_str = "unknown" - - # yield SearchResult( - # id_=result.get("id"), - # title=title, - # year=year, - # type_=type_str, - # url=f"https://www.skyshowtime.com{slug}" if slug else None - # ) - - def get_chapters(self, title: Title_T) -> list[Chapter]: - return [] \ No newline at end of file