diff --git a/MUBI/__init__.py b/MUBI/__init__.py deleted file mode 100644 index 4cd1e2f..0000000 --- a/MUBI/__init__.py +++ /dev/null @@ -1,396 +0,0 @@ -import json -import re -import uuid -from http.cookiejar import CookieJar -from typing import Optional, Generator -from langcodes import Language -import base64 -import click -from unshackle.core.constants import AnyTrack -from unshackle.core.credential import Credential -from unshackle.core.manifests import DASH -from unshackle.core.service import Service -from unshackle.core.titles import Episode, Movie, Movies, Title_T, Titles_T, Series -from unshackle.core.tracks import Chapter, Tracks, Subtitle - - -class MUBI(Service): - """ - Service code for MUBI (mubi.com) - Version: 1.2.0 - - Authorization: Required cookies (lt token + session) - Security: FHD @ L3 (Widevine) - - Supports: - • Series ↦ https://mubi.com/en/nl/series/twin-peaks - • Movies ↦ https://mubi.com/en/nl/films/the-substance - - """ - SERIES_TITLE_RE = r"^https?://(?:www\.)?mubi\.com(?:/[^/]+)*?/series/(?P[^/]+)(?:/season/(?P[^/]+))?$" - TITLE_RE = r"^(?:https?://(?:www\.)?mubi\.com)(?:/[^/]+)*?/films/(?P[^/?#]+)$" - NO_SUBTITLES = False - - @staticmethod - @click.command(name="MUBI", short_help="https://mubi.com") - @click.argument("title", type=str) - @click.pass_context - def cli(ctx, **kwargs): - return MUBI(ctx, **kwargs) - - def __init__(self, ctx, title: str): - super().__init__(ctx) - - m_film = re.match(self.TITLE_RE, title) - m_series = re.match(self.SERIES_TITLE_RE, title) - - if not m_film and not m_series: - raise ValueError(f"Invalid MUBI URL: {title}") - - self.is_series = bool(m_series) - self.slug = m_film.group("slug") if m_film else None - self.series_slug = m_series.group("series_slug") if m_series else None - self.season_slug = m_series.group("season_slug") if m_series else None - - self.film_id: Optional[int] = None - self.lt_token: Optional[str] = None - self.session_token: Optional[str] = None - self.user_id: Optional[int] = None - self.country_code: Optional[str] = None - self.anonymous_user_id: Optional[str] = None - self.default_country: Optional[str] = None - self.reels_data: Optional[list] = None - - # Store CDM reference - self.cdm = ctx.obj.cdm - - if self.config is None: - raise EnvironmentError("Missing service config for MUBI.") - - def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None: - super().authenticate(cookies, credential) - - try: - r_ip = self.session.get(self.config["endpoints"]["ip_geolocation"], timeout=5) - r_ip.raise_for_status() - ip_data = r_ip.json() - if ip_data.get("country"): - self.default_country = ip_data["country"] - self.log.debug(f"Detected country from IP: {self.default_country}") - else: - self.log.warning("IP geolocation response did not contain a country code.") - except Exception as e: - raise ValueError(f"Failed to fetch IP geolocation: {e}") - - if not cookies: - raise PermissionError("MUBI requires login cookies.") - - # Extract essential tokens - lt_cookie = next((c for c in cookies if c.name == "lt"), None) - session_cookie = next((c for c in cookies if c.name == "_mubi_session"), None) - snow_id_cookie = next((c for c in cookies if c.name == "_snow_id.c006"), None) - - if not lt_cookie: - raise PermissionError("Missing 'lt' cookie (Bearer token).") - if not session_cookie: - raise PermissionError("Missing '_mubi_session' cookie.") - - self.lt_token = lt_cookie.value - self.session_token = session_cookie.value - - # Extract anonymous_user_id from _snow_id.c006 - if snow_id_cookie and "." in snow_id_cookie.value: - self.anonymous_user_id = snow_id_cookie.value.split(".")[0] - else: - self.anonymous_user_id = str(uuid.uuid4()) - self.log.warning(f"No _snow_id.c006 cookie found — generated new anonymous_user_id: {self.anonymous_user_id}") - - base_headers = { - "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) Firefox/143.0", - "Origin": "https://mubi.com", - "Referer": "https://mubi.com/", - "CLIENT": "web", - "Client-Accept-Video-Codecs": "h265,vp9,h264", - "Client-Accept-Audio-Codecs": "aac", - "Authorization": f"Bearer {self.lt_token}", - "ANONYMOUS_USER_ID": self.anonymous_user_id, - "Client-Country": self.default_country, - "Sec-Fetch-Dest": "empty", - "Sec-Fetch-Mode": "cors", - "Sec-Fetch-Site": "same-site", - "Pragma": "no-cache", - "Cache-Control": "no-cache", - } - - self.session.headers.update(base_headers) - - r_account = self.session.get(self.config["endpoints"]["account"]) - if not r_account.ok: - raise PermissionError(f"Failed to fetch MUBI account: {r_account.status_code} {r_account.text}") - - account_data = r_account.json() - self.user_id = account_data.get("id") - self.country_code = (account_data.get("country") or {}).get("code", "NL") - - self.session.headers["Client-Country"] = self.country_code - self.GEOFENCE = (self.country_code,) - - self._bind_anonymous_user() - - self.log.info( - f"Authenticated as user {self.user_id}, " - f"country: {self.country_code}, " - f"anonymous_id: {self.anonymous_user_id}" - ) - - def _bind_anonymous_user(self): - try: - r = self.session.put( - self.config["endpoints"]["current_user"], - json={"anonymous_user_uuid": self.anonymous_user_id}, - headers={"Content-Type": "application/json"} - ) - if r.ok: - self.log.debug("Anonymous user ID successfully bound to account.") - else: - self.log.warning(f"Failed to bind anonymous_user_uuid: {r.status_code}") - except Exception as e: - self.log.warning(f"Exception while binding anonymous_user_uuid: {e}") - - def get_titles(self) -> Titles_T: - if self.is_series: - return self._get_series_titles() - else: - return self._get_film_title() - - def _get_film_title(self) -> Movies: - url = self.config["endpoints"]["film_by_slug"].format(slug=self.slug) - r = self.session.get(url) - r.raise_for_status() - data = r.json() - - self.film_id = data["id"] - - # Fetch reels to get definitive language code and cache the response - url_reels = self.config["endpoints"]["reels"].format(film_id=self.film_id) - r_reels = self.session.get(url_reels) - r_reels.raise_for_status() - self.reels_data = r_reels.json() - - # Extract original language from the first audio track of the first reel - original_language_code = "en" # Default fallback - if self.reels_data and self.reels_data[0].get("audio_tracks"): - first_audio_track = self.reels_data[0]["audio_tracks"][0] - if "language_code" in first_audio_track: - original_language_code = first_audio_track["language_code"] - self.log.debug(f"Detected original language from reels: '{original_language_code}'") - - genres = ", ".join(data.get("genres", [])) or "Unknown" - description = ( - data.get("default_editorial_html", "") - .replace("

", "").replace("

", "").replace("", "").replace("", "").strip() - ) - year = data.get("year") - name = data.get("title", "Unknown") - - movie = Movie( - id_=self.film_id, - service=self.__class__, - name=name, - year=year, - description=description, - language=Language.get(original_language_code), - data=data, - ) - - return Movies([movie]) - - def _get_series_titles(self) -> Titles_T: - # Fetch series metadata - series_url = self.config["endpoints"]["series"].format(series_slug=self.series_slug) - r_series = self.session.get(series_url) - r_series.raise_for_status() - series_data = r_series.json() - - episodes = [] - - # If season is explicitly specified, only fetch that season - if self.season_slug: - eps_url = self.config["endpoints"]["season_episodes"].format( - series_slug=self.series_slug, - season_slug=self.season_slug - ) - r_eps = self.session.get(eps_url) - if r_eps.status_code == 404: - raise ValueError(f"Season '{self.season_slug}' not found.") - r_eps.raise_for_status() - episodes_data = r_eps.json().get("episodes", []) - self._add_episodes_to_list(episodes, episodes_data, series_data) - else: - # No season specified fetch ALL seasons - seasons = series_data.get("seasons", []) - if not seasons: - raise ValueError("No seasons found for this series.") - - for season in seasons: - season_slug = season["slug"] - eps_url = self.config["endpoints"]["season_episodes"].format( - series_slug=self.series_slug, - season_slug=season_slug - ) - - self.log.debug(f"Fetching episodes for season: {season_slug}") - - r_eps = self.session.get(eps_url) - - # Stop if season returns 404 or empty - if r_eps.status_code == 404: - self.log.info(f"Season '{season_slug}' not available, skipping.") - continue - - r_eps.raise_for_status() - episodes_data = r_eps.json().get("episodes", []) - - if not episodes_data: - self.log.info(f"No episodes found in season '{season_slug}'.") - continue - - self._add_episodes_to_list(episodes, episodes_data, series_data) - - from unshackle.core.titles import Series - return Series(sorted(episodes, key=lambda x: (x.season, x.number))) - - def _add_episodes_to_list(self, episodes_list: list, episodes_data: list, series_data: dict): - """Helper to avoid code duplication when adding episodes.""" - for ep in episodes_data: - # Use episode's own language detection via its consumable.playback_languages - playback_langs = ep.get("consumable", {}).get("playback_languages", {}) - audio_langs = playback_langs.get("audio_options", ["English"]) - lang_code = audio_langs[0].split()[0].lower() if audio_langs else "en" - - try: - detected_lang = Language.get(lang_code) - except: - detected_lang = Language.get("en") - - episodes_list.append(Episode( - id_=ep["id"], - service=self.__class__, - title=series_data["title"], # Series title - season=ep["episode"]["season_number"], - number=ep["episode"]["number"], - name=ep["title"], # Episode title - description=ep.get("short_synopsis", ""), - language=detected_lang, - data=ep, # Full episode data for later use in get_tracks - )) - - def get_tracks(self, title: Title_T) -> Tracks: - film_id = getattr(title, "id", None) - if not film_id: - raise RuntimeError("Title ID not found.") - - # For series episodes, we don't have reels cached, so skip reel-based logic - url_view = self.config["endpoints"]["initiate_viewing"].format(film_id=film_id) - r_view = self.session.post(url_view, json={}, headers={"Content-Type": "application/json"}) - r_view.raise_for_status() - view_data = r_view.json() - reel_id = view_data["reel_id"] - - # For films, use reels data for language/audio mapping - if not self.is_series: - if not self.film_id: - raise RuntimeError("film_id not set. Call get_titles() first.") - - if not self.reels_data: - self.log.warning("Reels data not cached, fetching now.") - url_reels = self.config["endpoints"]["reels"].format(film_id=film_id) - r_reels = self.session.get(url_reels) - r_reels.raise_for_status() - reels = r_reels.json() - else: - reels = self.reels_data - - reel = next((r for r in reels if r["id"] == reel_id), reels[0]) - else: - # For episodes, we don’t need reel-based logic — just proceed - pass - - # Request secure streaming URL, works for both films and episodes - url_secure = self.config["endpoints"]["secure_url"].format(film_id=film_id) - r_secure = self.session.get(url_secure) - r_secure.raise_for_status() - secure_data = r_secure.json() - - manifest_url = None - for entry in secure_data.get("urls", []): - if entry.get("content_type") == "application/dash+xml": - manifest_url = entry["src"] - break - - if not manifest_url: - raise ValueError("No DASH manifest URL found.") - - # Parse DASH, use title.language as fallback - tracks = DASH.from_url(manifest_url, session=self.session).to_tracks(language=title.language) - - # Add subtitles - subtitles = [] - for sub in secure_data.get("text_track_urls", []): - lang_code = sub.get("language_code", "und") - vtt_url = sub.get("url") - if not vtt_url: - continue - - is_original = lang_code == title.language.language - - subtitles.append( - Subtitle( - id_=sub["id"], - url=vtt_url, - language=Language.get(lang_code), - is_original_lang=is_original, - codec=Subtitle.Codec.WebVTT, - name=sub.get("display_name", lang_code.upper()), - forced=False, - sdh=False, - ) - ) - tracks.subtitles = subtitles - - return tracks - - def get_chapters(self, title: Title_T) -> list[Chapter]: - return [] - - def get_widevine_license(self, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes: - if not self.user_id: - raise RuntimeError("user_id not set — authenticate first.") - - dt_custom_data = { - "userId": self.user_id, - "sessionId": self.lt_token, - "merchant": "mubi" - } - - dt_custom_data_b64 = base64.b64encode(json.dumps(dt_custom_data).encode()).decode() - - headers = { - "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:143.0) Gecko/20100101 Firefox/143.0", - "Accept": "*/*", - "Origin": "https://mubi.com", - "Referer": "https://mubi.com/", - "dt-custom-data": dt_custom_data_b64, - } - - r = self.session.post( - self.config["endpoints"]["license"], - data=challenge, - headers=headers, - ) - r.raise_for_status() - license_data = r.json() - if license_data.get("status") != "OK": - raise PermissionError(f"DRM license error: {license_data}") - return base64.b64decode(license_data["license"]) -