From 56759f06ec8d1c92cca61a78102ea2bd5b8aad20 Mon Sep 17 00:00:00 2001 From: adef Date: Sun, 11 Jan 2026 11:15:39 +0000 Subject: [PATCH] Delete VIDO/__init__.py --- VIDO/__init__.py | 452 ----------------------------------------------- 1 file changed, 452 deletions(-) delete mode 100644 VIDO/__init__.py diff --git a/VIDO/__init__.py b/VIDO/__init__.py deleted file mode 100644 index 6a0c930..0000000 --- a/VIDO/__init__.py +++ /dev/null @@ -1,452 +0,0 @@ -import re -import uuid -import xml.etree.ElementTree as ET -from urllib.parse import urljoin -from hashlib import md5 -from typing import Optional, Union -from http.cookiejar import CookieJar -from langcodes import Language - -import click - -from unshackle.core.credential import Credential -from unshackle.core.manifests import HLS, DASH -from unshackle.core.service import Service -from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T -from unshackle.core.tracks import Chapter, Tracks, Subtitle -from unshackle.core.constants import AnyTrack -from datetime import datetime, timezone - - -class VIDO(Service): - """ - Vidio.com service, Series and Movies, login required. - Version: 2.3.0 - - Supports URLs like: - • https://www.vidio.com/premier/2978/giligilis (Series) - • https://www.vidio.com/watch/7454613-marantau-short-movie (Movie) - - Security: HD@L3 (Widevine DRM when available) - """ - - TITLE_RE = r"^https?://(?:www\.)?vidio\.com/(?:premier|series|watch)/(?P\d+)" - GEOFENCE = ("ID",) - - @staticmethod - @click.command(name="VIDO", short_help="https://vidio.com (login required)") - @click.argument("title", type=str) - @click.pass_context - def cli(ctx, **kwargs): - return VIDO(ctx, **kwargs) - - def __init__(self, ctx, title: str): - super().__init__(ctx) - - match = re.match(self.TITLE_RE, title) - if not match: - raise ValueError(f"Unsupported or invalid Vidio URL: {title}") - self.content_id = match.group("id") - - self.is_movie = "watch" in title - - # Static app identifiers from Android traffic - self.API_AUTH = "laZOmogezono5ogekaso5oz4Mezimew1" - self.USER_AGENT = "vidioandroid/7.14.6-e4d1de87f2 (3191683)" - self.API_APP_INFO = "android/15/7.14.6-e4d1de87f2-3191683" - self.VISITOR_ID = str(uuid.uuid4()) - - # Auth state - self._email = None - self._user_token = None - self._access_token = None - - # DRM state - self.license_url = None - self.custom_data = None - self.cdm = ctx.obj.cdm - - def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None: - if not credential or not credential.username or not credential.password: - raise ValueError("Vidio requires email and password login.") - - self._email = credential.username - password = credential.password - - cache_key = f"auth_tokens_{self._email}" - cache = self.cache.get(cache_key) - - # Check if valid tokens are already in the cache - if cache and not cache.expired: - self.log.info("Using cached authentication tokens") - cached_data = cache.data - self._user_token = cached_data.get("user_token") - self._access_token = cached_data.get("access_token") - if self._user_token and self._access_token: - return - - # If no valid cache, proceed with login - self.log.info("Authenticating with username and password") - headers = { - "referer": "android-app://com.vidio.android", - "x-api-platform": "app-android", - "x-api-auth": self.API_AUTH, - "user-agent": self.USER_AGENT, - "x-api-app-info": self.API_APP_INFO, - "accept-language": "en", - "content-type": "application/x-www-form-urlencoded", - "x-visitor-id": self.VISITOR_ID, - } - - data = f"login={self._email}&password={password}" - r = self.session.post("https://api.vidio.com/api/login", headers=headers, data=data) - r.raise_for_status() - - auth_data = r.json() - self._user_token = auth_data["auth"]["authentication_token"] - self._access_token = auth_data["auth_tokens"]["access_token"] - self.log.info(f"Authenticated as {self._email}") - - try: - expires_at_str = auth_data["auth_tokens"]["access_token_expires_at"] - expires_at_dt = datetime.fromisoformat(expires_at_str) - now_utc = datetime.now(timezone.utc) - expiration_in_seconds = max(0, int((expires_at_dt - now_utc).total_seconds())) - self.log.info(f"Token expires in {expiration_in_seconds / 60:.2f} minutes.") - except (KeyError, ValueError) as e: - self.log.warning(f"Could not parse token expiration: {e}. Defaulting to 1 hour.") - expiration_in_seconds = 3600 - - cache.set({ - "user_token": self._user_token, - "access_token": self._access_token - }, expiration=expiration_in_seconds) - - def _headers(self): - if not self._user_token or not self._access_token: - raise RuntimeError("Not authenticated. Call authenticate() first.") - return { - "referer": "android-app://com.vidio.android", - "x-api-platform": "app-android", - "x-api-auth": self.API_AUTH, - "user-agent": self.USER_AGENT, - "x-api-app-info": self.API_APP_INFO, - "x-visitor-id": self.VISITOR_ID, - "x-user-email": self._email, - "x-user-token": self._user_token, - "x-authorization": self._access_token, - "accept-language": "en", - "accept": "application/json", - "accept-charset": "UTF-8", - "content-type": "application/vnd.api+json", - } - - def _extract_subtitles_from_mpd(self, mpd_url: str) -> list[Subtitle]: - """ - Manually parse the MPD to extract subtitle tracks. - Handles plain VTT format (for free content). - """ - subtitles = [] - - try: - r = self.session.get(mpd_url) - r.raise_for_status() - mpd_content = r.text - - # Get base URL for resolving relative paths - base_url = mpd_url.rsplit('/', 1)[0] + '/' - - # Remove namespace for easier parsing - mpd_content_clean = re.sub(r'\sxmlns="[^"]+"', '', mpd_content) - root = ET.fromstring(mpd_content_clean) - - for adaptation_set in root.findall('.//AdaptationSet'): - content_type = adaptation_set.get('contentType', '') - - if content_type != 'text': - continue - - lang = adaptation_set.get('lang', 'und') - - for rep in adaptation_set.findall('Representation'): - mime_type = rep.get('mimeType', '') - - # Handle plain VTT (free content) - if mime_type == 'text/vtt': - segment_list = rep.find('SegmentList') - if segment_list is not None: - for segment_url in segment_list.findall('SegmentURL'): - media = segment_url.get('media') - if media: - full_url = urljoin(base_url, media) - - # Determine if auto-generated - is_auto = '-auto' in lang - clean_lang = lang.replace('-auto', '') - - subtitle = Subtitle( - id_=md5(full_url.encode()).hexdigest()[0:16], - url=full_url, - codec=Subtitle.Codec.WebVTT, - language=Language.get(clean_lang), - forced=False, - sdh=False, - ) - - subtitles.append(subtitle) - self.log.debug(f"Found VTT subtitle: {lang} -> {full_url}") - - except Exception as e: - self.log.warning(f"Failed to extract subtitles from MPD: {e}") - - return subtitles - - def get_titles(self) -> Titles_T: - headers = self._headers() - - if self.is_movie: - r = self.session.get(f"https://api.vidio.com/api/videos/{self.content_id}/detail", headers=headers) - r.raise_for_status() - video_data = r.json()["video"] - year = None - if video_data.get("publish_date"): - try: - year = int(video_data["publish_date"][:4]) - except (ValueError, TypeError): - pass - return Movies([ - Movie( - id_=video_data["id"], - service=self.__class__, - name=video_data["title"], - description=video_data.get("description", ""), - year=year, - language=Language.get("id"), - data=video_data, - ) - ]) - else: - r = self.session.get(f"https://api.vidio.com/content_profiles/{self.content_id}", headers=headers) - r.raise_for_status() - root = r.json()["data"] - series_title = root["attributes"]["title"] - - r_playlists = self.session.get( - f"https://api.vidio.com/content_profiles/{self.content_id}/playlists", - headers=headers - ) - r_playlists.raise_for_status() - playlists_data = r_playlists.json() - - # Use metadata to identify season playlists - season_playlist_ids = set() - if "meta" in playlists_data and "playlist_group" in playlists_data["meta"]: - for group in playlists_data["meta"]["playlist_group"]: - if group.get("type") == "season": - season_playlist_ids.update(group.get("playlist_ids", [])) - - season_playlists = [] - for pl in playlists_data["data"]: - playlist_id = int(pl["id"]) - name = pl["attributes"]["name"].lower() - - if season_playlist_ids: - if playlist_id in season_playlist_ids: - season_playlists.append(pl) - else: - if ("season" in name or name == "episode" or name == "episodes") and \ - "trailer" not in name and "extra" not in name: - season_playlists.append(pl) - - if not season_playlists: - raise ValueError("No season playlists found for this series.") - - def extract_season_number(pl): - name = pl["attributes"]["name"] - match = re.search(r"season\s*(\d+)", name, re.IGNORECASE) - if match: - return int(match.group(1)) - elif name.lower() in ["season", "episodes", "episode"]: - return 1 - else: - return 0 - - season_playlists.sort(key=extract_season_number) - - all_episodes = [] - - for playlist in season_playlists: - playlist_id = playlist["id"] - season_number = extract_season_number(playlist) - - if season_number == 0: - season_number = 1 - - self.log.debug(f"Processing playlist '{playlist['attributes']['name']}' as Season {season_number}") - - page = 1 - while True: - r_eps = self.session.get( - f"https://api.vidio.com/content_profiles/{self.content_id}/playlists/{playlist_id}/videos", - params={ - "page[number]": page, - "page[size]": 20, - "sort": "order", - "included": "upcoming_videos" - }, - headers=headers, - ) - r_eps.raise_for_status() - page_data = r_eps.json() - - for raw_ep in page_data["data"]: - attrs = raw_ep["attributes"] - ep_number = len([e for e in all_episodes if e.season == season_number]) + 1 - all_episodes.append( - Episode( - id_=int(raw_ep["id"]), - service=self.__class__, - title=series_title, - season=season_number, - number=ep_number, - name=attrs["title"], - description=attrs.get("description", ""), - language=Language.get("id"), - data=raw_ep, - ) - ) - - if not page_data["links"].get("next"): - break - page += 1 - - if not all_episodes: - raise ValueError("No episodes found in any season.") - - return Series(all_episodes) - - def get_tracks(self, title: Title_T) -> Tracks: - headers = self._headers() - headers.update({ - "x-device-brand": "samsung", - "x-device-model": "SM-A525F", - "x-device-form-factor": "phone", - "x-device-soc": "Qualcomm SM7125", - "x-device-os": "Android 15 (API 35)", - "x-device-android-mpc": "0", - "x-device-cpu-arch": "arm64-v8a", - "x-device-platform": "android", - "x-app-version": "7.14.6-e4d1de87f2-3191683", - }) - - video_id = str(title.id) - url = f"https://api.vidio.com/api/stream/v1/video_data/{video_id}?initialize=true" - - r = self.session.get(url, headers=headers) - r.raise_for_status() - stream = r.json() - - if not isinstance(stream, dict): - raise ValueError("Vidio returned invalid stream data.") - - # Extract DRM info - custom_data = stream.get("custom_data") or {} - license_servers = stream.get("license_servers") or {} - widevine_data = custom_data.get("widevine") if isinstance(custom_data, dict) else None - license_url = license_servers.get("drm_license_url") if isinstance(license_servers, dict) else None - - # Get stream URLs, check all possible HLS and DASH fields - # HLS URLs (prefer in this order) - hls_url = ( - stream.get("stream_hls_url") or - stream.get("stream_token_hls_url") or - stream.get("stream_token_url") # This is also HLS (m3u8) - ) - - # DASH URLs - dash_url = stream.get("stream_dash_url") or stream.get("stream_token_dash_url") - - has_drm = widevine_data and license_url and dash_url and isinstance(widevine_data, str) - - if has_drm: - # DRM content: must use DASH - self.log.info("Widevine DRM detected, using DASH") - self.custom_data = widevine_data - self.license_url = license_url - tracks = DASH.from_url(dash_url, session=self.session).to_tracks(language=title.language) - - elif hls_url: - # Non-DRM: prefer HLS (H.264, proper frame_rate metadata) - self.log.info("No DRM detected, using HLS") - self.custom_data = None - self.license_url = None - tracks = HLS.from_url(hls_url, session=self.session).to_tracks(language=title.language) - - # Clear HLS subtitles (they're segmented and incompatible) - if tracks.subtitles: - self.log.debug("Clearing HLS subtitles (incompatible format)") - tracks.subtitles.clear() - - # Get subtitles from DASH manifest (plain VTT) if available - if dash_url: - self.log.debug("Extracting subtitles from DASH manifest") - manual_subs = self._extract_subtitles_from_mpd(dash_url) - if manual_subs: - for sub in manual_subs: - tracks.add(sub) - self.log.info(f"Added {len(manual_subs)} subtitle tracks from DASH") - - elif dash_url: - # Fallback to DASH only if no HLS available - self.log.warning("No HLS available, using DASH (VP9 codec - may have issues)") - self.custom_data = None - self.license_url = None - tracks = DASH.from_url(dash_url, session=self.session).to_tracks(language=title.language) - - # Try manual subtitle extraction for non-DRM DASH - if not tracks.subtitles: - manual_subs = self._extract_subtitles_from_mpd(dash_url) - if manual_subs: - for sub in manual_subs: - tracks.add(sub) - else: - raise ValueError("No playable stream (DASH or HLS) available.") - - self.log.info(f"Found {len(tracks.videos)} video tracks, {len(tracks.audio)} audio tracks, {len(tracks.subtitles)} subtitle tracks") - - return tracks - - def get_chapters(self, title: Title_T) -> list[Chapter]: - return [] - - def search(self): - raise NotImplementedError("Search not implemented for Vidio.") - - def get_widevine_service_certificate(self, **_) -> Union[bytes, str, None]: - return None - - def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes: - if not self.license_url or not self.custom_data: - raise ValueError("DRM license info missing.") - - headers = { - "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:143.0) Gecko/20100101 Firefox/143.0", - "Referer": "https://www.vidio.com/", - "Origin": "https://www.vidio.com", - "pallycon-customdata-v2": self.custom_data, - "Content-Type": "application/octet-stream", - } - - self.log.debug(f"Requesting Widevine license from: {self.license_url}") - response = self.session.post( - self.license_url, - data=challenge, - headers=headers - ) - - if not response.ok: - error_summary = response.text[:200] if response.text else "No response body" - raise Exception(f"License request failed ({response.status_code}): {error_summary}") - - return response.content -