Skip to content

Commit

Permalink
Merge pull request #575 from dajiaji/disable-non-aead-by-default
Browse files Browse the repository at this point in the history
Add enable_non_aead=False option for encode & decode
  • Loading branch information
dajiaji authored Oct 26, 2024
2 parents 6ced08f + 4c03145 commit 0452202
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 10 deletions.
48 changes: 41 additions & 7 deletions cwt/cose.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def encode(
signers: List[Signer] = [],
external_aad: bytes = b"",
out: str = "",
enable_non_aead: bool = False,
) -> bytes:
"""
Encodes COSE message with MAC, signing and encryption.
Expand All @@ -160,14 +161,21 @@ def encode(
data as `cbor2 <https://cbor2.readthedocs.io/en/stable/>`_'s
``CBORTag`` object. If any other value is specified, it will return
encoded data as bytes.
enable_non_aead (bool): Enable non-AEAD content ecnryption algorithms
(False = disabled by default). Before enable non-AEAD ciphers,
read and understand Security considerations of RFC 9459 carefully.
Since non-AEAD ciphers DO NOT provide neither authentication nor integrity
of decrypted message, make sure to deliver the encoded COSE message
in conjunction with an authentication and integrity mechanisms,
such as a digital signature.
Returns:
Union[bytes, CBORTag]: A byte string of the encoded COSE or a
cbor2.CBORTag object.
Raises:
ValueError: Invalid arguments.
EncodeError: Failed to encode data.
"""
p, u = self._encode_headers(key, protected, unprotected)
p, u = self._encode_headers(key, protected, unprotected, enable_non_aead)
typ = self._validate_cose_message(key, p, u, recipients, signers)
if typ == 0:
return self._encode_and_encrypt(payload, key, p, u, recipients, external_aad, out)
Expand All @@ -185,6 +193,7 @@ def encode_and_encrypt(
recipients: List[RecipientInterface] = [],
external_aad: bytes = b"",
out: str = "",
enable_non_aead: bool = False,
) -> bytes:
"""
Encodes data with encryption.
Expand All @@ -202,14 +211,21 @@ def encode_and_encrypt(
data as `cbor2 <https://cbor2.readthedocs.io/en/stable/>`_'s
``CBORTag`` object. If any other value is specified, it will return
encoded data as bytes.
enable_non_aead (bool): Enable non-AEAD content ecnryption algorithms
(False = disabled by default). Before enable non-AEAD ciphers,
read and understand Security considerations of RFC 9459 carefully.
Since non-AEAD ciphers DO NOT provide neither authentication nor integrity
of decrypted message, make sure to deliver the encoded COSE message
in conjunction with an authentication and integrity mechanisms,
such as a digital signature.
Returns:
Union[bytes, CBORTag]: A byte string of the encoded COSE or a
cbor2.CBORTag object.
Raises:
ValueError: Invalid arguments.
EncodeError: Failed to encode data.
"""
p, u = self._encode_headers(key, protected, unprotected)
p, u = self._encode_headers(key, protected, unprotected, enable_non_aead)
typ = self._validate_cose_message(key, p, u, recipients, [])
if typ != 0:
raise ValueError("The COSE message is not suitable for COSE Encrypt0/Encrypt.")
Expand Down Expand Up @@ -245,7 +261,7 @@ def encode_and_mac(
ValueError: Invalid arguments.
EncodeError: Failed to encode data.
"""
p, u = self._encode_headers(key, protected, unprotected)
p, u = self._encode_headers(key, protected, unprotected, False)
typ = self._validate_cose_message(key, p, u, recipients, [])
if typ != 1:
raise ValueError("The COSE message is not suitable for COSE MAC0/MAC.")
Expand Down Expand Up @@ -287,7 +303,7 @@ def encode_and_sign(
ValueError: Invalid arguments.
EncodeError: Failed to encode data.
"""
p, u = self._encode_headers(key, protected, unprotected)
p, u = self._encode_headers(key, protected, unprotected, False)
typ = self._validate_cose_message(key, p, u, [], signers)
if typ != 2:
raise ValueError("The COSE message is not suitable for COSE Sign0/Sign.")
Expand All @@ -300,6 +316,7 @@ def decode(
context: Optional[Union[Dict[str, Any], List[Any]]] = None,
external_aad: bytes = b"",
detached_payload: Optional[bytes] = None,
enable_non_aead: bool = False,
) -> bytes:
"""
Verifies and decodes COSE data, and returns only payload.
Expand All @@ -314,14 +331,19 @@ def decode(
external_aad(bytes): External additional authenticated data supplied by
application.
detached_payload (Optional[bytes]): The detached payload that should be verified with data.
enable_non_aead (bool): Enable non-AEAD content ecnryption algorithms
(False = disabled by default). Before enable non-AEAD ciphers,
read and understand Security considerations of RFC 9459 carefully.
Since non-AEAD ciphers DO NOT provide neither authentication nor integrity
of decrypted message, make sure to validate them outside of this library.
Returns:
bytes: A byte string of decoded payload.
Raises:
ValueError: Invalid arguments.
DecodeError: Failed to decode data.
VerifyError: Failed to verify data.
"""
_, _, res = self.decode_with_headers(data, keys, context, external_aad, detached_payload)
_, _, res = self.decode_with_headers(data, keys, context, external_aad, detached_payload, enable_non_aead)
return res

def decode_with_headers(
Expand All @@ -331,6 +353,7 @@ def decode_with_headers(
context: Optional[Union[Dict[str, Any], List[Any]]] = None,
external_aad: bytes = b"",
detached_payload: Optional[bytes] = None,
enable_non_aead: bool = False,
) -> Tuple[Dict[int, Any], Dict[int, Any], bytes]:
"""
Verifies and decodes COSE data, and returns protected headers, unprotected headers and payload.
Expand All @@ -345,6 +368,11 @@ def decode_with_headers(
external_aad(bytes): External additional authenticated data supplied by
application.
detached_payload (Optional[bytes]): The detached payload that should be verified with data.
enable_non_aead (bool): Enable non-AEAD content ecnryption algorithms
(False = disabled by default). Before enable non-AEAD ciphers,
read and understand Security considerations of RFC 9459 carefully.
Since non-AEAD ciphers DO NOT provide neither authentication nor integrity
of decrypted message, make sure to validate them outside of this library.
Returns:
Tuple[Dict[int, Any], Dict[int, Any], bytes]: A dictionary data of decoded protected headers, and a dictionary data of unprotected headers, and a byte string of decoded payload.
Raises:
Expand Down Expand Up @@ -403,6 +431,8 @@ def decode_with_headers(
# raise ValueError("unprotected header should be dict.")
p, u = self._decode_headers(data.value[0], data.value[1])
alg = p[1] if 1 in p else u.get(1, 0)
if enable_non_aead is False and alg in COSE_ALGORITHMS_CEK_NON_AEAD.values():
raise ValueError(f"Deprecated non-AEAD algorithm: {alg}.")

# Local variable `protected` is byte encoded protected header
# Sender is allowed to encode empty protected header into a bstr-wrapped zero-length map << {} >> (0x40A0)
Expand Down Expand Up @@ -557,6 +587,7 @@ def _encode_headers(
key: Optional[COSEKeyInterface],
protected: Optional[dict],
unprotected: Optional[dict],
enable_non_aead: bool,
) -> Tuple[Dict[int, Any], Dict[int, Any]]:
p = to_cose_header(protected)
u = to_cose_header(unprotected)
Expand All @@ -577,8 +608,11 @@ def _encode_headers(
# Check the protected header is empty if the algorithm is non AEAD (AES-CBC or AES-CTR)
# because section 4 of RFC9459 says "The 'protected' header MUST be a zero-length byte string."
alg = p[1] if 1 in p else u.get(1, 0)
if alg in COSE_ALGORITHMS_CEK_NON_AEAD.values() and len(p) > 0:
raise ValueError("protected header MUST be zero-length")
if alg in COSE_ALGORITHMS_CEK_NON_AEAD.values():
if enable_non_aead is False:
raise ValueError(f"Deprecated non-AEAD algorithm: {alg}.")
if len(p) > 0:
raise ValueError("protected header MUST be zero-length")
return p, u

def _decode_headers(self, protected: Any, unprotected: Any) -> Tuple[Dict[int, Any], Dict[int, Any]]:
Expand Down
30 changes: 27 additions & 3 deletions tests/test_recipient.py
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,19 @@ def test_recipients_aes(self, kw_alg, enc_alg):
kw_key = COSEKey.from_symmetric_key(alg=kw_alg)
enc_key = COSEKey.from_symmetric_key(alg=enc_alg)

# The sender side (must fail):
r = Recipient.new(unprotected={"alg": kw_alg}, sender_key=kw_key)
sender = COSE.new(alg_auto_inclusion=True)
with pytest.raises(ValueError) as err:
encoded = sender.encode_and_encrypt(
b"Hello world!",
enc_key,
recipients=[r],
enable_non_aead=False,
)
pytest.fail("encode_and_encrypt() should fail.")
assert "Deprecated non-AEAD algorithm" in str(err.value)

# The sender side (must fail):
with pytest.raises(ValueError) as err:
r = Recipient.new(protected={"alg": kw_alg}, sender_key=kw_key)
Expand All @@ -812,6 +825,7 @@ def test_recipients_aes(self, kw_alg, enc_alg):
enc_key,
protected={"kid": "actually-not-protected"},
recipients=[r],
enable_non_aead=True,
)
pytest.fail("encode_and_encrypt() should fail.")
assert "protected header MUST be zero-length" in str(err.value)
Expand All @@ -823,11 +837,19 @@ def test_recipients_aes(self, kw_alg, enc_alg):
b"Hello world!",
enc_key,
recipients=[r],
enable_non_aead=True,
)

# The recipient side (must fail):
recipient = COSE.new()
with pytest.raises(ValueError) as err:
_ = recipient.decode(encoded, keys=[kw_key]) # the option enable_non_aead=False by default
pytest.fail("decode() should fail for non-AEAD without enable_non_aead=True.")
assert f"Deprecated non-AEAD algorithm: {enc_key._alg}." == str(err.value)

# The recipient side:
recipient = COSE.new()
assert b"Hello world!" == recipient.decode(encoded, keys=[kw_key])
assert b"Hello world!" == recipient.decode(encoded, keys=[kw_key], enable_non_aead=True)

@pytest.mark.parametrize(
"enc_alg",
Expand Down Expand Up @@ -859,9 +881,10 @@ def test_recipients_hpke(self, rsk1, rsk2, enc_alg):
enc_key,
unprotected={"alg": enc_alg},
recipients=[r],
enable_non_aead=True,
)
recipient = COSE.new()
assert b"This is the content." == recipient.decode(encoded, [rsk1, rsk2])
assert b"This is the content." == recipient.decode(encoded, [rsk1, rsk2], enable_non_aead=True)

@pytest.mark.parametrize(
"key_agreement_alg, key_agreement_alg_id, kw_alg, enc_alg",
Expand Down Expand Up @@ -915,6 +938,7 @@ def test_recipients_ecdh_es(self, key_agreement_alg, key_agreement_alg_id, kw_al
protected={},
unprotected={"alg": enc_alg, "iv": nonce},
recipients=[r],
enable_non_aead=True,
)

# The recipient side:
Expand All @@ -930,4 +954,4 @@ def test_recipients_ecdh_es(self, key_agreement_alg, key_agreement_alg_id, kw_al
}
)
recipient = COSE.new()
assert b"Hello world!" == recipient.decode(encoded, rsk2, context)
assert b"Hello world!" == recipient.decode(encoded, rsk2, context, enable_non_aead=True)

0 comments on commit 0452202

Please sign in to comment.