From 241b4cd8951f4a1ce46e8f1cd01fd05996831324 Mon Sep 17 00:00:00 2001 From: Ken Takayama Date: Fri, 25 Oct 2024 11:17:04 +0000 Subject: [PATCH 1/2] add: enable_non_aead=False to prevent unintended use of non-AEAD --- cwt/cose.py | 48 +++++++++++++++++++++++++++++++++++------ tests/test_recipient.py | 23 +++++++++++++++++--- 2 files changed, 61 insertions(+), 10 deletions(-) diff --git a/cwt/cose.py b/cwt/cose.py index 5b426a9..04f04cd 100644 --- a/cwt/cose.py +++ b/cwt/cose.py @@ -133,6 +133,7 @@ def encode( signers: List[Signer] = [], external_aad: bytes = b"", out: str = "", + enable_non_aead: bool = False, ) -> bytes: """ Encodes COSE message with MAC, signing and encryption. @@ -152,6 +153,13 @@ def encode( data as `cbor2 `_'s ``CBORTag`` object. If any other value is specified, it will return encoded data as bytes. + enable_non_aead (bool): Enable non-AEAD content ecnryption algorithms + (False = disabled by default). Before enable non-AEAD ciphers, + read and understand Security considerations of RFC 9459 carefully. + Since non-AEAD ciphers DO NOT provide neither authentication nor integrity + of decrypted message, make sure to deliver the encoded COSE message + in conjunction with an authentication and integrity mechanisms, + such as a digital signature. Returns: Union[bytes, CBORTag]: A byte string of the encoded COSE or a cbor2.CBORTag object. @@ -159,7 +167,7 @@ def encode( ValueError: Invalid arguments. EncodeError: Failed to encode data. """ - p, u = self._encode_headers(key, protected, unprotected) + p, u = self._encode_headers(key, protected, unprotected, enable_non_aead) typ = self._validate_cose_message(key, p, u, recipients, signers) if typ == 0: return self._encode_and_encrypt(payload, key, p, u, recipients, external_aad, out) @@ -177,6 +185,7 @@ def encode_and_encrypt( recipients: List[RecipientInterface] = [], external_aad: bytes = b"", out: str = "", + enable_non_aead: bool = False, ) -> bytes: """ Encodes data with encryption. @@ -194,6 +203,13 @@ def encode_and_encrypt( data as `cbor2 `_'s ``CBORTag`` object. If any other value is specified, it will return encoded data as bytes. + enable_non_aead (bool): Enable non-AEAD content ecnryption algorithms + (False = disabled by default). Before enable non-AEAD ciphers, + read and understand Security considerations of RFC 9459 carefully. + Since non-AEAD ciphers DO NOT provide neither authentication nor integrity + of decrypted message, make sure to deliver the encoded COSE message + in conjunction with an authentication and integrity mechanisms, + such as a digital signature. Returns: Union[bytes, CBORTag]: A byte string of the encoded COSE or a cbor2.CBORTag object. @@ -201,7 +217,7 @@ def encode_and_encrypt( ValueError: Invalid arguments. EncodeError: Failed to encode data. """ - p, u = self._encode_headers(key, protected, unprotected) + p, u = self._encode_headers(key, protected, unprotected, enable_non_aead) typ = self._validate_cose_message(key, p, u, recipients, []) if typ != 0: raise ValueError("The COSE message is not suitable for COSE Encrypt0/Encrypt.") @@ -237,7 +253,7 @@ def encode_and_mac( ValueError: Invalid arguments. EncodeError: Failed to encode data. """ - p, u = self._encode_headers(key, protected, unprotected) + p, u = self._encode_headers(key, protected, unprotected, False) typ = self._validate_cose_message(key, p, u, recipients, []) if typ != 1: raise ValueError("The COSE message is not suitable for COSE MAC0/MAC.") @@ -279,7 +295,7 @@ def encode_and_sign( ValueError: Invalid arguments. EncodeError: Failed to encode data. """ - p, u = self._encode_headers(key, protected, unprotected) + p, u = self._encode_headers(key, protected, unprotected, False) typ = self._validate_cose_message(key, p, u, [], signers) if typ != 2: raise ValueError("The COSE message is not suitable for COSE Sign0/Sign.") @@ -292,6 +308,7 @@ def decode( context: Optional[Union[Dict[str, Any], List[Any]]] = None, external_aad: bytes = b"", detached_payload: Optional[bytes] = None, + enable_non_aead: bool = False, ) -> bytes: """ Verifies and decodes COSE data, and returns only payload. @@ -306,6 +323,11 @@ def decode( external_aad(bytes): External additional authenticated data supplied by application. detached_payload (Optional[bytes]): The detached payload that should be verified with data. + enable_non_aead (bool): Enable non-AEAD content ecnryption algorithms + (False = disabled by default). Before enable non-AEAD ciphers, + read and understand Security considerations of RFC 9459 carefully. + Since non-AEAD ciphers DO NOT provide neither authentication nor integrity + of decrypted message, make sure to validate them outside of this library. Returns: bytes: A byte string of decoded payload. Raises: @@ -313,7 +335,7 @@ def decode( DecodeError: Failed to decode data. VerifyError: Failed to verify data. """ - _, _, res = self.decode_with_headers(data, keys, context, external_aad, detached_payload) + _, _, res = self.decode_with_headers(data, keys, context, external_aad, detached_payload, enable_non_aead) return res def decode_with_headers( @@ -323,6 +345,7 @@ def decode_with_headers( context: Optional[Union[Dict[str, Any], List[Any]]] = None, external_aad: bytes = b"", detached_payload: Optional[bytes] = None, + enable_non_aead: bool = False, ) -> Tuple[Dict[int, Any], Dict[int, Any], bytes]: """ Verifies and decodes COSE data, and returns protected headers, unprotected headers and payload. @@ -337,6 +360,11 @@ def decode_with_headers( external_aad(bytes): External additional authenticated data supplied by application. detached_payload (Optional[bytes]): The detached payload that should be verified with data. + enable_non_aead (bool): Enable non-AEAD content ecnryption algorithms + (False = disabled by default). Before enable non-AEAD ciphers, + read and understand Security considerations of RFC 9459 carefully. + Since non-AEAD ciphers DO NOT provide neither authentication nor integrity + of decrypted message, make sure to validate them outside of this library. Returns: Tuple[Dict[int, Any], Dict[int, Any], bytes]: A dictionary data of decoded protected headers, and a dictionary data of unprotected headers, and a byte string of decoded payload. Raises: @@ -395,6 +423,8 @@ def decode_with_headers( # raise ValueError("unprotected header should be dict.") p, u = self._decode_headers(data.value[0], data.value[1]) alg = p[1] if 1 in p else u.get(1, 0) + if enable_non_aead is False and alg in COSE_ALGORITHMS_CEK_NON_AEAD.values(): + raise ValueError(f"Deprecated non-AEAD algorithm: {alg}.") # Local variable `protected` is byte encoded protected header # Sender is allowed to encode empty protected header into a bstr-wrapped zero-length map << {} >> (0x40A0) @@ -549,6 +579,7 @@ def _encode_headers( key: Optional[COSEKeyInterface], protected: Optional[dict], unprotected: Optional[dict], + enable_non_aead: bool, ) -> Tuple[Dict[int, Any], Dict[int, Any]]: p = to_cose_header(protected) u = to_cose_header(unprotected) @@ -564,8 +595,11 @@ def _encode_headers( # Check the protected header is empty if the algorithm is non AEAD (AES-CBC or AES-CTR) # because section 4 of RFC9459 says "The 'protected' header MUST be a zero-length byte string." alg = p[1] if 1 in p else u.get(1, 0) - if alg in COSE_ALGORITHMS_CEK_NON_AEAD.values() and len(p) > 0: - raise ValueError("protected header MUST be zero-length") + if alg in COSE_ALGORITHMS_CEK_NON_AEAD.values(): + if enable_non_aead is False: + raise ValueError(f"Deprecated non-AEAD algorithm: {alg}.") + if len(p) > 0: + raise ValueError("protected header MUST be zero-length") return p, u def _decode_headers(self, protected: Any, unprotected: Any) -> Tuple[Dict[int, Any], Dict[int, Any]]: diff --git a/tests/test_recipient.py b/tests/test_recipient.py index 003ca45..5b98493 100644 --- a/tests/test_recipient.py +++ b/tests/test_recipient.py @@ -797,6 +797,19 @@ def test_recipients_aes(self, kw_alg, enc_alg): kw_key = COSEKey.from_symmetric_key(alg=kw_alg) enc_key = COSEKey.from_symmetric_key(alg=enc_alg) + # The sender side (must fail): + r = Recipient.new(unprotected={"alg": kw_alg}, sender_key=kw_key) + sender = COSE.new(alg_auto_inclusion=True) + with pytest.raises(ValueError) as err: + encoded = sender.encode_and_encrypt( + b"Hello world!", + enc_key, + recipients=[r], + enable_non_aead=False, + ) + pytest.fail("encode_and_encrypt() should fail.") + assert "Deprecated non-AEAD algorithm" in str(err.value) + # The sender side (must fail): with pytest.raises(ValueError) as err: r = Recipient.new(protected={"alg": kw_alg}, sender_key=kw_key) @@ -812,6 +825,7 @@ def test_recipients_aes(self, kw_alg, enc_alg): enc_key, protected={"kid": "actually-not-protected"}, recipients=[r], + enable_non_aead=True, ) pytest.fail("encode_and_encrypt() should fail.") assert "protected header MUST be zero-length" in str(err.value) @@ -823,11 +837,12 @@ def test_recipients_aes(self, kw_alg, enc_alg): b"Hello world!", enc_key, recipients=[r], + enable_non_aead=True, ) # The recipient side: recipient = COSE.new() - assert b"Hello world!" == recipient.decode(encoded, keys=[kw_key]) + assert b"Hello world!" == recipient.decode(encoded, keys=[kw_key], enable_non_aead=True) @pytest.mark.parametrize( "enc_alg", @@ -859,9 +874,10 @@ def test_recipients_hpke(self, rsk1, rsk2, enc_alg): enc_key, unprotected={"alg": enc_alg}, recipients=[r], + enable_non_aead=True, ) recipient = COSE.new() - assert b"This is the content." == recipient.decode(encoded, [rsk1, rsk2]) + assert b"This is the content." == recipient.decode(encoded, [rsk1, rsk2], enable_non_aead=True) @pytest.mark.parametrize( "key_agreement_alg, key_agreement_alg_id, kw_alg, enc_alg", @@ -915,6 +931,7 @@ def test_recipients_ecdh_es(self, key_agreement_alg, key_agreement_alg_id, kw_al protected={}, unprotected={"alg": enc_alg, "iv": nonce}, recipients=[r], + enable_non_aead=True, ) # The recipient side: @@ -930,4 +947,4 @@ def test_recipients_ecdh_es(self, key_agreement_alg, key_agreement_alg_id, kw_al } ) recipient = COSE.new() - assert b"Hello world!" == recipient.decode(encoded, rsk2, context) + assert b"Hello world!" == recipient.decode(encoded, rsk2, context, enable_non_aead=True) From 4c03145c184888ed041b3e35bc40db97b76ea9d2 Mon Sep 17 00:00:00 2001 From: Ken Takayama Date: Fri, 25 Oct 2024 11:33:24 +0000 Subject: [PATCH 2/2] add: should be failed test on decode non-AEAD message without enable_non_aead=True --- tests/test_recipient.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/test_recipient.py b/tests/test_recipient.py index 5b98493..3d24466 100644 --- a/tests/test_recipient.py +++ b/tests/test_recipient.py @@ -840,6 +840,13 @@ def test_recipients_aes(self, kw_alg, enc_alg): enable_non_aead=True, ) + # The recipient side (must fail): + recipient = COSE.new() + with pytest.raises(ValueError) as err: + _ = recipient.decode(encoded, keys=[kw_key]) # the option enable_non_aead=False by default + pytest.fail("decode() should fail for non-AEAD without enable_non_aead=True.") + assert f"Deprecated non-AEAD algorithm: {enc_key._alg}." == str(err.value) + # The recipient side: recipient = COSE.new() assert b"Hello world!" == recipient.decode(encoded, keys=[kw_key], enable_non_aead=True)