commit 3c8c01df954e9c90399527f2ec7c0c1d07260c05 Author: lambda <> Date: Sun Aug 3 01:47:10 2025 +0200 Initial import diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8d35cb3 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +__pycache__ +*.pyc diff --git a/ARD/__init__.py b/ARD/__init__.py new file mode 100644 index 0000000..a2425e6 --- /dev/null +++ b/ARD/__init__.py @@ -0,0 +1,175 @@ +from __future__ import annotations + +from http.cookiejar import MozillaCookieJar +from typing import Any, Optional, Union +from functools import partial +from pathlib import Path +import sys +import re + +import click +import webvtt +import requests +from click import Context +from bs4 import BeautifulSoup + +from devine.core.credential import Credential +from devine.core.service import Service +from devine.core.titles import Movie, Movies, Episode, Series +from devine.core.tracks import Track, Chapter, Tracks, Video, Subtitle +from devine.core.manifests.hls import HLS +from devine.core.manifests.dash import DASH + + +class ARD(Service): + """ + Service code for ARD Mediathek (https://www.ardmediathek.de) + + \b + Version: 1.0.0 + Author: lambda + Authorization: None + Robustness: + Unencrypted: 2160p, AAC2.0 + """ + + GEOFENCE = ("de",) + TITLE_RE = r"^(https://www\.ardmediathek\.de/(?Pserie|video)/.+/)(?P[a-zA-Z0-9]{10,})(/[0-9]{1,3})?$" + EPISODE_NAME_RE = r"^(Folge [0-9]+:)?(?P[^\(]+) \(S(?P[0-9]+)/E(?P[0-9]+)\)$" + + @staticmethod + @click.command(name="ARD", short_help="https://www.ardmediathek.de", help=__doc__) + @click.argument("title", type=str) + @click.pass_context + def cli(ctx: Context, **kwargs: Any) -> ARD: + return ARD(ctx, **kwargs) + + def __init__(self, ctx: Context, title: str): + self.title = title + super().__init__(ctx) + + def authenticate(self, cookies: Optional[MozillaCookieJar] = None, credential: Optional[Credential] = None) -> None: + pass + + def get_titles(self) -> Union[Movies, Series]: + match = re.match(self.TITLE_RE, self.title) + if not match: + return + + item_id = match.group("item_id") + if match.group("item_type") == "video": + return self.load_player(item_id) + + r = self.session.get(self.config["endpoints"]["grouping"].format(item_id=item_id)) + item = r.json() + + for widget in item["widgets"]: + if widget["type"] == "gridlist" and widget.get("compilationType") == "itemsOfShow": + episodes = Series() + for teaser in widget["teasers"]: + if teaser["coreAssetType"] != "EPISODE": + continue + + if 'Hörfassung' in teaser['longTitle']: + continue + + episodes += self.load_player(teaser["id"]) + return episodes + + def get_tracks(self, title: Union[Episode, Movie]) -> Tracks: + if title.data["blockedByFsk"]: + self.log.error( + "This content is age-restricted and not currently available. " + "Try again after 10pm German time") + sys.exit(0) + + media_collection = title.data["mediaCollection"]["embedded"] + tracks = Tracks() + for stream_collection in media_collection["streams"]: + if stream_collection["kind"] != "main": + continue + + for stream in stream_collection["media"]: + if stream["mimeType"] == "application/vnd.apple.mpegurl": + tracks += Tracks(HLS.from_url(stream["url"]).to_tracks(stream["audios"][0]["languageCode"])) + break + + # Fetch tracks from HBBTV endpoint to check for potential H.265/2160p DASH + r = self.session.get(self.config["endpoints"]["hbbtv"].format(item_id=title.id)) + hbbtv = r.json() + for stream in hbbtv["video"]["streams"]: + for media in stream["media"]: + if media["mimeType"] == "application/dash+xml" and media["audios"][0]["kind"] == "standard": + tracks += Tracks(DASH.from_url(media["url"]).to_tracks(media["audios"][0]["languageCode"])) + break + + # for stream in title.data["video"]["streams"]: + # for media in stream["media"]: + # if media["mimeType"] != "video/mp4" or media["audios"][0]["kind"] != "standard": + # continue + + # tracks += Video( + # codec=Video.Codec.AVC, # Should check media["videoCodec"] + # range_=Video.Range.SDR, # Should check media["isHighDynamicRange"] + # width=media["maxHResolutionPx"], + # height=media["maxVResolutionPx"], + # url=media["url"], + # language=media["audios"][0]["languageCode"], + # fps=50, + # ) + + for sub in media_collection["subtitles"]: + for source in sub["sources"]: + if source["kind"] == "ebutt": + tracks.add(Subtitle( + codec=Subtitle.Codec.TimedTextMarkupLang, + language=sub["languageCode"], + url=source["url"] + )) + + return tracks + + def get_chapters(self, title: Union[Episode, Movie]) -> list[Chapter]: + return [] + + def load_player(self, item_id): + r = self.session.get(self.config["endpoints"]["item"].format(item_id=item_id)) + item = r.json() + + for widget in item["widgets"]: + if widget["type"] != "player_ondemand": + continue + + common_data = { + "id_": item_id, + "data": widget, + "service": self.__class__, + "language": "de", + "year": widget["broadcastedOn"][0:4], + } + + if widget["show"]["coreAssetType"] == "SINGLE" or not widget["show"].get("availableSeasons"): + return Movies([Movie( + name=widget["title"], + **common_data + )]) + else: + match = re.match(self.EPISODE_NAME_RE, widget["title"]) + if not match: + name = widget["title"] + season = 0 + episode = 0 + else: + name = match.group("name") + season = match.group("season") or 0 + episode = match.group("episode") or 0 + + return Series([Episode( + name=name, + title=widget["show"]["title"], + #season=widget["show"]["availableSeasons"][0], + season=season, + number=episode, + **common_data + )]) + diff --git a/ARD/config.yaml b/ARD/config.yaml new file mode 100644 index 0000000..e54c434 --- /dev/null +++ b/ARD/config.yaml @@ -0,0 +1,8 @@ +headers: + Accept-Language: de-DE,de;q=0.8 + User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0 + +endpoints: + item: https://api.ardmediathek.de/page-gateway/pages/ard/item/{item_id}?embedded=true&mcV6=true + grouping: https://api.ardmediathek.de/page-gateway/pages/ard/grouping/{item_id}?seasoned=true&embedded=true + hbbtv: https://tv.ardmediathek.de/dyn/get?id=video:{item_id} diff --git a/MTSP/__init__.py b/MTSP/__init__.py new file mode 100644 index 0000000..eed5c56 --- /dev/null +++ b/MTSP/__init__.py @@ -0,0 +1,101 @@ +from __future__ import annotations + +from http.cookiejar import MozillaCookieJar +from typing import Any, Optional +import re + +import click +from click import Context +from bs4 import BeautifulSoup + +from devine.core.credential import Credential +from devine.core.service import Service +from devine.core.titles import Movie, Movies +from devine.core.tracks import Chapter, Tracks +from devine.core.manifests.dash import DASH + + +class MTSP(Service): + TITLE_RE = r"^(?:https?://(?:www\.)?magentasport\.de/event/[^/]+)?/[0-9]+/(?P[0-9]+)" + + @staticmethod + @click.command(name="MTSP", short_help="https://magentasport.de", help=__doc__) + @click.argument("title", type=str) + @click.pass_context + def cli(ctx: Context, **kwargs: Any) -> MTSP: + return MTSP(ctx, **kwargs) + + def __init__(self, ctx: Context, title: str): + self.title = title + super().__init__(ctx) + + def authenticate(self, cookies: Optional[MozillaCookieJar] = None, credential: Optional[Credential] = None) -> None: + cache = self.cache.get(f"session_{credential.sha1}") + if cache and not cache.expired: + self.session.cookies.update({ + "session": cache.data, + "entitled": "1", + }) + return + + self.log.info("No cached session cookie, logging in...") + r = self.session.get(self.config["endpoints"]["login_form"]) + r.raise_for_status() + + tid, xsrf_name, xsrf_value = self.get_login_tid_xsrf(r.text) + + data = { + "tid": tid, + xsrf_name: xsrf_value, + "pkc": "", + "webauthn_supported": "false", + "pw_usr": credential.username + } + r = self.session.post(self.config["endpoints"]["login_post"], data=data) + r.raise_for_status() + + tid, xsrf_name, xsrf_value = self.get_login_tid_xsrf(r.text) + + data = { + "tid": tid, + xsrf_name: xsrf_value, + "hidden_usr": credential.username, + "pw_pwd": credential.password, + "persist_session_displayed": "1", + "persist_session": "on" + } + r = self.session.post(self.config["endpoints"]["login_post"], data=data) + r.raise_for_status() + + session = self.session.cookies.get_dict().get('session') + cache.set(session) + + def get_titles(self) -> Movies: + video_id = re.match(self.TITLE_RE, self.title).group("video_id") + r = self.session.get(self.config["endpoints"]["video_config"].format(video_id=video_id)) + config = r.json() + + return Movies([Movie( + id_=video_id, + service=self.__class__, + name=config["title"], + language="de", + data=config, + )]) + + def get_tracks(self, title: Movie) -> Tracks: + r = self.session.post(title.data['streamAccess']) + access = r.json() + tracks = DASH.from_url(access["data"]["stream"]["dash"]).to_tracks(title.language) + return tracks + + def get_chapters(self, title: Movie) -> list[Chapter]: + return [ + ] + + def get_login_tid_xsrf(self, html): + soup = BeautifulSoup(html, "html.parser") + form = soup.find("form", id="login") + xsrf = form.find("input", {"name": re.compile("^xsrf_")}) + tid = form.find("input", {"name": "tid"}) + return tid.get("value"), xsrf.get('name'), xsrf.get("value") diff --git a/MTSP/config.yaml b/MTSP/config.yaml new file mode 100644 index 0000000..6a1f839 --- /dev/null +++ b/MTSP/config.yaml @@ -0,0 +1,8 @@ +headers: + Accept-Language: de-DE,de;q=0.8 + User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0 + +endpoints: + login_form: https://www.magentasport.de/service/auth/web/login?headto=https://www.magentasport.de/home + login_post: https://accounts.login.idm.telekom.com/factorx + video_config: https://www.magentasport.de/service/player/v2/videoConfig?videoid={video_id}&partnerid=0&language=de&format=iphone&device=desktop&platform=web&cdn=telekom_cdn&userType=loggedin-entitled diff --git a/NBLA/__init__.py b/NBLA/__init__.py new file mode 100644 index 0000000..ce2b564 --- /dev/null +++ b/NBLA/__init__.py @@ -0,0 +1,297 @@ +from __future__ import annotations + +from http.cookiejar import MozillaCookieJar +from typing import Any, Optional, Union +from functools import partial +from pathlib import Path +import sys +import re + +import click +import webvtt +import requests +from click import Context +from bs4 import BeautifulSoup + +from devine.core.credential import Credential +from devine.core.service import Service +from devine.core.titles import Movie, Movies, Episode, Series +from devine.core.tracks import Track, Chapter, Tracks, Subtitle +from devine.core.manifests.hls import HLS + + +class NebulaSubtitle(Subtitle): + STYLE_RE = re.compile('::cue\\(v\\[voice="(.+)"\\]\\) { color: ([^;]+); (.*)}') + RGB_RE = re.compile("rgb\\((.+), ?(.+), ?(.+)\\)") + + def download( + self, + session: requests.Session, + prepare_drm: partial, + max_workers: Optional[int] = None, + progress: Optional[partial] = None + ): + # Track.download chooses file extension based on class name so use + # this hack to keep it happy + self.__class__.__name__ = "Subtitle" + + # Skip Subtitle.download and use Track.download directly. The pycaption + # calls in Subtitle.download are not needed here and mangle the WebVTT + # styling Nebula uses + Track.download(self, session, prepare_drm, max_workers, progress) + + def convert(self, codec: Subtitle.Codec) -> Path: + if codec != Subtitle.Codec.SubRip: + return super().convert(codec) + + output_path = self.path.with_suffix(f".{codec.value.lower()}") + vtt = webvtt.read(self.path) + + styles = dict() + for group in vtt.styles: + for style in group.text.splitlines(): + if match := self.STYLE_RE.match(style): + name, color, extra = match.groups() + + if "rgb" in color: + r, g, b = self.RGB_RE.match(color).groups() + color = "#{0:02x}{1:02x}{2:02x}".format(int(r), int(g), int(b)) + + bold = "bold" in extra + styles[name.lower()] = {"color": color, "bold": bold} + + count = 1 + new_subs = [] + for caption in vtt: + soup = BeautifulSoup(caption.raw_text, features="html.parser") + + for tag in soup.find_all("v"): + name = " ".join(tag.attrs.keys()) + + # Work around a few broken "Abolish Everything" subtitles + if ((name == "spectator" and "spectator" not in styles) or + (name == "spectators" and "spectators" not in styles)): + name = "audience" + + style = styles[name] + tag.name = "font" + tag.attrs = {"color": style["color"]} + + if style["bold"]: + tag.wrap(soup.new_tag("b")) + + text = str(soup) + new_subs.append(f"{count}") + new_subs.append(f"{caption.start} --> {caption.end}") + new_subs.append(f"{text}\n") + count += 1 + + output_path.write_text("\n".join(new_subs), encoding="utf8") + + self.path = output_path + self.codec = codec + + if callable(self.OnConverted): + self.OnConverted(codec) + + return output_path + + +class NBLA(Service): + """ + Service code for Nebula (https://nebula.tv) + + \b + Version: 1.0.0 + Author: lambda + Authorization: Credentials + Robustness: + Unencrypted: 2160p, AAC2.0 + """ + + VIDEO_RE = r"https?://(?:www\.)?nebula\.tv/videos/(?P.+)" + CHANNEL_RE = r"^https?://(?:www\.)?nebula\.tv/(?P.+)" + + @staticmethod + @click.command(name="NBLA", short_help="https://nebula.tv", help=__doc__) + @click.argument("title", type=str) + @click.pass_context + def cli(ctx: Context, **kwargs: Any) -> NBLA: + return NBLA(ctx, **kwargs) + + def __init__(self, ctx: Context, title: str): + self.title = title + super().__init__(ctx) + + def authenticate(self, cookies: Optional[MozillaCookieJar] = None, credential: Optional[Credential] = None) -> None: + cache = self.cache.get(f"key_{credential.sha1}") + if not cache or cache.expired: + self.log.info("Key is missing or expired, logging in...") + + data = { + "email": credential.username, + "password": credential.password, + } + r = self.session.post(self.config["endpoints"]["login"], json=data) + r.raise_for_status() + + key = r.json().get("key") + cache.set(key) + else: + key = cache.data + + r = self.session.post(self.config["endpoints"]["authorization"], headers={"Authorization": f"Token {key}"}) + r.raise_for_status() + + self.jwt = r.json()["token"] + self.session.headers.update({"Authorization": f"Bearer {self.jwt}"}) + + def get_titles(self) -> Union[Movies, Series]: + if video_match := re.match(self.VIDEO_RE, self.title): + r = self.session.get(self.config["endpoints"]["video"].format(slug=video_match.group("slug"))) + video = r.json() + + # Simplest scenario: This is a video on a non-episodic channel, return it as movie + if video["channel_type"] != "episodic": + return Movies([ + Movie( + id_=video["id"], + service=self.__class__, + name=video["title"], + year=video["published_at"][0:4], + language="en" + ) + ]) + + # For episodic videos, things are trickier: There is no way to get the season + # and episode number from the video endpoint, so we instead have to iterate + # through all seasons and filter for the video id. + return self.get_content(video["channel_slug"], video_id_filter=video["id"]) + + # If the link did not match the video regex, try using it as slug for the content + # API to fetch a whole channel/season + elif channel_match := re.match(self.CHANNEL_RE, self.title): + return self.get_content(channel_match.group("slug")) + + def get_tracks(self, title: Union[Episode, Movie]) -> Tracks: + r = self.session.get(self.config["endpoints"]["manifest"].format(video_id=title.id, jwt=self.jwt), allow_redirects=False) + manifest_url = r.headers["Location"] + tracks = HLS.from_url(manifest_url).to_tracks(title.language) + + subs = [] + for subtitle in tracks.subtitles: + subs.append(NebulaSubtitle( + id_=subtitle.id, + url=subtitle.url, + language=subtitle.language, + is_original_lang=subtitle.is_original_lang, + descriptor=subtitle.descriptor, + name=subtitle.name, + codec=subtitle.codec, + forced=subtitle.forced, + sdh=subtitle.sdh, + )) + + tracks.subtitles = subs + return tracks + + def get_chapters(self, title: Union[Episode, Movie]) -> list[Chapter]: + return [] + + + def search(self) -> Generator[SearchResult, None, None]: + pass + #self.title + r = self.session.get(self.config["endpoints"]["search"], params=params) + r.raise_for_status() + +# for result in results["results"]: +# yield SearchResult( +# id_=result["brand"].get("websafeTitle"), +# title=result["brand"].get("title"), +# description=result["brand"].get("description"), +# label=result.get("label"), +# url=result["brand"].get("href"), +# ) + + ### Service specific functions + def season_to_episodes(self, channel, season, video_id_filter): + try: + season_number = int(season["label"]) + except ValueError: + # Some shows such have some non-integer season numbers (Such as + # Jet Lag: The Game season 13.5). These are generally listed as specials + # (Season 0) on TMDB, so treat them the same way. + # + # Specials episode numbers will then likely be off, use caution and + # check TMDB for manual corrections. + season_number = 0 + self.log.warn(f"Could not extract season information, guessing season {season_number}") + + for episode_number, episode in enumerate(season["episodes"], start=1): + if not episode["video"] or (video_id_filter and video_id_filter != episode["video"]["id"]): + continue + + yield Episode( + id_=episode["video"]["id"], + service=self.__class__, + title=channel["title"], + name=episode["title"], + language="en", + year=episode["video"]["published_at"][0:4], + season=season_number, + number=episode_number, + ) + + + + def get_content(self, slug, video_id_filter=None): + r = self.session.get(self.config["endpoints"]["content"].format(slug=slug)) + content = r.json() + + if content["type"] == "season": + r = self.session.get(self.config["endpoints"]["content"].format(slug=content["video_channel_slug"])) + channel = r.json() + return Series(self.season_to_episodes(channel, content, video_id_filter)) + elif content["type"] == "video_channel" and content["channel_type"] == "episodic": + episodes = [] + for season_data in content["episodic"]["seasons"]: + # We could also use the generic content endpoint to retrieve + # seasons, but this is how the nebula web app does it. + r = self.session.get(self.config["endpoints"]["season"].format(id=season_data["id"])) + episodes.extend(self.season_to_episodes(content, r.json(), video_id_filter)) + + return Series(episodes) + elif content["type"] == "video_channel": + self.log.error("Non-episodic channel URL passed. Treating it as a show with a single season. If you want to download non-episodic content as a movie, pass the direct video URL instead.") + r = self.session.get(self.config["endpoints"]["video_channel_episodes"].format(id=content["id"])) + episodes = r.json()['results'] + + # Non-episodic channel names tend to have a format of "Creator Name — Show Name" + if " — " in content["title"]: + show_title = content["title"].split(" — ", maxsplit=1)[1] + else: + show_title = content["title"] + + season = [] + episode_number = 0 + for episode in episodes: + if 'trailer' in episode['title'].lower(): + continue + + episode_number += 1 + season.append(Episode( + id_=episode["id"], + service=self.__class__, + title=show_title, + name=episode["title"], + language="en", + year=episode["published_at"][0:4], + season=1, + number=episode_number, + )) + + return Series(season) + else: + self.log.error("Unsupported content type") + sys.exit(1) diff --git a/NBLA/config.yaml b/NBLA/config.yaml new file mode 100644 index 0000000..df16093 --- /dev/null +++ b/NBLA/config.yaml @@ -0,0 +1,13 @@ +headers: + Accept-Language: en-US,en;q=0.8 + User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0 + +endpoints: + login: https://nebula.tv/auth/login/ + authorization: https://users.api.nebula.app/api/v1/authorization/ + content: https://content.api.nebula.app/content/{slug}/ + season: https://content.api.nebula.app/seasons/{id}/ + video: https://content.api.nebula.app/content/videos/{slug}/ + video_channel: https://content.api.nebula.app/video_channels/{id}/ + video_channel_episodes: https://content.api.nebula.app/video_channels/{id}/video_episodes/?ordering=published_at + manifest: https://content.api.nebula.app/video_episodes/{video_id}/manifest.m3u8?token={jwt}&app_version=25.2.1&platform=web diff --git a/NRK/__init__.py b/NRK/__init__.py new file mode 100644 index 0000000..5e108ef --- /dev/null +++ b/NRK/__init__.py @@ -0,0 +1,122 @@ +from __future__ import annotations + +from http.cookiejar import MozillaCookieJar +from typing import Any, Optional, Union +from functools import partial +from pathlib import Path +import sys +import re + +import click +import isodate +from click import Context + +from devine.core.credential import Credential +from devine.core.service import Service +from devine.core.titles import Movie, Movies, Episode, Series +from devine.core.tracks import Track, Chapter, Tracks, Video, Audio, Subtitle +from devine.core.manifests.hls import HLS +from devine.core.manifests.dash import DASH + + +class NRK(Service): + """ + Service code for NRK TV (https://tv.nrk.no) + + \b + Version: 1.0.0 + Author: lambda + Authorization: None + Robustness: + Unencrypted: 1080p, DD5.1 + """ + + GEOFENCE = ("no",) + TITLE_RE = r"^https://tv.nrk.no/serie/fengselseksperimentet/sesong/1/episode/(?P.+)$" + + @staticmethod + @click.command(name="NRK", short_help="https://tv.nrk.no", help=__doc__) + @click.argument("title", type=str) + @click.pass_context + def cli(ctx: Context, **kwargs: Any) -> NRK: + return NRK(ctx, **kwargs) + + def __init__(self, ctx: Context, title: str): + self.title = title + super().__init__(ctx) + + def authenticate(self, cookies: Optional[MozillaCookieJar] = None, credential: Optional[Credential] = None) -> None: + pass + + def get_titles(self) -> Union[Movies, Series]: + match = re.match(self.TITLE_RE, self.title) + if not match: + return + + content_id = match.group("content_id") + + r = self.session.get(self.config["endpoints"]["content"].format(content_id=content_id)) + item = r.json() + + episode, name = item["programInformation"]["titles"]["title"].split(". ", maxsplit=1) + return Series([Episode( + id_=content_id, + service=self.__class__, + language="nb", + year=item["moreInformation"]["productionYear"], + title=item["_links"]["seriesPage"]["title"], + name=name, + season=item["_links"]["season"]["name"], + number=episode, + )]) + + + def get_tracks(self, title: Union[Episode, Movie]) -> Tracks: + r = self.session.get(self.config["endpoints"]["manifest"].format(content_id=title.id)) + manifest = r.json() + tracks = Tracks() + + for asset in manifest["playable"]["assets"]: + if asset["format"] == "HLS": + tracks += Tracks(HLS.from_url(asset["url"], session=self.session).to_tracks("nb")) + + + for sub in manifest["playable"]["subtitles"]: + tracks.add(Subtitle( + codec=Subtitle.Codec.WebVTT, + language=sub["language"], + url=sub["webVtt"], + sdh=sub["type"] == "ttv", + )) + + + for track in tracks: + track.needs_proxy = True + +# if isinstance(track, Audio) and track.channels == 6.0: +# track.channels = 5.1 + + return tracks + + def get_chapters(self, title: Union[Episode, Movie]) -> list[Chapter]: + r = self.session.get(self.config["endpoints"]["metadata"].format(content_id=title.id)) + sdi = r.json()["skipDialogInfo"] + + chapters = [] + if sdi["endIntroInSeconds"]: + if sdi["startIntroInSeconds"]: + chapters.append(Chapter(timestamp=0)) + + chapters |= [ + Chapter(timestamp=sdi["startIntroInSeconds"], name="Intro"), + Chapter(timestamp=sdi["endIntroInSeconds"]) + ] + + if sdi["startCreditsInSeconds"]: + if not chapters: + chapters.append(Chapter(timestamp=0)) + + credits = isodate.parse_duration(sdi["startCredits"]) + chapters.append(Chapter(credits.total_seconds(), name="Credits")) + + return chapters diff --git a/NRK/config.yaml b/NRK/config.yaml new file mode 100644 index 0000000..21fc330 --- /dev/null +++ b/NRK/config.yaml @@ -0,0 +1,8 @@ +headers: + Accept-Language: nb-NO,de;q=0.8 + User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0 + +endpoints: + content: https://psapi.nrk.no/tv/catalog/programs/{content_id}?contentGroup=adults&ageRestriction=None + metadata: https://psapi.nrk.no/playback/metadata/program/{content_id} + manifest: https://psapi.nrk.no/playback/manifest/program/{content_id} diff --git a/ZDF/__init__.py b/ZDF/__init__.py new file mode 100644 index 0000000..c3f0849 --- /dev/null +++ b/ZDF/__init__.py @@ -0,0 +1,213 @@ +from __future__ import annotations + +from http.cookiejar import MozillaCookieJar +from typing import Any, Optional, Union +import sys +import re + +import click +from click import Context + +from devine.core.credential import Credential +from devine.core.service import Service +from devine.core.titles import Movie, Movies, Episode, Series +from devine.core.tracks import Track, Chapter, Tracks, Video, Subtitle + + +class ZDF(Service): + """ + Service code for ZDF.de (https://www.zdf.de) + + \b + Version: 1.0.0 + Author: lambda + Authorization: None + Robustness: + Unencrypted: 2160p HLG, AAC2.0 + """ + + GEOFENCE = ("de",) + VIDEO_RE = r"^https://www\.zdf\.de/(play|video)/(?P.+)/(?P.+)/(?P[^\?]+)(\?.+)?$" + SERIES_RE = r"^https://www.zdf.de/serien/(?P[^\?]+)(\?.+)?$" + VIDEO_CODEC_MAP = { + "video/mp4": Video.Codec.AVC, + "video/webm": Video.Codec.VP9 + } + + @staticmethod + @click.command(name="ZDF", short_help="https://www.zdf.de", help=__doc__) + @click.argument("title", type=str) + @click.pass_context + def cli(ctx: Context, **kwargs: Any) -> ZDF: + return ZDF(ctx, **kwargs) + + def __init__(self, ctx: Context, title: str): + self.title = title + super().__init__(ctx) + + def authenticate(self, cookies: Optional[MozillaCookieJar] = None, credential: Optional[Credential] = None) -> None: + # This seems to be more or less static, but it's easy enough to fetch every time + r = self.session.get("http://hbbtv.zdf.de/zdfm3/index.php") + match = re.match(r'.+GLOBALS\.apikey += +"(?P
[^"\n]+).+";', r.text, re.DOTALL) + self.session.headers.update({"Api-Auth": match.group('header')}) + + def get_titles(self) -> Union[Movies, Series]: + if match := re.match(self.SERIES_RE, self.title): + return self.handle_series_page(match.group('slug')) + + if match := re.match(self.VIDEO_RE, self.title): + r = self.session.post(self.config["endpoints"]["graphql"], json={ + "operationName": "VideoByCanonical", + "query": self.config["queries"]["VideoByCanonical"], + "variables": {"canonical": match.group('item_slug'), "first": 1}, + }, headers={"content-type": "application/json"}) + + video = r.json()["data"]["videoByCanonical"] + return self.parse_video_data(video) + + def get_tracks(self, title: Union[Episode, Movie]) -> Tracks: + tracks = Tracks() + for node in title.data["nodes"]: + if node["vodMediaType"] != "DEFAULT": + continue + + for player_type in self.config["meta"]["player_types"]: + ptmd_url = (self.config["endpoints"]["ptmd_base"] + + node["ptmdTemplate"].format(playerId=player_type)) + + r = self.session.get(ptmd_url) + ptmd = r.json() + + for pl in ptmd["priorityList"]: + for media_format in pl["formitaeten"]: + if "restriction_useragent" in media_format["facets"] or media_format["mimeType"] not in self.VIDEO_CODEC_MAP.keys(): + continue + + if 'hdr_hlg' in media_format["facets"]: + video_range = Video.Range.HLG + video_codec = Video.Codec.HEVC + else: + video_range = Video.Range.SDR + video_codec = self.VIDEO_CODEC_MAP[media_format["mimeType"]] + + for quality in media_format["qualities"]: + for track in quality["audio"]["tracks"]: + if track["class"] not in ("main", "ot"): + continue + + track_id = f'{video_codec}-{track["language"]}-{quality["highestVerticalResolution"]}' + if tracks.exists(by_id=track_id): + continue + + tracks.add(Video( + id_=track_id, + codec=video_codec, + range_=video_range, + width=quality["highestVerticalResolution"] // 9 * 16, + height=quality["highestVerticalResolution"], + url=track["uri"], + language=track["language"], + fps=50, + )) + + for subs in ptmd["captions"]: + if subs["format"] == "ebu-tt-d-basic-de": + track_id = f'subs-{subs["language"]}-{subs["class"]}' + if tracks.exists(by_id=track_id): + continue + + tracks.add(Subtitle( + id_=track_id, + codec=Subtitle.Codec.TimedTextMarkupLang, + language=subs["language"], + sdh=subs["class"] == "hoh", + url=subs["uri"] + )) + + return tracks + + def get_chapters(self, title: Union[Episode, Movie]) -> list[Chapter]: + for node in title.data["nodes"]: + si = node.get("skipIntro") + + if si and node["vodMediaType"] == "DEFAULT": + if si["startIntroTimeOffset"] and si["stopIntroTimeOffset"]: + intro_start = float(si["startIntroTimeOffset"]) + intro_stop = float(si["stopIntroTimeOffset"]) + chapters = [] + + if intro_start != 0: + chapters.append(Chapter(timestamp=0)) + + return chapters + [ + Chapter(timestamp=intro_start), + Chapter(timestamp=intro_stop), + ] + break + return [] + + def parse_video_data(self, video): + common_data = { + "id_": video["id"], + "service": self.__class__, + "year": video["editorialDate"][0:4], + "data": video["currentMedia"], + } + + meta = video["structuralMetadata"] + if "publicationFormInfo" in meta and meta["publicationFormInfo"]["original"] == "Film": + return Movies([Movie( + name=video["title"], + **common_data + )]) + else: + name = video["title"] + series_title = video["smartCollection"].get("title", "DUMMY") + + # Ignore fake episode names like "Episode 123" or "Series Name (1/8)" + if re.match(fr"^(Folge \d+|{series_title} \(\d+/\d+\))$", name): + name = None + + return Series([Episode( + **common_data, + name=name, + title=series_title, + season=video["episodeInfo"]["seasonNumber"], + number=video["episodeInfo"]["episodeNumber"], + )]) + + def handle_series_page(self, slug): + extensions = { + "persistedQuery": { + "version": 1, + "sha256Hash": "9412a0f4ac55dc37d46975d461ec64bfd14380d815df843a1492348f77b5c99a" + } + } + + variables = { + "seasonIndex": 0, + "episodesPageSize": 24, + "canonical": slug, + "sortBy": [ + { + "field": "EDITORIAL_DATE", + "direction": "ASC" + } + ] + } + + r = self.session.get(self.config["endpoints"]["graphql"], params={ + "extensions": json.dumps(extensions), + "variables": json.dumps(variables) + }, headers={"content-type": "application/json"}) + + data = r.json()["data"]["smartCollectionByCanonical"] + if not data: + return + + series = Series() + for season in data["seasons"]["nodes"]: + for video in season["episodes"]["nodes"]: + series += self.parse_video_data(video) + + return series diff --git a/ZDF/config.yaml b/ZDF/config.yaml new file mode 100644 index 0000000..34f41f0 --- /dev/null +++ b/ZDF/config.yaml @@ -0,0 +1,117 @@ +headers: + Accept-Language: de-DE,de;q=0.8 + User-Agent: Mozilla/5.0 (Web0S; Linux/SmartTV) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36 DMOST/2.0.0 (; LGE; webOSTV; WEBOS6.3.2 03.34.95; W6_lm21a;) + +endpoints: + graphql: https://api.zdf.de/graphql + ptmd_base: https://api.zdf.de + +meta: + # Known options: + # ngplayer_2_5 (Web player - H.264 1080p + VP9 1080p)) + # smarttv_6 (HBBTV - H.264 1080p + H.265 HLG 2160p) + # smarttv_7 (Unknown - same formats as smarttv_6) + # android_native_5 (Android - H.264 1080p + VP9 1080p + H.265 HLG 2160p) + player_types: + - android_native_5 + +queries: + VideoByCanonical: | + query VideoByCanonical($canonical: String!, $first: Int) { + videoByCanonical(canonical: $canonical) { + id + canonical + contentType + title + editorialDate + streamingOptions { + ad + ut + dgs + ov + ks + fsk + } + episodeInfo { + episodeNumber + seasonNumber + } + structuralMetadata { + isChildrenContent + publicationFormInfo { + original + transformed + } + visualDimension { + moods(first: $first) { + nodes { + mood + } + } + } + } + smartCollection { + id + canonical + title + collectionType + structuralMetadata { + contentFamily + publicationFormInfo { + original + transformed + } + } + } + seo { + title + } + availability { + fskBlocked + } + currentMediaType + subtitle + webUrl + publicationDate + currentMedia { + nodes { + ptmdTemplate + ... on VodMedia { + duration + aspectRatio + visible + geoLocation + highestVerticalResolution + streamAnchorTags { + nodes { + anchorOffset + anchorLabel + } + } + skipIntro { + startIntroTimeOffset + stopIntroTimeOffset + skipButtonDisplayTime + skipButtonLabel + } + vodMediaType + label + contentType + } + ... on LiveMedia { + geoLocation + tvService + title + start + stop + editorialStart + editorialStop + encryption + liveMediaType + label + } + id + } + } + } + }