diff --git a/custom_functions/decrypt/api_decrypt.py b/custom_functions/decrypt/api_decrypt.py index 694098b..4d08d49 100644 --- a/custom_functions/decrypt/api_decrypt.py +++ b/custom_functions/decrypt/api_decrypt.py @@ -5,6 +5,7 @@ import ast import glob import json import os +import re from urllib.parse import urlparse import binascii @@ -113,6 +114,11 @@ def is_url_and_split(input_str): return False, None, None +def sanitize_username(username): + """Sanitize the username.""" + return re.sub(r"[^a-zA-Z0-9_\-]", "_", username).lower() + + def load_device(device_type, device, username, config): """Load the appropriate device file for PlayReady or Widevine.""" if device_type == "PR": @@ -126,12 +132,21 @@ def load_device(device_type, device, username, config): base_name = config[config_key] if not base_name.endswith(ext): base_name += ext - search_path = f"{os.getcwd()}/configs/CDMs/{base_dir}/{base_name}" + search_path = os.path.join(os.getcwd(), "configs", "CDMs", base_dir, base_name) else: base_name = device if not base_name.endswith(ext): base_name += ext - search_path = f"{os.getcwd()}/configs/CDMs/{username}/{base_dir}/{base_name}" + safe_username = sanitize_username(username) + search_path = os.path.join( + os.getcwd(), + "configs", + "CDMs", + "users_uploaded", + safe_username, + base_dir, + base_name, + ) files = glob.glob(search_path) if not files: diff --git a/custom_functions/user_checks/device_allowed.py b/custom_functions/user_checks/device_allowed.py index 1e2d283..57fd09e 100644 --- a/custom_functions/user_checks/device_allowed.py +++ b/custom_functions/user_checks/device_allowed.py @@ -2,11 +2,19 @@ import os import glob +import re + + +def sanitize_username(username): + """Sanitize the username.""" + return re.sub(r"[^a-zA-Z0-9_\-]", "_", username).lower() def user_allowed_to_use_device(device, username): """Check if the user is allowed to use the device.""" - base_path = os.path.join(os.getcwd(), "configs", "CDMs", username) + base_path = os.path.join( + os.getcwd(), "configs", "CDMs", "users_uploaded", sanitize_username(username) + ) # Get filenames with extensions pr_files = [ diff --git a/routes/remote_device_pr.py b/routes/remote_device_pr.py index 77c50e9..fbe7944 100644 --- a/routes/remote_device_pr.py +++ b/routes/remote_device_pr.py @@ -263,7 +263,7 @@ def remote_cdm_playready_get_license_challenge(device): return make_response( "Success", "Successfully got the License Challenge", - {"challenge_b64": base64.b64encode(license_request).decode()}, + {"challenge_b64": base64.b64encode(license_request.encode("utf-8")).decode()}, http_status=200, ) @@ -317,24 +317,17 @@ def remote_cdm_playready_get_keys(device): if missing_field: return missing_field session_id = bytes.fromhex(body["session_id"]) - key_type = body.get("key_type", None) cdm = get_cdm_or_error(device) if isinstance(cdm, tuple): # error response return cdm try: - keys = cdm.get_keys(session_id, key_type) + keys = cdm.get_keys(session_id) except InvalidSession: return make_response( "Error", f"Invalid Session ID '{session_id.hex()}', it may have expired.", http_status=400, ) - except ValueError as error: - return make_response( - "Error", - f"The Key Type value '{key_type}' is invalid, {error}", - http_status=400, - ) keys_json = [ { "key_id": key.key_id.hex, @@ -344,7 +337,6 @@ def remote_cdm_playready_get_keys(device): "key_length": key.key_length, } for key in keys - if not key_type or key.type == key_type ] return make_response( "Success", diff --git a/routes/remote_device_wv.py b/routes/remote_device_wv.py index 565f34f..8d1ac2c 100644 --- a/routes/remote_device_wv.py +++ b/routes/remote_device_wv.py @@ -52,6 +52,19 @@ def check_required_fields(body, required_fields): ) return None + +def get_cdm_or_error(device: str): + """Get the CDM or return an error response.""" + cdm = current_app.config.get("CDM") + if not cdm: + return make_response( + "Error", + f'No CDM session for "{device}" has been opened yet. No session to use', + http_status=400, + ) + return cdm + + @remotecdm_wv_bp.route("/remotecdm/widevine", methods=["GET", "HEAD"]) def remote_cdm_widevine(): """Handle the remote device Widevine.""" @@ -84,9 +97,7 @@ def remote_cdm_widevine_deviceinfo(): os.path.join(os.getcwd(), "configs", "CDMs", "WV", base_name) ) cdm = widevineCDM.from_device(device) - return make_response( - "Success", - "Successfully got the Widevine CDM device info", + return jsonify( { "device_type": cdm.device_type.name, "system_id": cdm.system_id, @@ -94,8 +105,7 @@ def remote_cdm_widevine_deviceinfo(): "host": f'{config["fqdn"]}/remotecdm/widevine', "secret": f'{config["remote_cdm_secret"]}', "device_name": Path(base_name).stem, - }, - http_status=200, + } ) @@ -110,22 +120,19 @@ def remote_cdm_widevine_deviceinfo_specific(device): base_name = Path(device).with_suffix(".wvd").name api_key = request.headers["X-Secret-Key"] username = fetch_username_by_api_key(api_key) - safe_username = sanitize_username(username) device = widevineDevice.load( os.path.join( os.getcwd(), "configs", "CDMs", "users_uploaded", - safe_username, + sanitize_username(username), "WV", base_name, ) ) cdm = widevineCDM.from_device(device) - return make_response( - "Success", - "Successfully got the Widevine CDM device info (by user)", + return jsonify( { "device_type": cdm.device_type.name, "system_id": cdm.system_id, @@ -133,77 +140,69 @@ def remote_cdm_widevine_deviceinfo_specific(device): "host": f'{config["fqdn"]}/remotecdm/widevine', "secret": f"{api_key}", "device_name": Path(base_name).stem, - }, - http_status=200, + } ) -def load_widevine_device(device_name, api_key=None): - """Load a Widevine device, either default or user-uploaded.""" - try: - if device_name.lower() == config["default_wv_cdm"].lower(): - path = os.path.join( - os.getcwd(), "configs", "CDMs", "WV", config["default_wv_cdm"] + ".wvd" - ) - else: - if not api_key: - return None - username = fetch_username_by_api_key(api_key) - if not username or not user_allowed_to_use_device( - device=device_name, username=username - ): - return None - safe_username = sanitize_username(username) - path = os.path.join( - os.getcwd(), - "configs", - "CDMs", - "users_uploaded", - safe_username, - "WV", - device_name + ".wvd", - ) - return widevineDevice.load(path) - except (FileNotFoundError, ValueError): - return None - - -def get_cdm_or_error(device): - """Get the CDM or return an error response.""" - cdm = current_app.config.get("CDM") - if not cdm: - return make_response( - "Error", - f'No CDM session for "{device}" has been opened yet. No session to use', - http_status=400, - ) - return cdm - - @remotecdm_wv_bp.route("/remotecdm/widevine//open", methods=["GET"]) def remote_cdm_widevine_open(device): """Handle the remote device Widevine open.""" - api_key = request.headers.get("X-Secret-Key") - wv_device = load_widevine_device(device, api_key) - if not wv_device: + if str(device).lower() == config["default_wv_cdm"].lower(): + wv_device = widevineDevice.load( + os.path.join( + os.getcwd(), "configs", "CDMs", "WV", config["default_wv_cdm"] + ".wvd" + ) + ) + cdm = current_app.config["CDM"] = widevineCDM.from_device(wv_device) + session_id = cdm.open() + return make_response( + "Success", + "Successfully opened the Widevine Session", + { + "session_id": session_id.hex(), + "device": { + "system_id": cdm.system_id, + "security_level": cdm.security_level, + }, + }, + http_status=200, + ) + if ( + request.headers["X-Secret-Key"] + and str(device).lower() != config["default_wv_cdm"].lower() + ): + api_key = request.headers["X-Secret-Key"] + user = fetch_username_by_api_key(api_key=api_key) + if user and user_allowed_to_use_device(device=device, username=user): + wv_device = widevineDevice.load( + os.path.join( + os.getcwd(), "configs", "CDMs", user, "WV", device + ".wvd" + ) + ) + cdm = current_app.config["CDM"] = widevineCDM.from_device(wv_device) + session_id = cdm.open() + return make_response( + "Success", + "Successfully opened the Widevine Session", + { + "session_id": session_id.hex(), + "device": { + "system_id": cdm.system_id, + "security_level": cdm.security_level, + }, + }, + http_status=200, + ) return make_response( "Error", f"Device '{device}' is not found or you are not authorized to use it.", http_status=403, ) - cdm = current_app.config["CDM"] = widevineCDM.from_device(wv_device) - session_id = cdm.open() + return make_response( - "Success", - "Successfully opened the Widevine CDM session", - { - "session_id": session_id.hex(), - "device": { - "system_id": cdm.system_id, - "security_level": cdm.security_level, - }, - }, - http_status=200, + "Error", + f"Device '{device}' is not found or you are not authorized to use it.", + http_status=403, ) @@ -216,6 +215,7 @@ def remote_cdm_widevine_close(device, session_id): cdm = get_cdm_or_error(device) if isinstance(cdm, tuple): # error response return cdm + try: cdm.close(session_id) except InvalidSession: @@ -257,22 +257,27 @@ def remote_cdm_widevine_set_service_certificate(device): f'Invalid session id: "{session_id.hex()}", it may have expired', http_status=400, ) + except DecodeError as error: return make_response( "Error", f"Invalid Service Certificate, {error}", http_status=400, ) + except SignatureMismatch: return make_response( "Error", "Signature Validation failed on the Service Certificate, rejecting", http_status=400, ) + return make_response( "Success", f"Successfully {['set', 'unset'][not certificate]} the Service Certificate.", - {"provider_id": provider_id}, + { + "provider_id": provider_id, + }, http_status=200, ) @@ -301,6 +306,7 @@ def remote_cdm_widevine_get_service_certificate(device): f'Invalid Session ID "{session_id.hex()}", it may have expired', http_status=400, ) + if service_certificate: service_certificate_b64 = base64.b64encode( service_certificate.SerializeToString() @@ -325,18 +331,24 @@ def remote_cdm_widevine_get_license_challenge(device, license_type): missing_field = check_required_fields(body, ("session_id", "init_data")) if missing_field: return missing_field + session_id = bytes.fromhex(body["session_id"]) privacy_mode = body.get("privacy_mode", True) cdm = get_cdm_or_error(device) if isinstance(cdm, tuple): # error response return cdm + if current_app.config.get("force_privacy_mode"): privacy_mode = True if not cdm.get_service_certificate(session_id): - return make_response( - "Error", - "No Service Certificate set but Privacy Mode is Enforced.", - http_status=403, + return ( + jsonify( + { + "status": 403, + "message": "No Service Certificate set but Privacy Mode is Enforced.", + } + ), + 403, ) current_app.config["pssh"] = body["init_data"] @@ -355,18 +367,21 @@ def remote_cdm_widevine_get_license_challenge(device, license_type): f'Invalid Session ID "{session_id.hex()}", it may have expired', http_status=400, ) + except InvalidInitData as error: return make_response( "Error", f"Invalid Init Data, {error}", http_status=400, ) + except InvalidLicenseType: return make_response( "Error", f"Invalid License Type {license_type}", http_status=400, ) + return make_response( "Success", "Successfully got the License Challenge", @@ -382,10 +397,13 @@ def remote_cdm_widevine_parse_license(device): missing_field = check_required_fields(body, ("session_id", "license_message")) if missing_field: return missing_field + session_id = bytes.fromhex(body["session_id"]) + cdm = get_cdm_or_error(device) if isinstance(cdm, tuple): # error response return cdm + try: cdm.parse_license(session_id, body["license_message"]) except InvalidLicenseMessage as error: @@ -394,24 +412,23 @@ def remote_cdm_widevine_parse_license(device): f"Invalid License Message, {error}", http_status=400, ) + except InvalidContext as error: - return make_response( - "Error", - f"Invalid Context, {error}", - http_status=400, - ) + return jsonify({"status": 400, "message": f"Invalid Context, {error}"}), 400 except InvalidSession: return make_response( "Error", f'Invalid Session ID "{session_id.hex()}", it may have expired', http_status=400, ) + except SignatureMismatch: return make_response( "Error", "Signature Validation failed on the License Message, rejecting.", http_status=400, ) + return make_response( "Success", "Successfully parsed and loaded the Keys from the License message.", @@ -428,12 +445,14 @@ def remote_cdm_widevine_get_keys(device, key_type): missing_field = check_required_fields(body, ("session_id",)) if missing_field: return missing_field + session_id = bytes.fromhex(body["session_id"]) if key_type == "ALL": key_type = None cdm = get_cdm_or_error(device) if isinstance(cdm, tuple): # error response return cdm + try: keys = cdm.get_keys(session_id, key_type) except InvalidSession: