forked from tpd94/CDRM-Project
		
	Compare commits
	
		
			2 Commits
		
	
	
		
			7f84542cfb
			...
			db7bea7951
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					db7bea7951 | ||
| 
						 | 
					7f9f04d829 | 
@ -314,8 +314,8 @@ def api_decrypt(
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    # Prepare proxies
 | 
			
		||||
    proxies = None
 | 
			
		||||
    if proxy is not None:
 | 
			
		||||
    proxies = ""
 | 
			
		||||
    if proxy != "":
 | 
			
		||||
        is_url, protocol, fqdn = is_url_and_split(proxy)
 | 
			
		||||
        if is_url:
 | 
			
		||||
            proxies = {"http": proxy, "https": proxy}
 | 
			
		||||
@ -380,7 +380,7 @@ def api_decrypt(
 | 
			
		||||
        json_data,
 | 
			
		||||
        is_widevine=(not is_pr),
 | 
			
		||||
    )
 | 
			
		||||
    if returned_keys is None:
 | 
			
		||||
    if returned_keys == "":
 | 
			
		||||
        return {"status": "error", "message": key_err}
 | 
			
		||||
 | 
			
		||||
    # Close session
 | 
			
		||||
 | 
			
		||||
@ -1,44 +1,75 @@
 | 
			
		||||
import base64
 | 
			
		||||
"""Module to handle the remote device PlayReady."""
 | 
			
		||||
 | 
			
		||||
from flask import Blueprint, jsonify, request, current_app, Response
 | 
			
		||||
import base64
 | 
			
		||||
import os
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
import re
 | 
			
		||||
import yaml
 | 
			
		||||
from flask import Blueprint, jsonify, request, current_app, Response
 | 
			
		||||
 | 
			
		||||
from pyplayready.device import Device as PlayReadyDevice
 | 
			
		||||
from pyplayready.cdm import Cdm as PlayReadyCDM
 | 
			
		||||
from pyplayready import PSSH as PlayReadyPSSH
 | 
			
		||||
from pyplayready.exceptions import (
 | 
			
		||||
    InvalidSession,
 | 
			
		||||
    TooManySessions,
 | 
			
		||||
    InvalidLicense,
 | 
			
		||||
    InvalidPssh,
 | 
			
		||||
)
 | 
			
		||||
from custom_functions.database.user_db import fetch_username_by_api_key
 | 
			
		||||
from custom_functions.decrypt.api_decrypt import is_base64
 | 
			
		||||
from custom_functions.user_checks.device_allowed import user_allowed_to_use_device
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
remotecdm_pr_bp = Blueprint("remotecdm_pr", __name__)
 | 
			
		||||
with open(f"{os.getcwd()}/configs/config.yaml", "r") as file:
 | 
			
		||||
with open(
 | 
			
		||||
    os.path.join(os.getcwd(), "configs", "config.yaml"), "r", encoding="utf-8"
 | 
			
		||||
) as file:
 | 
			
		||||
    config = yaml.safe_load(file)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def make_response(status, message, data=None, http_status=200):
 | 
			
		||||
    """Make a response."""
 | 
			
		||||
    resp = {"status": status, "message": message}
 | 
			
		||||
    if data is not None:
 | 
			
		||||
        resp["data"] = data
 | 
			
		||||
    return jsonify(resp), http_status
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def check_required_fields(body, required_fields):
 | 
			
		||||
    """Return a response tuple if a required field is missing, else None."""
 | 
			
		||||
    for field in required_fields:
 | 
			
		||||
        if not body.get(field):
 | 
			
		||||
            return make_response(
 | 
			
		||||
                "Error",
 | 
			
		||||
                f'Missing required field "{field}" in JSON body',
 | 
			
		||||
                http_status=400,
 | 
			
		||||
            )
 | 
			
		||||
    return None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@remotecdm_pr_bp.route("/remotecdm/playready", methods=["GET", "HEAD"])
 | 
			
		||||
def remote_cdm_playready():
 | 
			
		||||
    """Handle the remote device PlayReady."""
 | 
			
		||||
    if request.method == "GET":
 | 
			
		||||
        return jsonify({"message": "OK"})
 | 
			
		||||
        return make_response(
 | 
			
		||||
            "Success",
 | 
			
		||||
            "OK",
 | 
			
		||||
            http_status=200,
 | 
			
		||||
        )
 | 
			
		||||
    if request.method == "HEAD":
 | 
			
		||||
        response = Response(status=200)
 | 
			
		||||
        response.headers["Server"] = "playready serve"
 | 
			
		||||
        return response
 | 
			
		||||
    return make_response("Failed", "Method not allowed", http_status=405)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@remotecdm_pr_bp.route("/remotecdm/playready/deviceinfo", methods=["GET"])
 | 
			
		||||
def remote_cdm_playready_deviceinfo():
 | 
			
		||||
    """Handle the remote device PlayReady device info."""
 | 
			
		||||
    base_name = config["default_pr_cdm"]
 | 
			
		||||
    if not base_name.endswith(".prd"):
 | 
			
		||||
        full_file_name = base_name + ".prd"
 | 
			
		||||
    device = PlayReadyDevice.load(f"{os.getcwd()}/configs/CDMs/PR/{full_file_name}")
 | 
			
		||||
    device = PlayReadyDevice.load(
 | 
			
		||||
        os.path.join(os.getcwd(), "configs", "CDMs", "PR", base_name + ".prd")
 | 
			
		||||
    )
 | 
			
		||||
    cdm = PlayReadyCDM.from_device(device)
 | 
			
		||||
    return jsonify(
 | 
			
		||||
        {
 | 
			
		||||
@ -50,133 +81,141 @@ def remote_cdm_playready_deviceinfo():
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def sanitize_username(username):
 | 
			
		||||
    """Sanitize the username."""
 | 
			
		||||
    return re.sub(r"[^a-zA-Z0-9_\-]", "_", username).lower()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@remotecdm_pr_bp.route("/remotecdm/playready/deviceinfo/<device>", methods=["GET"])
 | 
			
		||||
def remote_cdm_playready_deviceinfo_specific(device):
 | 
			
		||||
    if request.method == "GET":
 | 
			
		||||
        base_name = Path(device).with_suffix(".prd").name
 | 
			
		||||
        api_key = request.headers["X-Secret-Key"]
 | 
			
		||||
        username = fetch_username_by_api_key(api_key)
 | 
			
		||||
        device = PlayReadyDevice.load(
 | 
			
		||||
            f"{os.getcwd()}/configs/CDMs/{username}/PR/{base_name}"
 | 
			
		||||
        )
 | 
			
		||||
        cdm = PlayReadyCDM.from_device(device)
 | 
			
		||||
        return jsonify(
 | 
			
		||||
            {
 | 
			
		||||
                "security_level": cdm.security_level,
 | 
			
		||||
                "host": f'{config["fqdn"]}/remotecdm/widevine',
 | 
			
		||||
                "secret": f"{api_key}",
 | 
			
		||||
                "device_name": Path(base_name).stem,
 | 
			
		||||
            }
 | 
			
		||||
    """Handle the remote device PlayReady device info specific."""
 | 
			
		||||
    base_name = Path(device).with_suffix(".prd").name
 | 
			
		||||
    api_key = request.headers["X-Secret-Key"]
 | 
			
		||||
    username = fetch_username_by_api_key(api_key)
 | 
			
		||||
    if not username:
 | 
			
		||||
        return jsonify({"message": "Invalid or missing API key."}), 403
 | 
			
		||||
    safe_username = sanitize_username(username)
 | 
			
		||||
    device = PlayReadyDevice.load(
 | 
			
		||||
        os.path.join(
 | 
			
		||||
            os.getcwd(),
 | 
			
		||||
            "configs",
 | 
			
		||||
            "CDMs",
 | 
			
		||||
            "users_uploaded",
 | 
			
		||||
            safe_username,
 | 
			
		||||
            "PR",
 | 
			
		||||
            base_name,
 | 
			
		||||
        )
 | 
			
		||||
    )
 | 
			
		||||
    cdm = PlayReadyCDM.from_device(device)
 | 
			
		||||
    return jsonify(
 | 
			
		||||
        {
 | 
			
		||||
            "security_level": cdm.security_level,
 | 
			
		||||
            "host": f'{config["fqdn"]}/remotecdm/playready',
 | 
			
		||||
            "secret": f"{api_key}",
 | 
			
		||||
            "device_name": Path(base_name).stem,
 | 
			
		||||
        }
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@remotecdm_pr_bp.route("/remotecdm/playready/<device>/open", methods=["GET"])
 | 
			
		||||
def remote_cdm_playready_open(device):
 | 
			
		||||
    """Handle the remote device PlayReady open."""
 | 
			
		||||
    unauthorized_msg = {
 | 
			
		||||
        "message": f"Device '{device}' is not found or you are not authorized to use it."
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    # Default device logic
 | 
			
		||||
    if str(device).lower() == config["default_pr_cdm"].lower():
 | 
			
		||||
        pr_device = PlayReadyDevice.load(
 | 
			
		||||
            f'{os.getcwd()}/configs/CDMs/PR/{config["default_pr_cdm"]}.prd'
 | 
			
		||||
            os.path.join(
 | 
			
		||||
                os.getcwd(), "configs", "CDMs", "PR", config["default_pr_cdm"] + ".prd"
 | 
			
		||||
            )
 | 
			
		||||
        )
 | 
			
		||||
        cdm = current_app.config["CDM"] = PlayReadyCDM.from_device(pr_device)
 | 
			
		||||
        session_id = cdm.open()
 | 
			
		||||
        return jsonify(
 | 
			
		||||
        return make_response(
 | 
			
		||||
            "Success",
 | 
			
		||||
            "Successfully opened the PlayReady CDM session",
 | 
			
		||||
            {
 | 
			
		||||
                "message": "Success",
 | 
			
		||||
                "data": {
 | 
			
		||||
                "session_id": session_id.hex(),
 | 
			
		||||
                "device": {"security_level": cdm.security_level},
 | 
			
		||||
            },
 | 
			
		||||
            http_status=200,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    # User device logic
 | 
			
		||||
    api_key = request.headers.get("X-Secret-Key")
 | 
			
		||||
    if api_key and str(device).lower() != config["default_pr_cdm"].lower():
 | 
			
		||||
        user = fetch_username_by_api_key(api_key=api_key)
 | 
			
		||||
        safe_username = sanitize_username(user)
 | 
			
		||||
        if user and user_allowed_to_use_device(device=device, username=user):
 | 
			
		||||
            pr_device = PlayReadyDevice.load(
 | 
			
		||||
                os.path.join(
 | 
			
		||||
                    os.getcwd(),
 | 
			
		||||
                    "configs",
 | 
			
		||||
                    "CDMs",
 | 
			
		||||
                    "users_uploaded",
 | 
			
		||||
                    safe_username,
 | 
			
		||||
                    "PR",
 | 
			
		||||
                    device + ".prd",
 | 
			
		||||
                )
 | 
			
		||||
            )
 | 
			
		||||
            cdm = current_app.config["CDM"] = PlayReadyCDM.from_device(pr_device)
 | 
			
		||||
            session_id = cdm.open()
 | 
			
		||||
            return make_response(
 | 
			
		||||
                "Success",
 | 
			
		||||
                "Successfully opened the PlayReady CDM session",
 | 
			
		||||
                {
 | 
			
		||||
                    "session_id": session_id.hex(),
 | 
			
		||||
                    "device": {"security_level": cdm.security_level},
 | 
			
		||||
                },
 | 
			
		||||
            }
 | 
			
		||||
        )
 | 
			
		||||
    if (
 | 
			
		||||
        request.headers["X-Secret-Key"]
 | 
			
		||||
        and str(device).lower() != config["default_pr_cdm"].lower()
 | 
			
		||||
    ):
 | 
			
		||||
        api_key = request.headers["X-Secret-Key"]
 | 
			
		||||
        user = fetch_username_by_api_key(api_key=api_key)
 | 
			
		||||
        if user:
 | 
			
		||||
            if user_allowed_to_use_device(device=device, username=user):
 | 
			
		||||
                pr_device = PlayReadyDevice.load(
 | 
			
		||||
                    f"{os.getcwd()}/configs/CDMs/{user}/PR/{device}.prd"
 | 
			
		||||
                )
 | 
			
		||||
                cdm = current_app.config["CDM"] = PlayReadyCDM.from_device(pr_device)
 | 
			
		||||
                session_id = cdm.open()
 | 
			
		||||
                return jsonify(
 | 
			
		||||
                    {
 | 
			
		||||
                        "message": "Success",
 | 
			
		||||
                        "data": {
 | 
			
		||||
                            "session_id": session_id.hex(),
 | 
			
		||||
                            "device": {"security_level": cdm.security_level},
 | 
			
		||||
                        },
 | 
			
		||||
                    }
 | 
			
		||||
                )
 | 
			
		||||
            else:
 | 
			
		||||
                return (
 | 
			
		||||
                    jsonify(
 | 
			
		||||
                        {
 | 
			
		||||
                            "message": f"Device '{device}' is not found or you are not authorized to use it.",
 | 
			
		||||
                        }
 | 
			
		||||
                    ),
 | 
			
		||||
                    403,
 | 
			
		||||
                )
 | 
			
		||||
        else:
 | 
			
		||||
            return (
 | 
			
		||||
                jsonify(
 | 
			
		||||
                    {
 | 
			
		||||
                        "message": f"Device '{device}' is not found or you are not authorized to use it.",
 | 
			
		||||
                    }
 | 
			
		||||
                ),
 | 
			
		||||
                403,
 | 
			
		||||
                http_status=200,
 | 
			
		||||
            )
 | 
			
		||||
    else:
 | 
			
		||||
        return (
 | 
			
		||||
            jsonify(
 | 
			
		||||
                {
 | 
			
		||||
                    "message": f"Device '{device}' is not found or you are not authorized to use it.",
 | 
			
		||||
                }
 | 
			
		||||
            ),
 | 
			
		||||
            403,
 | 
			
		||||
        return make_response("Failed", unauthorized_msg, http_status=403)
 | 
			
		||||
 | 
			
		||||
    return make_response("Failed", unauthorized_msg, http_status=403)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_cdm_or_error(device):
 | 
			
		||||
    """Get the CDM or return an error response."""
 | 
			
		||||
    cdm = current_app.config.get("CDM")
 | 
			
		||||
    if not cdm:
 | 
			
		||||
        return make_response(
 | 
			
		||||
            "Error",
 | 
			
		||||
            f'No CDM session for "{device}" has been opened yet. No session to use',
 | 
			
		||||
            http_status=400,
 | 
			
		||||
        )
 | 
			
		||||
    return cdm
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@remotecdm_pr_bp.route(
 | 
			
		||||
    "/remotecdm/playready/<device>/close/<session_id>", methods=["GET"]
 | 
			
		||||
)
 | 
			
		||||
def remote_cdm_playready_close(device, session_id):
 | 
			
		||||
    """Handle the remote device PlayReady close."""
 | 
			
		||||
    try:
 | 
			
		||||
        session_id = bytes.fromhex(session_id)
 | 
			
		||||
        cdm = current_app.config["CDM"]
 | 
			
		||||
        if not cdm:
 | 
			
		||||
            return (
 | 
			
		||||
                jsonify(
 | 
			
		||||
                    {
 | 
			
		||||
                        "message": f'No CDM for "{device}" has been opened yet. No session to close'
 | 
			
		||||
                    }
 | 
			
		||||
                ),
 | 
			
		||||
                400,
 | 
			
		||||
            )
 | 
			
		||||
        cdm = get_cdm_or_error(device)
 | 
			
		||||
        if isinstance(cdm, tuple):  # error response
 | 
			
		||||
            return cdm
 | 
			
		||||
        try:
 | 
			
		||||
            cdm.close(session_id)
 | 
			
		||||
        except InvalidSession:
 | 
			
		||||
            return (
 | 
			
		||||
                jsonify(
 | 
			
		||||
                    {
 | 
			
		||||
                        "message": f'Invalid session ID "{session_id.hex()}", it may have expired'
 | 
			
		||||
                    }
 | 
			
		||||
                ),
 | 
			
		||||
                400,
 | 
			
		||||
            return make_response(
 | 
			
		||||
                "Error",
 | 
			
		||||
                f'Invalid session ID "{session_id.hex()}", it may have expired',
 | 
			
		||||
                http_status=400,
 | 
			
		||||
            )
 | 
			
		||||
        return (
 | 
			
		||||
            jsonify(
 | 
			
		||||
                {
 | 
			
		||||
                    "message": f'Successfully closed Session "{session_id.hex()}".',
 | 
			
		||||
                }
 | 
			
		||||
            ),
 | 
			
		||||
            200,
 | 
			
		||||
        return make_response(
 | 
			
		||||
            "Success",
 | 
			
		||||
            f'Successfully closed Session "{session_id.hex()}".',
 | 
			
		||||
            http_status=200,
 | 
			
		||||
        )
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        return (
 | 
			
		||||
            jsonify({"message": f'Failed to close Session "{session_id.hex()}".'}),
 | 
			
		||||
            400,
 | 
			
		||||
    except Exception as error:
 | 
			
		||||
        return make_response(
 | 
			
		||||
            "Error",
 | 
			
		||||
            f'Failed to close Session "{session_id.hex()}", {error}.',
 | 
			
		||||
            http_status=400,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -184,18 +223,14 @@ def remote_cdm_playready_close(device, session_id):
 | 
			
		||||
    "/remotecdm/playready/<device>/get_license_challenge", methods=["POST"]
 | 
			
		||||
)
 | 
			
		||||
def remote_cdm_playready_get_license_challenge(device):
 | 
			
		||||
    """Handle the remote device PlayReady get license challenge."""
 | 
			
		||||
    body = request.get_json()
 | 
			
		||||
    for required_field in ("session_id", "init_data"):
 | 
			
		||||
        if not body.get(required_field):
 | 
			
		||||
            return (
 | 
			
		||||
                jsonify(
 | 
			
		||||
                    {
 | 
			
		||||
                        "message": f'Missing required field "{required_field}" in JSON body'
 | 
			
		||||
                    }
 | 
			
		||||
                ),
 | 
			
		||||
                400,
 | 
			
		||||
            )
 | 
			
		||||
    cdm = current_app.config["CDM"]
 | 
			
		||||
    missing_field = check_required_fields(body, ("session_id", "init_data"))
 | 
			
		||||
    if missing_field:
 | 
			
		||||
        return missing_field
 | 
			
		||||
    cdm = get_cdm_or_error(device)
 | 
			
		||||
    if isinstance(cdm, tuple):  # error response
 | 
			
		||||
        return cdm
 | 
			
		||||
    session_id = bytes.fromhex(body["session_id"])
 | 
			
		||||
    init_data = body["init_data"]
 | 
			
		||||
    if not init_data.startswith("<WRMHEADER"):
 | 
			
		||||
@ -203,38 +238,46 @@ def remote_cdm_playready_get_license_challenge(device):
 | 
			
		||||
            pssh = PlayReadyPSSH(init_data)
 | 
			
		||||
            if pssh.wrm_headers:
 | 
			
		||||
                init_data = pssh.wrm_headers[0]
 | 
			
		||||
        except InvalidPssh as e:
 | 
			
		||||
            return jsonify({"message": f"Unable to parse base64 PSSH, {e}"})
 | 
			
		||||
        except InvalidPssh as error:
 | 
			
		||||
            return make_response(
 | 
			
		||||
                "Error",
 | 
			
		||||
                f"Unable to parse base64 PSSH, {error}",
 | 
			
		||||
                http_status=400,
 | 
			
		||||
            )
 | 
			
		||||
    try:
 | 
			
		||||
        license_request = cdm.get_license_challenge(
 | 
			
		||||
            session_id=session_id, wrm_header=init_data
 | 
			
		||||
        )
 | 
			
		||||
    except InvalidSession:
 | 
			
		||||
        return jsonify(
 | 
			
		||||
            {
 | 
			
		||||
                "message": f"Invalid Session ID '{session_id.hex()}', it may have expired."
 | 
			
		||||
            }
 | 
			
		||||
        return make_response(
 | 
			
		||||
            "Error",
 | 
			
		||||
            f"Invalid Session ID '{session_id.hex()}', it may have expired.",
 | 
			
		||||
            http_status=400,
 | 
			
		||||
        )
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        return jsonify({"message": f"Error, {e}"})
 | 
			
		||||
    return jsonify({"message": "success", "data": {"challenge": license_request}})
 | 
			
		||||
    except ValueError as error:
 | 
			
		||||
        return make_response(
 | 
			
		||||
            "Error",
 | 
			
		||||
            f"Invalid License, {error}",
 | 
			
		||||
            http_status=400,
 | 
			
		||||
        )
 | 
			
		||||
    return make_response(
 | 
			
		||||
        "Success",
 | 
			
		||||
        "Successfully got the License Challenge",
 | 
			
		||||
        {"challenge_b64": base64.b64encode(license_request).decode()},
 | 
			
		||||
        http_status=200,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@remotecdm_pr_bp.route("/remotecdm/playready/<device>/parse_license", methods=["POST"])
 | 
			
		||||
def remote_cdm_playready_parse_license(device):
 | 
			
		||||
    """Handle the remote device PlayReady parse license."""
 | 
			
		||||
    body = request.get_json()
 | 
			
		||||
    for required_field in ("license_message", "session_id"):
 | 
			
		||||
        if not body.get(required_field):
 | 
			
		||||
            return jsonify(
 | 
			
		||||
                {"message": f'Missing required field "{required_field}" in JSON body'}
 | 
			
		||||
            )
 | 
			
		||||
    cdm = current_app.config["CDM"]
 | 
			
		||||
    if not cdm:
 | 
			
		||||
        return jsonify(
 | 
			
		||||
            {
 | 
			
		||||
                "message": f"No Cdm session for {device} has been opened yet. No session to use."
 | 
			
		||||
            }
 | 
			
		||||
        )
 | 
			
		||||
    missing_field = check_required_fields(body, ("license_message", "session_id"))
 | 
			
		||||
    if missing_field:
 | 
			
		||||
        return missing_field
 | 
			
		||||
    cdm = get_cdm_or_error(device)
 | 
			
		||||
    if isinstance(cdm, tuple):  # error response
 | 
			
		||||
        return cdm
 | 
			
		||||
    session_id = bytes.fromhex(body["session_id"])
 | 
			
		||||
    license_message = body["license_message"]
 | 
			
		||||
    if is_base64(license_message):
 | 
			
		||||
@ -242,44 +285,56 @@ def remote_cdm_playready_parse_license(device):
 | 
			
		||||
    try:
 | 
			
		||||
        cdm.parse_license(session_id, license_message)
 | 
			
		||||
    except InvalidSession:
 | 
			
		||||
        return jsonify(
 | 
			
		||||
            {
 | 
			
		||||
                "message": f"Invalid Session ID '{session_id.hex()}', it may have expired."
 | 
			
		||||
            }
 | 
			
		||||
        return make_response(
 | 
			
		||||
            "Error",
 | 
			
		||||
            f"Invalid Session ID '{session_id.hex()}', it may have expired.",
 | 
			
		||||
            http_status=400,
 | 
			
		||||
        )
 | 
			
		||||
    except InvalidLicense as e:
 | 
			
		||||
        return jsonify({"message": f"Invalid License, {e}"})
 | 
			
		||||
        return make_response(
 | 
			
		||||
            "Error",
 | 
			
		||||
            f"Invalid License, {e}",
 | 
			
		||||
            http_status=400,
 | 
			
		||||
        )
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        return jsonify({"message": f"Error, {e}"})
 | 
			
		||||
    return jsonify(
 | 
			
		||||
        {"message": "Successfully parsed and loaded the Keys from the License message"}
 | 
			
		||||
        return make_response(
 | 
			
		||||
            "Error",
 | 
			
		||||
            f"Error, {e}",
 | 
			
		||||
            http_status=400,
 | 
			
		||||
        )
 | 
			
		||||
    return make_response(
 | 
			
		||||
        "Success",
 | 
			
		||||
        "Successfully parsed and loaded the Keys from the License message",
 | 
			
		||||
        http_status=200,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@remotecdm_pr_bp.route("/remotecdm/playready/<device>/get_keys", methods=["POST"])
 | 
			
		||||
def remote_cdm_playready_get_keys(device):
 | 
			
		||||
    """Handle the remote device PlayReady get keys."""
 | 
			
		||||
    body = request.get_json()
 | 
			
		||||
    for required_field in ("session_id",):
 | 
			
		||||
        if not body.get(required_field):
 | 
			
		||||
            return jsonify(
 | 
			
		||||
                {"message": f'Missing required field "{required_field}" in JSON body'}
 | 
			
		||||
            )
 | 
			
		||||
    missing_field = check_required_fields(body, ("session_id",))
 | 
			
		||||
    if missing_field:
 | 
			
		||||
        return missing_field
 | 
			
		||||
    session_id = bytes.fromhex(body["session_id"])
 | 
			
		||||
    cdm = current_app.config["CDM"]
 | 
			
		||||
    if not cdm:
 | 
			
		||||
        return jsonify(
 | 
			
		||||
            {"message": f"Missing required field '{required_field}' in JSON body."}
 | 
			
		||||
        )
 | 
			
		||||
    key_type = body.get("key_type", None)
 | 
			
		||||
    cdm = get_cdm_or_error(device)
 | 
			
		||||
    if isinstance(cdm, tuple):  # error response
 | 
			
		||||
        return cdm
 | 
			
		||||
    try:
 | 
			
		||||
        keys = cdm.get_keys(session_id)
 | 
			
		||||
        keys = cdm.get_keys(session_id, key_type)
 | 
			
		||||
    except InvalidSession:
 | 
			
		||||
        return jsonify(
 | 
			
		||||
            {
 | 
			
		||||
                "message": f"Invalid Session ID '{session_id.hex()}', it may have expired."
 | 
			
		||||
            }
 | 
			
		||||
        return make_response(
 | 
			
		||||
            "Error",
 | 
			
		||||
            f"Invalid Session ID '{session_id.hex()}', it may have expired.",
 | 
			
		||||
            http_status=400,
 | 
			
		||||
        )
 | 
			
		||||
    except ValueError as error:
 | 
			
		||||
        return make_response(
 | 
			
		||||
            "Error",
 | 
			
		||||
            f"The Key Type value '{key_type}' is invalid, {error}",
 | 
			
		||||
            http_status=400,
 | 
			
		||||
        )
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        return jsonify({"message": f"Error, {e}"})
 | 
			
		||||
    keys_json = [
 | 
			
		||||
        {
 | 
			
		||||
            "key_id": key.key_id.hex,
 | 
			
		||||
@ -289,5 +344,11 @@ def remote_cdm_playready_get_keys(device):
 | 
			
		||||
            "key_length": key.key_length,
 | 
			
		||||
        }
 | 
			
		||||
        for key in keys
 | 
			
		||||
        if not key_type or key.type == key_type
 | 
			
		||||
    ]
 | 
			
		||||
    return jsonify({"message": "success", "data": {"keys": keys_json}})
 | 
			
		||||
    return make_response(
 | 
			
		||||
        "Success",
 | 
			
		||||
        "Successfully got the Keys",
 | 
			
		||||
        {"keys": keys_json},
 | 
			
		||||
        http_status=200,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
@ -1,7 +1,12 @@
 | 
			
		||||
"""Module to handle the remote device Widevine."""
 | 
			
		||||
 | 
			
		||||
import os
 | 
			
		||||
from flask import Blueprint, jsonify, request, current_app, Response
 | 
			
		||||
import base64
 | 
			
		||||
from typing import Any, Optional, Union
 | 
			
		||||
import re
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
import yaml
 | 
			
		||||
from flask import Blueprint, jsonify, request, current_app, Response
 | 
			
		||||
 | 
			
		||||
from google.protobuf.message import DecodeError
 | 
			
		||||
from pywidevine.pssh import PSSH as widevinePSSH
 | 
			
		||||
from pywidevine import __version__
 | 
			
		||||
@ -14,24 +19,47 @@ from pywidevine.exceptions import (
 | 
			
		||||
    InvalidLicenseType,
 | 
			
		||||
    InvalidSession,
 | 
			
		||||
    SignatureMismatch,
 | 
			
		||||
    TooManySessions,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
import yaml
 | 
			
		||||
from custom_functions.database.user_db import fetch_api_key, fetch_username_by_api_key
 | 
			
		||||
from custom_functions.database.user_db import fetch_username_by_api_key
 | 
			
		||||
from custom_functions.database.unified_db_ops import cache_to_db
 | 
			
		||||
from custom_functions.user_checks.device_allowed import user_allowed_to_use_device
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
remotecdm_wv_bp = Blueprint("remotecdm_wv", __name__)
 | 
			
		||||
with open(f"{os.getcwd()}/configs/config.yaml", "r") as file:
 | 
			
		||||
with open(
 | 
			
		||||
    os.path.join(os.getcwd(), "configs", "config.yaml"), "r", encoding="utf-8"
 | 
			
		||||
) as file:
 | 
			
		||||
    config = yaml.safe_load(file)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def make_response(status, message, data=None, http_status=200):
 | 
			
		||||
    """Make a response."""
 | 
			
		||||
    resp = {"status": status, "message": message}
 | 
			
		||||
    if data is not None:
 | 
			
		||||
        resp["data"] = data
 | 
			
		||||
    return jsonify(resp), http_status
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def check_required_fields(body, required_fields):
 | 
			
		||||
    """Return a response if a required field is missing, else None."""
 | 
			
		||||
    for field in required_fields:
 | 
			
		||||
        if not body.get(field):
 | 
			
		||||
            return make_response(
 | 
			
		||||
                "Error",
 | 
			
		||||
                f'Missing required field "{field}" in JSON body',
 | 
			
		||||
                http_status=400,
 | 
			
		||||
            )
 | 
			
		||||
    return None
 | 
			
		||||
 | 
			
		||||
@remotecdm_wv_bp.route("/remotecdm/widevine", methods=["GET", "HEAD"])
 | 
			
		||||
def remote_cdm_widevine():
 | 
			
		||||
    """Handle the remote device Widevine."""
 | 
			
		||||
    if request.method == "GET":
 | 
			
		||||
        return jsonify(
 | 
			
		||||
            {"status": 200, "message": f"{config['fqdn'].upper()} Remote Widevine CDM."}
 | 
			
		||||
        return make_response(
 | 
			
		||||
            "Success",
 | 
			
		||||
            f"{config['fqdn'].upper()} Remote Widevine CDM.",
 | 
			
		||||
            http_status=200,
 | 
			
		||||
        )
 | 
			
		||||
    if request.method == "HEAD":
 | 
			
		||||
        response = Response(status=200)
 | 
			
		||||
@ -39,171 +67,168 @@ def remote_cdm_widevine():
 | 
			
		||||
            f"https://github.com/devine-dl/pywidevine serve v{__version__}"
 | 
			
		||||
        )
 | 
			
		||||
        return response
 | 
			
		||||
    return make_response(
 | 
			
		||||
        "Error",
 | 
			
		||||
        "Invalid request method",
 | 
			
		||||
        http_status=405,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@remotecdm_wv_bp.route("/remotecdm/widevine/deviceinfo", methods=["GET"])
 | 
			
		||||
def remote_cdm_widevine_deviceinfo():
 | 
			
		||||
    if request.method == "GET":
 | 
			
		||||
        base_name = config["default_wv_cdm"]
 | 
			
		||||
        if not base_name.endswith(".wvd"):
 | 
			
		||||
            base_name = base_name + ".wvd"
 | 
			
		||||
        device = widevineDevice.load(f"{os.getcwd()}/configs/CDMs/WV/{base_name}")
 | 
			
		||||
        cdm = widevineCDM.from_device(device)
 | 
			
		||||
        return jsonify(
 | 
			
		||||
            {
 | 
			
		||||
                "device_type": cdm.device_type.name,
 | 
			
		||||
                "system_id": cdm.system_id,
 | 
			
		||||
                "security_level": cdm.security_level,
 | 
			
		||||
                "host": f'{config["fqdn"]}/remotecdm/widevine',
 | 
			
		||||
                "secret": f'{config["remote_cdm_secret"]}',
 | 
			
		||||
                "device_name": Path(base_name).stem,
 | 
			
		||||
            }
 | 
			
		||||
        )
 | 
			
		||||
    """Handle the remote device Widevine device info."""
 | 
			
		||||
    base_name = config["default_wv_cdm"]
 | 
			
		||||
    if not base_name.endswith(".wvd"):
 | 
			
		||||
        base_name = base_name + ".wvd"
 | 
			
		||||
    device = widevineDevice.load(
 | 
			
		||||
        os.path.join(os.getcwd(), "configs", "CDMs", "WV", base_name)
 | 
			
		||||
    )
 | 
			
		||||
    cdm = widevineCDM.from_device(device)
 | 
			
		||||
    return make_response(
 | 
			
		||||
        "Success",
 | 
			
		||||
        "Successfully got the Widevine CDM device info",
 | 
			
		||||
        {
 | 
			
		||||
            "device_type": cdm.device_type.name,
 | 
			
		||||
            "system_id": cdm.system_id,
 | 
			
		||||
            "security_level": cdm.security_level,
 | 
			
		||||
            "host": f'{config["fqdn"]}/remotecdm/widevine',
 | 
			
		||||
            "secret": f'{config["remote_cdm_secret"]}',
 | 
			
		||||
            "device_name": Path(base_name).stem,
 | 
			
		||||
        },
 | 
			
		||||
        http_status=200,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def sanitize_username(username):
 | 
			
		||||
    """Sanitize the username."""
 | 
			
		||||
    return re.sub(r"[^a-zA-Z0-9_\-]", "_", username).lower()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@remotecdm_wv_bp.route("/remotecdm/widevine/deviceinfo/<device>", methods=["GET"])
 | 
			
		||||
def remote_cdm_widevine_deviceinfo_specific(device):
 | 
			
		||||
    if request.method == "GET":
 | 
			
		||||
        base_name = Path(device).with_suffix(".wvd").name
 | 
			
		||||
        api_key = request.headers["X-Secret-Key"]
 | 
			
		||||
        username = fetch_username_by_api_key(api_key)
 | 
			
		||||
        device = widevineDevice.load(
 | 
			
		||||
            f"{os.getcwd()}/configs/CDMs/{username}/WV/{base_name}"
 | 
			
		||||
    """Handle the remote device Widevine device info specific."""
 | 
			
		||||
    base_name = Path(device).with_suffix(".wvd").name
 | 
			
		||||
    api_key = request.headers["X-Secret-Key"]
 | 
			
		||||
    username = fetch_username_by_api_key(api_key)
 | 
			
		||||
    safe_username = sanitize_username(username)
 | 
			
		||||
    device = widevineDevice.load(
 | 
			
		||||
        os.path.join(
 | 
			
		||||
            os.getcwd(),
 | 
			
		||||
            "configs",
 | 
			
		||||
            "CDMs",
 | 
			
		||||
            "users_uploaded",
 | 
			
		||||
            safe_username,
 | 
			
		||||
            "WV",
 | 
			
		||||
            base_name,
 | 
			
		||||
        )
 | 
			
		||||
        cdm = widevineCDM.from_device(device)
 | 
			
		||||
        return jsonify(
 | 
			
		||||
            {
 | 
			
		||||
                "device_type": cdm.device_type.name,
 | 
			
		||||
                "system_id": cdm.system_id,
 | 
			
		||||
                "security_level": cdm.security_level,
 | 
			
		||||
                "host": f'{config["fqdn"]}/remotecdm/widevine',
 | 
			
		||||
                "secret": f"{api_key}",
 | 
			
		||||
                "device_name": Path(base_name).stem,
 | 
			
		||||
            }
 | 
			
		||||
    )
 | 
			
		||||
    cdm = widevineCDM.from_device(device)
 | 
			
		||||
    return make_response(
 | 
			
		||||
        "Success",
 | 
			
		||||
        "Successfully got the Widevine CDM device info (by user)",
 | 
			
		||||
        {
 | 
			
		||||
            "device_type": cdm.device_type.name,
 | 
			
		||||
            "system_id": cdm.system_id,
 | 
			
		||||
            "security_level": cdm.security_level,
 | 
			
		||||
            "host": f'{config["fqdn"]}/remotecdm/widevine',
 | 
			
		||||
            "secret": f"{api_key}",
 | 
			
		||||
            "device_name": Path(base_name).stem,
 | 
			
		||||
        },
 | 
			
		||||
        http_status=200,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def load_widevine_device(device_name, api_key=None):
 | 
			
		||||
    """Load a Widevine device, either default or user-uploaded."""
 | 
			
		||||
    try:
 | 
			
		||||
        if device_name.lower() == config["default_wv_cdm"].lower():
 | 
			
		||||
            path = os.path.join(
 | 
			
		||||
                os.getcwd(), "configs", "CDMs", "WV", config["default_wv_cdm"] + ".wvd"
 | 
			
		||||
            )
 | 
			
		||||
        else:
 | 
			
		||||
            if not api_key:
 | 
			
		||||
                return None
 | 
			
		||||
            username = fetch_username_by_api_key(api_key)
 | 
			
		||||
            if not username or not user_allowed_to_use_device(
 | 
			
		||||
                device=device_name, username=username
 | 
			
		||||
            ):
 | 
			
		||||
                return None
 | 
			
		||||
            safe_username = sanitize_username(username)
 | 
			
		||||
            path = os.path.join(
 | 
			
		||||
                os.getcwd(),
 | 
			
		||||
                "configs",
 | 
			
		||||
                "CDMs",
 | 
			
		||||
                "users_uploaded",
 | 
			
		||||
                safe_username,
 | 
			
		||||
                "WV",
 | 
			
		||||
                device_name + ".wvd",
 | 
			
		||||
            )
 | 
			
		||||
        return widevineDevice.load(path)
 | 
			
		||||
    except (FileNotFoundError, ValueError):
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_cdm_or_error(device):
 | 
			
		||||
    """Get the CDM or return an error response."""
 | 
			
		||||
    cdm = current_app.config.get("CDM")
 | 
			
		||||
    if not cdm:
 | 
			
		||||
        return make_response(
 | 
			
		||||
            "Error",
 | 
			
		||||
            f'No CDM session for "{device}" has been opened yet. No session to use',
 | 
			
		||||
            http_status=400,
 | 
			
		||||
        )
 | 
			
		||||
    return cdm
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@remotecdm_wv_bp.route("/remotecdm/widevine/<device>/open", methods=["GET"])
 | 
			
		||||
def remote_cdm_widevine_open(device):
 | 
			
		||||
    if str(device).lower() == config["default_wv_cdm"].lower():
 | 
			
		||||
        wv_device = widevineDevice.load(
 | 
			
		||||
            f'{os.getcwd()}/configs/CDMs/WV/{config["default_wv_cdm"]}.wvd'
 | 
			
		||||
        )
 | 
			
		||||
        cdm = current_app.config["CDM"] = widevineCDM.from_device(wv_device)
 | 
			
		||||
        session_id = cdm.open()
 | 
			
		||||
        return (
 | 
			
		||||
            jsonify(
 | 
			
		||||
                {
 | 
			
		||||
                    "status": 200,
 | 
			
		||||
                    "message": "Success",
 | 
			
		||||
                    "data": {
 | 
			
		||||
                        "session_id": session_id.hex(),
 | 
			
		||||
                        "device": {
 | 
			
		||||
                            "system_id": cdm.system_id,
 | 
			
		||||
                            "security_level": cdm.security_level,
 | 
			
		||||
                        },
 | 
			
		||||
                    },
 | 
			
		||||
                }
 | 
			
		||||
            ),
 | 
			
		||||
            200,
 | 
			
		||||
        )
 | 
			
		||||
    if (
 | 
			
		||||
        request.headers["X-Secret-Key"]
 | 
			
		||||
        and str(device).lower() != config["default_wv_cdm"].lower()
 | 
			
		||||
    ):
 | 
			
		||||
        api_key = request.headers["X-Secret-Key"]
 | 
			
		||||
        user = fetch_username_by_api_key(api_key=api_key)
 | 
			
		||||
        if user:
 | 
			
		||||
            if user_allowed_to_use_device(device=device, username=user):
 | 
			
		||||
                wv_device = widevineDevice.load(
 | 
			
		||||
                    f"{os.getcwd()}/configs/CDMs/{user}/WV/{device}.wvd"
 | 
			
		||||
                )
 | 
			
		||||
                cdm = current_app.config["CDM"] = widevineCDM.from_device(wv_device)
 | 
			
		||||
                session_id = cdm.open()
 | 
			
		||||
                return (
 | 
			
		||||
                    jsonify(
 | 
			
		||||
                        {
 | 
			
		||||
                            "status": 200,
 | 
			
		||||
                            "message": "Success",
 | 
			
		||||
                            "data": {
 | 
			
		||||
                                "session_id": session_id.hex(),
 | 
			
		||||
                                "device": {
 | 
			
		||||
                                    "system_id": cdm.system_id,
 | 
			
		||||
                                    "security_level": cdm.security_level,
 | 
			
		||||
                                },
 | 
			
		||||
                            },
 | 
			
		||||
                        }
 | 
			
		||||
                    ),
 | 
			
		||||
                    200,
 | 
			
		||||
                )
 | 
			
		||||
            else:
 | 
			
		||||
                return (
 | 
			
		||||
                    jsonify(
 | 
			
		||||
                        {
 | 
			
		||||
                            "message": f"Device '{device}' is not found or you are not authorized to use it.",
 | 
			
		||||
                            "status": 403,
 | 
			
		||||
                        }
 | 
			
		||||
                    ),
 | 
			
		||||
                    403,
 | 
			
		||||
                )
 | 
			
		||||
        else:
 | 
			
		||||
            return (
 | 
			
		||||
                jsonify(
 | 
			
		||||
                    {
 | 
			
		||||
                        "message": f"Device '{device}' is not found or you are not authorized to use it.",
 | 
			
		||||
                        "status": 403,
 | 
			
		||||
                    }
 | 
			
		||||
                ),
 | 
			
		||||
                403,
 | 
			
		||||
            )
 | 
			
		||||
    else:
 | 
			
		||||
        return (
 | 
			
		||||
            jsonify(
 | 
			
		||||
                {
 | 
			
		||||
                    "message": f"Device '{device}' is not found or you are not authorized to use it.",
 | 
			
		||||
                    "status": 403,
 | 
			
		||||
                }
 | 
			
		||||
            ),
 | 
			
		||||
            403,
 | 
			
		||||
    """Handle the remote device Widevine open."""
 | 
			
		||||
    api_key = request.headers.get("X-Secret-Key")
 | 
			
		||||
    wv_device = load_widevine_device(device, api_key)
 | 
			
		||||
    if not wv_device:
 | 
			
		||||
        return make_response(
 | 
			
		||||
            "Error",
 | 
			
		||||
            f"Device '{device}' is not found or you are not authorized to use it.",
 | 
			
		||||
            http_status=403,
 | 
			
		||||
        )
 | 
			
		||||
    cdm = current_app.config["CDM"] = widevineCDM.from_device(wv_device)
 | 
			
		||||
    session_id = cdm.open()
 | 
			
		||||
    return make_response(
 | 
			
		||||
        "Success",
 | 
			
		||||
        "Successfully opened the Widevine CDM session",
 | 
			
		||||
        {
 | 
			
		||||
            "session_id": session_id.hex(),
 | 
			
		||||
            "device": {
 | 
			
		||||
                "system_id": cdm.system_id,
 | 
			
		||||
                "security_level": cdm.security_level,
 | 
			
		||||
            },
 | 
			
		||||
        },
 | 
			
		||||
        http_status=200,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@remotecdm_wv_bp.route(
 | 
			
		||||
    "/remotecdm/widevine/<device>/close/<session_id>", methods=["GET"]
 | 
			
		||||
)
 | 
			
		||||
def remote_cdm_widevine_close(device, session_id):
 | 
			
		||||
    """Handle the remote device Widevine close."""
 | 
			
		||||
    session_id = bytes.fromhex(session_id)
 | 
			
		||||
    cdm = current_app.config["CDM"]
 | 
			
		||||
    if not cdm:
 | 
			
		||||
        return (
 | 
			
		||||
            jsonify(
 | 
			
		||||
                {
 | 
			
		||||
                    "status": 400,
 | 
			
		||||
                    "message": f'No CDM for "{device}" has been opened yet. No session to close',
 | 
			
		||||
                }
 | 
			
		||||
            ),
 | 
			
		||||
            400,
 | 
			
		||||
        )
 | 
			
		||||
    cdm = get_cdm_or_error(device)
 | 
			
		||||
    if isinstance(cdm, tuple):  # error response
 | 
			
		||||
        return cdm
 | 
			
		||||
    try:
 | 
			
		||||
        cdm.close(session_id)
 | 
			
		||||
    except InvalidSession:
 | 
			
		||||
        return (
 | 
			
		||||
            jsonify(
 | 
			
		||||
                {
 | 
			
		||||
                    "status": 400,
 | 
			
		||||
                    "message": f'Invalid session ID "{session_id.hex()}", it may have expired',
 | 
			
		||||
                }
 | 
			
		||||
            ),
 | 
			
		||||
            400,
 | 
			
		||||
        return make_response(
 | 
			
		||||
            "Error",
 | 
			
		||||
            f'Invalid session ID "{session_id.hex()}", it may have expired',
 | 
			
		||||
            http_status=400,
 | 
			
		||||
        )
 | 
			
		||||
    return (
 | 
			
		||||
        jsonify(
 | 
			
		||||
            {
 | 
			
		||||
                "status": 200,
 | 
			
		||||
                "message": f'Successfully closed Session "{session_id.hex()}".',
 | 
			
		||||
            }
 | 
			
		||||
        ),
 | 
			
		||||
        200,
 | 
			
		||||
 | 
			
		||||
    return make_response(
 | 
			
		||||
        "Success",
 | 
			
		||||
        f'Successfully closed Session "{session_id.hex()}".',
 | 
			
		||||
        http_status=200,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -211,80 +236,44 @@ def remote_cdm_widevine_close(device, session_id):
 | 
			
		||||
    "/remotecdm/widevine/<device>/set_service_certificate", methods=["POST"]
 | 
			
		||||
)
 | 
			
		||||
def remote_cdm_widevine_set_service_certificate(device):
 | 
			
		||||
    """Handle the remote device Widevine set service certificate."""
 | 
			
		||||
    body = request.get_json()
 | 
			
		||||
    for required_field in ("session_id", "certificate"):
 | 
			
		||||
        if required_field == "certificate":
 | 
			
		||||
            has_field = (
 | 
			
		||||
                required_field in body
 | 
			
		||||
            )  # it needs the key, but can be empty/null
 | 
			
		||||
        else:
 | 
			
		||||
            has_field = body.get(required_field)
 | 
			
		||||
        if not has_field:
 | 
			
		||||
            return (
 | 
			
		||||
                jsonify(
 | 
			
		||||
                    {
 | 
			
		||||
                        "status": 400,
 | 
			
		||||
                        "message": f'Missing required field "{required_field}" in JSON body',
 | 
			
		||||
                    }
 | 
			
		||||
                ),
 | 
			
		||||
                400,
 | 
			
		||||
            )
 | 
			
		||||
    missing_field = check_required_fields(body, ("session_id", "certificate"))
 | 
			
		||||
    if missing_field:
 | 
			
		||||
        return missing_field
 | 
			
		||||
 | 
			
		||||
    session_id = bytes.fromhex(body["session_id"])
 | 
			
		||||
 | 
			
		||||
    cdm = current_app.config["CDM"]
 | 
			
		||||
    if not cdm:
 | 
			
		||||
        return (
 | 
			
		||||
            jsonify(
 | 
			
		||||
                {
 | 
			
		||||
                    "status": 400,
 | 
			
		||||
                    "message": f'No CDM session for "{device}" has been opened yet. No session to use',
 | 
			
		||||
                }
 | 
			
		||||
            ),
 | 
			
		||||
            400,
 | 
			
		||||
        )
 | 
			
		||||
    cdm = get_cdm_or_error(device)
 | 
			
		||||
    if isinstance(cdm, tuple):  # error response
 | 
			
		||||
        return cdm
 | 
			
		||||
 | 
			
		||||
    certificate = body["certificate"]
 | 
			
		||||
    try:
 | 
			
		||||
        provider_id = cdm.set_service_certificate(session_id, certificate)
 | 
			
		||||
    except InvalidSession:
 | 
			
		||||
        return (
 | 
			
		||||
            jsonify(
 | 
			
		||||
                {
 | 
			
		||||
                    "status": 400,
 | 
			
		||||
                    "message": f'Invalid session id: "{session_id.hex()}", it may have expired',
 | 
			
		||||
                }
 | 
			
		||||
            ),
 | 
			
		||||
            400,
 | 
			
		||||
        return make_response(
 | 
			
		||||
            "Error",
 | 
			
		||||
            f'Invalid session id: "{session_id.hex()}", it may have expired',
 | 
			
		||||
            http_status=400,
 | 
			
		||||
        )
 | 
			
		||||
    except DecodeError as error:
 | 
			
		||||
        return (
 | 
			
		||||
            jsonify(
 | 
			
		||||
                {"status": 400, "message": f"Invalid Service Certificate, {error}"}
 | 
			
		||||
            ),
 | 
			
		||||
            400,
 | 
			
		||||
        return make_response(
 | 
			
		||||
            "Error",
 | 
			
		||||
            f"Invalid Service Certificate, {error}",
 | 
			
		||||
            http_status=400,
 | 
			
		||||
        )
 | 
			
		||||
    except SignatureMismatch:
 | 
			
		||||
        return (
 | 
			
		||||
            jsonify(
 | 
			
		||||
                {
 | 
			
		||||
                    "status": 400,
 | 
			
		||||
                    "message": "Signature Validation failed on the Service Certificate, rejecting",
 | 
			
		||||
                }
 | 
			
		||||
            ),
 | 
			
		||||
            400,
 | 
			
		||||
        return make_response(
 | 
			
		||||
            "Error",
 | 
			
		||||
            "Signature Validation failed on the Service Certificate, rejecting",
 | 
			
		||||
            http_status=400,
 | 
			
		||||
        )
 | 
			
		||||
    return (
 | 
			
		||||
        jsonify(
 | 
			
		||||
            {
 | 
			
		||||
                "status": 200,
 | 
			
		||||
                "message": f"Successfully {['set', 'unset'][not certificate]} the Service Certificate.",
 | 
			
		||||
                "data": {
 | 
			
		||||
                    "provider_id": provider_id,
 | 
			
		||||
                },
 | 
			
		||||
            }
 | 
			
		||||
        ),
 | 
			
		||||
        200,
 | 
			
		||||
    return make_response(
 | 
			
		||||
        "Success",
 | 
			
		||||
        f"Successfully {['set', 'unset'][not certificate]} the Service Certificate.",
 | 
			
		||||
        {"provider_id": provider_id},
 | 
			
		||||
        http_status=200,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -292,45 +281,25 @@ def remote_cdm_widevine_set_service_certificate(device):
 | 
			
		||||
    "/remotecdm/widevine/<device>/get_service_certificate", methods=["POST"]
 | 
			
		||||
)
 | 
			
		||||
def remote_cdm_widevine_get_service_certificate(device):
 | 
			
		||||
    """Handle the remote device Widevine get service certificate."""
 | 
			
		||||
    body = request.get_json()
 | 
			
		||||
    for required_field in ("session_id",):
 | 
			
		||||
        if not body.get(required_field):
 | 
			
		||||
            return (
 | 
			
		||||
                jsonify(
 | 
			
		||||
                    {
 | 
			
		||||
                        "status": 400,
 | 
			
		||||
                        "message": f'Missing required field "{required_field}" in JSON body',
 | 
			
		||||
                    }
 | 
			
		||||
                ),
 | 
			
		||||
                400,
 | 
			
		||||
            )
 | 
			
		||||
    missing_field = check_required_fields(body, ("session_id",))
 | 
			
		||||
    if missing_field:
 | 
			
		||||
        return missing_field
 | 
			
		||||
 | 
			
		||||
    session_id = bytes.fromhex(body["session_id"])
 | 
			
		||||
 | 
			
		||||
    cdm = current_app.config["CDM"]
 | 
			
		||||
 | 
			
		||||
    if not cdm:
 | 
			
		||||
        return (
 | 
			
		||||
            jsonify(
 | 
			
		||||
                {
 | 
			
		||||
                    "status": 400,
 | 
			
		||||
                    "message": f'No CDM session for "{device}" has been opened yet. No session to use',
 | 
			
		||||
                }
 | 
			
		||||
            ),
 | 
			
		||||
            400,
 | 
			
		||||
        )
 | 
			
		||||
    cdm = get_cdm_or_error(device)
 | 
			
		||||
    if isinstance(cdm, tuple):  # error response
 | 
			
		||||
        return cdm
 | 
			
		||||
 | 
			
		||||
    try:
 | 
			
		||||
        service_certificate = cdm.get_service_certificate(session_id)
 | 
			
		||||
    except InvalidSession:
 | 
			
		||||
        return (
 | 
			
		||||
            jsonify(
 | 
			
		||||
                {
 | 
			
		||||
                    "status": 400,
 | 
			
		||||
                    "message": f'Invalid Session ID "{session_id.hex()}", it may have expired',
 | 
			
		||||
                }
 | 
			
		||||
            ),
 | 
			
		||||
            400,
 | 
			
		||||
        return make_response(
 | 
			
		||||
            "Error",
 | 
			
		||||
            f'Invalid Session ID "{session_id.hex()}", it may have expired',
 | 
			
		||||
            http_status=400,
 | 
			
		||||
        )
 | 
			
		||||
    if service_certificate:
 | 
			
		||||
        service_certificate_b64 = base64.b64encode(
 | 
			
		||||
@ -338,17 +307,11 @@ def remote_cdm_widevine_get_service_certificate(device):
 | 
			
		||||
        ).decode()
 | 
			
		||||
    else:
 | 
			
		||||
        service_certificate_b64 = None
 | 
			
		||||
    return (
 | 
			
		||||
        jsonify(
 | 
			
		||||
            {
 | 
			
		||||
                "status": 200,
 | 
			
		||||
                "message": "Successfully got the Service Certificate",
 | 
			
		||||
                "data": {
 | 
			
		||||
                    "service_certificate": service_certificate_b64,
 | 
			
		||||
                },
 | 
			
		||||
            }
 | 
			
		||||
        ),
 | 
			
		||||
        200,
 | 
			
		||||
    return make_response(
 | 
			
		||||
        "Success",
 | 
			
		||||
        "Successfully got the Service Certificate",
 | 
			
		||||
        {"service_certificate": service_certificate_b64},
 | 
			
		||||
        http_status=200,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -357,42 +320,23 @@ def remote_cdm_widevine_get_service_certificate(device):
 | 
			
		||||
    methods=["POST"],
 | 
			
		||||
)
 | 
			
		||||
def remote_cdm_widevine_get_license_challenge(device, license_type):
 | 
			
		||||
    """Handle the remote device Widevine get license challenge."""
 | 
			
		||||
    body = request.get_json()
 | 
			
		||||
    for required_field in ("session_id", "init_data"):
 | 
			
		||||
        if not body.get(required_field):
 | 
			
		||||
            return (
 | 
			
		||||
                jsonify(
 | 
			
		||||
                    {
 | 
			
		||||
                        "status": 400,
 | 
			
		||||
                        "message": f'Missing required field "{required_field}" in JSON body',
 | 
			
		||||
                    }
 | 
			
		||||
                ),
 | 
			
		||||
                400,
 | 
			
		||||
            )
 | 
			
		||||
    missing_field = check_required_fields(body, ("session_id", "init_data"))
 | 
			
		||||
    if missing_field:
 | 
			
		||||
        return missing_field
 | 
			
		||||
    session_id = bytes.fromhex(body["session_id"])
 | 
			
		||||
    privacy_mode = body.get("privacy_mode", True)
 | 
			
		||||
    cdm = current_app.config["CDM"]
 | 
			
		||||
    if not cdm:
 | 
			
		||||
        return (
 | 
			
		||||
            jsonify(
 | 
			
		||||
                {
 | 
			
		||||
                    "status": 400,
 | 
			
		||||
                    "message": f'No CDM session for "{device}" has been opened yet. No session to use',
 | 
			
		||||
                }
 | 
			
		||||
            ),
 | 
			
		||||
            400,
 | 
			
		||||
        )
 | 
			
		||||
    cdm = get_cdm_or_error(device)
 | 
			
		||||
    if isinstance(cdm, tuple):  # error response
 | 
			
		||||
        return cdm
 | 
			
		||||
    if current_app.config.get("force_privacy_mode"):
 | 
			
		||||
        privacy_mode = True
 | 
			
		||||
        if not cdm.get_service_certificate(session_id):
 | 
			
		||||
            return (
 | 
			
		||||
                jsonify(
 | 
			
		||||
                    {
 | 
			
		||||
                        "status": 403,
 | 
			
		||||
                        "message": "No Service Certificate set but Privacy Mode is Enforced.",
 | 
			
		||||
                    }
 | 
			
		||||
                ),
 | 
			
		||||
                403,
 | 
			
		||||
            return make_response(
 | 
			
		||||
                "Error",
 | 
			
		||||
                "No Service Certificate set but Privacy Mode is Enforced.",
 | 
			
		||||
                http_status=403,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    current_app.config["pssh"] = body["init_data"]
 | 
			
		||||
@ -406,97 +350,72 @@ def remote_cdm_widevine_get_license_challenge(device, license_type):
 | 
			
		||||
            privacy_mode=privacy_mode,
 | 
			
		||||
        )
 | 
			
		||||
    except InvalidSession:
 | 
			
		||||
        return (
 | 
			
		||||
            jsonify(
 | 
			
		||||
                {
 | 
			
		||||
                    "status": 400,
 | 
			
		||||
                    "message": f'Invalid Session ID "{session_id.hex()}", it may have expired',
 | 
			
		||||
                }
 | 
			
		||||
            ),
 | 
			
		||||
            400,
 | 
			
		||||
        return make_response(
 | 
			
		||||
            "Error",
 | 
			
		||||
            f'Invalid Session ID "{session_id.hex()}", it may have expired',
 | 
			
		||||
            http_status=400,
 | 
			
		||||
        )
 | 
			
		||||
    except InvalidInitData as error:
 | 
			
		||||
        return jsonify({"status": 400, "message": f"Invalid Init Data, {error}"}), 400
 | 
			
		||||
    except InvalidLicenseType:
 | 
			
		||||
        return (
 | 
			
		||||
            jsonify({"status": 400, "message": f"Invalid License Type {license_type}"}),
 | 
			
		||||
            400,
 | 
			
		||||
        return make_response(
 | 
			
		||||
            "Error",
 | 
			
		||||
            f"Invalid Init Data, {error}",
 | 
			
		||||
            http_status=400,
 | 
			
		||||
        )
 | 
			
		||||
    return (
 | 
			
		||||
        jsonify(
 | 
			
		||||
            {
 | 
			
		||||
                "status": 200,
 | 
			
		||||
                "message": "Success",
 | 
			
		||||
                "data": {"challenge_b64": base64.b64encode(license_request).decode()},
 | 
			
		||||
            }
 | 
			
		||||
        ),
 | 
			
		||||
        200,
 | 
			
		||||
    except InvalidLicenseType:
 | 
			
		||||
        return make_response(
 | 
			
		||||
            "Error",
 | 
			
		||||
            f"Invalid License Type {license_type}",
 | 
			
		||||
            http_status=400,
 | 
			
		||||
        )
 | 
			
		||||
    return make_response(
 | 
			
		||||
        "Success",
 | 
			
		||||
        "Successfully got the License Challenge",
 | 
			
		||||
        {"challenge_b64": base64.b64encode(license_request).decode()},
 | 
			
		||||
        http_status=200,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@remotecdm_wv_bp.route("/remotecdm/widevine/<device>/parse_license", methods=["POST"])
 | 
			
		||||
def remote_cdm_widevine_parse_license(device):
 | 
			
		||||
    """Handle the remote device Widevine parse license."""
 | 
			
		||||
    body = request.get_json()
 | 
			
		||||
    for required_field in ("session_id", "license_message"):
 | 
			
		||||
        if not body.get(required_field):
 | 
			
		||||
            return (
 | 
			
		||||
                jsonify(
 | 
			
		||||
                    {
 | 
			
		||||
                        "status": 400,
 | 
			
		||||
                        "message": f'Missing required field "{required_field}" in JSON body',
 | 
			
		||||
                    }
 | 
			
		||||
                ),
 | 
			
		||||
                400,
 | 
			
		||||
            )
 | 
			
		||||
    missing_field = check_required_fields(body, ("session_id", "license_message"))
 | 
			
		||||
    if missing_field:
 | 
			
		||||
        return missing_field
 | 
			
		||||
    session_id = bytes.fromhex(body["session_id"])
 | 
			
		||||
    cdm = current_app.config["CDM"]
 | 
			
		||||
    if not cdm:
 | 
			
		||||
        return (
 | 
			
		||||
            jsonify(
 | 
			
		||||
                {
 | 
			
		||||
                    "status": 400,
 | 
			
		||||
                    "message": f'No CDM session for "{device}" has been opened yet. No session to use',
 | 
			
		||||
                }
 | 
			
		||||
            ),
 | 
			
		||||
            400,
 | 
			
		||||
        )
 | 
			
		||||
    cdm = get_cdm_or_error(device)
 | 
			
		||||
    if isinstance(cdm, tuple):  # error response
 | 
			
		||||
        return cdm
 | 
			
		||||
    try:
 | 
			
		||||
        cdm.parse_license(session_id, body["license_message"])
 | 
			
		||||
    except InvalidLicenseMessage as error:
 | 
			
		||||
        return (
 | 
			
		||||
            jsonify({"status": 400, "message": f"Invalid License Message, {error}"}),
 | 
			
		||||
            400,
 | 
			
		||||
        return make_response(
 | 
			
		||||
            "Error",
 | 
			
		||||
            f"Invalid License Message, {error}",
 | 
			
		||||
            http_status=400,
 | 
			
		||||
        )
 | 
			
		||||
    except InvalidContext as error:
 | 
			
		||||
        return jsonify({"status": 400, "message": f"Invalid Context, {error}"}), 400
 | 
			
		||||
        return make_response(
 | 
			
		||||
            "Error",
 | 
			
		||||
            f"Invalid Context, {error}",
 | 
			
		||||
            http_status=400,
 | 
			
		||||
        )
 | 
			
		||||
    except InvalidSession:
 | 
			
		||||
        return (
 | 
			
		||||
            jsonify(
 | 
			
		||||
                {
 | 
			
		||||
                    "status": 400,
 | 
			
		||||
                    "message": f'Invalid Session ID "{session_id.hex()}", it may have expired',
 | 
			
		||||
                }
 | 
			
		||||
            ),
 | 
			
		||||
            400,
 | 
			
		||||
        return make_response(
 | 
			
		||||
            "Error",
 | 
			
		||||
            f'Invalid Session ID "{session_id.hex()}", it may have expired',
 | 
			
		||||
            http_status=400,
 | 
			
		||||
        )
 | 
			
		||||
    except SignatureMismatch:
 | 
			
		||||
        return (
 | 
			
		||||
            jsonify(
 | 
			
		||||
                {
 | 
			
		||||
                    "status": 400,
 | 
			
		||||
                    "message": f"Signature Validation failed on the License Message, rejecting.",
 | 
			
		||||
                }
 | 
			
		||||
            ),
 | 
			
		||||
            400,
 | 
			
		||||
        return make_response(
 | 
			
		||||
            "Error",
 | 
			
		||||
            "Signature Validation failed on the License Message, rejecting.",
 | 
			
		||||
            http_status=400,
 | 
			
		||||
        )
 | 
			
		||||
    return (
 | 
			
		||||
        jsonify(
 | 
			
		||||
            {
 | 
			
		||||
                "status": 200,
 | 
			
		||||
                "message": "Successfully parsed and loaded the Keys from the License message.",
 | 
			
		||||
            }
 | 
			
		||||
        ),
 | 
			
		||||
        200,
 | 
			
		||||
    return make_response(
 | 
			
		||||
        "Success",
 | 
			
		||||
        "Successfully parsed and loaded the Keys from the License message.",
 | 
			
		||||
        http_status=200,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -504,54 +423,30 @@ def remote_cdm_widevine_parse_license(device):
 | 
			
		||||
    "/remotecdm/widevine/<device>/get_keys/<key_type>", methods=["POST"]
 | 
			
		||||
)
 | 
			
		||||
def remote_cdm_widevine_get_keys(device, key_type):
 | 
			
		||||
    """Handle the remote device Widevine get keys."""
 | 
			
		||||
    body = request.get_json()
 | 
			
		||||
    for required_field in ("session_id",):
 | 
			
		||||
        if not body.get(required_field):
 | 
			
		||||
            return (
 | 
			
		||||
                jsonify(
 | 
			
		||||
                    {
 | 
			
		||||
                        "status": 400,
 | 
			
		||||
                        "message": f'Missing required field "{required_field}" in JSON body',
 | 
			
		||||
                    }
 | 
			
		||||
                ),
 | 
			
		||||
                400,
 | 
			
		||||
            )
 | 
			
		||||
    missing_field = check_required_fields(body, ("session_id",))
 | 
			
		||||
    if missing_field:
 | 
			
		||||
        return missing_field
 | 
			
		||||
    session_id = bytes.fromhex(body["session_id"])
 | 
			
		||||
    key_type: Optional[str] = key_type
 | 
			
		||||
    if key_type == "ALL":
 | 
			
		||||
        key_type = None
 | 
			
		||||
    cdm = current_app.config["CDM"]
 | 
			
		||||
    if not cdm:
 | 
			
		||||
        return (
 | 
			
		||||
            jsonify(
 | 
			
		||||
                {
 | 
			
		||||
                    "status": 400,
 | 
			
		||||
                    "message": f'No CDM session for "{device}" has been opened yet. No session to use',
 | 
			
		||||
                }
 | 
			
		||||
            ),
 | 
			
		||||
            400,
 | 
			
		||||
        )
 | 
			
		||||
    cdm = get_cdm_or_error(device)
 | 
			
		||||
    if isinstance(cdm, tuple):  # error response
 | 
			
		||||
        return cdm
 | 
			
		||||
    try:
 | 
			
		||||
        keys = cdm.get_keys(session_id, key_type)
 | 
			
		||||
    except InvalidSession:
 | 
			
		||||
        return (
 | 
			
		||||
            jsonify(
 | 
			
		||||
                {
 | 
			
		||||
                    "status": 400,
 | 
			
		||||
                    "message": f'Invalid Session ID "{session_id.hex()}", it may have expired',
 | 
			
		||||
                }
 | 
			
		||||
            ),
 | 
			
		||||
            400,
 | 
			
		||||
        return make_response(
 | 
			
		||||
            "Error",
 | 
			
		||||
            f'Invalid Session ID "{session_id.hex()}", it may have expired',
 | 
			
		||||
            http_status=400,
 | 
			
		||||
        )
 | 
			
		||||
    except ValueError as error:
 | 
			
		||||
        return (
 | 
			
		||||
            jsonify(
 | 
			
		||||
                {
 | 
			
		||||
                    "status": 400,
 | 
			
		||||
                    "message": f'The Key Type value "{key_type}" is invalid, {error}',
 | 
			
		||||
                }
 | 
			
		||||
            ),
 | 
			
		||||
            400,
 | 
			
		||||
        return make_response(
 | 
			
		||||
            "Error",
 | 
			
		||||
            f'The Key Type value "{key_type}" is invalid, {error}',
 | 
			
		||||
            http_status=400,
 | 
			
		||||
        )
 | 
			
		||||
    keys_json = [
 | 
			
		||||
        {
 | 
			
		||||
@ -564,10 +459,6 @@ def remote_cdm_widevine_get_keys(device, key_type):
 | 
			
		||||
        if not key_type or key.type == key_type
 | 
			
		||||
    ]
 | 
			
		||||
    for entry in keys_json:
 | 
			
		||||
        if config["database_type"].lower() != "mariadb":
 | 
			
		||||
            from custom_functions.database.cache_to_db_sqlite import cache_to_db
 | 
			
		||||
        elif config["database_type"].lower() == "mariadb":
 | 
			
		||||
            from custom_functions.database.cache_to_db_mariadb import cache_to_db
 | 
			
		||||
        if entry["type"] != "SIGNING":
 | 
			
		||||
            cache_to_db(
 | 
			
		||||
                pssh=str(current_app.config["pssh"]),
 | 
			
		||||
@ -575,7 +466,9 @@ def remote_cdm_widevine_get_keys(device, key_type):
 | 
			
		||||
                key=entry["key"],
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    return (
 | 
			
		||||
        jsonify({"status": 200, "message": "Success", "data": {"keys": keys_json}}),
 | 
			
		||||
        200,
 | 
			
		||||
    return make_response(
 | 
			
		||||
        "Success",
 | 
			
		||||
        "Successfully got the Keys",
 | 
			
		||||
        {"keys": keys_json},
 | 
			
		||||
        http_status=200,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user