Implement username sanitization in device handling modules and improve path management for user-uploaded devices. Enhance error handling and response formatting in remote device routes for better consistency

This commit is contained in:
voldemort 2025-07-24 21:05:41 +07:00
parent f83d22c09e
commit 8697342e9c
4 changed files with 126 additions and 92 deletions

View File

@ -5,6 +5,7 @@ import ast
import glob import glob
import json import json
import os import os
import re
from urllib.parse import urlparse from urllib.parse import urlparse
import binascii import binascii
@ -113,6 +114,11 @@ def is_url_and_split(input_str):
return False, None, None return False, None, None
def sanitize_username(username):
"""Sanitize the username."""
return re.sub(r"[^a-zA-Z0-9_\-]", "_", username).lower()
def load_device(device_type, device, username, config): def load_device(device_type, device, username, config):
"""Load the appropriate device file for PlayReady or Widevine.""" """Load the appropriate device file for PlayReady or Widevine."""
if device_type == "PR": if device_type == "PR":
@ -126,12 +132,21 @@ def load_device(device_type, device, username, config):
base_name = config[config_key] base_name = config[config_key]
if not base_name.endswith(ext): if not base_name.endswith(ext):
base_name += ext base_name += ext
search_path = f"{os.getcwd()}/configs/CDMs/{base_dir}/{base_name}" search_path = os.path.join(os.getcwd(), "configs", "CDMs", base_dir, base_name)
else: else:
base_name = device base_name = device
if not base_name.endswith(ext): if not base_name.endswith(ext):
base_name += ext base_name += ext
search_path = f"{os.getcwd()}/configs/CDMs/{username}/{base_dir}/{base_name}" safe_username = sanitize_username(username)
search_path = os.path.join(
os.getcwd(),
"configs",
"CDMs",
"users_uploaded",
safe_username,
base_dir,
base_name,
)
files = glob.glob(search_path) files = glob.glob(search_path)
if not files: if not files:

View File

@ -2,11 +2,19 @@
import os import os
import glob import glob
import re
def sanitize_username(username):
"""Sanitize the username."""
return re.sub(r"[^a-zA-Z0-9_\-]", "_", username).lower()
def user_allowed_to_use_device(device, username): def user_allowed_to_use_device(device, username):
"""Check if the user is allowed to use the device.""" """Check if the user is allowed to use the device."""
base_path = os.path.join(os.getcwd(), "configs", "CDMs", username) base_path = os.path.join(
os.getcwd(), "configs", "CDMs", "users_uploaded", sanitize_username(username)
)
# Get filenames with extensions # Get filenames with extensions
pr_files = [ pr_files = [

View File

@ -263,7 +263,7 @@ def remote_cdm_playready_get_license_challenge(device):
return make_response( return make_response(
"Success", "Success",
"Successfully got the License Challenge", "Successfully got the License Challenge",
{"challenge_b64": base64.b64encode(license_request).decode()}, {"challenge_b64": base64.b64encode(license_request.encode("utf-8")).decode()},
http_status=200, http_status=200,
) )
@ -317,24 +317,17 @@ def remote_cdm_playready_get_keys(device):
if missing_field: if missing_field:
return missing_field return missing_field
session_id = bytes.fromhex(body["session_id"]) session_id = bytes.fromhex(body["session_id"])
key_type = body.get("key_type", None)
cdm = get_cdm_or_error(device) cdm = get_cdm_or_error(device)
if isinstance(cdm, tuple): # error response if isinstance(cdm, tuple): # error response
return cdm return cdm
try: try:
keys = cdm.get_keys(session_id, key_type) keys = cdm.get_keys(session_id)
except InvalidSession: except InvalidSession:
return make_response( return make_response(
"Error", "Error",
f"Invalid Session ID '{session_id.hex()}', it may have expired.", f"Invalid Session ID '{session_id.hex()}', it may have expired.",
http_status=400, http_status=400,
) )
except ValueError as error:
return make_response(
"Error",
f"The Key Type value '{key_type}' is invalid, {error}",
http_status=400,
)
keys_json = [ keys_json = [
{ {
"key_id": key.key_id.hex, "key_id": key.key_id.hex,
@ -344,7 +337,6 @@ def remote_cdm_playready_get_keys(device):
"key_length": key.key_length, "key_length": key.key_length,
} }
for key in keys for key in keys
if not key_type or key.type == key_type
] ]
return make_response( return make_response(
"Success", "Success",

View File

@ -52,6 +52,19 @@ def check_required_fields(body, required_fields):
) )
return None return None
def get_cdm_or_error(device: str):
"""Get the CDM or return an error response."""
cdm = current_app.config.get("CDM")
if not cdm:
return make_response(
"Error",
f'No CDM session for "{device}" has been opened yet. No session to use',
http_status=400,
)
return cdm
@remotecdm_wv_bp.route("/remotecdm/widevine", methods=["GET", "HEAD"]) @remotecdm_wv_bp.route("/remotecdm/widevine", methods=["GET", "HEAD"])
def remote_cdm_widevine(): def remote_cdm_widevine():
"""Handle the remote device Widevine.""" """Handle the remote device Widevine."""
@ -84,9 +97,7 @@ def remote_cdm_widevine_deviceinfo():
os.path.join(os.getcwd(), "configs", "CDMs", "WV", base_name) os.path.join(os.getcwd(), "configs", "CDMs", "WV", base_name)
) )
cdm = widevineCDM.from_device(device) cdm = widevineCDM.from_device(device)
return make_response( return jsonify(
"Success",
"Successfully got the Widevine CDM device info",
{ {
"device_type": cdm.device_type.name, "device_type": cdm.device_type.name,
"system_id": cdm.system_id, "system_id": cdm.system_id,
@ -94,8 +105,7 @@ def remote_cdm_widevine_deviceinfo():
"host": f'{config["fqdn"]}/remotecdm/widevine', "host": f'{config["fqdn"]}/remotecdm/widevine',
"secret": f'{config["remote_cdm_secret"]}', "secret": f'{config["remote_cdm_secret"]}',
"device_name": Path(base_name).stem, "device_name": Path(base_name).stem,
}, }
http_status=200,
) )
@ -110,22 +120,19 @@ def remote_cdm_widevine_deviceinfo_specific(device):
base_name = Path(device).with_suffix(".wvd").name base_name = Path(device).with_suffix(".wvd").name
api_key = request.headers["X-Secret-Key"] api_key = request.headers["X-Secret-Key"]
username = fetch_username_by_api_key(api_key) username = fetch_username_by_api_key(api_key)
safe_username = sanitize_username(username)
device = widevineDevice.load( device = widevineDevice.load(
os.path.join( os.path.join(
os.getcwd(), os.getcwd(),
"configs", "configs",
"CDMs", "CDMs",
"users_uploaded", "users_uploaded",
safe_username, sanitize_username(username),
"WV", "WV",
base_name, base_name,
) )
) )
cdm = widevineCDM.from_device(device) cdm = widevineCDM.from_device(device)
return make_response( return jsonify(
"Success",
"Successfully got the Widevine CDM device info (by user)",
{ {
"device_type": cdm.device_type.name, "device_type": cdm.device_type.name,
"system_id": cdm.system_id, "system_id": cdm.system_id,
@ -133,69 +140,24 @@ def remote_cdm_widevine_deviceinfo_specific(device):
"host": f'{config["fqdn"]}/remotecdm/widevine', "host": f'{config["fqdn"]}/remotecdm/widevine',
"secret": f"{api_key}", "secret": f"{api_key}",
"device_name": Path(base_name).stem, "device_name": Path(base_name).stem,
}, }
http_status=200,
) )
def load_widevine_device(device_name, api_key=None):
"""Load a Widevine device, either default or user-uploaded."""
try:
if device_name.lower() == config["default_wv_cdm"].lower():
path = os.path.join(
os.getcwd(), "configs", "CDMs", "WV", config["default_wv_cdm"] + ".wvd"
)
else:
if not api_key:
return None
username = fetch_username_by_api_key(api_key)
if not username or not user_allowed_to_use_device(
device=device_name, username=username
):
return None
safe_username = sanitize_username(username)
path = os.path.join(
os.getcwd(),
"configs",
"CDMs",
"users_uploaded",
safe_username,
"WV",
device_name + ".wvd",
)
return widevineDevice.load(path)
except (FileNotFoundError, ValueError):
return None
def get_cdm_or_error(device):
"""Get the CDM or return an error response."""
cdm = current_app.config.get("CDM")
if not cdm:
return make_response(
"Error",
f'No CDM session for "{device}" has been opened yet. No session to use',
http_status=400,
)
return cdm
@remotecdm_wv_bp.route("/remotecdm/widevine/<device>/open", methods=["GET"]) @remotecdm_wv_bp.route("/remotecdm/widevine/<device>/open", methods=["GET"])
def remote_cdm_widevine_open(device): def remote_cdm_widevine_open(device):
"""Handle the remote device Widevine open.""" """Handle the remote device Widevine open."""
api_key = request.headers.get("X-Secret-Key") if str(device).lower() == config["default_wv_cdm"].lower():
wv_device = load_widevine_device(device, api_key) wv_device = widevineDevice.load(
if not wv_device: os.path.join(
return make_response( os.getcwd(), "configs", "CDMs", "WV", config["default_wv_cdm"] + ".wvd"
"Error", )
f"Device '{device}' is not found or you are not authorized to use it.",
http_status=403,
) )
cdm = current_app.config["CDM"] = widevineCDM.from_device(wv_device) cdm = current_app.config["CDM"] = widevineCDM.from_device(wv_device)
session_id = cdm.open() session_id = cdm.open()
return make_response( return make_response(
"Success", "Success",
"Successfully opened the Widevine CDM session", "Successfully opened the Widevine Session",
{ {
"session_id": session_id.hex(), "session_id": session_id.hex(),
"device": { "device": {
@ -205,6 +167,43 @@ def remote_cdm_widevine_open(device):
}, },
http_status=200, http_status=200,
) )
if (
request.headers["X-Secret-Key"]
and str(device).lower() != config["default_wv_cdm"].lower()
):
api_key = request.headers["X-Secret-Key"]
user = fetch_username_by_api_key(api_key=api_key)
if user and user_allowed_to_use_device(device=device, username=user):
wv_device = widevineDevice.load(
os.path.join(
os.getcwd(), "configs", "CDMs", user, "WV", device + ".wvd"
)
)
cdm = current_app.config["CDM"] = widevineCDM.from_device(wv_device)
session_id = cdm.open()
return make_response(
"Success",
"Successfully opened the Widevine Session",
{
"session_id": session_id.hex(),
"device": {
"system_id": cdm.system_id,
"security_level": cdm.security_level,
},
},
http_status=200,
)
return make_response(
"Error",
f"Device '{device}' is not found or you are not authorized to use it.",
http_status=403,
)
return make_response(
"Error",
f"Device '{device}' is not found or you are not authorized to use it.",
http_status=403,
)
@remotecdm_wv_bp.route( @remotecdm_wv_bp.route(
@ -216,6 +215,7 @@ def remote_cdm_widevine_close(device, session_id):
cdm = get_cdm_or_error(device) cdm = get_cdm_or_error(device)
if isinstance(cdm, tuple): # error response if isinstance(cdm, tuple): # error response
return cdm return cdm
try: try:
cdm.close(session_id) cdm.close(session_id)
except InvalidSession: except InvalidSession:
@ -257,22 +257,27 @@ def remote_cdm_widevine_set_service_certificate(device):
f'Invalid session id: "{session_id.hex()}", it may have expired', f'Invalid session id: "{session_id.hex()}", it may have expired',
http_status=400, http_status=400,
) )
except DecodeError as error: except DecodeError as error:
return make_response( return make_response(
"Error", "Error",
f"Invalid Service Certificate, {error}", f"Invalid Service Certificate, {error}",
http_status=400, http_status=400,
) )
except SignatureMismatch: except SignatureMismatch:
return make_response( return make_response(
"Error", "Error",
"Signature Validation failed on the Service Certificate, rejecting", "Signature Validation failed on the Service Certificate, rejecting",
http_status=400, http_status=400,
) )
return make_response( return make_response(
"Success", "Success",
f"Successfully {['set', 'unset'][not certificate]} the Service Certificate.", f"Successfully {['set', 'unset'][not certificate]} the Service Certificate.",
{"provider_id": provider_id}, {
"provider_id": provider_id,
},
http_status=200, http_status=200,
) )
@ -301,6 +306,7 @@ def remote_cdm_widevine_get_service_certificate(device):
f'Invalid Session ID "{session_id.hex()}", it may have expired', f'Invalid Session ID "{session_id.hex()}", it may have expired',
http_status=400, http_status=400,
) )
if service_certificate: if service_certificate:
service_certificate_b64 = base64.b64encode( service_certificate_b64 = base64.b64encode(
service_certificate.SerializeToString() service_certificate.SerializeToString()
@ -325,18 +331,24 @@ def remote_cdm_widevine_get_license_challenge(device, license_type):
missing_field = check_required_fields(body, ("session_id", "init_data")) missing_field = check_required_fields(body, ("session_id", "init_data"))
if missing_field: if missing_field:
return missing_field return missing_field
session_id = bytes.fromhex(body["session_id"]) session_id = bytes.fromhex(body["session_id"])
privacy_mode = body.get("privacy_mode", True) privacy_mode = body.get("privacy_mode", True)
cdm = get_cdm_or_error(device) cdm = get_cdm_or_error(device)
if isinstance(cdm, tuple): # error response if isinstance(cdm, tuple): # error response
return cdm return cdm
if current_app.config.get("force_privacy_mode"): if current_app.config.get("force_privacy_mode"):
privacy_mode = True privacy_mode = True
if not cdm.get_service_certificate(session_id): if not cdm.get_service_certificate(session_id):
return make_response( return (
"Error", jsonify(
"No Service Certificate set but Privacy Mode is Enforced.", {
http_status=403, "status": 403,
"message": "No Service Certificate set but Privacy Mode is Enforced.",
}
),
403,
) )
current_app.config["pssh"] = body["init_data"] current_app.config["pssh"] = body["init_data"]
@ -355,18 +367,21 @@ def remote_cdm_widevine_get_license_challenge(device, license_type):
f'Invalid Session ID "{session_id.hex()}", it may have expired', f'Invalid Session ID "{session_id.hex()}", it may have expired',
http_status=400, http_status=400,
) )
except InvalidInitData as error: except InvalidInitData as error:
return make_response( return make_response(
"Error", "Error",
f"Invalid Init Data, {error}", f"Invalid Init Data, {error}",
http_status=400, http_status=400,
) )
except InvalidLicenseType: except InvalidLicenseType:
return make_response( return make_response(
"Error", "Error",
f"Invalid License Type {license_type}", f"Invalid License Type {license_type}",
http_status=400, http_status=400,
) )
return make_response( return make_response(
"Success", "Success",
"Successfully got the License Challenge", "Successfully got the License Challenge",
@ -382,10 +397,13 @@ def remote_cdm_widevine_parse_license(device):
missing_field = check_required_fields(body, ("session_id", "license_message")) missing_field = check_required_fields(body, ("session_id", "license_message"))
if missing_field: if missing_field:
return missing_field return missing_field
session_id = bytes.fromhex(body["session_id"]) session_id = bytes.fromhex(body["session_id"])
cdm = get_cdm_or_error(device) cdm = get_cdm_or_error(device)
if isinstance(cdm, tuple): # error response if isinstance(cdm, tuple): # error response
return cdm return cdm
try: try:
cdm.parse_license(session_id, body["license_message"]) cdm.parse_license(session_id, body["license_message"])
except InvalidLicenseMessage as error: except InvalidLicenseMessage as error:
@ -394,24 +412,23 @@ def remote_cdm_widevine_parse_license(device):
f"Invalid License Message, {error}", f"Invalid License Message, {error}",
http_status=400, http_status=400,
) )
except InvalidContext as error: except InvalidContext as error:
return make_response( return jsonify({"status": 400, "message": f"Invalid Context, {error}"}), 400
"Error",
f"Invalid Context, {error}",
http_status=400,
)
except InvalidSession: except InvalidSession:
return make_response( return make_response(
"Error", "Error",
f'Invalid Session ID "{session_id.hex()}", it may have expired', f'Invalid Session ID "{session_id.hex()}", it may have expired',
http_status=400, http_status=400,
) )
except SignatureMismatch: except SignatureMismatch:
return make_response( return make_response(
"Error", "Error",
"Signature Validation failed on the License Message, rejecting.", "Signature Validation failed on the License Message, rejecting.",
http_status=400, http_status=400,
) )
return make_response( return make_response(
"Success", "Success",
"Successfully parsed and loaded the Keys from the License message.", "Successfully parsed and loaded the Keys from the License message.",
@ -428,12 +445,14 @@ def remote_cdm_widevine_get_keys(device, key_type):
missing_field = check_required_fields(body, ("session_id",)) missing_field = check_required_fields(body, ("session_id",))
if missing_field: if missing_field:
return missing_field return missing_field
session_id = bytes.fromhex(body["session_id"]) session_id = bytes.fromhex(body["session_id"])
if key_type == "ALL": if key_type == "ALL":
key_type = None key_type = None
cdm = get_cdm_or_error(device) cdm = get_cdm_or_error(device)
if isinstance(cdm, tuple): # error response if isinstance(cdm, tuple): # error response
return cdm return cdm
try: try:
keys = cdm.get_keys(session_id, key_type) keys = cdm.get_keys(session_id, key_type)
except InvalidSession: except InvalidSession: