Initial upload
- Added Crunchyroll service
This commit is contained in:
commit
c6f3ade6c6
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
||||
*.pyc
|
712
CR/__init__.py
Normal file
712
CR/__init__.py
Normal file
@ -0,0 +1,712 @@
|
||||
import base64
|
||||
import hashlib
|
||||
import json
|
||||
import re
|
||||
from codecs import Codec
|
||||
from collections.abc import Generator
|
||||
from datetime import datetime, timedelta
|
||||
from http.cookiejar import CookieJar
|
||||
from typing import Optional, Union
|
||||
import click
|
||||
from langcodes import Language
|
||||
|
||||
from unshackle.core.console import console
|
||||
from unshackle.core.constants import AnyTrack
|
||||
from unshackle.core.credential import Credential
|
||||
from unshackle.core.manifests import DASH
|
||||
from unshackle.core.search_result import SearchResult
|
||||
from unshackle.core.service import Service
|
||||
from unshackle.core.session import session
|
||||
from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T
|
||||
from unshackle.core.tracks import Chapter, Subtitle, Tracks, Video, Chapters
|
||||
|
||||
class CR(Service):
|
||||
|
||||
"""
|
||||
Service code for Crunchyroll
|
||||
Author: TPD94
|
||||
Version: 1.0.0
|
||||
Authorization: Cookies for web endpoints, credentials for TV endpoints, both for both
|
||||
Security: FHD@L3
|
||||
Binary Requirements: SubtitleEdit, put the portable version in a folder named SubtitleEdit in /unshackle/binaries
|
||||
Use Series ID/URL (for example - https://www.crunchyroll.com/series/GG5H5XQ7D/kaiju-no-8) or Series ID (for example - GG5H5XQ7D).
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
@click.command(name="CR", short_help="https://crunchyroll.com/",
|
||||
help="""
|
||||
Service code for Crunchyroll\n
|
||||
Author: TPD94\n
|
||||
Version: 1.0.0\n
|
||||
Authorization: Cookies for web endpoints, Credentials for TV endpoints, Cookies/Credentials for both. Cookies required.\n
|
||||
Security: FHD@L3\n
|
||||
Binary Requirements: SubtitleEdit, put the portable version in a folder named SubtitleEdit in /unshackle/binaries\n
|
||||
Use Series ID/URL (for example - https://www.crunchyroll.com/series/GG5H5XQ7D/kaiju-no-8) or Series ID (for example - GG5H5XQ7D).
|
||||
"""
|
||||
)
|
||||
@click.argument("title", type=str)
|
||||
@click.pass_context
|
||||
|
||||
def cli(ctx, **kwargs):
|
||||
return CR(ctx, **kwargs)
|
||||
|
||||
def __init__(self, ctx, title):
|
||||
super().__init__(ctx)
|
||||
|
||||
# Extract Series ID from URL if applicable
|
||||
match = re.search(r'crunchyroll\.com/series/([^/]+)', title)
|
||||
|
||||
# Get match if found, otherwise just use the users input for title
|
||||
self.title = match.group(1) if match else title
|
||||
|
||||
# Initiate empty authorization token for network web requests
|
||||
self.auth_token_web = None
|
||||
|
||||
# Initiate empty token expiry to store expiry time of authorization token above
|
||||
self.auth_token_expiry_web = None
|
||||
|
||||
# Initiate empty authorization token for network TV requests
|
||||
self.auth_token_tv = None
|
||||
|
||||
# Initiate empty token expiry to store expiry time of authorization token above
|
||||
self.auth_token_expiry_tv = None
|
||||
|
||||
self.cookies = None
|
||||
|
||||
self.credential = None
|
||||
|
||||
def get_session(self):
|
||||
|
||||
# Create a session using curl_cffi as it can impersonate browsers and avoid bot detection by Crunchyroll
|
||||
return session("chrome124")
|
||||
|
||||
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
|
||||
if cookies:
|
||||
self.cookies = cookies
|
||||
elif hasattr(self, 'cookies'):
|
||||
cookies = self.cookies
|
||||
|
||||
if credential:
|
||||
self.credential = credential
|
||||
elif hasattr(self, 'credential'):
|
||||
credential = self.credential
|
||||
|
||||
# Run the super method to load the cookies without writing redundant code
|
||||
super().authenticate(cookies, credential)
|
||||
|
||||
# Raise error if no cookies, Crunchyroll has implemented recaptcha, so authorization via credentials is not implemented
|
||||
if not cookies:
|
||||
raise EnvironmentError("Service requires cookies for authentication.")
|
||||
|
||||
# If authenticate is being called for the first time and cookies are present, retrieve an authorization token
|
||||
if cookies and self.auth_token_web is None:
|
||||
|
||||
# Update the session with the loaded cookies.
|
||||
self.session.cookies.update(cookies)
|
||||
|
||||
# Send the POST request for an authorization token
|
||||
self.auth_token_web = self.session.post(
|
||||
url=self.config['endpoints']['token'],
|
||||
headers={
|
||||
'Authorization': 'Basic bm9haWhkZXZtXzZpeWcwYThsMHE6' # This is generated by crunchyroll and is occasionally updated
|
||||
},
|
||||
data={
|
||||
'device_id': 'b74d25a3-c507-4d89-b0d8-8276577b916f', # This is the default firefox device ID
|
||||
'device_type': 'Firefox on Windows', # Device type really doesn't matter here, just using this to match
|
||||
'grant_type': 'etp_rt_cookie',
|
||||
}
|
||||
).json()['access_token']
|
||||
|
||||
# Update the token expiry, Crunchyroll offers 15 minutes between expiry.
|
||||
self.auth_token_expiry_web = datetime.now() + timedelta(minutes=5)
|
||||
|
||||
if not credential:
|
||||
console.log("Only cookies detected, can only fetch web manifests")
|
||||
|
||||
if credential and self.auth_token_tv is None:
|
||||
# Send the POST request for an authorization token
|
||||
self.auth_token_tv = self.session.post(
|
||||
url=self.config['endpoints']['token'],
|
||||
headers={
|
||||
'Content-Type': 'application/x-www-form-urlencoded',
|
||||
},
|
||||
data = {
|
||||
'grant_type': 'password',
|
||||
'username': credential.username,
|
||||
'password': credential.password,
|
||||
'scope': 'offline_access',
|
||||
'client_id': 'anydazwaxclrocanwho3',
|
||||
'client_secret': '88gnIsucV-Q7sYrY29uOW_JGlMqx1mBN',
|
||||
'device_type': 'ANDROIDTV',
|
||||
'device_id': 'b70699f1-94ae-4cc9-970b-1f58f3dff32c',
|
||||
'device_name': 'emulator_x86_arm'
|
||||
}
|
||||
).json()['access_token']
|
||||
|
||||
# Update the token expiry, Crunchyroll offers 15 minutes between expiry.
|
||||
self.auth_token_expiry_tv = datetime.now() + timedelta(minutes=5)
|
||||
|
||||
|
||||
# If there is already an authorization token for web, and it is expired, get a new one
|
||||
if self.auth_token_web is not None and datetime.now() > self.auth_token_expiry_web:
|
||||
|
||||
# Send the POST request for an authorization token
|
||||
refresh_response = self.session.post(
|
||||
url=self.config['endpoints']['token'],
|
||||
headers={
|
||||
'Authorization': 'Basic bm9haWhkZXZtXzZpeWcwYThsMHE6' # This is generated by crunchyroll and is occasionally updated
|
||||
},
|
||||
data={
|
||||
'device_id': 'b74d25a3-c507-4d89-b0d8-8276577b916f', # This is the default firefox device ID
|
||||
'device_type': 'Firefox on Windows', # Device type really doesn't matter here, just using this to match
|
||||
'grant_type': 'etp_rt_cookie',
|
||||
}
|
||||
)
|
||||
|
||||
# Update the authorization token
|
||||
self.auth_token_web = refresh_response.json()['access_token']
|
||||
|
||||
# Update the token expiry time
|
||||
self.auth_token_expiry_web = datetime.now() + timedelta(minutes=5)
|
||||
|
||||
# If there is already an authorization token for TV, and it is expired, get a new one
|
||||
if self.auth_token_tv is not None and datetime.now() > self.auth_token_expiry_tv:
|
||||
|
||||
# Send a POST request for an authorization token
|
||||
refresh_response = self.session.post(
|
||||
url=self.config['endpoints']['token'],
|
||||
headers={
|
||||
'Content-Type': 'application/x-www-form-urlencoded',
|
||||
},
|
||||
data={
|
||||
'grant_type': 'password',
|
||||
'username': credential.username,
|
||||
'password': credential.password,
|
||||
'scope': 'offline_access',
|
||||
'client_id': 'anydazwaxclrocanwho3',
|
||||
'client_secret': '88gnIsucV-Q7sYrY29uOW_JGlMqx1mBN',
|
||||
'device_type': 'ANDROIDTV',
|
||||
'device_id': 'b70699f1-94ae-4cc9-970b-1f58f3dff32c',
|
||||
'device_name': 'emulator_x86_arm'
|
||||
}
|
||||
)
|
||||
|
||||
# Update the authorization token
|
||||
self.auth_token_tv = refresh_response.json()['access_token']
|
||||
|
||||
# Update the token expiry time
|
||||
self.auth_token_expiry_tv = datetime.now() + timedelta(minutes=5)
|
||||
|
||||
def get_titles(self) -> Titles_T:
|
||||
|
||||
# Initialize empty list for all season IDs of series.
|
||||
season_ids = []
|
||||
|
||||
# Initialize a special counter for episodes labeled as 24.9, 12.5, etc. Devine/Unshackle does not allow floats.
|
||||
special_counter = 1000
|
||||
|
||||
# Send a GET request to get the series info
|
||||
series_response = self.session.get(
|
||||
url=self.config['endpoints']['seasons'].format(series_id=self.title),
|
||||
headers={
|
||||
'Authorization': f'Bearer {self.auth_token_web}'
|
||||
}
|
||||
).json()
|
||||
|
||||
# Append each season to the season_ids list
|
||||
for season in series_response['data']:
|
||||
season_ids.append(season['id'])
|
||||
|
||||
# Initialize empty list for episodes
|
||||
episodes = []
|
||||
|
||||
# Iterate through the season_ids
|
||||
for season_id in season_ids:
|
||||
|
||||
# Send a GET request for the season to get episode information
|
||||
episodes_response = self.session.get(
|
||||
url=self.config['endpoints']['episodes'].format(season_id=season_id),
|
||||
headers={
|
||||
'Authorization': f'Bearer {self.auth_token_web}'
|
||||
}
|
||||
).json()
|
||||
|
||||
# Iterate through each episode in the season response
|
||||
for episode in episodes_response['data']:
|
||||
|
||||
# If the episode doesn't have a number, or it is in a season not displayed in chronological order, add 1 to the special counter
|
||||
if episode['episode_number'] is None or episode['episode_number'] is not None and episode['season_display_number'] == '':
|
||||
special_counter += 1
|
||||
|
||||
# Append the episode to the episodes list
|
||||
episodes.append(Episode(
|
||||
id_=episode['id'],
|
||||
service=self.__class__,
|
||||
title=episode['series_title'],
|
||||
season=int(episode['season_display_number']) if episode['season_display_number'] != '' else episode['season_number'] if episode['season_display_number'] == '' and episode['season_number'] == 1 else 0,
|
||||
number=episode['episode_number'] if episode['episode_number'] and episode['season_display_number'] != '' else episode['episode_number'] if episode['season_display_number'] == '' and episode['season_number'] == 1 else special_counter,
|
||||
name=episode['title'] if episode['title'] else episode['season_title'],
|
||||
year=episode['episode_air_date'][:4],
|
||||
language=episode['audio_locale']
|
||||
))
|
||||
|
||||
# Return the series
|
||||
return Series(episodes)
|
||||
|
||||
def get_tracks(self, title: Title_T) -> Tracks:
|
||||
|
||||
# Initialize tracks class
|
||||
tracks = Tracks()
|
||||
|
||||
# Initialize current_tracks list to keep track of tracks already added to the tracks class
|
||||
current_tracks = []
|
||||
|
||||
# Initialize current_subtitles to avoid duplicated when iterating over all available versions of episodes
|
||||
current_subtitles = []
|
||||
|
||||
# Send a GET request to the web endpoint to get episode response for web devices
|
||||
episode_response = self.session.get(
|
||||
url=self.config['endpoints']['episode_web'].format(episode=title.id),
|
||||
headers={
|
||||
'Authorization': f'Bearer {self.auth_token_web}'
|
||||
}
|
||||
).json()
|
||||
|
||||
# Grab the deletion token, opening the GET request above counts as a concurrent stream
|
||||
# Crunchyroll only allows 3 concurrent streams to be open at once, so we most "close" the stream when all information needed has been extracted
|
||||
deletion_token = episode_response['token']
|
||||
|
||||
# Initialize an empty list for different versions of the episode
|
||||
available_versions = []
|
||||
|
||||
# Append all the available versions to the available_versions list
|
||||
for version in episode_response['versions']:
|
||||
available_versions.append(version['guid'])
|
||||
|
||||
# Update the session headers so that session may be passed to DASH.from_url()
|
||||
self.session.headers.update({
|
||||
'Authorization': f'Bearer {self.auth_token_web}'
|
||||
})
|
||||
|
||||
# Add the original response tracks
|
||||
tracks.add(DASH.from_url(url=episode_response['url'], session=self.session).to_tracks(language=episode_response['audioLocale']))
|
||||
|
||||
# Clear the sessions headers
|
||||
self.session.headers.clear()
|
||||
|
||||
# Clear any subtitles included in the MPD, they will be retrieved manually
|
||||
tracks.subtitles.clear()
|
||||
|
||||
# Iterate through subtitles available in the original response
|
||||
for subtitle in episode_response['subtitles']:
|
||||
|
||||
# If the subtitle isn't none
|
||||
if subtitle != 'none' and episode_response['audioLocale'] != 'en-US':
|
||||
|
||||
# Append the language to the current_subtitles list to keep track of which are already added to tracks
|
||||
current_subtitles.append(episode_response['subtitles'][subtitle]['language'])
|
||||
|
||||
# Add the subtitle to the tracks object
|
||||
tracks.add(Subtitle(
|
||||
url=episode_response['subtitles'][subtitle]['url'],
|
||||
codec=Subtitle.Codec.from_mime(episode_response['subtitles'][subtitle]['format']),
|
||||
language=episode_response['subtitles'][subtitle]['language'],
|
||||
))
|
||||
|
||||
# Update current_tracks with the tracks now in tracks object
|
||||
for track in tracks:
|
||||
current_tracks.append(track.id)
|
||||
track.data['endpoint_type'] = 'web'
|
||||
track.data['GUID'] = title.id
|
||||
|
||||
# Now that tracks have been extracted, close the "stream"
|
||||
self.session.delete(
|
||||
url=self.config['endpoints']['delete'].format(episode=title.id, delete_token=deletion_token),
|
||||
headers={
|
||||
'Authorization': f'Bearer {self.auth_token_web}'
|
||||
}
|
||||
)
|
||||
|
||||
if self.auth_token_tv is not None:
|
||||
# Send a GET request to the Android/Google TV endpoint to get episode response for TV devices
|
||||
episode_response = self.session.get(
|
||||
url=self.config['endpoints']['episode_tv'].format(episode=title.id),
|
||||
headers={
|
||||
'Accept': 'application/json',
|
||||
'Accept-Charset': 'UTF-8',
|
||||
'Accept-Encoding': 'gzip',
|
||||
'Authorization': f'Bearer {self.auth_token_tv}',
|
||||
'User-Agent': 'Crunchyroll/ANDROIDTV/3.46.0_22275 (Android 16; en-US; AOSP TV on x86)',
|
||||
},
|
||||
).json()
|
||||
|
||||
# Grab the deletion token, opening the GET request above counts as a concurrent stream
|
||||
# Crunchyroll only allows 3 concurrent streams to be open at once, so we most "close" the stream when all information needed has been extracted
|
||||
deletion_token = episode_response['token']
|
||||
|
||||
## Update the headers to reflect TV device
|
||||
self.session.headers.update({
|
||||
'Accept': 'application/json',
|
||||
'Accept-Charset': 'UTF-8',
|
||||
'Accept-Encoding': 'gzip',
|
||||
'Authorization': f'Bearer {self.auth_token_tv}',
|
||||
'User-Agent': 'Crunchyroll/ANDROIDTV/3.46.0_22275 (Android 16; en-US; AOSP TV on x86)',
|
||||
})
|
||||
|
||||
# Grab the new tracks into a variable
|
||||
new_tracks = DASH.from_url(url=episode_response['url'], session=self.session).to_tracks(
|
||||
language=episode_response['audioLocale'])
|
||||
|
||||
# Clear the subtitles
|
||||
new_tracks.subtitles.clear()
|
||||
|
||||
# Iterate through the new_tracks
|
||||
for track in new_tracks:
|
||||
# See if the track already exists in current_tracks
|
||||
if track.id not in current_tracks:
|
||||
# Append the track ID if it doesn't
|
||||
current_tracks.append(track.id)
|
||||
# Add track endpoint type
|
||||
track.data['endpoint_type'] = 'tv'
|
||||
track.data['GUID'] = title.id
|
||||
# Add the track to the tracks object
|
||||
tracks.add(track)
|
||||
|
||||
# Iterate through subtitles available in the original response
|
||||
for subtitle in episode_response['subtitles']:
|
||||
|
||||
# If the subtitle isn't none
|
||||
if subtitle != 'none' and subtitle not in current_subtitles and episode_response['audioLocale'] != 'en-US':
|
||||
current_subtitles.append(episode_response['subtitles'][subtitle]['language'])
|
||||
|
||||
# Add the subtitle to the tracks object
|
||||
tracks.add(Subtitle(
|
||||
url=episode_response['subtitles'][subtitle]['url'],
|
||||
codec=Subtitle.Codec.from_mime(episode_response['subtitles'][subtitle]['format']),
|
||||
language=episode_response['subtitles'][subtitle]['language'],
|
||||
))
|
||||
|
||||
# Now that tracks have been extracted, close the "stream"
|
||||
self.session.delete(
|
||||
url=self.config['endpoints']['delete'].format(episode=title.id, delete_token=deletion_token),
|
||||
headers={
|
||||
'Authorization': f'Bearer {self.auth_token_tv}'
|
||||
}
|
||||
)
|
||||
|
||||
# Clear the headers for next iterations below
|
||||
self.session.headers.clear()
|
||||
|
||||
for version in available_versions:
|
||||
# Send a GET request to the web endpoint to get episode response for web devices
|
||||
episode_response = self.session.get(
|
||||
url=self.config['endpoints']['episode_web'].format(episode=version),
|
||||
headers={
|
||||
'Authorization': f'Bearer {self.auth_token_web}'
|
||||
}
|
||||
).json()
|
||||
|
||||
### TRY STATEMENT HERE FOR DEBUGGING ###
|
||||
try:
|
||||
# Grab the deletion token, opening the GET request above counts as a concurrent stream
|
||||
# Crunchyroll only allows 3 concurrent streams to be open at once, so we most "close" the stream when all information needed has been extracted
|
||||
deletion_token = episode_response['token']
|
||||
|
||||
# Update the session headers so that session may be passed to DASH.from_url()
|
||||
self.session.headers.update({
|
||||
'Authorization': f'Bearer {self.auth_token_web}'
|
||||
})
|
||||
|
||||
# Grab the new tracks into a variable
|
||||
new_tracks = DASH.from_url(url=episode_response['url'], session=self.session).to_tracks(language=episode_response['audioLocale'])
|
||||
|
||||
# Clear the subtitles on the tracks
|
||||
new_tracks.subtitles.clear()
|
||||
|
||||
# Iterate through available subtitles
|
||||
for subtitle in episode_response['subtitles']:
|
||||
# If the subtitle isn't empty, continue
|
||||
if subtitle != 'none':
|
||||
# If the subtitle is not in current subtitles continue
|
||||
if episode_response['subtitles'][subtitle]['language'] not in current_subtitles:
|
||||
# Append the language to current subtitles list
|
||||
current_subtitles.append(episode_response['subtitles'][subtitle]['language'])
|
||||
# Add the subtitle to the tracks
|
||||
tracks.add(Subtitle(
|
||||
url=episode_response['subtitles'][subtitle]['url'],
|
||||
codec=Subtitle.Codec.from_mime(episode_response['subtitles'][subtitle]['format']),
|
||||
language=episode_response['subtitles'][subtitle]['language'],
|
||||
))
|
||||
# Iterate through the tracks
|
||||
for track in new_tracks:
|
||||
# if the track ID isn't in the current tracks continue
|
||||
if track.id not in current_tracks:
|
||||
# Append the new track ad
|
||||
current_tracks.append(track.id)
|
||||
# Add track endpoint type
|
||||
track.data['endpoint_type'] = 'web'
|
||||
# Add track GUID
|
||||
track.data['GUID'] = version
|
||||
# Add it
|
||||
tracks.add(track)
|
||||
# Now that tracks have been extracted, close the "stream"
|
||||
self.session.delete(
|
||||
url=self.config['endpoints']['delete'].format(episode=version, delete_token=deletion_token),
|
||||
headers={
|
||||
'Authorization': f'Bearer {self.auth_token_web}'
|
||||
}
|
||||
)
|
||||
# Clear headers for the TV endpoint iteration
|
||||
self.session.headers.clear()
|
||||
except:
|
||||
continue
|
||||
if self.auth_token_tv is not None:
|
||||
# Send a GET request to the Android/Google TV endpoint to get episode response for TV devices
|
||||
episode_response = self.session.get(
|
||||
url=self.config['endpoints']['episode_tv'].format(episode=version),
|
||||
headers={
|
||||
'Accept': 'application/json',
|
||||
'Accept-Charset': 'UTF-8',
|
||||
'Accept-Encoding': 'gzip',
|
||||
'Authorization': f'Bearer {self.auth_token_tv}',
|
||||
'User-Agent': 'Crunchyroll/ANDROIDTV/3.46.0_22275 (Android 16; en-US; AOSP TV on x86)',
|
||||
},
|
||||
).json()
|
||||
|
||||
# Grab the deletion token, opening the GET request above counts as a concurrent stream
|
||||
# Crunchyroll only allows 3 concurrent streams to be open at once, so we most "close" the stream when all information needed has been extracted
|
||||
deletion_token = episode_response['token']
|
||||
|
||||
## Update the headers to reflect TV device
|
||||
self.session.headers.update({
|
||||
'Accept': 'application/json',
|
||||
'Accept-Charset': 'UTF-8',
|
||||
'Accept-Encoding': 'gzip',
|
||||
'Authorization': f'Bearer {self.auth_token_tv}',
|
||||
'User-Agent': 'Crunchyroll/ANDROIDTV/3.46.0_22275 (Android 16; en-US; AOSP TV on x86)',
|
||||
})
|
||||
|
||||
# Grab the new tracks into a variable
|
||||
new_tracks = DASH.from_url(url=episode_response['url'], session=self.session).to_tracks(
|
||||
language=episode_response['audioLocale'])
|
||||
|
||||
# Clear subtitles
|
||||
new_tracks.subtitles.clear()
|
||||
|
||||
# Iterate through the new_tracks
|
||||
for track in new_tracks:
|
||||
# See if the track already exists in current_tracks
|
||||
if track.id not in current_tracks:
|
||||
# Append the track ID if it doesn't
|
||||
current_tracks.append(track.id)
|
||||
# Add the track endpoint type
|
||||
track.data['endpoint_type'] = 'tv'
|
||||
# Add track guid
|
||||
track.data['GUID'] = version
|
||||
# Add the track to the tracks object
|
||||
tracks.add(track)
|
||||
|
||||
# Iterate through subtitles available in the original response
|
||||
for subtitle in episode_response['subtitles']:
|
||||
|
||||
# If the subtitle isn't none
|
||||
if subtitle != 'none' and subtitle not in current_subtitles:
|
||||
# Add the subtitle to the tracks object
|
||||
tracks.add(Subtitle(
|
||||
url=episode_response['subtitles'][subtitle]['url'],
|
||||
codec=Subtitle.Codec.from_mime(episode_response['subtitles'][subtitle]['format']),
|
||||
language=episode_response['subtitles'][subtitle]['language'],
|
||||
))
|
||||
|
||||
# Now that tracks have been extracted, close the "stream"
|
||||
self.session.delete(
|
||||
url=self.config['endpoints']['delete'].format(episode=version, delete_token=deletion_token),
|
||||
headers={
|
||||
'Authorization': f'Bearer {self.auth_token_tv}'
|
||||
}
|
||||
)
|
||||
|
||||
# Clear the headers for next iterations below
|
||||
self.session.headers.clear()
|
||||
|
||||
return tracks
|
||||
|
||||
def get_chapters(self, title: Title_T) -> Chapters:
|
||||
# All things created equal, this should be the same for all video tracks
|
||||
|
||||
# Initialize chapters class
|
||||
chapters = Chapters()
|
||||
|
||||
# Send GET request for the chapters
|
||||
chapter_response = self.session.get(
|
||||
url=self.config['endpoints']['chapters'].format(episode=title.id),
|
||||
).json()
|
||||
|
||||
# Check for intro chapter
|
||||
if chapter_response.get('intro'):
|
||||
try:
|
||||
# Add the chapter, may not exist
|
||||
chapters.add(Chapter(
|
||||
timestamp=chapter_response['intro']['start'] * 1000,
|
||||
name=chapter_response['intro']['type'].capitalize(),
|
||||
))
|
||||
# If it doesn't exist, move on to the next
|
||||
except:
|
||||
pass
|
||||
|
||||
# Check for the credits chapter
|
||||
if chapter_response.get('credits'):
|
||||
try:
|
||||
# Add the chapter, may not exist
|
||||
chapters.add(Chapter(
|
||||
timestamp=chapter_response['credits']['start'] * 1000,
|
||||
name=chapter_response['credits']['type'].capitalize(),
|
||||
))
|
||||
# If it doesn't exist, move on to the next
|
||||
except:
|
||||
pass
|
||||
|
||||
# Check for the preview chapter
|
||||
if chapter_response.get('preview'):
|
||||
try:
|
||||
# Add the chapter, may not exist
|
||||
chapters.add(Chapter(
|
||||
timestamp=chapter_response['preview']['start'] * 1000,
|
||||
name=chapter_response['preview']['type'].capitalize(),
|
||||
))
|
||||
# If it doesn't exist, move on to the next
|
||||
except:
|
||||
pass
|
||||
|
||||
# Check for recap chapter
|
||||
if chapter_response.get('recap'):
|
||||
try:
|
||||
# Add the chapter, may not exist
|
||||
chapters.add(Chapter(
|
||||
timestamp=chapter_response['recap']['start'] * 1000,
|
||||
name=chapter_response['recap']['type'].capitalize(),
|
||||
))
|
||||
# If it doesn't exist, move on to return statement
|
||||
except:
|
||||
pass
|
||||
|
||||
# Return the chapters
|
||||
return chapters
|
||||
|
||||
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]:
|
||||
if track.data['endpoint_type'] == 'tv':
|
||||
# Get the episode response
|
||||
episode_response = self.session.get(
|
||||
url=self.config['endpoints']['episode_tv'].format(episode=track.data['GUID']),
|
||||
headers={
|
||||
'Accept': 'application/json',
|
||||
'Accept-Charset': 'UTF-8',
|
||||
'Accept-Encoding': 'gzip',
|
||||
'Authorization': f'Bearer {self.auth_token_tv}',
|
||||
'User-Agent': 'Crunchyroll/ANDROIDTV/3.46.0_22275 (Android 16; en-US; AOSP TV on x86)',
|
||||
},
|
||||
).json()
|
||||
|
||||
# Grab the deletion token, opening the GET request above counts as a concurrent stream
|
||||
# Crunchyroll only allows 3 concurrent streams to be open at once, so we most "close" the stream when all information needed has been extracted
|
||||
deletion_token = episode_response['token']
|
||||
|
||||
# Send a post request to the license server
|
||||
license_response = self.session.post(
|
||||
url=self.config['endpoints']['license_tv'],
|
||||
headers={
|
||||
'Accept': 'application/octet-stream',
|
||||
'Accept-Encoding': 'gzip',
|
||||
'Authorization': f'Bearer {self.auth_token_tv}',
|
||||
'Content-Type': 'application/octet-stream',
|
||||
'Host': 'cr-license-proxy.prd.crunchyrollsvc.com',
|
||||
'x-cr-content-id': track.data['GUID'],
|
||||
'x-cr-video-token': deletion_token
|
||||
},
|
||||
data=challenge
|
||||
)
|
||||
|
||||
# Now that license has been extracted, close the "stream"
|
||||
self.session.delete(
|
||||
url=self.config['endpoints']['delete'].format(episode=track.data['GUID'], delete_token=deletion_token),
|
||||
headers={
|
||||
'Authorization': f'Bearer {self.auth_token_tv}'
|
||||
}
|
||||
)
|
||||
|
||||
## Update the headers to reflect TV device
|
||||
self.session.headers.update({
|
||||
'Accept': 'application/json',
|
||||
'Accept-Charset': 'UTF-8',
|
||||
'Accept-Encoding': 'gzip',
|
||||
'Authorization': f'Bearer {self.auth_token_tv}',
|
||||
'User-Agent': 'Crunchyroll/ANDROIDTV/3.46.0_22275 (Android 16; en-US; AOSP TV on x86)',
|
||||
})
|
||||
|
||||
return license_response.content
|
||||
|
||||
if track.data['endpoint_type'] == 'web':
|
||||
# Get the episode response
|
||||
episode_response = self.session.get(
|
||||
url=self.config['endpoints']['episode_web'].format(episode=track.data['GUID']),
|
||||
headers={
|
||||
'Authorization': f'Bearer {self.auth_token_web}'
|
||||
}
|
||||
).json()
|
||||
|
||||
# Grab the deletion token, opening the GET request above counts as a concurrent stream
|
||||
# Crunchyroll only allows 3 concurrent streams to be open at once, so we most "close" the stream when all information needed has been extracted
|
||||
deletion_token = episode_response['token']
|
||||
|
||||
# Get the license
|
||||
license_response = self.session.post(
|
||||
url=self.config['endpoints']['license_web'],
|
||||
headers={
|
||||
'Accept': '*/*',
|
||||
'content-type': 'application/octet-stream',
|
||||
'x-cr-content-id': track.data['GUID'],
|
||||
'x-cr-video-token': deletion_token,
|
||||
'authorization': f'Bearer {self.auth_token_web}'
|
||||
},
|
||||
data=challenge
|
||||
).json()
|
||||
|
||||
# Now that license has been extracted, close the "stream"
|
||||
self.session.delete(
|
||||
url=self.config['endpoints']['delete'].format(episode=track.data['GUID'], delete_token=deletion_token),
|
||||
headers={
|
||||
'Authorization': f'Bearer {self.auth_token_web}'
|
||||
}
|
||||
)
|
||||
|
||||
# Update the headers to reflect web
|
||||
self.session.headers.update({
|
||||
'Authorization': f'Bearer {self.auth_token_web}',
|
||||
})
|
||||
|
||||
return license_response['license']
|
||||
|
||||
def on_track_downloaded(self, track: AnyTrack) -> None:
|
||||
if isinstance(track, Subtitle):
|
||||
track.convert(codec=Subtitle.Codec.SubRip)
|
||||
self.authenticate()
|
||||
|
||||
def search(self) -> Generator[SearchResult, None, None]:
|
||||
|
||||
# Get the search results
|
||||
search_results = self.session.get(
|
||||
url=self.config['endpoints']['search'].format(search_keyword=self.title),
|
||||
headers={
|
||||
'Authorization': f'Bearer {self.auth_token_web}',
|
||||
}
|
||||
).json()
|
||||
|
||||
# Iterate through series responses, create generator for results.
|
||||
for result_type in search_results['data']:
|
||||
if result_type['type'] == 'series':
|
||||
for series_results in result_type['items']:
|
||||
yield SearchResult(
|
||||
id_=series_results['id'],
|
||||
title=series_results['title'],
|
||||
description=series_results['description']
|
||||
)
|
11
CR/config.yaml
Normal file
11
CR/config.yaml
Normal file
@ -0,0 +1,11 @@
|
||||
endpoints:
|
||||
token: 'https://www.crunchyroll.com/auth/v1/token'
|
||||
seasons: 'https://www.crunchyroll.com/content/v2/cms/series/{series_id}/seasons'
|
||||
episodes: 'https://www.crunchyroll.com/content/v2/cms/seasons/{season_id}/episodes'
|
||||
episode_web: 'https://www.crunchyroll.com/playback/v3/{episode}/web/firefox/play'
|
||||
episode_tv: 'https://www.crunchyroll.com/playback/v2/{episode}/tv/android_tv/play'
|
||||
delete: 'https://www.crunchyroll.com/playback/v1/token/{episode}/{delete_token}'
|
||||
chapters: 'https://static.crunchyroll.com/skip-events/production/{episode}.json'
|
||||
license_tv: 'https://cr-license-proxy.prd.crunchyrollsvc.com/v1/license/widevine?specConform=true'
|
||||
license_web: 'https://www.crunchyroll.com/license/v1/license/widevine'
|
||||
search: 'https://www.crunchyroll.com/content/v2/discover/search?q={search_keyword}'
|
Loading…
x
Reference in New Issue
Block a user