mirror of
				https://github.com/devine-dl/pywidevine.git
				synced 2025-11-04 03:44:50 +00:00 
			
		
		
		
	Cdm: Implement get_service_certificate()
This commit is contained in:
		
							parent
							
								
									987eee2b0f
								
							
						
					
					
						commit
						768c4e7851
					
				@ -232,6 +232,24 @@ class Cdm:
 | 
			
		||||
            drm_certificate.ParseFromString(signed_drm_certificate.drm_certificate)
 | 
			
		||||
            return drm_certificate.provider_id
 | 
			
		||||
 | 
			
		||||
    def get_service_certificate(self, session_id: bytes) -> Optional[SignedMessage]:
 | 
			
		||||
        """
 | 
			
		||||
        Get the currently set Service Privacy Certificate of the Session.
 | 
			
		||||
 | 
			
		||||
        Parameters:
 | 
			
		||||
            session_id: Session identifier.
 | 
			
		||||
 | 
			
		||||
        Raises:
 | 
			
		||||
            InvalidSession: If the Session identifier is invalid.
 | 
			
		||||
 | 
			
		||||
        Returns the Service Certificate if one is set, otherwise None.
 | 
			
		||||
        """
 | 
			
		||||
        session = self.__sessions.get(session_id)
 | 
			
		||||
        if not session:
 | 
			
		||||
            raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
 | 
			
		||||
 | 
			
		||||
        return session.service_certificate
 | 
			
		||||
 | 
			
		||||
    def get_license_challenge(
 | 
			
		||||
        self,
 | 
			
		||||
        session_id: bytes,
 | 
			
		||||
 | 
			
		||||
@ -6,14 +6,18 @@ import re
 | 
			
		||||
from typing import Union, Optional
 | 
			
		||||
 | 
			
		||||
import requests
 | 
			
		||||
from Crypto.Hash import SHA1
 | 
			
		||||
from Crypto.PublicKey import RSA
 | 
			
		||||
from Crypto.Signature import pss
 | 
			
		||||
from google.protobuf.message import DecodeError
 | 
			
		||||
from pywidevine.cdm import Cdm
 | 
			
		||||
from pywidevine.device import Device
 | 
			
		||||
from pywidevine.exceptions import InvalidInitData, InvalidLicenseType, InvalidLicenseMessage, DeviceMismatch
 | 
			
		||||
from pywidevine.exceptions import InvalidInitData, InvalidLicenseType, InvalidLicenseMessage, DeviceMismatch, \
 | 
			
		||||
    SignatureMismatch
 | 
			
		||||
from pywidevine.key import Key
 | 
			
		||||
 | 
			
		||||
from pywidevine.license_protocol_pb2 import LicenseType, SignedMessage, License, ClientIdentification
 | 
			
		||||
from pywidevine.license_protocol_pb2 import LicenseType, SignedMessage, License, ClientIdentification, \
 | 
			
		||||
    SignedDrmCertificate
 | 
			
		||||
from pywidevine.pssh import PSSH
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -139,6 +143,53 @@ class RemoteCdm(Cdm):
 | 
			
		||||
 | 
			
		||||
        return r["provider_id"]
 | 
			
		||||
 | 
			
		||||
    def get_service_certificate(self, session_id: bytes) -> Optional[SignedMessage]:
 | 
			
		||||
        r = self.__session.post(
 | 
			
		||||
            url=f"{self.host}/{self.device_name}/get_service_certificate",
 | 
			
		||||
            json={
 | 
			
		||||
                "session_id": session_id.hex()
 | 
			
		||||
            }
 | 
			
		||||
        ).json()
 | 
			
		||||
        if r["status"] != 200:
 | 
			
		||||
            raise ValueError(f"Cannot Get CDMs Service Certificate, {r['message']} [{r['status']}]")
 | 
			
		||||
        r = r["data"]
 | 
			
		||||
 | 
			
		||||
        service_certificate = r["service_certificate"]
 | 
			
		||||
        if not service_certificate:
 | 
			
		||||
            return None
 | 
			
		||||
 | 
			
		||||
        service_certificate = base64.b64decode(service_certificate)
 | 
			
		||||
        signed_message = SignedMessage()
 | 
			
		||||
        signed_drm_certificate = SignedDrmCertificate()
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            signed_message.ParseFromString(service_certificate)
 | 
			
		||||
            if signed_message.SerializeToString() == service_certificate:
 | 
			
		||||
                signed_drm_certificate.ParseFromString(signed_message.msg)
 | 
			
		||||
            else:
 | 
			
		||||
                signed_drm_certificate.ParseFromString(service_certificate)
 | 
			
		||||
                if signed_drm_certificate.SerializeToString() != service_certificate:
 | 
			
		||||
                    raise DecodeError("partial parse")
 | 
			
		||||
                # Craft a SignedMessage as it's stored as a SignedMessage
 | 
			
		||||
                signed_message.Clear()
 | 
			
		||||
                signed_message.msg = signed_drm_certificate.SerializeToString()
 | 
			
		||||
                # we don't need to sign this message, this is normal
 | 
			
		||||
        except DecodeError as e:
 | 
			
		||||
            # could be a direct unsigned DrmCertificate, but reject those anyway
 | 
			
		||||
            raise DecodeError(f"Could not parse certificate as a SignedDrmCertificate, {e}")
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            pss. \
 | 
			
		||||
                new(RSA.import_key(self.root_cert.public_key)). \
 | 
			
		||||
                verify(
 | 
			
		||||
                    msg_hash=SHA1.new(signed_drm_certificate.drm_certificate),
 | 
			
		||||
                    signature=signed_drm_certificate.signature
 | 
			
		||||
                )
 | 
			
		||||
        except (ValueError, TypeError):
 | 
			
		||||
            raise SignatureMismatch("Signature Mismatch on SignedDrmCertificate, rejecting certificate")
 | 
			
		||||
        else:
 | 
			
		||||
            return signed_message
 | 
			
		||||
 | 
			
		||||
    def get_license_challenge(
 | 
			
		||||
        self,
 | 
			
		||||
        session_id: bytes,
 | 
			
		||||
 | 
			
		||||
@ -177,6 +177,51 @@ async def set_service_certificate(request: web.Request) -> web.Response:
 | 
			
		||||
    })
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@routes.post("/{device}/get_service_certificate")
 | 
			
		||||
async def get_service_certificate(request: web.Request) -> web.Response:
 | 
			
		||||
    secret_key = request.headers["X-Secret-Key"]
 | 
			
		||||
    device_name = request.match_info["device"]
 | 
			
		||||
 | 
			
		||||
    body = await request.json()
 | 
			
		||||
    for required_field in ("session_id",):
 | 
			
		||||
        if not body.get(required_field):
 | 
			
		||||
            return web.json_response({
 | 
			
		||||
                "status": 400,
 | 
			
		||||
                "message": f"Missing required field '{required_field}' in JSON body."
 | 
			
		||||
            }, status=400)
 | 
			
		||||
 | 
			
		||||
    # get session id
 | 
			
		||||
    session_id = bytes.fromhex(body["session_id"])
 | 
			
		||||
 | 
			
		||||
    # get cdm
 | 
			
		||||
    cdm: Optional[Cdm] = request.app["cdms"].get((secret_key, device_name))
 | 
			
		||||
    if not cdm:
 | 
			
		||||
        return web.json_response({
 | 
			
		||||
            "status": 400,
 | 
			
		||||
            "message": f"No Cdm session for {device_name} has been opened yet. No session to use."
 | 
			
		||||
        }, status=400)
 | 
			
		||||
 | 
			
		||||
    # get service certificate
 | 
			
		||||
    try:
 | 
			
		||||
        service_certificate = cdm.get_service_certificate(session_id)
 | 
			
		||||
    except InvalidSession:
 | 
			
		||||
        return web.json_response({
 | 
			
		||||
            "status": 400,
 | 
			
		||||
            "message": f"Invalid Session ID '{session_id.hex()}', it may have expired."
 | 
			
		||||
        }, status=400)
 | 
			
		||||
 | 
			
		||||
    if service_certificate:
 | 
			
		||||
        service_certificate = base64.b64encode(service_certificate.SerializeToString()).decode()
 | 
			
		||||
 | 
			
		||||
    return web.json_response({
 | 
			
		||||
        "status": 200,
 | 
			
		||||
        "message": "Successfully got the Service Certificate.",
 | 
			
		||||
        "data": {
 | 
			
		||||
            "service_certificate": service_certificate
 | 
			
		||||
        }
 | 
			
		||||
    })
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@routes.post("/{device}/get_license_challenge/{license_type}")
 | 
			
		||||
async def get_license_challenge(request: web.Request) -> web.Response:
 | 
			
		||||
    secret_key = request.headers["X-Secret-Key"]
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user