mirror of
				https://github.com/devine-dl/pywidevine.git
				synced 2025-11-04 03:44:50 +00:00 
			
		
		
		
	Cdm: Privatize the sessions map even harder
This is to further discourage direct access to the sessions directly
This commit is contained in:
		
							parent
							
								
									4f32b4b790
								
							
						
					
					
						commit
						576d7212d5
					
				@ -108,7 +108,7 @@ class Cdm:
 | 
				
			|||||||
        self.__signer = pss.new(rsa_key)
 | 
					        self.__signer = pss.new(rsa_key)
 | 
				
			||||||
        self.__decrypter = PKCS1_OAEP.new(rsa_key)
 | 
					        self.__decrypter = PKCS1_OAEP.new(rsa_key)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self._sessions: dict[bytes, Session] = {}
 | 
					        self.__sessions: dict[bytes, Session] = {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @classmethod
 | 
					    @classmethod
 | 
				
			||||||
    def from_device(cls, device: Device) -> Cdm:
 | 
					    def from_device(cls, device: Device) -> Cdm:
 | 
				
			||||||
@ -128,11 +128,11 @@ class Cdm:
 | 
				
			|||||||
        Raises:
 | 
					        Raises:
 | 
				
			||||||
            TooManySessions: If the session cannot be opened as limit has been reached.
 | 
					            TooManySessions: If the session cannot be opened as limit has been reached.
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
        if len(self._sessions) > self.MAX_NUM_OF_SESSIONS:
 | 
					        if len(self.__sessions) > self.MAX_NUM_OF_SESSIONS:
 | 
				
			||||||
            raise TooManySessions(f"Too many Sessions open ({self.MAX_NUM_OF_SESSIONS}).")
 | 
					            raise TooManySessions(f"Too many Sessions open ({self.MAX_NUM_OF_SESSIONS}).")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        session = Session()
 | 
					        session = Session()
 | 
				
			||||||
        self._sessions[session.id] = session
 | 
					        self.__sessions[session.id] = session
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        return session.id
 | 
					        return session.id
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -146,10 +146,10 @@ class Cdm:
 | 
				
			|||||||
        Raises:
 | 
					        Raises:
 | 
				
			||||||
            InvalidSession: If the Session identifier is invalid.
 | 
					            InvalidSession: If the Session identifier is invalid.
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
        session = self._sessions.get(session_id)
 | 
					        session = self.__sessions.get(session_id)
 | 
				
			||||||
        if not session:
 | 
					        if not session:
 | 
				
			||||||
            raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
 | 
					            raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
 | 
				
			||||||
        del self._sessions[session_id]
 | 
					        del self.__sessions[session_id]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def set_service_certificate(self, session_id: bytes, certificate: Optional[Union[bytes, str]]) -> str:
 | 
					    def set_service_certificate(self, session_id: bytes, certificate: Optional[Union[bytes, str]]) -> str:
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
@ -180,7 +180,7 @@ class Cdm:
 | 
				
			|||||||
        Returns the Service Provider ID of the verified DrmCertificate if successful.
 | 
					        Returns the Service Provider ID of the verified DrmCertificate if successful.
 | 
				
			||||||
        If certificate is None, it will return the now unset certificate's Provider ID.
 | 
					        If certificate is None, it will return the now unset certificate's Provider ID.
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
        session = self._sessions.get(session_id)
 | 
					        session = self.__sessions.get(session_id)
 | 
				
			||||||
        if not session:
 | 
					        if not session:
 | 
				
			||||||
            raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
 | 
					            raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -260,7 +260,7 @@ class Cdm:
 | 
				
			|||||||
        Returns a SignedMessage containing a LicenseRequest message. It's signed with
 | 
					        Returns a SignedMessage containing a LicenseRequest message. It's signed with
 | 
				
			||||||
        the Private Key of the device provision.
 | 
					        the Private Key of the device provision.
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
        session = self._sessions.get(session_id)
 | 
					        session = self.__sessions.get(session_id)
 | 
				
			||||||
        if not session:
 | 
					        if not session:
 | 
				
			||||||
            raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
 | 
					            raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -333,7 +333,7 @@ class Cdm:
 | 
				
			|||||||
            SignatureMismatch: If the Signature of the License SignedMessage does not
 | 
					            SignatureMismatch: If the Signature of the License SignedMessage does not
 | 
				
			||||||
                match the underlying License.
 | 
					                match the underlying License.
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
        session = self._sessions.get(session_id)
 | 
					        session = self.__sessions.get(session_id)
 | 
				
			||||||
        if not session:
 | 
					        if not session:
 | 
				
			||||||
            raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
 | 
					            raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -403,7 +403,7 @@ class Cdm:
 | 
				
			|||||||
            TypeError: If the provided type_ is an unexpected value type.
 | 
					            TypeError: If the provided type_ is an unexpected value type.
 | 
				
			||||||
            ValueError: If the provided type_ is not a valid Key Type.
 | 
					            ValueError: If the provided type_ is not a valid Key Type.
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
        session = self._sessions.get(session_id)
 | 
					        session = self.__sessions.get(session_id)
 | 
				
			||||||
        if not session:
 | 
					        if not session:
 | 
				
			||||||
            raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
 | 
					            raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -475,7 +475,7 @@ class Cdm:
 | 
				
			|||||||
        if output_file.is_file() and not exists_ok:
 | 
					        if output_file.is_file() and not exists_ok:
 | 
				
			||||||
            raise FileExistsError(f"Output file already exists, {output_file}")
 | 
					            raise FileExistsError(f"Output file already exists, {output_file}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        session = self._sessions.get(session_id)
 | 
					        session = self.__sessions.get(session_id)
 | 
				
			||||||
        if not session:
 | 
					        if not session:
 | 
				
			||||||
            raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
 | 
					            raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -203,7 +203,8 @@ async def get_license_challenge(request: web.Request) -> web.Response:
 | 
				
			|||||||
        }, status=400)
 | 
					        }, status=400)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # enforce service certificate (opt-in)
 | 
					    # enforce service certificate (opt-in)
 | 
				
			||||||
    if request.app["config"].get("force_privacy_mode") and not cdm._sessions[session_id].service_certificate:
 | 
					    # TODO: Add a way to check if there's a service certificate set properly
 | 
				
			||||||
 | 
					    if request.app["config"].get("force_privacy_mode") and not cdm._Cdm__sessions[session_id].service_certificate:
 | 
				
			||||||
        return web.json_response({
 | 
					        return web.json_response({
 | 
				
			||||||
            "status": 403,
 | 
					            "status": 403,
 | 
				
			||||||
            "message": "No Service Certificate set but Privacy Mode is Enforced."
 | 
					            "message": "No Service Certificate set but Privacy Mode is Enforced."
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user