Compare commits

..

No commits in common. "db7bea79513c5411ff6dab59150b441b3f6aab45" and "7f84542cfb99f4486ecec6bb3982d3d3cde3d179" have entirely different histories.

3 changed files with 565 additions and 519 deletions

View File

@ -314,8 +314,8 @@ def api_decrypt(
} }
# Prepare proxies # Prepare proxies
proxies = "" proxies = None
if proxy != "": if proxy is not None:
is_url, protocol, fqdn = is_url_and_split(proxy) is_url, protocol, fqdn = is_url_and_split(proxy)
if is_url: if is_url:
proxies = {"http": proxy, "https": proxy} proxies = {"http": proxy, "https": proxy}
@ -380,7 +380,7 @@ def api_decrypt(
json_data, json_data,
is_widevine=(not is_pr), is_widevine=(not is_pr),
) )
if returned_keys == "": if returned_keys is None:
return {"status": "error", "message": key_err} return {"status": "error", "message": key_err}
# Close session # Close session

View File

@ -1,75 +1,44 @@
"""Module to handle the remote device PlayReady."""
import base64 import base64
import os
from pathlib import Path
import re
import yaml
from flask import Blueprint, jsonify, request, current_app, Response
from flask import Blueprint, jsonify, request, current_app, Response
import os
import yaml
from pyplayready.device import Device as PlayReadyDevice from pyplayready.device import Device as PlayReadyDevice
from pyplayready.cdm import Cdm as PlayReadyCDM from pyplayready.cdm import Cdm as PlayReadyCDM
from pyplayready import PSSH as PlayReadyPSSH from pyplayready import PSSH as PlayReadyPSSH
from pyplayready.exceptions import ( from pyplayready.exceptions import (
InvalidSession, InvalidSession,
TooManySessions,
InvalidLicense, InvalidLicense,
InvalidPssh, InvalidPssh,
) )
from custom_functions.database.user_db import fetch_username_by_api_key from custom_functions.database.user_db import fetch_username_by_api_key
from custom_functions.decrypt.api_decrypt import is_base64 from custom_functions.decrypt.api_decrypt import is_base64
from custom_functions.user_checks.device_allowed import user_allowed_to_use_device from custom_functions.user_checks.device_allowed import user_allowed_to_use_device
from pathlib import Path
remotecdm_pr_bp = Blueprint("remotecdm_pr", __name__) remotecdm_pr_bp = Blueprint("remotecdm_pr", __name__)
with open( with open(f"{os.getcwd()}/configs/config.yaml", "r") as file:
os.path.join(os.getcwd(), "configs", "config.yaml"), "r", encoding="utf-8"
) as file:
config = yaml.safe_load(file) config = yaml.safe_load(file)
def make_response(status, message, data=None, http_status=200):
"""Make a response."""
resp = {"status": status, "message": message}
if data is not None:
resp["data"] = data
return jsonify(resp), http_status
def check_required_fields(body, required_fields):
"""Return a response tuple if a required field is missing, else None."""
for field in required_fields:
if not body.get(field):
return make_response(
"Error",
f'Missing required field "{field}" in JSON body',
http_status=400,
)
return None
@remotecdm_pr_bp.route("/remotecdm/playready", methods=["GET", "HEAD"]) @remotecdm_pr_bp.route("/remotecdm/playready", methods=["GET", "HEAD"])
def remote_cdm_playready(): def remote_cdm_playready():
"""Handle the remote device PlayReady."""
if request.method == "GET": if request.method == "GET":
return make_response( return jsonify({"message": "OK"})
"Success",
"OK",
http_status=200,
)
if request.method == "HEAD": if request.method == "HEAD":
response = Response(status=200) response = Response(status=200)
response.headers["Server"] = "playready serve" response.headers["Server"] = "playready serve"
return response return response
return make_response("Failed", "Method not allowed", http_status=405)
@remotecdm_pr_bp.route("/remotecdm/playready/deviceinfo", methods=["GET"]) @remotecdm_pr_bp.route("/remotecdm/playready/deviceinfo", methods=["GET"])
def remote_cdm_playready_deviceinfo(): def remote_cdm_playready_deviceinfo():
"""Handle the remote device PlayReady device info."""
base_name = config["default_pr_cdm"] base_name = config["default_pr_cdm"]
device = PlayReadyDevice.load( if not base_name.endswith(".prd"):
os.path.join(os.getcwd(), "configs", "CDMs", "PR", base_name + ".prd") full_file_name = base_name + ".prd"
) device = PlayReadyDevice.load(f"{os.getcwd()}/configs/CDMs/PR/{full_file_name}")
cdm = PlayReadyCDM.from_device(device) cdm = PlayReadyCDM.from_device(device)
return jsonify( return jsonify(
{ {
@ -81,36 +50,20 @@ def remote_cdm_playready_deviceinfo():
) )
def sanitize_username(username):
"""Sanitize the username."""
return re.sub(r"[^a-zA-Z0-9_\-]", "_", username).lower()
@remotecdm_pr_bp.route("/remotecdm/playready/deviceinfo/<device>", methods=["GET"]) @remotecdm_pr_bp.route("/remotecdm/playready/deviceinfo/<device>", methods=["GET"])
def remote_cdm_playready_deviceinfo_specific(device): def remote_cdm_playready_deviceinfo_specific(device):
"""Handle the remote device PlayReady device info specific.""" if request.method == "GET":
base_name = Path(device).with_suffix(".prd").name base_name = Path(device).with_suffix(".prd").name
api_key = request.headers["X-Secret-Key"] api_key = request.headers["X-Secret-Key"]
username = fetch_username_by_api_key(api_key) username = fetch_username_by_api_key(api_key)
if not username:
return jsonify({"message": "Invalid or missing API key."}), 403
safe_username = sanitize_username(username)
device = PlayReadyDevice.load( device = PlayReadyDevice.load(
os.path.join( f"{os.getcwd()}/configs/CDMs/{username}/PR/{base_name}"
os.getcwd(),
"configs",
"CDMs",
"users_uploaded",
safe_username,
"PR",
base_name,
)
) )
cdm = PlayReadyCDM.from_device(device) cdm = PlayReadyCDM.from_device(device)
return jsonify( return jsonify(
{ {
"security_level": cdm.security_level, "security_level": cdm.security_level,
"host": f'{config["fqdn"]}/remotecdm/playready', "host": f'{config["fqdn"]}/remotecdm/widevine',
"secret": f"{api_key}", "secret": f"{api_key}",
"device_name": Path(base_name).stem, "device_name": Path(base_name).stem,
} }
@ -119,103 +72,111 @@ def remote_cdm_playready_deviceinfo_specific(device):
@remotecdm_pr_bp.route("/remotecdm/playready/<device>/open", methods=["GET"]) @remotecdm_pr_bp.route("/remotecdm/playready/<device>/open", methods=["GET"])
def remote_cdm_playready_open(device): def remote_cdm_playready_open(device):
"""Handle the remote device PlayReady open."""
unauthorized_msg = {
"message": f"Device '{device}' is not found or you are not authorized to use it."
}
# Default device logic
if str(device).lower() == config["default_pr_cdm"].lower(): if str(device).lower() == config["default_pr_cdm"].lower():
pr_device = PlayReadyDevice.load( pr_device = PlayReadyDevice.load(
os.path.join( f'{os.getcwd()}/configs/CDMs/PR/{config["default_pr_cdm"]}.prd'
os.getcwd(), "configs", "CDMs", "PR", config["default_pr_cdm"] + ".prd"
)
) )
cdm = current_app.config["CDM"] = PlayReadyCDM.from_device(pr_device) cdm = current_app.config["CDM"] = PlayReadyCDM.from_device(pr_device)
session_id = cdm.open() session_id = cdm.open()
return make_response( return jsonify(
"Success",
"Successfully opened the PlayReady CDM session",
{ {
"message": "Success",
"data": {
"session_id": session_id.hex(), "session_id": session_id.hex(),
"device": {"security_level": cdm.security_level}, "device": {"security_level": cdm.security_level},
}, },
http_status=200, }
) )
if (
# User device logic request.headers["X-Secret-Key"]
api_key = request.headers.get("X-Secret-Key") and str(device).lower() != config["default_pr_cdm"].lower()
if api_key and str(device).lower() != config["default_pr_cdm"].lower(): ):
api_key = request.headers["X-Secret-Key"]
user = fetch_username_by_api_key(api_key=api_key) user = fetch_username_by_api_key(api_key=api_key)
safe_username = sanitize_username(user) if user:
if user and user_allowed_to_use_device(device=device, username=user): if user_allowed_to_use_device(device=device, username=user):
pr_device = PlayReadyDevice.load( pr_device = PlayReadyDevice.load(
os.path.join( f"{os.getcwd()}/configs/CDMs/{user}/PR/{device}.prd"
os.getcwd(),
"configs",
"CDMs",
"users_uploaded",
safe_username,
"PR",
device + ".prd",
)
) )
cdm = current_app.config["CDM"] = PlayReadyCDM.from_device(pr_device) cdm = current_app.config["CDM"] = PlayReadyCDM.from_device(pr_device)
session_id = cdm.open() session_id = cdm.open()
return make_response( return jsonify(
"Success",
"Successfully opened the PlayReady CDM session",
{ {
"message": "Success",
"data": {
"session_id": session_id.hex(), "session_id": session_id.hex(),
"device": {"security_level": cdm.security_level}, "device": {"security_level": cdm.security_level},
}, },
http_status=200, }
) )
return make_response("Failed", unauthorized_msg, http_status=403) else:
return (
return make_response("Failed", unauthorized_msg, http_status=403) jsonify(
{
"message": f"Device '{device}' is not found or you are not authorized to use it.",
def get_cdm_or_error(device): }
"""Get the CDM or return an error response.""" ),
cdm = current_app.config.get("CDM") 403,
if not cdm: )
return make_response( else:
"Error", return (
f'No CDM session for "{device}" has been opened yet. No session to use', jsonify(
http_status=400, {
"message": f"Device '{device}' is not found or you are not authorized to use it.",
}
),
403,
)
else:
return (
jsonify(
{
"message": f"Device '{device}' is not found or you are not authorized to use it.",
}
),
403,
) )
return cdm
@remotecdm_pr_bp.route( @remotecdm_pr_bp.route(
"/remotecdm/playready/<device>/close/<session_id>", methods=["GET"] "/remotecdm/playready/<device>/close/<session_id>", methods=["GET"]
) )
def remote_cdm_playready_close(device, session_id): def remote_cdm_playready_close(device, session_id):
"""Handle the remote device PlayReady close."""
try: try:
session_id = bytes.fromhex(session_id) session_id = bytes.fromhex(session_id)
cdm = get_cdm_or_error(device) cdm = current_app.config["CDM"]
if isinstance(cdm, tuple): # error response if not cdm:
return cdm return (
jsonify(
{
"message": f'No CDM for "{device}" has been opened yet. No session to close'
}
),
400,
)
try: try:
cdm.close(session_id) cdm.close(session_id)
except InvalidSession: except InvalidSession:
return make_response( return (
"Error", jsonify(
f'Invalid session ID "{session_id.hex()}", it may have expired', {
http_status=400, "message": f'Invalid session ID "{session_id.hex()}", it may have expired'
}
),
400,
) )
return make_response( return (
"Success", jsonify(
f'Successfully closed Session "{session_id.hex()}".', {
http_status=200, "message": f'Successfully closed Session "{session_id.hex()}".',
}
),
200,
) )
except Exception as error: except Exception as e:
return make_response( return (
"Error", jsonify({"message": f'Failed to close Session "{session_id.hex()}".'}),
f'Failed to close Session "{session_id.hex()}", {error}.', 400,
http_status=400,
) )
@ -223,14 +184,18 @@ def remote_cdm_playready_close(device, session_id):
"/remotecdm/playready/<device>/get_license_challenge", methods=["POST"] "/remotecdm/playready/<device>/get_license_challenge", methods=["POST"]
) )
def remote_cdm_playready_get_license_challenge(device): def remote_cdm_playready_get_license_challenge(device):
"""Handle the remote device PlayReady get license challenge."""
body = request.get_json() body = request.get_json()
missing_field = check_required_fields(body, ("session_id", "init_data")) for required_field in ("session_id", "init_data"):
if missing_field: if not body.get(required_field):
return missing_field return (
cdm = get_cdm_or_error(device) jsonify(
if isinstance(cdm, tuple): # error response {
return cdm "message": f'Missing required field "{required_field}" in JSON body'
}
),
400,
)
cdm = current_app.config["CDM"]
session_id = bytes.fromhex(body["session_id"]) session_id = bytes.fromhex(body["session_id"])
init_data = body["init_data"] init_data = body["init_data"]
if not init_data.startswith("<WRMHEADER"): if not init_data.startswith("<WRMHEADER"):
@ -238,46 +203,38 @@ def remote_cdm_playready_get_license_challenge(device):
pssh = PlayReadyPSSH(init_data) pssh = PlayReadyPSSH(init_data)
if pssh.wrm_headers: if pssh.wrm_headers:
init_data = pssh.wrm_headers[0] init_data = pssh.wrm_headers[0]
except InvalidPssh as error: except InvalidPssh as e:
return make_response( return jsonify({"message": f"Unable to parse base64 PSSH, {e}"})
"Error",
f"Unable to parse base64 PSSH, {error}",
http_status=400,
)
try: try:
license_request = cdm.get_license_challenge( license_request = cdm.get_license_challenge(
session_id=session_id, wrm_header=init_data session_id=session_id, wrm_header=init_data
) )
except InvalidSession: except InvalidSession:
return make_response( return jsonify(
"Error", {
f"Invalid Session ID '{session_id.hex()}', it may have expired.", "message": f"Invalid Session ID '{session_id.hex()}', it may have expired."
http_status=400, }
)
except ValueError as error:
return make_response(
"Error",
f"Invalid License, {error}",
http_status=400,
)
return make_response(
"Success",
"Successfully got the License Challenge",
{"challenge_b64": base64.b64encode(license_request).decode()},
http_status=200,
) )
except Exception as e:
return jsonify({"message": f"Error, {e}"})
return jsonify({"message": "success", "data": {"challenge": license_request}})
@remotecdm_pr_bp.route("/remotecdm/playready/<device>/parse_license", methods=["POST"]) @remotecdm_pr_bp.route("/remotecdm/playready/<device>/parse_license", methods=["POST"])
def remote_cdm_playready_parse_license(device): def remote_cdm_playready_parse_license(device):
"""Handle the remote device PlayReady parse license."""
body = request.get_json() body = request.get_json()
missing_field = check_required_fields(body, ("license_message", "session_id")) for required_field in ("license_message", "session_id"):
if missing_field: if not body.get(required_field):
return missing_field return jsonify(
cdm = get_cdm_or_error(device) {"message": f'Missing required field "{required_field}" in JSON body'}
if isinstance(cdm, tuple): # error response )
return cdm cdm = current_app.config["CDM"]
if not cdm:
return jsonify(
{
"message": f"No Cdm session for {device} has been opened yet. No session to use."
}
)
session_id = bytes.fromhex(body["session_id"]) session_id = bytes.fromhex(body["session_id"])
license_message = body["license_message"] license_message = body["license_message"]
if is_base64(license_message): if is_base64(license_message):
@ -285,56 +242,44 @@ def remote_cdm_playready_parse_license(device):
try: try:
cdm.parse_license(session_id, license_message) cdm.parse_license(session_id, license_message)
except InvalidSession: except InvalidSession:
return make_response( return jsonify(
"Error", {
f"Invalid Session ID '{session_id.hex()}', it may have expired.", "message": f"Invalid Session ID '{session_id.hex()}', it may have expired."
http_status=400, }
) )
except InvalidLicense as e: except InvalidLicense as e:
return make_response( return jsonify({"message": f"Invalid License, {e}"})
"Error",
f"Invalid License, {e}",
http_status=400,
)
except Exception as e: except Exception as e:
return make_response( return jsonify({"message": f"Error, {e}"})
"Error", return jsonify(
f"Error, {e}", {"message": "Successfully parsed and loaded the Keys from the License message"}
http_status=400,
)
return make_response(
"Success",
"Successfully parsed and loaded the Keys from the License message",
http_status=200,
) )
@remotecdm_pr_bp.route("/remotecdm/playready/<device>/get_keys", methods=["POST"]) @remotecdm_pr_bp.route("/remotecdm/playready/<device>/get_keys", methods=["POST"])
def remote_cdm_playready_get_keys(device): def remote_cdm_playready_get_keys(device):
"""Handle the remote device PlayReady get keys."""
body = request.get_json() body = request.get_json()
missing_field = check_required_fields(body, ("session_id",)) for required_field in ("session_id",):
if missing_field: if not body.get(required_field):
return missing_field return jsonify(
{"message": f'Missing required field "{required_field}" in JSON body'}
)
session_id = bytes.fromhex(body["session_id"]) session_id = bytes.fromhex(body["session_id"])
key_type = body.get("key_type", None) cdm = current_app.config["CDM"]
cdm = get_cdm_or_error(device) if not cdm:
if isinstance(cdm, tuple): # error response return jsonify(
return cdm {"message": f"Missing required field '{required_field}' in JSON body."}
)
try: try:
keys = cdm.get_keys(session_id, key_type) keys = cdm.get_keys(session_id)
except InvalidSession: except InvalidSession:
return make_response( return jsonify(
"Error", {
f"Invalid Session ID '{session_id.hex()}', it may have expired.", "message": f"Invalid Session ID '{session_id.hex()}', it may have expired."
http_status=400, }
)
except ValueError as error:
return make_response(
"Error",
f"The Key Type value '{key_type}' is invalid, {error}",
http_status=400,
) )
except Exception as e:
return jsonify({"message": f"Error, {e}"})
keys_json = [ keys_json = [
{ {
"key_id": key.key_id.hex, "key_id": key.key_id.hex,
@ -344,11 +289,5 @@ def remote_cdm_playready_get_keys(device):
"key_length": key.key_length, "key_length": key.key_length,
} }
for key in keys for key in keys
if not key_type or key.type == key_type
] ]
return make_response( return jsonify({"message": "success", "data": {"keys": keys_json}})
"Success",
"Successfully got the Keys",
{"keys": keys_json},
http_status=200,
)

View File

@ -1,12 +1,7 @@
"""Module to handle the remote device Widevine."""
import os import os
import base64
import re
from pathlib import Path
import yaml
from flask import Blueprint, jsonify, request, current_app, Response from flask import Blueprint, jsonify, request, current_app, Response
import base64
from typing import Any, Optional, Union
from google.protobuf.message import DecodeError from google.protobuf.message import DecodeError
from pywidevine.pssh import PSSH as widevinePSSH from pywidevine.pssh import PSSH as widevinePSSH
from pywidevine import __version__ from pywidevine import __version__
@ -19,47 +14,24 @@ from pywidevine.exceptions import (
InvalidLicenseType, InvalidLicenseType,
InvalidSession, InvalidSession,
SignatureMismatch, SignatureMismatch,
TooManySessions,
) )
from custom_functions.database.user_db import fetch_username_by_api_key import yaml
from custom_functions.database.unified_db_ops import cache_to_db from custom_functions.database.user_db import fetch_api_key, fetch_username_by_api_key
from custom_functions.user_checks.device_allowed import user_allowed_to_use_device from custom_functions.user_checks.device_allowed import user_allowed_to_use_device
from pathlib import Path
remotecdm_wv_bp = Blueprint("remotecdm_wv", __name__) remotecdm_wv_bp = Blueprint("remotecdm_wv", __name__)
with open( with open(f"{os.getcwd()}/configs/config.yaml", "r") as file:
os.path.join(os.getcwd(), "configs", "config.yaml"), "r", encoding="utf-8"
) as file:
config = yaml.safe_load(file) config = yaml.safe_load(file)
def make_response(status, message, data=None, http_status=200):
"""Make a response."""
resp = {"status": status, "message": message}
if data is not None:
resp["data"] = data
return jsonify(resp), http_status
def check_required_fields(body, required_fields):
"""Return a response if a required field is missing, else None."""
for field in required_fields:
if not body.get(field):
return make_response(
"Error",
f'Missing required field "{field}" in JSON body',
http_status=400,
)
return None
@remotecdm_wv_bp.route("/remotecdm/widevine", methods=["GET", "HEAD"]) @remotecdm_wv_bp.route("/remotecdm/widevine", methods=["GET", "HEAD"])
def remote_cdm_widevine(): def remote_cdm_widevine():
"""Handle the remote device Widevine."""
if request.method == "GET": if request.method == "GET":
return make_response( return jsonify(
"Success", {"status": 200, "message": f"{config['fqdn'].upper()} Remote Widevine CDM."}
f"{config['fqdn'].upper()} Remote Widevine CDM.",
http_status=200,
) )
if request.method == "HEAD": if request.method == "HEAD":
response = Response(status=200) response = Response(status=200)
@ -67,26 +39,17 @@ def remote_cdm_widevine():
f"https://github.com/devine-dl/pywidevine serve v{__version__}" f"https://github.com/devine-dl/pywidevine serve v{__version__}"
) )
return response return response
return make_response(
"Error",
"Invalid request method",
http_status=405,
)
@remotecdm_wv_bp.route("/remotecdm/widevine/deviceinfo", methods=["GET"]) @remotecdm_wv_bp.route("/remotecdm/widevine/deviceinfo", methods=["GET"])
def remote_cdm_widevine_deviceinfo(): def remote_cdm_widevine_deviceinfo():
"""Handle the remote device Widevine device info.""" if request.method == "GET":
base_name = config["default_wv_cdm"] base_name = config["default_wv_cdm"]
if not base_name.endswith(".wvd"): if not base_name.endswith(".wvd"):
base_name = base_name + ".wvd" base_name = base_name + ".wvd"
device = widevineDevice.load( device = widevineDevice.load(f"{os.getcwd()}/configs/CDMs/WV/{base_name}")
os.path.join(os.getcwd(), "configs", "CDMs", "WV", base_name)
)
cdm = widevineCDM.from_device(device) cdm = widevineCDM.from_device(device)
return make_response( return jsonify(
"Success",
"Successfully got the Widevine CDM device info",
{ {
"device_type": cdm.device_type.name, "device_type": cdm.device_type.name,
"system_id": cdm.system_id, "system_id": cdm.system_id,
@ -94,38 +57,21 @@ def remote_cdm_widevine_deviceinfo():
"host": f'{config["fqdn"]}/remotecdm/widevine', "host": f'{config["fqdn"]}/remotecdm/widevine',
"secret": f'{config["remote_cdm_secret"]}', "secret": f'{config["remote_cdm_secret"]}',
"device_name": Path(base_name).stem, "device_name": Path(base_name).stem,
}, }
http_status=200,
) )
def sanitize_username(username):
"""Sanitize the username."""
return re.sub(r"[^a-zA-Z0-9_\-]", "_", username).lower()
@remotecdm_wv_bp.route("/remotecdm/widevine/deviceinfo/<device>", methods=["GET"]) @remotecdm_wv_bp.route("/remotecdm/widevine/deviceinfo/<device>", methods=["GET"])
def remote_cdm_widevine_deviceinfo_specific(device): def remote_cdm_widevine_deviceinfo_specific(device):
"""Handle the remote device Widevine device info specific.""" if request.method == "GET":
base_name = Path(device).with_suffix(".wvd").name base_name = Path(device).with_suffix(".wvd").name
api_key = request.headers["X-Secret-Key"] api_key = request.headers["X-Secret-Key"]
username = fetch_username_by_api_key(api_key) username = fetch_username_by_api_key(api_key)
safe_username = sanitize_username(username)
device = widevineDevice.load( device = widevineDevice.load(
os.path.join( f"{os.getcwd()}/configs/CDMs/{username}/WV/{base_name}"
os.getcwd(),
"configs",
"CDMs",
"users_uploaded",
safe_username,
"WV",
base_name,
)
) )
cdm = widevineCDM.from_device(device) cdm = widevineCDM.from_device(device)
return make_response( return jsonify(
"Success",
"Successfully got the Widevine CDM device info (by user)",
{ {
"device_type": cdm.device_type.name, "device_type": cdm.device_type.name,
"system_id": cdm.system_id, "system_id": cdm.system_id,
@ -133,77 +79,92 @@ def remote_cdm_widevine_deviceinfo_specific(device):
"host": f'{config["fqdn"]}/remotecdm/widevine', "host": f'{config["fqdn"]}/remotecdm/widevine',
"secret": f"{api_key}", "secret": f"{api_key}",
"device_name": Path(base_name).stem, "device_name": Path(base_name).stem,
}, }
http_status=200,
) )
def load_widevine_device(device_name, api_key=None):
"""Load a Widevine device, either default or user-uploaded."""
try:
if device_name.lower() == config["default_wv_cdm"].lower():
path = os.path.join(
os.getcwd(), "configs", "CDMs", "WV", config["default_wv_cdm"] + ".wvd"
)
else:
if not api_key:
return None
username = fetch_username_by_api_key(api_key)
if not username or not user_allowed_to_use_device(
device=device_name, username=username
):
return None
safe_username = sanitize_username(username)
path = os.path.join(
os.getcwd(),
"configs",
"CDMs",
"users_uploaded",
safe_username,
"WV",
device_name + ".wvd",
)
return widevineDevice.load(path)
except (FileNotFoundError, ValueError):
return None
def get_cdm_or_error(device):
"""Get the CDM or return an error response."""
cdm = current_app.config.get("CDM")
if not cdm:
return make_response(
"Error",
f'No CDM session for "{device}" has been opened yet. No session to use',
http_status=400,
)
return cdm
@remotecdm_wv_bp.route("/remotecdm/widevine/<device>/open", methods=["GET"]) @remotecdm_wv_bp.route("/remotecdm/widevine/<device>/open", methods=["GET"])
def remote_cdm_widevine_open(device): def remote_cdm_widevine_open(device):
"""Handle the remote device Widevine open.""" if str(device).lower() == config["default_wv_cdm"].lower():
api_key = request.headers.get("X-Secret-Key") wv_device = widevineDevice.load(
wv_device = load_widevine_device(device, api_key) f'{os.getcwd()}/configs/CDMs/WV/{config["default_wv_cdm"]}.wvd'
if not wv_device:
return make_response(
"Error",
f"Device '{device}' is not found or you are not authorized to use it.",
http_status=403,
) )
cdm = current_app.config["CDM"] = widevineCDM.from_device(wv_device) cdm = current_app.config["CDM"] = widevineCDM.from_device(wv_device)
session_id = cdm.open() session_id = cdm.open()
return make_response( return (
"Success", jsonify(
"Successfully opened the Widevine CDM session",
{ {
"status": 200,
"message": "Success",
"data": {
"session_id": session_id.hex(), "session_id": session_id.hex(),
"device": { "device": {
"system_id": cdm.system_id, "system_id": cdm.system_id,
"security_level": cdm.security_level, "security_level": cdm.security_level,
}, },
}, },
http_status=200, }
),
200,
)
if (
request.headers["X-Secret-Key"]
and str(device).lower() != config["default_wv_cdm"].lower()
):
api_key = request.headers["X-Secret-Key"]
user = fetch_username_by_api_key(api_key=api_key)
if user:
if user_allowed_to_use_device(device=device, username=user):
wv_device = widevineDevice.load(
f"{os.getcwd()}/configs/CDMs/{user}/WV/{device}.wvd"
)
cdm = current_app.config["CDM"] = widevineCDM.from_device(wv_device)
session_id = cdm.open()
return (
jsonify(
{
"status": 200,
"message": "Success",
"data": {
"session_id": session_id.hex(),
"device": {
"system_id": cdm.system_id,
"security_level": cdm.security_level,
},
},
}
),
200,
)
else:
return (
jsonify(
{
"message": f"Device '{device}' is not found or you are not authorized to use it.",
"status": 403,
}
),
403,
)
else:
return (
jsonify(
{
"message": f"Device '{device}' is not found or you are not authorized to use it.",
"status": 403,
}
),
403,
)
else:
return (
jsonify(
{
"message": f"Device '{device}' is not found or you are not authorized to use it.",
"status": 403,
}
),
403,
) )
@ -211,24 +172,38 @@ def remote_cdm_widevine_open(device):
"/remotecdm/widevine/<device>/close/<session_id>", methods=["GET"] "/remotecdm/widevine/<device>/close/<session_id>", methods=["GET"]
) )
def remote_cdm_widevine_close(device, session_id): def remote_cdm_widevine_close(device, session_id):
"""Handle the remote device Widevine close."""
session_id = bytes.fromhex(session_id) session_id = bytes.fromhex(session_id)
cdm = get_cdm_or_error(device) cdm = current_app.config["CDM"]
if isinstance(cdm, tuple): # error response if not cdm:
return cdm return (
jsonify(
{
"status": 400,
"message": f'No CDM for "{device}" has been opened yet. No session to close',
}
),
400,
)
try: try:
cdm.close(session_id) cdm.close(session_id)
except InvalidSession: except InvalidSession:
return make_response( return (
"Error", jsonify(
f'Invalid session ID "{session_id.hex()}", it may have expired', {
http_status=400, "status": 400,
"message": f'Invalid session ID "{session_id.hex()}", it may have expired',
}
),
400,
) )
return (
return make_response( jsonify(
"Success", {
f'Successfully closed Session "{session_id.hex()}".', "status": 200,
http_status=200, "message": f'Successfully closed Session "{session_id.hex()}".',
}
),
200,
) )
@ -236,44 +211,80 @@ def remote_cdm_widevine_close(device, session_id):
"/remotecdm/widevine/<device>/set_service_certificate", methods=["POST"] "/remotecdm/widevine/<device>/set_service_certificate", methods=["POST"]
) )
def remote_cdm_widevine_set_service_certificate(device): def remote_cdm_widevine_set_service_certificate(device):
"""Handle the remote device Widevine set service certificate."""
body = request.get_json() body = request.get_json()
missing_field = check_required_fields(body, ("session_id", "certificate")) for required_field in ("session_id", "certificate"):
if missing_field: if required_field == "certificate":
return missing_field has_field = (
required_field in body
) # it needs the key, but can be empty/null
else:
has_field = body.get(required_field)
if not has_field:
return (
jsonify(
{
"status": 400,
"message": f'Missing required field "{required_field}" in JSON body',
}
),
400,
)
session_id = bytes.fromhex(body["session_id"]) session_id = bytes.fromhex(body["session_id"])
cdm = get_cdm_or_error(device) cdm = current_app.config["CDM"]
if isinstance(cdm, tuple): # error response if not cdm:
return cdm return (
jsonify(
{
"status": 400,
"message": f'No CDM session for "{device}" has been opened yet. No session to use',
}
),
400,
)
certificate = body["certificate"] certificate = body["certificate"]
try: try:
provider_id = cdm.set_service_certificate(session_id, certificate) provider_id = cdm.set_service_certificate(session_id, certificate)
except InvalidSession: except InvalidSession:
return make_response( return (
"Error", jsonify(
f'Invalid session id: "{session_id.hex()}", it may have expired', {
http_status=400, "status": 400,
"message": f'Invalid session id: "{session_id.hex()}", it may have expired',
}
),
400,
) )
except DecodeError as error: except DecodeError as error:
return make_response( return (
"Error", jsonify(
f"Invalid Service Certificate, {error}", {"status": 400, "message": f"Invalid Service Certificate, {error}"}
http_status=400, ),
400,
) )
except SignatureMismatch: except SignatureMismatch:
return make_response( return (
"Error", jsonify(
"Signature Validation failed on the Service Certificate, rejecting", {
http_status=400, "status": 400,
"message": "Signature Validation failed on the Service Certificate, rejecting",
}
),
400,
) )
return make_response( return (
"Success", jsonify(
f"Successfully {['set', 'unset'][not certificate]} the Service Certificate.", {
{"provider_id": provider_id}, "status": 200,
http_status=200, "message": f"Successfully {['set', 'unset'][not certificate]} the Service Certificate.",
"data": {
"provider_id": provider_id,
},
}
),
200,
) )
@ -281,25 +292,45 @@ def remote_cdm_widevine_set_service_certificate(device):
"/remotecdm/widevine/<device>/get_service_certificate", methods=["POST"] "/remotecdm/widevine/<device>/get_service_certificate", methods=["POST"]
) )
def remote_cdm_widevine_get_service_certificate(device): def remote_cdm_widevine_get_service_certificate(device):
"""Handle the remote device Widevine get service certificate."""
body = request.get_json() body = request.get_json()
missing_field = check_required_fields(body, ("session_id",)) for required_field in ("session_id",):
if missing_field: if not body.get(required_field):
return missing_field return (
jsonify(
{
"status": 400,
"message": f'Missing required field "{required_field}" in JSON body',
}
),
400,
)
session_id = bytes.fromhex(body["session_id"]) session_id = bytes.fromhex(body["session_id"])
cdm = get_cdm_or_error(device) cdm = current_app.config["CDM"]
if isinstance(cdm, tuple): # error response
return cdm if not cdm:
return (
jsonify(
{
"status": 400,
"message": f'No CDM session for "{device}" has been opened yet. No session to use',
}
),
400,
)
try: try:
service_certificate = cdm.get_service_certificate(session_id) service_certificate = cdm.get_service_certificate(session_id)
except InvalidSession: except InvalidSession:
return make_response( return (
"Error", jsonify(
f'Invalid Session ID "{session_id.hex()}", it may have expired', {
http_status=400, "status": 400,
"message": f'Invalid Session ID "{session_id.hex()}", it may have expired',
}
),
400,
) )
if service_certificate: if service_certificate:
service_certificate_b64 = base64.b64encode( service_certificate_b64 = base64.b64encode(
@ -307,11 +338,17 @@ def remote_cdm_widevine_get_service_certificate(device):
).decode() ).decode()
else: else:
service_certificate_b64 = None service_certificate_b64 = None
return make_response( return (
"Success", jsonify(
"Successfully got the Service Certificate", {
{"service_certificate": service_certificate_b64}, "status": 200,
http_status=200, "message": "Successfully got the Service Certificate",
"data": {
"service_certificate": service_certificate_b64,
},
}
),
200,
) )
@ -320,23 +357,42 @@ def remote_cdm_widevine_get_service_certificate(device):
methods=["POST"], methods=["POST"],
) )
def remote_cdm_widevine_get_license_challenge(device, license_type): def remote_cdm_widevine_get_license_challenge(device, license_type):
"""Handle the remote device Widevine get license challenge."""
body = request.get_json() body = request.get_json()
missing_field = check_required_fields(body, ("session_id", "init_data")) for required_field in ("session_id", "init_data"):
if missing_field: if not body.get(required_field):
return missing_field return (
jsonify(
{
"status": 400,
"message": f'Missing required field "{required_field}" in JSON body',
}
),
400,
)
session_id = bytes.fromhex(body["session_id"]) session_id = bytes.fromhex(body["session_id"])
privacy_mode = body.get("privacy_mode", True) privacy_mode = body.get("privacy_mode", True)
cdm = get_cdm_or_error(device) cdm = current_app.config["CDM"]
if isinstance(cdm, tuple): # error response if not cdm:
return cdm return (
jsonify(
{
"status": 400,
"message": f'No CDM session for "{device}" has been opened yet. No session to use',
}
),
400,
)
if current_app.config.get("force_privacy_mode"): if current_app.config.get("force_privacy_mode"):
privacy_mode = True privacy_mode = True
if not cdm.get_service_certificate(session_id): if not cdm.get_service_certificate(session_id):
return make_response( return (
"Error", jsonify(
"No Service Certificate set but Privacy Mode is Enforced.", {
http_status=403, "status": 403,
"message": "No Service Certificate set but Privacy Mode is Enforced.",
}
),
403,
) )
current_app.config["pssh"] = body["init_data"] current_app.config["pssh"] = body["init_data"]
@ -350,72 +406,97 @@ def remote_cdm_widevine_get_license_challenge(device, license_type):
privacy_mode=privacy_mode, privacy_mode=privacy_mode,
) )
except InvalidSession: except InvalidSession:
return make_response( return (
"Error", jsonify(
f'Invalid Session ID "{session_id.hex()}", it may have expired', {
http_status=400, "status": 400,
"message": f'Invalid Session ID "{session_id.hex()}", it may have expired',
}
),
400,
) )
except InvalidInitData as error: except InvalidInitData as error:
return make_response( return jsonify({"status": 400, "message": f"Invalid Init Data, {error}"}), 400
"Error",
f"Invalid Init Data, {error}",
http_status=400,
)
except InvalidLicenseType: except InvalidLicenseType:
return make_response( return (
"Error", jsonify({"status": 400, "message": f"Invalid License Type {license_type}"}),
f"Invalid License Type {license_type}", 400,
http_status=400,
) )
return make_response( return (
"Success", jsonify(
"Successfully got the License Challenge", {
{"challenge_b64": base64.b64encode(license_request).decode()}, "status": 200,
http_status=200, "message": "Success",
"data": {"challenge_b64": base64.b64encode(license_request).decode()},
}
),
200,
) )
@remotecdm_wv_bp.route("/remotecdm/widevine/<device>/parse_license", methods=["POST"]) @remotecdm_wv_bp.route("/remotecdm/widevine/<device>/parse_license", methods=["POST"])
def remote_cdm_widevine_parse_license(device): def remote_cdm_widevine_parse_license(device):
"""Handle the remote device Widevine parse license."""
body = request.get_json() body = request.get_json()
missing_field = check_required_fields(body, ("session_id", "license_message")) for required_field in ("session_id", "license_message"):
if missing_field: if not body.get(required_field):
return missing_field return (
jsonify(
{
"status": 400,
"message": f'Missing required field "{required_field}" in JSON body',
}
),
400,
)
session_id = bytes.fromhex(body["session_id"]) session_id = bytes.fromhex(body["session_id"])
cdm = get_cdm_or_error(device) cdm = current_app.config["CDM"]
if isinstance(cdm, tuple): # error response if not cdm:
return cdm return (
jsonify(
{
"status": 400,
"message": f'No CDM session for "{device}" has been opened yet. No session to use',
}
),
400,
)
try: try:
cdm.parse_license(session_id, body["license_message"]) cdm.parse_license(session_id, body["license_message"])
except InvalidLicenseMessage as error: except InvalidLicenseMessage as error:
return make_response( return (
"Error", jsonify({"status": 400, "message": f"Invalid License Message, {error}"}),
f"Invalid License Message, {error}", 400,
http_status=400,
) )
except InvalidContext as error: except InvalidContext as error:
return make_response( return jsonify({"status": 400, "message": f"Invalid Context, {error}"}), 400
"Error",
f"Invalid Context, {error}",
http_status=400,
)
except InvalidSession: except InvalidSession:
return make_response( return (
"Error", jsonify(
f'Invalid Session ID "{session_id.hex()}", it may have expired', {
http_status=400, "status": 400,
"message": f'Invalid Session ID "{session_id.hex()}", it may have expired',
}
),
400,
) )
except SignatureMismatch: except SignatureMismatch:
return make_response( return (
"Error", jsonify(
"Signature Validation failed on the License Message, rejecting.", {
http_status=400, "status": 400,
"message": f"Signature Validation failed on the License Message, rejecting.",
}
),
400,
) )
return make_response( return (
"Success", jsonify(
"Successfully parsed and loaded the Keys from the License message.", {
http_status=200, "status": 200,
"message": "Successfully parsed and loaded the Keys from the License message.",
}
),
200,
) )
@ -423,30 +504,54 @@ def remote_cdm_widevine_parse_license(device):
"/remotecdm/widevine/<device>/get_keys/<key_type>", methods=["POST"] "/remotecdm/widevine/<device>/get_keys/<key_type>", methods=["POST"]
) )
def remote_cdm_widevine_get_keys(device, key_type): def remote_cdm_widevine_get_keys(device, key_type):
"""Handle the remote device Widevine get keys."""
body = request.get_json() body = request.get_json()
missing_field = check_required_fields(body, ("session_id",)) for required_field in ("session_id",):
if missing_field: if not body.get(required_field):
return missing_field return (
jsonify(
{
"status": 400,
"message": f'Missing required field "{required_field}" in JSON body',
}
),
400,
)
session_id = bytes.fromhex(body["session_id"]) session_id = bytes.fromhex(body["session_id"])
key_type: Optional[str] = key_type
if key_type == "ALL": if key_type == "ALL":
key_type = None key_type = None
cdm = get_cdm_or_error(device) cdm = current_app.config["CDM"]
if isinstance(cdm, tuple): # error response if not cdm:
return cdm return (
jsonify(
{
"status": 400,
"message": f'No CDM session for "{device}" has been opened yet. No session to use',
}
),
400,
)
try: try:
keys = cdm.get_keys(session_id, key_type) keys = cdm.get_keys(session_id, key_type)
except InvalidSession: except InvalidSession:
return make_response( return (
"Error", jsonify(
f'Invalid Session ID "{session_id.hex()}", it may have expired', {
http_status=400, "status": 400,
"message": f'Invalid Session ID "{session_id.hex()}", it may have expired',
}
),
400,
) )
except ValueError as error: except ValueError as error:
return make_response( return (
"Error", jsonify(
f'The Key Type value "{key_type}" is invalid, {error}', {
http_status=400, "status": 400,
"message": f'The Key Type value "{key_type}" is invalid, {error}',
}
),
400,
) )
keys_json = [ keys_json = [
{ {
@ -459,6 +564,10 @@ def remote_cdm_widevine_get_keys(device, key_type):
if not key_type or key.type == key_type if not key_type or key.type == key_type
] ]
for entry in keys_json: for entry in keys_json:
if config["database_type"].lower() != "mariadb":
from custom_functions.database.cache_to_db_sqlite import cache_to_db
elif config["database_type"].lower() == "mariadb":
from custom_functions.database.cache_to_db_mariadb import cache_to_db
if entry["type"] != "SIGNING": if entry["type"] != "SIGNING":
cache_to_db( cache_to_db(
pssh=str(current_app.config["pssh"]), pssh=str(current_app.config["pssh"]),
@ -466,9 +575,7 @@ def remote_cdm_widevine_get_keys(device, key_type):
key=entry["key"], key=entry["key"],
) )
return make_response( return (
"Success", jsonify({"status": 200, "message": "Success", "data": {"keys": keys_json}}),
"Successfully got the Keys", 200,
{"keys": keys_json},
http_status=200,
) )