import base64 import hashlib import json import re from codecs import Codec from collections.abc import Generator from datetime import datetime, timedelta from http.cookiejar import CookieJar from typing import Optional, Union import click from langcodes import Language from unshackle.core.console import console from unshackle.core.constants import AnyTrack from unshackle.core.credential import Credential from unshackle.core.manifests import DASH from unshackle.core.search_result import SearchResult from unshackle.core.service import Service from unshackle.core.session import session from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T from unshackle.core.tracks import Chapter, Subtitle, Tracks, Video, Chapters class CR(Service): """ Service code for Crunchyroll Author: TPD94 Version: 1.0.0 Authorization: Cookies for web endpoints, credentials for TV endpoints, both for both Security: FHD@L3 Binary Requirements: SubtitleEdit, put the portable version in a folder named SubtitleEdit in /unshackle/binaries Use Series ID/URL (for example - https://www.crunchyroll.com/series/GG5H5XQ7D/kaiju-no-8) or Series ID (for example - GG5H5XQ7D). """ @staticmethod @click.command(name="CR", short_help="https://crunchyroll.com/", help=""" Service code for Crunchyroll\n Author: TPD94\n Version: 1.0.0\n Authorization: Cookies for web endpoints, Credentials for TV endpoints, Cookies/Credentials for both. Cookies required.\n Security: FHD@L3\n Binary Requirements: SubtitleEdit, put the portable version in a folder named SubtitleEdit in /unshackle/binaries\n Use Series ID/URL (for example - https://www.crunchyroll.com/series/GG5H5XQ7D/kaiju-no-8) or Series ID (for example - GG5H5XQ7D). """ ) @click.argument("title", type=str) @click.pass_context def cli(ctx, **kwargs): return CR(ctx, **kwargs) def __init__(self, ctx, title): super().__init__(ctx) # Extract Series ID from URL if applicable match = re.search(r'crunchyroll\.com/series/([^/]+)', title) # Get match if found, otherwise just use the users input for title self.title = match.group(1) if match else title # Initiate empty authorization token for network web requests self.auth_token_web = None # Initiate empty token expiry to store expiry time of authorization token above self.auth_token_expiry_web = None # Initiate empty authorization token for network TV requests self.auth_token_tv = None # Initiate empty token expiry to store expiry time of authorization token above self.auth_token_expiry_tv = None self.cookies = None self.credential = None def get_session(self): # Create a session using curl_cffi as it can impersonate browsers and avoid bot detection by Crunchyroll return session("chrome124") def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None: if cookies: self.cookies = cookies elif hasattr(self, 'cookies'): cookies = self.cookies if credential: self.credential = credential elif hasattr(self, 'credential'): credential = self.credential # Run the super method to load the cookies without writing redundant code super().authenticate(cookies, credential) # Raise error if no cookies, Crunchyroll has implemented recaptcha, so authorization via credentials is not implemented if not cookies: raise EnvironmentError("Service requires cookies for authentication.") # If authenticate is being called for the first time and cookies are present, retrieve an authorization token if cookies and self.auth_token_web is None: # Update the session with the loaded cookies. self.session.cookies.update(cookies) # Send the POST request for an authorization token self.auth_token_web = self.session.post( url=self.config['endpoints']['token'], headers={ 'Authorization': 'Basic bm9haWhkZXZtXzZpeWcwYThsMHE6' # This is generated by crunchyroll and is occasionally updated }, data={ 'device_id': 'b74d25a3-c507-4d89-b0d8-8276577b916f', # This is the default firefox device ID 'device_type': 'Firefox on Windows', # Device type really doesn't matter here, just using this to match 'grant_type': 'etp_rt_cookie', } ).json()['access_token'] # Update the token expiry, Crunchyroll offers 15 minutes between expiry. self.auth_token_expiry_web = datetime.now() + timedelta(minutes=5) if not credential: console.log("Only cookies detected, can only fetch web manifests") if credential and self.auth_token_tv is None: # Send the POST request for an authorization token self.auth_token_tv = self.session.post( url=self.config['endpoints']['token'], headers={ 'Content-Type': 'application/x-www-form-urlencoded', }, data = { 'grant_type': 'password', 'username': credential.username, 'password': credential.password, 'scope': 'offline_access', 'client_id': 'anydazwaxclrocanwho3', 'client_secret': '88gnIsucV-Q7sYrY29uOW_JGlMqx1mBN', 'device_type': 'ANDROIDTV', 'device_id': 'b70699f1-94ae-4cc9-970b-1f58f3dff32c', 'device_name': 'emulator_x86_arm' } ).json()['access_token'] # Update the token expiry, Crunchyroll offers 15 minutes between expiry. self.auth_token_expiry_tv = datetime.now() + timedelta(minutes=5) # If there is already an authorization token for web, and it is expired, get a new one if self.auth_token_web is not None and datetime.now() > self.auth_token_expiry_web: # Send the POST request for an authorization token refresh_response = self.session.post( url=self.config['endpoints']['token'], headers={ 'Authorization': 'Basic bm9haWhkZXZtXzZpeWcwYThsMHE6' # This is generated by crunchyroll and is occasionally updated }, data={ 'device_id': 'b74d25a3-c507-4d89-b0d8-8276577b916f', # This is the default firefox device ID 'device_type': 'Firefox on Windows', # Device type really doesn't matter here, just using this to match 'grant_type': 'etp_rt_cookie', } ) # Update the authorization token self.auth_token_web = refresh_response.json()['access_token'] # Update the token expiry time self.auth_token_expiry_web = datetime.now() + timedelta(minutes=5) # If there is already an authorization token for TV, and it is expired, get a new one if self.auth_token_tv is not None and datetime.now() > self.auth_token_expiry_tv: # Send a POST request for an authorization token refresh_response = self.session.post( url=self.config['endpoints']['token'], headers={ 'Content-Type': 'application/x-www-form-urlencoded', }, data={ 'grant_type': 'password', 'username': credential.username, 'password': credential.password, 'scope': 'offline_access', 'client_id': 'anydazwaxclrocanwho3', 'client_secret': '88gnIsucV-Q7sYrY29uOW_JGlMqx1mBN', 'device_type': 'ANDROIDTV', 'device_id': 'b70699f1-94ae-4cc9-970b-1f58f3dff32c', 'device_name': 'emulator_x86_arm' } ) # Update the authorization token self.auth_token_tv = refresh_response.json()['access_token'] # Update the token expiry time self.auth_token_expiry_tv = datetime.now() + timedelta(minutes=5) def get_titles(self) -> Titles_T: # Initialize empty list for all season IDs of series. season_ids = [] # Initialize a special counter for episodes labeled as 24.9, 12.5, etc. Devine/Unshackle does not allow floats. special_counter = 1000 # Send a GET request to get the series info series_response = self.session.get( url=self.config['endpoints']['seasons'].format(series_id=self.title), headers={ 'Authorization': f'Bearer {self.auth_token_web}' } ).json() # Append each season to the season_ids list for season in series_response['data']: season_ids.append(season['id']) # Initialize empty list for episodes episodes = [] # Iterate through the season_ids for season_id in season_ids: # Send a GET request for the season to get episode information episodes_response = self.session.get( url=self.config['endpoints']['episodes'].format(season_id=season_id), headers={ 'Authorization': f'Bearer {self.auth_token_web}' } ).json() # Iterate through each episode in the season response for episode in episodes_response['data']: # If the episode doesn't have a number, or it is in a season not displayed in chronological order, add 1 to the special counter if episode['episode_number'] is None or episode['episode_number'] is not None and episode['season_display_number'] == '': special_counter += 1 # Append the episode to the episodes list episodes.append(Episode( id_=episode['id'], service=self.__class__, title=episode['series_title'], season=int(episode['season_display_number']) if episode['season_display_number'] != '' else episode['season_number'] if episode['season_display_number'] == '' and episode['season_number'] == 1 else 0, number=episode['episode_number'] if episode['episode_number'] and episode['season_display_number'] != '' else episode['episode_number'] if episode['season_display_number'] == '' and episode['season_number'] == 1 else special_counter, name=episode['title'] if episode['title'] else episode['season_title'], year=episode['episode_air_date'][:4], language=episode['audio_locale'] )) # Return the series return Series(episodes) def get_tracks(self, title: Title_T) -> Tracks: # Initialize tracks class tracks = Tracks() # Initialize current_tracks list to keep track of tracks already added to the tracks class current_tracks = [] # Initialize current_subtitles to avoid duplicated when iterating over all available versions of episodes current_subtitles = [] # Send a GET request to the web endpoint to get episode response for web devices episode_response = self.session.get( url=self.config['endpoints']['episode_web'].format(episode=title.id), headers={ 'Authorization': f'Bearer {self.auth_token_web}' } ).json() # Grab the deletion token, opening the GET request above counts as a concurrent stream # Crunchyroll only allows 3 concurrent streams to be open at once, so we most "close" the stream when all information needed has been extracted deletion_token = episode_response['token'] # Initialize an empty list for different versions of the episode available_versions = [] # Append all the available versions to the available_versions list for version in episode_response['versions']: available_versions.append(version['guid']) # Update the session headers so that session may be passed to DASH.from_url() self.session.headers.update({ 'Authorization': f'Bearer {self.auth_token_web}' }) # Add the original response tracks tracks.add(DASH.from_url(url=episode_response['url'], session=self.session).to_tracks(language=episode_response['audioLocale'])) # Clear the sessions headers self.session.headers.clear() # Clear any subtitles included in the MPD, they will be retrieved manually tracks.subtitles.clear() # Iterate through subtitles available in the original response for subtitle in episode_response['subtitles']: # If the subtitle isn't none if subtitle != 'none' and episode_response['audioLocale'] != 'en-US': # Append the language to the current_subtitles list to keep track of which are already added to tracks current_subtitles.append(episode_response['subtitles'][subtitle]['language']) # Add the subtitle to the tracks object tracks.add(Subtitle( url=episode_response['subtitles'][subtitle]['url'], codec=Subtitle.Codec.from_mime(episode_response['subtitles'][subtitle]['format']), language=episode_response['subtitles'][subtitle]['language'], )) # Update current_tracks with the tracks now in tracks object for track in tracks: current_tracks.append(track.id) track.data['endpoint_type'] = 'web' track.data['GUID'] = title.id # Now that tracks have been extracted, close the "stream" self.session.delete( url=self.config['endpoints']['delete'].format(episode=title.id, delete_token=deletion_token), headers={ 'Authorization': f'Bearer {self.auth_token_web}' } ) if self.auth_token_tv is not None: # Send a GET request to the Android/Google TV endpoint to get episode response for TV devices episode_response = self.session.get( url=self.config['endpoints']['episode_tv'].format(episode=title.id), headers={ 'Accept': 'application/json', 'Accept-Charset': 'UTF-8', 'Accept-Encoding': 'gzip', 'Authorization': f'Bearer {self.auth_token_tv}', 'User-Agent': 'Crunchyroll/ANDROIDTV/3.46.0_22275 (Android 16; en-US; AOSP TV on x86)', }, ).json() # Grab the deletion token, opening the GET request above counts as a concurrent stream # Crunchyroll only allows 3 concurrent streams to be open at once, so we most "close" the stream when all information needed has been extracted deletion_token = episode_response['token'] ## Update the headers to reflect TV device self.session.headers.update({ 'Accept': 'application/json', 'Accept-Charset': 'UTF-8', 'Accept-Encoding': 'gzip', 'Authorization': f'Bearer {self.auth_token_tv}', 'User-Agent': 'Crunchyroll/ANDROIDTV/3.46.0_22275 (Android 16; en-US; AOSP TV on x86)', }) # Grab the new tracks into a variable new_tracks = DASH.from_url(url=episode_response['url'], session=self.session).to_tracks( language=episode_response['audioLocale']) # Clear the subtitles new_tracks.subtitles.clear() # Iterate through the new_tracks for track in new_tracks: # See if the track already exists in current_tracks if track.id not in current_tracks: # Append the track ID if it doesn't current_tracks.append(track.id) # Add track endpoint type track.data['endpoint_type'] = 'tv' track.data['GUID'] = title.id # Add the track to the tracks object tracks.add(track) # Iterate through subtitles available in the original response for subtitle in episode_response['subtitles']: # If the subtitle isn't none if subtitle != 'none' and subtitle not in current_subtitles and episode_response['audioLocale'] != 'en-US': current_subtitles.append(episode_response['subtitles'][subtitle]['language']) # Add the subtitle to the tracks object tracks.add(Subtitle( url=episode_response['subtitles'][subtitle]['url'], codec=Subtitle.Codec.from_mime(episode_response['subtitles'][subtitle]['format']), language=episode_response['subtitles'][subtitle]['language'], )) # Now that tracks have been extracted, close the "stream" self.session.delete( url=self.config['endpoints']['delete'].format(episode=title.id, delete_token=deletion_token), headers={ 'Authorization': f'Bearer {self.auth_token_tv}' } ) # Clear the headers for next iterations below self.session.headers.clear() for version in available_versions: # Send a GET request to the web endpoint to get episode response for web devices episode_response = self.session.get( url=self.config['endpoints']['episode_web'].format(episode=version), headers={ 'Authorization': f'Bearer {self.auth_token_web}' } ).json() ### TRY STATEMENT HERE FOR DEBUGGING ### try: # Grab the deletion token, opening the GET request above counts as a concurrent stream # Crunchyroll only allows 3 concurrent streams to be open at once, so we most "close" the stream when all information needed has been extracted deletion_token = episode_response['token'] # Update the session headers so that session may be passed to DASH.from_url() self.session.headers.update({ 'Authorization': f'Bearer {self.auth_token_web}' }) # Grab the new tracks into a variable new_tracks = DASH.from_url(url=episode_response['url'], session=self.session).to_tracks(language=episode_response['audioLocale']) # Clear the subtitles on the tracks new_tracks.subtitles.clear() # Iterate through available subtitles for subtitle in episode_response['subtitles']: # If the subtitle isn't empty, continue if subtitle != 'none': # If the subtitle is not in current subtitles continue if episode_response['subtitles'][subtitle]['language'] not in current_subtitles: # Append the language to current subtitles list current_subtitles.append(episode_response['subtitles'][subtitle]['language']) # Add the subtitle to the tracks tracks.add(Subtitle( url=episode_response['subtitles'][subtitle]['url'], codec=Subtitle.Codec.from_mime(episode_response['subtitles'][subtitle]['format']), language=episode_response['subtitles'][subtitle]['language'], )) # Iterate through the tracks for track in new_tracks: # if the track ID isn't in the current tracks continue if track.id not in current_tracks: # Append the new track ad current_tracks.append(track.id) # Add track endpoint type track.data['endpoint_type'] = 'web' # Add track GUID track.data['GUID'] = version # Add it tracks.add(track) # Now that tracks have been extracted, close the "stream" self.session.delete( url=self.config['endpoints']['delete'].format(episode=version, delete_token=deletion_token), headers={ 'Authorization': f'Bearer {self.auth_token_web}' } ) # Clear headers for the TV endpoint iteration self.session.headers.clear() except: continue if self.auth_token_tv is not None: # Send a GET request to the Android/Google TV endpoint to get episode response for TV devices episode_response = self.session.get( url=self.config['endpoints']['episode_tv'].format(episode=version), headers={ 'Accept': 'application/json', 'Accept-Charset': 'UTF-8', 'Accept-Encoding': 'gzip', 'Authorization': f'Bearer {self.auth_token_tv}', 'User-Agent': 'Crunchyroll/ANDROIDTV/3.46.0_22275 (Android 16; en-US; AOSP TV on x86)', }, ).json() # Grab the deletion token, opening the GET request above counts as a concurrent stream # Crunchyroll only allows 3 concurrent streams to be open at once, so we most "close" the stream when all information needed has been extracted deletion_token = episode_response['token'] ## Update the headers to reflect TV device self.session.headers.update({ 'Accept': 'application/json', 'Accept-Charset': 'UTF-8', 'Accept-Encoding': 'gzip', 'Authorization': f'Bearer {self.auth_token_tv}', 'User-Agent': 'Crunchyroll/ANDROIDTV/3.46.0_22275 (Android 16; en-US; AOSP TV on x86)', }) # Grab the new tracks into a variable new_tracks = DASH.from_url(url=episode_response['url'], session=self.session).to_tracks( language=episode_response['audioLocale']) # Clear subtitles new_tracks.subtitles.clear() # Iterate through the new_tracks for track in new_tracks: # See if the track already exists in current_tracks if track.id not in current_tracks: # Append the track ID if it doesn't current_tracks.append(track.id) # Add the track endpoint type track.data['endpoint_type'] = 'tv' # Add track guid track.data['GUID'] = version # Add the track to the tracks object tracks.add(track) # Iterate through subtitles available in the original response for subtitle in episode_response['subtitles']: # If the subtitle isn't none if subtitle != 'none' and subtitle not in current_subtitles: # Add the subtitle to the tracks object tracks.add(Subtitle( url=episode_response['subtitles'][subtitle]['url'], codec=Subtitle.Codec.from_mime(episode_response['subtitles'][subtitle]['format']), language=episode_response['subtitles'][subtitle]['language'], )) # Now that tracks have been extracted, close the "stream" self.session.delete( url=self.config['endpoints']['delete'].format(episode=version, delete_token=deletion_token), headers={ 'Authorization': f'Bearer {self.auth_token_tv}' } ) # Clear the headers for next iterations below self.session.headers.clear() return tracks def get_chapters(self, title: Title_T) -> Chapters: # All things created equal, this should be the same for all video tracks # Initialize chapters class chapters = Chapters() # Send GET request for the chapters chapter_response = self.session.get( url=self.config['endpoints']['chapters'].format(episode=title.id), ).json() # Check for intro chapter if chapter_response.get('intro'): try: # Add the chapter, may not exist chapters.add(Chapter( timestamp=chapter_response['intro']['start'] * 1000, name=chapter_response['intro']['type'].capitalize(), )) # If it doesn't exist, move on to the next except: pass # Check for the credits chapter if chapter_response.get('credits'): try: # Add the chapter, may not exist chapters.add(Chapter( timestamp=chapter_response['credits']['start'] * 1000, name=chapter_response['credits']['type'].capitalize(), )) # If it doesn't exist, move on to the next except: pass # Check for the preview chapter if chapter_response.get('preview'): try: # Add the chapter, may not exist chapters.add(Chapter( timestamp=chapter_response['preview']['start'] * 1000, name=chapter_response['preview']['type'].capitalize(), )) # If it doesn't exist, move on to the next except: pass # Check for recap chapter if chapter_response.get('recap'): try: # Add the chapter, may not exist chapters.add(Chapter( timestamp=chapter_response['recap']['start'] * 1000, name=chapter_response['recap']['type'].capitalize(), )) # If it doesn't exist, move on to return statement except: pass # Return the chapters return chapters def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]: if track.data['endpoint_type'] == 'tv': # Get the episode response episode_response = self.session.get( url=self.config['endpoints']['episode_tv'].format(episode=track.data['GUID']), headers={ 'Accept': 'application/json', 'Accept-Charset': 'UTF-8', 'Accept-Encoding': 'gzip', 'Authorization': f'Bearer {self.auth_token_tv}', 'User-Agent': 'Crunchyroll/ANDROIDTV/3.46.0_22275 (Android 16; en-US; AOSP TV on x86)', }, ).json() # Grab the deletion token, opening the GET request above counts as a concurrent stream # Crunchyroll only allows 3 concurrent streams to be open at once, so we most "close" the stream when all information needed has been extracted deletion_token = episode_response['token'] # Send a post request to the license server license_response = self.session.post( url=self.config['endpoints']['license_tv'], headers={ 'Accept': 'application/octet-stream', 'Accept-Encoding': 'gzip', 'Authorization': f'Bearer {self.auth_token_tv}', 'Content-Type': 'application/octet-stream', 'Host': 'cr-license-proxy.prd.crunchyrollsvc.com', 'x-cr-content-id': track.data['GUID'], 'x-cr-video-token': deletion_token }, data=challenge ) # Now that license has been extracted, close the "stream" self.session.delete( url=self.config['endpoints']['delete'].format(episode=track.data['GUID'], delete_token=deletion_token), headers={ 'Authorization': f'Bearer {self.auth_token_tv}' } ) ## Update the headers to reflect TV device self.session.headers.update({ 'Accept': 'application/json', 'Accept-Charset': 'UTF-8', 'Accept-Encoding': 'gzip', 'Authorization': f'Bearer {self.auth_token_tv}', 'User-Agent': 'Crunchyroll/ANDROIDTV/3.46.0_22275 (Android 16; en-US; AOSP TV on x86)', }) return license_response.content if track.data['endpoint_type'] == 'web': # Get the episode response episode_response = self.session.get( url=self.config['endpoints']['episode_web'].format(episode=track.data['GUID']), headers={ 'Authorization': f'Bearer {self.auth_token_web}' } ).json() # Grab the deletion token, opening the GET request above counts as a concurrent stream # Crunchyroll only allows 3 concurrent streams to be open at once, so we most "close" the stream when all information needed has been extracted deletion_token = episode_response['token'] # Get the license license_response = self.session.post( url=self.config['endpoints']['license_web'], headers={ 'Accept': '*/*', 'content-type': 'application/octet-stream', 'x-cr-content-id': track.data['GUID'], 'x-cr-video-token': deletion_token, 'authorization': f'Bearer {self.auth_token_web}' }, data=challenge ).json() # Now that license has been extracted, close the "stream" self.session.delete( url=self.config['endpoints']['delete'].format(episode=track.data['GUID'], delete_token=deletion_token), headers={ 'Authorization': f'Bearer {self.auth_token_web}' } ) # Update the headers to reflect web self.session.headers.update({ 'Authorization': f'Bearer {self.auth_token_web}', }) return license_response['license'] def on_track_downloaded(self, track: AnyTrack) -> None: if isinstance(track, Subtitle): track.convert(codec=Subtitle.Codec.SubRip) self.authenticate() def search(self) -> Generator[SearchResult, None, None]: # Get the search results search_results = self.session.get( url=self.config['endpoints']['search'].format(search_keyword=self.title), headers={ 'Authorization': f'Bearer {self.auth_token_web}', } ).json() # Iterate through series responses, create generator for results. for result_type in search_results['data']: if result_type['type'] == 'series': for series_results in result_type['items']: yield SearchResult( id_=series_results['id'], title=series_results['title'], description=series_results['description'] )