diff --git a/KNPY/__init__.py b/KNPY/__init__.py deleted file mode 100644 index 8647d03..0000000 --- a/KNPY/__init__.py +++ /dev/null @@ -1,407 +0,0 @@ -import base64 -import json -import re -from datetime import datetime, timezone -from http.cookiejar import CookieJar -from typing import List, Optional - -import click -import jwt -from langcodes import Language - -from unshackle.core.constants import AnyTrack -from unshackle.core.credential import Credential -from unshackle.core.manifests import DASH -from unshackle.core.search_result import SearchResult -from unshackle.core.service import Service -from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T -from unshackle.core.tracks import Subtitle, Tracks - - -class KNPY(Service): - """ - Service code for Kanopy (kanopy.com). - Version: 1.0.0 - - Auth: Credential (username + password) - Security: FHD@L3 - - Handles both Movies and Series (Playlists). - Detects and stops for movies that require tickets. - Caching included - """ - - # Updated regex to match the new URL structure with library subdomain and path - TITLE_RE = r"^https?://(?:www\.)?kanopy\.com/.+/(?P\d+)$" - GEOFENCE = () - NO_SUBTITLES = False - - @staticmethod - @click.command(name="KNPY", short_help="https://kanopy.com") - @click.argument("title", type=str) - @click.pass_context - def cli(ctx, **kwargs): - return KNPY(ctx, **kwargs) - - def __init__(self, ctx, title: str): - super().__init__(ctx) - if not self.config: - raise ValueError("KNPY configuration not found. Ensure config.yaml exists.") - - self.cdm = ctx.obj.cdm - - match = re.match(self.TITLE_RE, title) - if match: - self.content_id = match.group("id") - else: - self.content_id = None - self.search_query = title - - self.API_VERSION = self.config["client"]["api_version"] - self.USER_AGENT = self.config["client"]["user_agent"] - self.WIDEVINE_UA = self.config["client"]["widevine_ua"] - - self.session.headers.update({ - "x-version": self.API_VERSION, - "user-agent": self.USER_AGENT - }) - - self._jwt = None - self._visitor_id = None - self._user_id = None - self._domain_id = None - self.widevine_license_url = None - - def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None: - if not credential or not credential.username or not credential.password: - raise ValueError("Kanopy requires email and password for authentication.") - - cache = self.cache.get("auth_token") - - if cache and not cache.expired: - cached_data = cache.data - valid_token = None - - if isinstance(cached_data, dict) and "token" in cached_data: - if cached_data.get("username") == credential.username: - valid_token = cached_data["token"] - self.log.info("Using cached authentication token") - else: - self.log.info(f"Cached token belongs to '{cached_data.get('username')}', but logging in as '{credential.username}'. Re-authenticating.") - - elif isinstance(cached_data, str): - self.log.info("Found legacy cached token format. Re-authenticating to ensure correct user.") - - if valid_token: - self._jwt = valid_token - self.session.headers.update({"authorization": f"Bearer {self._jwt}"}) - - if not self._user_id or not self._domain_id or not self._visitor_id: - try: - decoded_jwt = jwt.decode(self._jwt, options={"verify_signature": False}) - self._user_id = decoded_jwt["data"]["uid"] - self._visitor_id = decoded_jwt["data"]["visitor_id"] - self.log.info(f"Extracted user_id and visitor_id from cached token.") - self._fetch_user_details() - return - except (KeyError, jwt.DecodeError) as e: - self.log.error(f"Could not decode cached token: {e}. Re-authenticating.") - - self.log.info("Performing handshake to get visitor token...") - r = self.session.get(self.config["endpoints"]["handshake"]) - r.raise_for_status() - handshake_data = r.json() - self._visitor_id = handshake_data["visitorId"] - initial_jwt = handshake_data["jwt"] - - self.log.info(f"Logging in as {credential.username}...") - login_payload = { - "credentialType": "email", - "emailUser": { - "email": credential.username, - "password": credential.password - } - } - r = self.session.post( - self.config["endpoints"]["login"], - json=login_payload, - headers={"authorization": f"Bearer {initial_jwt}"} - ) - r.raise_for_status() - login_data = r.json() - self._jwt = login_data["jwt"] - self._user_id = login_data["userId"] - - self.session.headers.update({"authorization": f"Bearer {self._jwt}"}) - self.log.info(f"Successfully authenticated as {credential.username}") - - self._fetch_user_details() - - try: - decoded_jwt = jwt.decode(self._jwt, options={"verify_signature": False}) - exp_timestamp = decoded_jwt.get("exp") - - cache_payload = { - "token": self._jwt, - "username": credential.username - } - - if exp_timestamp: - expiration_in_seconds = int(exp_timestamp - datetime.now(timezone.utc).timestamp()) - self.log.info(f"Caching token for {expiration_in_seconds / 60:.2f} minutes.") - cache.set(data=cache_payload, expiration=expiration_in_seconds) - else: - self.log.warning("JWT has no 'exp' claim, caching for 1 hour as a fallback.") - cache.set(data=cache_payload, expiration=3600) - except Exception as e: - self.log.error(f"Failed to decode JWT for caching: {e}. Caching for 1 hour as a fallback.") - cache.set( - data={"token": self._jwt, "username": credential.username}, - expiration=3600 - ) - - def _fetch_user_details(self): - self.log.info("Fetching user library memberships...") - r = self.session.get(self.config["endpoints"]["memberships"].format(user_id=self._user_id)) - r.raise_for_status() - memberships = r.json() - - for membership in memberships.get("list", []): - if membership.get("status") == "active" and membership.get("isDefault", False): - self._domain_id = str(membership["domainId"]) - self.log.info(f"Using default library domain: {membership.get('sitename', 'Unknown')} (ID: {self._domain_id})") - return - - if memberships.get("list"): - self._domain_id = str(memberships["list"][0]["domainId"]) - self.log.warning(f"No default library found. Using first active domain: {self._domain_id}") - else: - raise ValueError("No active library memberships found for this user.") - - def get_titles(self) -> Titles_T: - if not self.content_id: - raise ValueError("A content ID is required to get titles. Use a URL or run a search first.") - if not self._domain_id: - raise ValueError("Domain ID not set. Authentication may have failed.") - - r = self.session.get(self.config["endpoints"]["video_info"].format(video_id=self.content_id, domain_id=self._domain_id)) - r.raise_for_status() - content_data = r.json() - - content_type = content_data.get("type") - - def parse_lang(data): - try: - langs = data.get("languages", []) - if langs and isinstance(langs, list) and len(langs) > 0: - return Language.find(langs[0]) - except: - pass - return Language.get("en") - - if content_type == "video": - video_data = content_data["video"] - movie = Movie( - id_=str(video_data["videoId"]), - service=self.__class__, - name=video_data["title"], - year=video_data.get("productionYear"), - description=video_data.get("descriptionHtml", ""), - language=parse_lang(video_data), - data=video_data, - ) - return Movies([movie]) - - elif content_type == "playlist": - playlist_data = content_data["playlist"] - series_title = playlist_data["title"] - series_year = playlist_data.get("productionYear") - - season_match = re.search(r'(?:Season|S)\s*(\d+)', series_title, re.IGNORECASE) - season_num = int(season_match.group(1)) if season_match else 1 - - r = self.session.get(self.config["endpoints"]["video_items"].format(video_id=self.content_id, domain_id=self._domain_id)) - r.raise_for_status() - items_data = r.json() - - episodes = [] - for i, item in enumerate(items_data.get("list", [])): - if item.get("type") != "video": - continue - - video_data = item["video"] - ep_num = i + 1 - - ep_title = video_data.get("title", "") - ep_match = re.search(r'Ep(?:isode)?\.?\s*(\d+)', ep_title, re.IGNORECASE) - if ep_match: - ep_num = int(ep_match.group(1)) - - episodes.append( - Episode( - id_=str(video_data["videoId"]), - service=self.__class__, - title=series_title, - season=season_num, - number=ep_num, - name=video_data["title"], - description=video_data.get("descriptionHtml", ""), - year=video_data.get("productionYear", series_year), - language=parse_lang(video_data), - data=video_data, - ) - ) - - series = Series(episodes) - series.name = series_title - series.description = playlist_data.get("descriptionHtml", "") - series.year = series_year - return series - - else: - raise ValueError(f"Unsupported content type: {content_type}") - - def get_tracks(self, title: Title_T) -> Tracks: - play_payload = { - "videoId": int(title.id), - "domainId": int(self._domain_id), - "userId": int(self._user_id), - "visitorId": self._visitor_id - } - - self.session.headers.setdefault("authorization", f"Bearer {self._jwt}") - self.session.headers.setdefault("x-version", self.API_VERSION) - self.session.headers.setdefault("user-agent", self.USER_AGENT) - - r = self.session.post(self.config["endpoints"]["plays"], json=play_payload) - response_json = None - try: - response_json = r.json() - except Exception: - pass - - # Handle known errors gracefully - if r.status_code == 403: - if response_json and response_json.get("errorSubcode") == "playRegionRestricted": - self.log.error("Kanopy reports: This video is not available in your country.") - raise PermissionError( - "Playback blocked by region restriction. Try connecting through a supported country or verify your library’s access region." - ) - else: - self.log.error(f"Access forbidden (HTTP 403). Response: {response_json}") - raise PermissionError("Kanopy denied access to this video. It may require a different library membership or authentication.") - - # Raise for any other HTTP errors - r.raise_for_status() - play_data = response_json or r.json() - - manifest_url = None - for manifest in play_data.get("manifests", []): - if manifest["manifestType"] == "dash": - url = manifest["url"] - manifest_url = f"https://kanopy.com{url}" if url.startswith("/") else url - drm_type = manifest.get("drmType") - if drm_type == "kanopyDrm": - play_id = play_data.get("playId") - self.widevine_license_url = self.config["endpoints"]["widevine_license"].format(license_id=f"{play_id}-0") - elif drm_type == "studioDrm": - license_id = manifest.get("drmLicenseID", f"{play_data.get('playId')}-1") - self.widevine_license_url = self.config["endpoints"]["widevine_license"].format(license_id=license_id) - else: - self.log.warning(f"Unknown drmType: {drm_type}") - self.widevine_license_url = None - break - - if not manifest_url: - raise ValueError("Could not find a DASH manifest for this title.") - if not self.widevine_license_url: - raise ValueError("Could not construct Widevine license URL.") - - self.log.info(f"Fetching DASH manifest from: {manifest_url}") - r = self.session.get(manifest_url) - r.raise_for_status() - - # Refresh headers for manifest parsing - self.session.headers.clear() - self.session.headers.update({ - "User-Agent": self.WIDEVINE_UA, - "Accept": "*/*", - "Accept-Encoding": "gzip, deflate", - "Connection": "keep-alive", - }) - - tracks = DASH.from_text(r.text, url=manifest_url).to_tracks(language=title.language) - for caption_data in play_data.get("captions", []): - lang = caption_data.get("language", "en") - for file_info in caption_data.get("files", []): - if file_info.get("type") == "webvtt": - tracks.add(Subtitle( - id_=f"caption-{lang}", - url=file_info["url"], - codec=Subtitle.Codec.WebVTT, - language=Language.get(lang) - )) - break - - return tracks - - - def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> bytes: - if not self.widevine_license_url: - raise ValueError("Widevine license URL was not set. Call get_tracks first.") - - license_headers = { - "Content-Type": "application/octet-stream", - "User-Agent": self.WIDEVINE_UA, - "Authorization": f"Bearer {self._jwt}", - "X-Version": self.API_VERSION - } - - r = self.session.post( - self.widevine_license_url, - data=challenge, - headers=license_headers - ) - r.raise_for_status() - return r.content - - # def search(self) -> List[SearchResult]: - # if not hasattr(self, 'search_query'): - # self.log.error("Search query not set. Cannot search.") - # return [] - - # self.log.info(f"Searching for '{self.search_query}'...") - # params = { - # "query": self.search_query, - # "sort": "relevance", - # "domainId": self._domain_id, - # "page": 0, - # "perPage": 20 - # } - # r = self.session.get(self.config["endpoints"]["search"], params=params) - # r.raise_for_status() - # search_data = r.json() - - # results = [] - # for item in search_data.get("list", []): - # item_type = item.get("type") - # if item_type not in ["playlist", "video"]: - # continue - - # video_id = item.get("videoId") - # title = item.get("title", "No Title") - # label = "Series" if item_type == "playlist" else "Movie" - - # results.append( - # SearchResult( - # id_=str(video_id), - # title=title, - # description="", - # label=label, - # url=f"https://www.kanopy.com/watch/{video_id}" - # ) - # ) - # return results - - def get_chapters(self, title: Title_T) -> list: - return []