diff --git a/VRT/__init__.py b/VRT/__init__.py deleted file mode 100644 index cb8f6dd..0000000 --- a/VRT/__init__.py +++ /dev/null @@ -1,264 +0,0 @@ -import json -import re -import time -import base64 -import warnings # Added -from http.cookiejar import CookieJar -from typing import Optional, List -from langcodes import Language - -import click -import jwt -from bs4 import XMLParsedAsHTMLWarning # Added -from collections.abc import Generator -from unshackle.core.search_result import SearchResult -from unshackle.core.constants import AnyTrack -from unshackle.core.credential import Credential -from unshackle.core.manifests import DASH -from unshackle.core.service import Service -from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T -from unshackle.core.tracks import Chapter, Tracks, Subtitle - -# Ignore the BeautifulSoup XML warning caused by STPP subtitles -warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning) - -# GraphQL Fragments and Queries -FRAGMENTS = """ -fragment tileFragment on Tile { - ... on ITile { - title - action { ... on LinkAction { link } } - } -} -""" - -QUERY_PROGRAM = """ -query VideoProgramPage($pageId: ID!) { - page(id: $pageId) { - ... on ProgramPage { - title - components { - __typename - ... on PaginatedTileList { listId title } - ... on StaticTileList { listId title } - ... on ContainerNavigation { - items { - title - components { - __typename - ... on PaginatedTileList { listId } - ... on StaticTileList { listId } - } - } - } - } - } - } -} -""" - -QUERY_PAGINATED_LIST = FRAGMENTS + """ -query PaginatedTileListPage($listId: ID!, $after: ID) { - list(listId: $listId) { - ... on PaginatedTileList { - paginatedItems(first: 50, after: $after) { - edges { node { ...tileFragment } } - pageInfo { endCursor hasNextPage } - } - } - ... on StaticTileList { - items { ...tileFragment } - } - } -} -""" - -QUERY_PLAYBACK = """ -query EpisodePage($pageId: ID!) { - page(id: $pageId) { - ... on PlaybackPage { - title - player { modes { streamId } } - } - } -} -""" - -class VRT(Service): - """ - Service code for VRT MAX (vrt.be) - Version: 2.1.1 - Auth: Gigya + OIDC flow - Security: FHD @ L3 (Widevine) - Supports: - - Movies: https://www.vrt.be/vrtmax/a-z/rikkie-de-ooievaar-2/ - Series: https://www.vrt.be/vrtmax/a-z/schaar-steen-papier/ - """ - - TITLE_RE = r"^(?:https?://(?:www\.)?vrt\.be/vrtmax/a-z/)?(?P[^/]+)(?:/(?P\d+)/(?P[^/]+))?/?$" - - @staticmethod - @click.command(name="VRT", short_help="https://www.vrt.be/vrtmax/") - @click.argument("title", type=str) - @click.pass_context - def cli(ctx, **kwargs): - return VRT(ctx, **kwargs) - - def __init__(self, ctx, title: str): - super().__init__(ctx) - self.cdm = ctx.obj.cdm - - m = re.match(self.TITLE_RE, title) - if m: - self.slug = m.group("slug") - self.is_series_root = m.group("episode_slug") is None - if "vrtmax/a-z" in title: - self.page_id = "/" + title.split("vrt.be/")[1].split("?")[0] - else: - self.page_id = f"/vrtmax/a-z/{self.slug}/" - else: - self.search_term = title - - self.access_token = None - self.video_token = None - - def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None: - cache = self.cache.get("auth_data") - if cache and not cache.expired: - self.log.info("Using cached VRT session.") - self.access_token = cache.data["access_token"] - self.video_token = cache.data["video_token"] - return - - if not credential or not credential.username or not credential.password: return - - self.log.info(f"Logging in to VRT as {credential.username}...") - login_params = { - "apiKey": self.config["settings"]["api_key"], - "loginID": credential.username, - "password": credential.password, - "format": "json", - "sdk": "Android_6.1.0" - } - r = self.session.post(self.config["endpoints"]["gigya_login"], data=login_params) - gigya_data = r.json() - if gigya_data.get("errorCode") != 0: raise PermissionError("Gigya login failed") - - sso_params = {"UID": gigya_data["UID"], "UIDSignature": gigya_data["UIDSignature"], "signatureTimestamp": gigya_data["signatureTimestamp"]} - r = self.session.get(self.config["endpoints"]["vrt_sso"], params=sso_params) - - match = re.search(r'var response = "(.*?)";', r.text) - token_data = json.loads(match.group(1).replace('\\"', '"')) - self.access_token = token_data["tokens"]["access_token"] - self.video_token = token_data["tokens"]["video_token"] - - decoded = jwt.decode(self.access_token, options={"verify_signature": False}) - cache.set(data={"access_token": self.access_token, "video_token": self.video_token}, expiration=int(decoded["exp"] - time.time()) - 300) - - def _get_gql_headers(self): - return { - "x-vrt-client-name": self.config["settings"]["client_name"], - "x-vrt-client-version": self.config["settings"]["client_version"], - "x-vrt-zone": "default", - "authorization": f"Bearer {self.access_token}" if self.access_token else None, - "Content-Type": "application/json" - } - - def get_titles(self) -> Titles_T: - if not self.is_series_root: - r = self.session.post(self.config["endpoints"]["graphql"], json={"query": QUERY_PLAYBACK, "variables": {"pageId": self.page_id}}, headers=self._get_gql_headers()) - data = r.json()["data"]["page"] - return Movies([Movie(id_=data["player"]["modes"][0]["streamId"], service=self.__class__, name=data["title"], language=Language.get("nl"), data={"page_id": self.page_id})]) - - r = self.session.post(self.config["endpoints"]["graphql"], json={"query": QUERY_PROGRAM, "variables": {"pageId": self.page_id}}, headers=self._get_gql_headers()) - program_data = r.json().get("data", {}).get("page") - if not program_data: - raise ValueError(f"Series page not found: {self.page_id}") - - series_name = program_data["title"] - episodes = [] - list_ids = [] - - for comp in program_data.get("components", []): - typename = comp.get("__typename") - if typename in ("PaginatedTileList", "StaticTileList") and "listId" in comp: - list_ids.append((comp.get("title") or "Episodes", comp["listId"])) - elif typename == "ContainerNavigation": - for item in comp.get("items", []): - item_title = item.get("title", "Episodes") - for sub in item.get("components", []): - if "listId" in sub: - list_ids.append((item_title, sub["listId"])) - - seen_lists = set() - unique_list_ids = [] - for title, lid in list_ids: - if lid not in seen_lists: - unique_list_ids.append((title, lid)) - seen_lists.add(lid) - - for season_title, list_id in unique_list_ids: - after = None - while True: - r_list = self.session.post(self.config["endpoints"]["graphql"], json={"query": QUERY_PAGINATED_LIST, "variables": {"listId": list_id, "after": after}}, headers=self._get_gql_headers()) - list_resp = r_list.json().get("data", {}).get("list") - if not list_resp: break - - items_container = list_resp.get("paginatedItems") - nodes = [e["node"] for e in items_container["edges"]] if items_container else list_resp.get("items", []) - - for node in nodes: - if not node.get("action"): continue - link = node["action"]["link"] - s_match = re.search(r'/(\d+)/.+s(\d+)a(\d+)', link) - episodes.append(Episode( - id_=link, - service=self.__class__, - title=series_name, - season=int(s_match.group(2)) if s_match else 1, - number=int(s_match.group(3)) if s_match else 0, - name=node["title"], - language=Language.get("nl"), - data={"page_id": link} - )) - - if items_container and items_container["pageInfo"]["hasNextPage"]: - after = items_container["pageInfo"]["endCursor"] - else: - break - - if not episodes: - raise ValueError("No episodes found for this series.") - - return Series(episodes) - - def get_tracks(self, title: Title_T) -> Tracks: - page_id = title.data["page_id"] - r_meta = self.session.post(self.config["endpoints"]["graphql"], json={"query": QUERY_PLAYBACK, "variables": {"pageId": page_id}}, headers=self._get_gql_headers()) - stream_id = r_meta.json()["data"]["page"]["player"]["modes"][0]["streamId"] - - p_info = base64.urlsafe_b64encode(json.dumps(self.config["player_info"]).encode()).decode().replace("=", "") - r_tok = self.session.post(self.config["endpoints"]["player_token"], json={"identityToken": self.video_token, "playerInfo": f"eyJhbGciOiJIUzI1NiJ9.{p_info}."}) - vrt_player_token = r_tok.json()["vrtPlayerToken"] - - r_agg = self.session.get(self.config["endpoints"]["aggregator"].format(stream_id=stream_id), params={"client": self.config["settings"]["client_id"], "vrtPlayerToken": vrt_player_token}) - agg_data = r_agg.json() - - dash_url = next(u["url"] for u in agg_data["targetUrls"] if u["type"] == "mpeg_dash") - tracks = DASH.from_url(dash_url, session=self.session).to_tracks(language=title.language) - self.drm_token = agg_data["drm"] - - for sub in agg_data.get("subtitleUrls", []): - tracks.add(Subtitle(id_=sub.get("label", "nl"), url=sub["url"], codec=Subtitle.Codec.WebVTT, language=Language.get(sub.get("language", "nl")))) - - for tr in tracks.videos + tracks.audio: - if tr.drm: tr.drm.license = lambda challenge, **kw: self.get_widevine_license(challenge, title, tr) - - return tracks - - def get_widevine_license(self, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes: - r = self.session.post(self.config["endpoints"]["license"], data=challenge, headers={"x-vudrm-token": self.drm_token, "Origin": "https://www.vrt.be", "Referer": "https://www.vrt.be/"}) - return r.content - - def get_chapters(self, title: Title_T) -> list[Chapter]: - return [] \ No newline at end of file