## Attribution

This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4, available at [][version]

[homepage]:
[version]: diff --git a/ b/ new file mode 100644 index 0000000..25fc566 --- /dev/null +++ b/ @@ -0,0 +1,8 @@ +include CHANGES.rst +include LICENSE +include tox.ini +graft docs +prune docs/_build +graft tests +global-exclude *.py[co] +recursive-exclude * __pycache__ diff --git a/ b/ index 6e78cde..96ce944 100644 --- a/ +++ b/ @@ -1,2 +1,116 @@ -# python-cwt -A Python implementation of CWT/COSE. +# Python CWT

A Python (>= 3.6) implementation of CBOR Web Token (CWT) and CBOR Object Signing and Encryption (COSE) compliant with:
- [RFC8392: CBOR Web Token (CWT)](
- [RFC8152: CBOR Object Signing and Encryption (COSE)]( + +## Installing + +Install with pip after cloning this repository. + +``` +pip install . +``` + +## Usase + +Python CWT is easy to use. +If you already know about [JSON Web Token (JWT)](, +little knowledge of [CBOR](, [COSE]( +and [CWT]( is required to use this library. + +Followings are basic examples which create CWT, verify and decode it: + +- [MACed CWT](#maced-cwt) +- [Signed CWT](#signed-cwt) +- [Encrypted CWT](#encrypted-cwt) +- [Nested CWT](#nested-cwt) + +### MACed CWT + +Create a MACed CWT, verify and decode it as follows: + +```py +import cwt +from cwt import cose_key, claims + +key = cose_key.from_symmetric_key("mysecretpassword") # "HMAC256/256" is the default algorithm. +encoded = cwt.encode_and_mac(claims.from_json({"iss":"https://as.example", "sub":"dajiaji", "cti":"123"}), key) +decoded = cwt.decode(encoded, key) +``` + +CBOR-like structure (Dict[int, Any]) can be used as follows: + +```py +import cwt + +key = cwt.cose_key.from_symmetric_key("mysecretpassword") +encoded = cwt.encode_and_mac({1:"https://as.example", 2:"dajiaji", 7:b"123"}, key) +decoded = cwt.decode(encoded, key) +``` + +### Signed CWT + +Create an ECDSA (with SHA-256) key pair: + +```sh +$ openssl ecparam -genkey -name prime256v1 -noout -out private_key.pem +$ openssl ec -in private_key.pem -pubout -out public_key.pem +``` + +Create a Signed CWT, verify and decode it with the key pair as follows: + +```py +import cwt +from cwt import cose_key, claims + +# Load PEM-formatted keys as COSE keys. +with open("./private_key.pem") as key_file: + private_key = cose_key.from_pem( +with open("./public_key.pem") as key_file: + public_key = cose_key.from_pem( + +# Encode with ES256 signing. +encoded = cwt.encode_and_sign( + claims.from_json({"iss":"https://as.example", "sub":"dajiaji", "cti":"123"}), private_key) + +# Verify and decode. +decoded = cwt.decode(encoded, public_key) +``` + +### Encrypted CWT + +Create an Ed25519 key pair: + +```sh +$ openssl genpkey -algorithm ed25519 -out private_key.pem +$ openssl pkey -in private_key.pem -pubout -out public_key.pem +``` + +Create an Encrypted CWT, verify and decode it with the key pair as follows: + +```py +import cwt +from cwt import cose_key, claims + +# Load PEM-formatted keys as COSE keys. +with open("./private_key.pem") as key_file: + private_key = cose_key.from_pem( +with open("./public_key.pem") as key_file: + public_key = cose_key.from_pem( + +# Encode with ES256 encryption. +encoded = cwt.encode_and_encrypt( + claims.from_json({"iss":"https://as.example", "sub":"dajiaji", "cti":"123"}), private_key) + +# Verify and decode. +decoded = cwt.decode(encoded, public_key) +``` + +## Tests + +You can run tests from the project root after cloning with: + +```sh +$ tox +``` diff --git a/cwt/ b/cwt/ new file mode 100644 index 0000000..48ca78a --- /dev/null +++ b/cwt/ @@ -0,0 +1,31 @@ +from .claims import Claims, claims +from .cose import COSE +from .cwt import CWT, decode, encode_and_encrypt, encode_and_mac, encode_and_sign +from .exceptions import PyCWTDecodeError, PyCWTEncodeError, PyCWTError +from .key_builder import KeyBuilder, cose_key + +__version__ = "0.1.0" +__title__ = "Python CWT" +__description__ = "A Python implementation of CWT/COSE" +__url__ = "" +__uri__ = __url__ +__doc__ = __description__ + " <" + __uri__ + ">" +__author__ = "AJITOMI Daisuke" +__email__ = "" +__license__ = "MIT" +__copyright__ = "Copyright 2021 AJITOMI Daisuke" +__all__ = [ + "CWT", + "encode_and_mac", + "encode_and_sign", + "encode_and_encrypt", + "decode", + "COSE", + "KeyBuilder", + "cose_key", + "Claims", + "claims", + "PyCWTError", + "PyCWTEncodeError", + "PyCWTDecodeError", +] diff --git a/cwt/ b/cwt/ new file mode 100644 index 0000000..6d5f615 --- /dev/null +++ b/cwt/ @@ -0,0 +1,49 @@ +import json +from typing import Any, Dict, Optional, Union + + +class Claims: + """""" + + REGISTERED_NAMES = { + "iss": 1, # text string + "sub": 2, # text string + "aud": 3, # text string + "exp": 4, # integer or floating-point number + "nbf": 5, # integer or floating-point number + "iat": 6, # integer or floating-point number + "cti": 7, # byte string + } + + def __init__(self, options: Optional[Dict[str, Any]] = None): + """""" + self._options = options + return + + def from_json(self, claims: Union[str, bytes, Dict[str, Any]]) -> Dict[int, Any]: + """""" + json_claims: Dict[str, Any] + if isinstance(claims, str) or isinstance(claims, bytes): + json_claims = json.loads(claims) + else: + json_claims = claims + + for k in json_claims: + if not isinstance(k, int): + break + ValueError("It is already CBOR-like format.") + + # Convert JSON to CBOR (Convert the type of key from str to int). + cbor_claims = {} + for k, v in json_claims.items(): + if k not in Claims.REGISTERED_NAMES: + # TODO Support additional arguments. + continue + cbor_claims[Claims.REGISTERED_NAMES[k]] = v + if 7 in cbor_claims and isinstance(cbor_claims[7], str): + cbor_claims[7] = cbor_claims[7].encode("utf-8") + return cbor_claims + + +# export +claims = Claims() diff --git a/cwt/ b/cwt/ new file mode 100644 index 0000000..23d4305 --- /dev/null +++ b/cwt/ @@ -0,0 +1,32 @@ +COSE_KEY_TYPES = { + "OKP": 1, # OCtet Key Pair + "EC2": 2, # Elliptic Curve Keys w/ x- and y-coordinate pair + "RSA": 3, # RSA Key + "Symmetric": 4, # Symmetric Keys + "HSS-LMS": 5, # Public key for HSS/LMS hash-based digital signature + "WalnutDSA": 6, # WalnutDSA public key +} + +# COSE Algorithms for Content Encryption Key (CEK). +COSE_ALGORITHMS_CEK = { + "A128GCM": 1, # AES-GCM mode w/ 128-bit key, 128-bit tag + "A192GCM": 2, # AES-GCM mode w/ 192-bit key, 128-bit tag + "A256GCM": 3, # AES-GCM mode w/ 256-bit key, 128-bit tag + # etc. +} + +# COSE Algorithms for MAC. +COSE_ALGORITHMS_MAC = { + "HMAC256/64": 4, # HMAC w/ SHA-256 truncated to 64 bits + "HMAC256/256": 5, # HMAC w/ SHA-256 + "HMAC384/384": 6, # HMAC w/ SHA-384 + "HMAC512/512": 7, # HMAC w/ SHA-512 + "AES-MAC128/64": 14, # AES-MAC 128-bit key, 64-bit tag + "AES-MAC256/64": 15, # AES-MAC 256-bit key, 64-bit tag + "AES-MAC128/128": 25, # AES-MAC 128-bit key, 128-bit tag + "AES-MAC256/128": 26, # AES-MAC 256-bit key, 128-bit tag + # etc. +} + +# COSE Algorithms for Symmetric Keys. +COSE_ALGORITHMS_SYMMETRIC = dict(COSE_ALGORITHMS_MAC, **COSE_ALGORITHMS_CEK) diff --git a/cwt/ b/cwt/ new file mode 100644 index 0000000..393dfb9 --- /dev/null +++ b/cwt/ @@ -0,0 +1,148 @@ +from typing import Any, Dict, List, Optional, Union + +from cbor2 import CBORTag, dumps, loads + +from .cose_key import COSEKey +from .exceptions import InvalidSignature + + +class COSE: + """ + A COSE (CBOR Object Signing and Encryption) Implementaion. + """ + + def __init__(self, options: Optional[Dict[str, Any]] = None): + self._options = options + + def encode_and_mac( + self, + protected: Dict[int, Any], + unprotected: Dict[int, Any], + payload: Union[Dict[int, Any], bytes], + key: COSEKey, + out: Optional[str] = None, + ) -> Union[bytes, CBORTag]: + + b_protected = dumps(protected) + b_payload = dumps(payload) + mac_structure = ["MAC0", b_protected, b"", b_payload] + tag = key.sign(dumps(mac_structure)) + res = CBORTag(17, [b_protected, unprotected, b_payload, tag]) + return res if out == "cbor2/CBORTag" else dumps(res) + + def encode_and_sign( + self, + protected: Dict[int, Any], + unprotected: Dict[int, Any], + payload: Union[Dict[int, Any], bytes], + key: Union[COSEKey, List[COSEKey]], + out: Optional[str] = None, + ) -> Union[bytes, CBORTag]: + + ctx = "Signature" if not isinstance(key, COSEKey) else "Signature1" + if isinstance(key, COSEKey): + protected[1] = key.alg + unprotected[4] = key.kid if key.kid else {} + + b_protected = dumps(protected) if protected else b"" + b_payload = dumps(payload) + + # Signature1 + if isinstance(key, COSEKey): + sig_structure = [ctx, b_protected, b"", b_payload] + sig = key.sign(dumps(sig_structure)) + res = CBORTag(18, [b_protected, unprotected, b_payload, sig]) + return res if out == "cbor2/CBORTag" else dumps(res) + + # Signature + sigs = [] + for k in key: + p_header = dumps({1: k.alg}) + u_header = dumps({4: k.kid} if k.kid else {}) + sig_structure = [ctx, b_protected, p_header, b"", b_payload] + sig = k.sign(dumps(sig_structure)) + sigs.append(dumps([p_header, u_header, sig])) + res = CBORTag(18, [b_protected, unprotected, b_payload, sigs]) + return res if out == "cbor2/CBORTag" else dumps(res) + + def encode_and_encrypt( + self, + protected: Dict[int, Any], + unprotected: Dict[int, Any], + payload: Union[Dict[int, Any], bytes], + key: COSEKey, + nonce: bytes = b"", + out: str = "", + ) -> bytes: + + b_protected = dumps(protected) + b_payload = dumps(payload) + aad = dumps(["Encrypt0", b_protected, b""]) + ciphertext = key.encrypt(b_payload, nonce, aad) + res = CBORTag(16, [b_protected, unprotected, ciphertext]) + return res if out == "cbor2/CBORTag" else dumps(res) + + def decode(self, data: Union[bytes, CBORTag], key: COSEKey) -> Dict[int, Any]: + + if isinstance(data, bytes): + data = loads(data) + if not isinstance(data, CBORTag): + raise ValueError("Invalid COSE format.") + + # Encrypt0 + if data.tag == 16: + if not isinstance(data.value, list) or len(data.value) != 3: + raise ValueError("Invalid Encrypt0 format.") + + aad = dumps(["Encrypt0", data.value[0], b""]) + unprotected = data.value[1] + if not isinstance(unprotected, dict): + raise ValueError("unprotected header should be dict.") + nonce = unprotected.get(5, None) + payload = key.decrypt(data.value[2], nonce, aad) + return loads(payload) + + # Encrypt + if data.tag == 96: + raise NotImplementedError() + + # MAC0 + if data.tag == 17: + if not isinstance(data.value, list) or len(data.value) != 4: + raise ValueError("Invalid MAP0 format.") + + msg = dumps(["MAC0", data.value[0], b"", data.value[2]]) + try: + key.verify(msg, data.value[3]) + except InvalidSignature: + raise + return loads(data.value[2]) + + # MAC + if data.tag == 97: + raise NotImplementedError() + + # Signature1 + if data.tag == 18: + if not isinstance(data.value, list) or len(data.value) != 4: + raise ValueError("Invalid Signature1 format.") + + msg = dumps(["Signature1", data.value[0], b"", data.value[2]]) + try: + key.verify(msg, data.value[3]) + except InvalidSignature: + raise + return loads(data.value[2]) + + # Signature + if data.tag == 98: + if not isinstance(data.value, list) or len(data.value) != 4: + raise ValueError("Invalid Signature format.") + sigs = data.value[3] + if not isinstance(sigs, list): + raise ValueError("Invalid Signature format.") + + msg = dumps(["Signature", data.value[0], b"", data.value[2]]) + raise NotImplementedError() + + raise ValueError("Unsupported or unknown tag: %d" % data.tag) diff --git a/cwt/ b/cwt/ new file mode 100644 index 0000000..8216f62 --- /dev/null +++ b/cwt/ @@ -0,0 +1,82 @@ +from typing import Any, Dict + +from .const import COSE_KEY_TYPES + + +class COSEKey: + """ + The interface class for a COSE Key used for mac, signing/verifying and encryption/decryption. + """ + + def __init__(self, cose_key: Dict[int, Any]): + # Validate COSE Key common parameters. + if 1 not in cose_key: + raise ValueError("kty(1) not found.") + if not isinstance(cose_key[1], int) and not isinstance(cose_key[1], str): + raise ValueError("kty(1) should be int or str(tstr).") + try: + self._kty: int = ( + cose_key[1] + if isinstance(cose_key[1], int) + else COSE_KEY_TYPES[cose_key[1]] + ) + except ValueError: + raise ValueError(f"Unknown kty: {cose_key[1]}") + if 2 in cose_key and not isinstance(cose_key[2], bytes): + raise ValueError("kid(2) should be bytes(bstr).") + if 3 in cose_key and ( + not isinstance(cose_key[3], int) and not isinstance(cose_key[3], str) + ): + raise ValueError("alg(3) should be int str(tstr).") + if 4 in cose_key and not isinstance(cose_key[4], list): + raise ValueError("key_ops(4) should be list.") + if 5 in cose_key and not isinstance(cose_key[5], bytes): + raise ValueError("Base IV(5) should be bytes(bstr).") + self._object = cose_key + return + + @property + def kty(self) -> int: + return self._kty + + @property + def kid(self) -> bytes: + return self._object.get(2, None) + + @property + def alg(self) -> int: + return self._object.get(3, None) + + @property + def key_ops(self) -> list: + return self._object.get(4, None) + + @property + def base_iv(self) -> bytes: + return self._object.get(5, None) + + def sign(self, msg: bytes) -> bytes: + """ + Returns a digital signature for the specified message + using the specified key value. + """ + raise NotImplementedError + + def verify(self, msg: bytes, sig: bytes): + """ + Verifies that the specified digital signature is valid + for the specified message. + """ + raise NotImplementedError + + def encrypt(self, msg: bytes, nonce: bytes, aad: bytes) -> bytes: + """ + Encrypts the specified message. + """ + raise NotImplementedError + + def decrypt(self, msg: bytes, nonce: bytes, aad: bytes) -> bytes: + """ + Decrypts the specified message. + """ + raise NotImplementedError diff --git a/cwt/ b/cwt/ new file mode 100644 index 0000000..2a6d9fd --- /dev/null +++ b/cwt/ @@ -0,0 +1,130 @@ +from typing import Any, Dict, List, Optional, Union + +from cbor2 import CBORTag, dumps, loads + +from .cose import COSE +from .cose_key import COSEKey + + +class CWT: + """ + A CWT (CBOR Web Token) Implementaion. + """ + + CBOR_TAG = 61 + + def __init__(self, options: Optional[Dict[str, Any]] = None): + self._cose = COSE(options) + + def encode_and_mac( + self, + claims: Union[Dict[int, Any], bytes], + key: COSEKey, + iv: Optional[str] = None, + partial_iv: Optional[str] = None, + tagged: Optional[bool] = False, + ) -> bytes: + """ + Encode CWT claims and add MAC to it. + """ + self._validate(claims) + protected: Dict[int, Any] = {1: key.alg} + unprotected: Dict[int, Any] = {4: key.kid} if key.kid else {} + res = self._cose.encode_and_mac( + protected, unprotected, claims, key, out="cbor2/CBORTag" + ) + if tagged: + return dumps(CBORTag(CWT.CBOR_TAG, res)) + return dumps(res) + + def encode_and_sign( + self, + claims: Union[Dict[int, Any], bytes], + key: Union[COSEKey, List[COSEKey]], + tagged: Optional[bool] = False, + ) -> bytes: + """ + Encode CWT claims and sign it. + """ + self._validate(claims) + protected: Dict[int, Any] = {} + unprotected: Dict[int, Any] = {} + res = self._cose.encode_and_sign( + protected, unprotected, claims, key, out="cbor2/CBORTag" + ) + if tagged: + return dumps(CBORTag(CWT.CBOR_TAG, res)) + return dumps(res) + + def encode_and_encrypt( + self, + claims: Union[Dict[int, Any], bytes], + key: COSEKey, + nonce: bytes, + tagged: Optional[bool] = False, + ) -> bytes: + """ + Encode CWT claims and encrypt it. + """ + self._validate(claims) + protected: Dict[int, Any] = {1: key.alg} + unprotected: Dict[int, Any] = {4: key.kid} if key.kid else {} + if nonce: + unprotected[5] = nonce + res = self._cose.encode_and_encrypt( + protected, unprotected, claims, key, nonce, out="cbor2/CBORTag" + ) + if tagged: + return dumps(CBORTag(CWT.CBOR_TAG, res)) + return dumps(res) + + def decode(self, data: bytes, key: Union[COSEKey, List[COSEKey]]) -> bytes: + """ + Verify and decode CWT. + """ + cwt = loads(data) + if isinstance(cwt, CBORTag) and cwt.tag == CWT.CBOR_TAG: + cwt = cwt.value + keys: List[COSEKey] = [key] if isinstance(key, COSEKey) else key + for k in keys: + cwt = self._cose.decode(cwt, k) + return cwt + + def _validate(self, claims: Union[Dict[int, Any], bytes]): + """""" + if isinstance(claims, bytes): + nested = loads(claims) + if not isinstance(nested, CBORTag): + raise ValueError("bytes-formatted claims need CBOR(COSE) Tag.") + if nested.tag not in [16, 96, 17, 97, 18, 98]: + raise ValueError("Unsupported or unknown CBOR tag.") + return + if 1 in claims and not isinstance(claims[1], str): + raise ValueError("iss(1) should be str.") + if 2 in claims and not isinstance(claims[2], str): + raise ValueError("sub(2) should be str.") + if 3 in claims and not isinstance(claims[3], str): + raise ValueError("aud(3) should be str.") + if 4 in claims and not ( + isinstance(claims[4], int) or isinstance(claims[4], float) + ): + raise ValueError("exp(4) should be int or float.") + if 5 in claims and not ( + isinstance(claims[5], int) or isinstance(claims[5], float) + ): + raise ValueError("nbf(5) should be int or float.") + if 6 in claims and not ( + isinstance(claims[6], int) or isinstance(claims[6], float) + ): + raise ValueError("iat(6) should be int or float.") + if 7 in claims and not isinstance(claims[7], bytes): + raise ValueError("cti(7) should be bytes.") + return + + +# export +_cwt = CWT() +encode_and_mac = _cwt.encode_and_mac +encode_and_sign = _cwt.encode_and_sign +encode_and_encrypt = _cwt.encode_and_encrypt +decode = _cwt.decode diff --git a/cwt/ b/cwt/ new file mode 100644 index 0000000..6987646 --- /dev/null +++ b/cwt/ @@ -0,0 +1,22 @@ +class PyCWTError(Exception): + """""" + + pass + + +class InvalidSignature(PyCWTError): + """""" + + pass + + +class PyCWTEncodeError(PyCWTError): + """""" + + pass + + +class PyCWTDecodeError(PyCWTError): + """""" + + pass diff --git a/cwt/ b/cwt/ new file mode 100644 index 0000000..506ae0f --- /dev/null +++ b/cwt/ @@ -0,0 +1,108 @@ +import json +from typing import Any, Dict, Optional, Union + +import cbor2 + +from .const import COSE_ALGORITHMS_SYMMETRIC +from .cose_key import COSEKey +from .key_types.ec2 import EC2Key +from .key_types.okp import OKPKey +from .key_types.symmetric import AESCCMKey, HMACKey + + +class KeyBuilder: + """""" + + COSE_KEY_COMMON_PARAMS = { + "kty": 1, # tstr / int + "kid": 2, # bstr + "alg": 3, # tstr / int + "key_ops": 4, # [+ (tstr / int)] + "base_iv": 5, # bstr + # * label => values + } + + COSE_KEY_OPERATION_VALUES = { + "sign": 1, + "verify": 2, + "encrypt": 3, + "decrypt": 4, + "wrap_key": 5, + "unwrap_key": 6, + "derive_key": 7, + "derive_bits": 8, + "MAC_create": 9, + "MAC_verify": 10, + } + + def __init__(self, options: Optional[Dict[str, Any]] = None): + """""" + self._options = options + return + + def from_symmetric_key( + self, key: Union[bytes, str], alg: str = "HMAC256/256" + ) -> COSEKey: + """""" + if isinstance(key, str): + key = key.encode("utf-8") + alg_id = COSE_ALGORITHMS_SYMMETRIC.get(alg, None) + if not alg_id: + raise ValueError("Unsupported or unknown alg: %s" % alg) + + cose_key = { + 1: 4, # kty: 'Symmetric' + 3: alg_id, # alg: int + -1: key, # k: bstr + } + if alg_id in [4, 5, 6, 7]: + return HMACKey(cose_key) + if alg_id in [10, 11, 12, 13, 30, 31, 32, 33]: + return AESCCMKey(cose_key) + raise ValueError("Unsupported or unknown alg(3): %d" % alg_id) + + def from_dict(self, cose_key: Dict[int, Any]) -> COSEKey: + """""" + + # Validate COSE Key common parameters. + if 1 not in cose_key: + raise ValueError("kty(1) not found.") + if not isinstance(cose_key[1], int) and not isinstance(cose_key[1], str): + raise ValueError("kty(1) should be int or str(tstr).") + if cose_key[1] == 1: + return OKPKey(cose_key) + if cose_key[1] == 2: + return EC2Key(cose_key) + if cose_key[1] == 4: + if 3 not in cose_key or ( + not isinstance(cose_key[3], int) and not isinstance(cose_key[3], str) + ): + raise ValueError("alg(3) should be int str(tstr).") + if cose_key[3] in [4, 5, 6, 7]: + return HMACKey(cose_key) + if cose_key[3] in [10, 11, 12, 13, 30, 31, 32, 33]: + return AESCCMKey(cose_key) + raise ValueError(f"Unsupported or unknown alg(3): {cose_key[3]}") + raise ValueError(f"Unsupported or unknown kty(1): {cose_key[1]}") + + def from_bytes(self, key_data: bytes) -> COSEKey: + """""" + cose_key = cbor2.loads(key_data) + return self.from_dict(cose_key) + + def from_jwk(self, jwk: Union[str, bytes, Dict[str, Any]]) -> COSEKey: + """""" + cose_key: Dict[int, Any] = {} + if not isinstance(jwk, dict): + jwk = json.loads(jwk) + # TODO: from JWT to COSE key. + return self.from_dict(cose_key) + + def from_pem(self, key_data: bytes) -> COSEKey: + """""" + cose_key = cbor2.loads(key_data) + return self.from_dict(cose_key) + + +# export +cose_key = KeyBuilder() diff --git a/cwt/key_types/ b/cwt/key_types/ new file mode 100644 index 0000000..e69de29 diff --git a/cwt/key_types/ b/cwt/key_types/ new file mode 100644 index 0000000..4b06926 --- /dev/null +++ b/cwt/key_types/ @@ -0,0 +1,140 @@ +from typing import Any, Dict + +import cryptography +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.asymmetric.utils import ( + decode_dss_signature, + encode_dss_signature, +) + +from ..cose_key import COSEKey +from ..exceptions import InvalidSignature +from ..utils import i2osp, os2ip + + +class EC2Key(COSEKey): + """""" + + def __init__(self, cose_key: Dict[int, Any]): + """""" + super().__init__(cose_key) + self._public_key: Any = None + self._private_key: Any = None + self._hash_alg: Any = None + + # Validate kty. + if 1 not in cose_key: + raise ValueError("kty(1) not found.") + if not isinstance(cose_key[1], int) and not isinstance(cose_key[1], str): + raise ValueError("kty(1) should be int or str(tstr).") + if cose_key[1] != 2: + raise ValueError("kty(1) should be EC2(2).") + + # Validate x and y. + if -2 not in cose_key: + raise ValueError("x(-2) not found.") + if not isinstance(cose_key[-2], bytes): + raise ValueError("x(-2) should be bytes(bstr).") + if -3 not in cose_key: + raise ValueError("y(-3) not found.") + if not isinstance(cose_key[-3], bytes): + raise ValueError("y(-3) should be bytes(bstr).") + x = cose_key[-2] + y = cose_key[-3] + + # Validate crv. + if -1 not in cose_key: + raise ValueError("crv(-1) not found.") + if not isinstance(cose_key[-1], int) and not isinstance(cose_key[-1], str): + raise ValueError("crv(-1) should be int or str(tstr).") + crv = cose_key[-1] + crv_obj: Any + if crv == 1: # P-256 + if len(x) == len(y) == 32: + crv_obj = ec.SECP256R1() + self._hash_alg = hashes.SHA256 + else: + raise ValueError("Coords should be 32 bytes for crv P-256") + elif crv == 2: # P-384 + if len(x) == len(y) == 48: + crv_obj = ec.SECP384R1() + self._hash_alg = hashes.SHA384 + else: + raise ValueError("Coords should be 48 bytes for crv P-384") + elif crv == 3: # P-521 + if len(x) == len(y) == 66: + crv_obj = ec.SECP521R1() + self._hash_alg = hashes.SHA512 + else: + raise ValueError("Coords should be 66 bytes for crv P-521") + elif crv == 8: # secp256k1 + if len(x) == len(y) == 32: + crv_obj = ec.SECP256K1() + self._hash_alg = hashes.SHA256 + else: + raise ValueError("Coords should be 32 bytes for crv secp256k1") + else: + raise ValueError(f"Unsupported or unknown crv: {crv}") + + public_numbers = ec.EllipticCurvePublicNumbers( + x=int.from_bytes(x, byteorder="big"), + y=int.from_bytes(y, byteorder="big"), + curve=crv_obj, + ) + + # Validate d. + if -4 not in cose_key: + self._public_key = public_numbers.public_key() + return + + if not isinstance(cose_key[-4], bytes): + raise ValueError("d(-4) should be bytes(bstr).") + d = cose_key[-4] + if len(d) != len(x): + raise ValueError("d(-4) should be {} bytes for curve {}", len(x), crv) + self._private_key = ec.EllipticCurvePrivateNumbers( + int.from_bytes(d, byteorder="big"), public_numbers + ).private_key() + return + + def sign(self, msg: bytes) -> bytes: + """""" + try: + if self._public_key: + raise ValueError("Public key cannot be used for signing.") + sig = self._private_key.sign(msg, ec.ECDSA(self._hash_alg())) + return self._der_to_os(self._private_key.curve.key_size, sig) + except ValueError as err: + raise InvalidSignature("Failed to sign.") from err + + def verify(self, msg: bytes, sig: bytes): + """""" + try: + if self._private_key: + der_sig = self._os_to_der(self._private_key.curve.key_size, sig) + self._private_key.public_key().verify( + der_sig, msg, ec.ECDSA(self._hash_alg()) + ) + else: + der_sig = self._os_to_der(self._public_key.curve.key_size, sig) + self._public_key.verify(der_sig, msg, ec.ECDSA(self._hash_alg())) + except cryptography.exceptions.InvalidSignature as err: + raise InvalidSignature("Failed to verify.") from err + except ValueError as err: + raise InvalidSignature("Invalid signature.") from err + + def _der_to_os(self, key_size: int, sig: bytes) -> bytes: + """""" + num_bytes = (key_size + 7) // 8 + r, s = decode_dss_signature(sig) + return i2osp(r, num_bytes) + i2osp(s, num_bytes) + + def _os_to_der(self, key_size: int, sig: bytes) -> bytes: + """""" + num_bytes = (key_size + 7) // 8 + if len(sig) != 2 * num_bytes: + raise ValueError("Invalid signature.") + r = os2ip(sig[:num_bytes]) + s = os2ip(sig[num_bytes:]) + return encode_dss_signature(r, s) diff --git a/cwt/key_types/ b/cwt/key_types/ new file mode 100644 index 0000000..94f3146 --- /dev/null +++ b/cwt/key_types/ @@ -0,0 +1,102 @@ +from typing import Any, Dict + +import cryptography +from cryptography.hazmat.primitives.asymmetric.ed448 import ( + Ed448PrivateKey, + Ed448PublicKey, +) +from cryptography.hazmat.primitives.asymmetric.ed25519 import ( + Ed25519PrivateKey, + Ed25519PublicKey, +) +from cryptography.hazmat.primitives.asymmetric.x448 import X448PrivateKey, X448PublicKey +from cryptography.hazmat.primitives.asymmetric.x25519 import ( + X25519PrivateKey, + X25519PublicKey, +) + +from ..cose_key import COSEKey +from ..exceptions import InvalidSignature + + +class OKPKey(COSEKey): + """""" + + def __init__(self, cose_key: Dict[int, Any]): + """""" + super().__init__(cose_key) + self._public_key: Any = None + self._private_key: Any = None + + # Validate kty. + if 1 not in cose_key: + raise ValueError("kty(1) not found.") + if not isinstance(cose_key[1], int) and not isinstance(cose_key[1], str): + raise ValueError("kty(1) should be int or str(tstr).") + if cose_key[1] != 1: + raise ValueError("kty(1) should be OKP(1).") + + # Validate x and y. + if -2 not in cose_key: + raise ValueError("x(-2) not found.") + if not isinstance(cose_key[-2], bytes): + raise ValueError("x(-2) should be bytes(bstr).") + x = cose_key[-2] + + # Validate crv. + if -1 not in cose_key: + raise ValueError("crv(-1) not found.") + if not isinstance(cose_key[-1], int) and not isinstance(cose_key[-1], str): + raise ValueError("crv(-1) should be int or str(tstr).") + crv = cose_key[-1] + + try: + if -4 not in cose_key: + if crv == 4: # X25519 + self._public_key = X25519PublicKey.from_public_bytes(x) + elif crv == 5: # X448 + self._public_key = X448PublicKey.from_public_bytes(x) + elif crv == 6: # Ed25519 + self._public_key = Ed25519PublicKey.from_public_bytes(x) + elif crv == 7: # Ed448 + self._public_key = Ed448PublicKey.from_public_bytes(x) + else: + raise ValueError(f"Unknown curve: {crv}") + return + if not isinstance(cose_key[-4], bytes): + raise ValueError("d(-4) should be bytes(bstr).") + d = cose_key[-4] + if crv == 4: # X25519 + self._private_key = X25519PrivateKey.from_private_bytes(d) + elif crv == 5: # X448 + self._private_key = X448PrivateKey.from_private_bytes(d) + elif crv == 6: # Ed25519 + self._private_key = Ed25519PrivateKey.from_private_bytes(d) + elif crv == 7: # Ed448 + self._private_key = Ed448PrivateKey.from_private_bytes(d) + else: + raise ValueError(f"Unknown curve: {crv}") + + except ValueError as err: + raise ValueError("Invalid key parameter") from err + return + + def sign(self, msg: bytes) -> bytes: + """""" + try: + if self._public_key: + raise ValueError("Public key cannot be used for signing.") + return self._private_key.sign(msg) + except cryptography.exceptions.InvalidSignature as err: + raise InvalidSignature("Failed to verify.") from err + + def verify(self, msg: bytes, sig: bytes) -> bool: + """""" + try: + if self._private_key: + self._private_key.public_key().verify(sig, msg) + else: + self._public_key.verify(sig, msg) + return True + except cryptography.exceptions.InvalidSignature: + return False diff --git a/cwt/key_types/ b/cwt/key_types/ new file mode 100644 index 0000000..e69de29 diff --git a/cwt/key_types/ b/cwt/key_types/ new file mode 100644 index 0000000..be1bbaa --- /dev/null +++ b/cwt/key_types/ @@ -0,0 +1,158 @@ +import hashlib +import hmac +from typing import Any, Dict + +from cryptography.hazmat.primitives.ciphers.aead import AESCCM + +from ..cose_key import COSEKey + + +class SymmetricKey(COSEKey): + """""" + + def __init__(self, cose_key: Dict[int, Any]): + """""" + super().__init__(cose_key) + + self._key: bytes = b"" + + # Validate kty. + if 1 not in cose_key: + raise ValueError("kty(1) not found.") + if not isinstance(cose_key[1], int) and not isinstance(cose_key[1], str): + raise ValueError("kty(1) should be int or str(tstr).") + if cose_key[1] != 4: + raise ValueError("kty(1) should be Symmetric(4).") + + # Validate k. + if -1 not in cose_key: + raise ValueError("k(-1) not found.") + if -1 in cose_key and not isinstance(cose_key[-1], bytes): + raise ValueError("k(-1) should be bytes(bstr).") + self._key = cose_key[-1] + + if 3 not in cose_key: + raise ValueError("alg(3) not found.") + self._alg = cose_key[3] + + +class HMACKey(SymmetricKey): + """""" + + def __init__(self, cose_key: Dict[int, Any]): + """""" + super().__init__(cose_key) + + self._hash_alg = None + self._trunc = 0 + + # Validate alg. + if self._alg == 4: # HMAC256/64 + self._hash_alg = hashlib.sha256 + self._trunc = 8 + elif self._alg == 5: # HMAC256/256 + self._hash_alg = hashlib.sha256 + self._trunc = 32 + elif self._alg == 6: # HMAC384/384 + self._hash_alg = hashlib.sha384 + self._trunc = 48 + elif self._alg == 7: # HMAC512/512 + self._hash_alg = hashlib.sha512 + self._trunc = 64 + else: + raise ValueError("Unsupported or unknown alg: %s" % self._alg) + + def sign(self, msg: bytes) -> bytes: + """""" + return, msg, self._hash_alg).digest()[0 : self._trunc] + + def verify(self, msg: bytes, sig: bytes) -> bool: + """""" + return hmac.compare_digest(sig, self.sign(msg)) + + +class AESCCMKey(SymmetricKey): + """""" + + def __init__(self, cose_key: Dict[int, Any]): + """""" + super().__init__(cose_key) + + self._cipher: AESCCM + self._nonce_len = 0 + + # Validate alg. + if self._alg == 10: # AES-CCM-16-64-128 + if len(self._key) != 16: + raise ValueError( + "The length of AES-CCM-16-64-128 key should be 16 bytes." + ) + self._cipher = AESCCM(self._key, tag_length=8) + self._nonce_len = 13 + elif self._alg == 11: # AES-CCM-16-64-256 + if len(self._key) != 32: + raise ValueError( + "The length of AES-CCM-16-64-256 key should be 32 bytes." + ) + self._cipher = AESCCM(self._key, tag_length=8) + self._nonce_len = 13 + elif self._alg == 12: # AES-CCM-64-64-128 + if len(self._key) != 16: + raise ValueError( + "The length of AES-CCM-64-64-128 key should be 16 bytes." + ) + self._cipher = AESCCM(self._key, tag_length=8) + self._nonce_len = 7 + elif self._alg == 13: # AES-CCM-64-64-256 + if len(self._key) != 32: + raise ValueError( + "The length of AES-CCM-64-64-256 key should be 32 bytes." + ) + self._cipher = AESCCM(self._key, tag_length=8) + self._nonce_len = 7 + elif self._alg == 30: # AES-CCM-16-128-128 + if len(self._key) != 16: + raise ValueError( + "The length of AES-CCM-16-128-128 key should be 16 bytes." + ) + self._cipher = AESCCM(self._key) + self._nonce_len = 13 + elif self._alg == 31: # AES-CCM-16-128-256 + if len(self._key) != 32: + raise ValueError( + "The length of AES-CCM-16-128-256 key should be 32 bytes." + ) + self._cipher = AESCCM(self._key) + self._nonce_len = 13 + elif self._alg == 32: # AES-CCM-64-128-128 + if len(self._key) != 16: + raise ValueError( + "The length of AES-CCM-64-128-128 key should be 16 bytes." + ) + self._cipher = AESCCM(self._key) + self._nonce_len = 7 + elif self._alg == 33: # AES-CCM-64-128-256 + if len(self._key) != 32: + raise ValueError( + "The length of AES-CCM-64-128-256 key should be 32 bytes." + ) + self._cipher = AESCCM(self._key) + self._nonce_len = 7 + else: + raise ValueError("Unsupported or unknown alg: %s" % self._alg) + + def encrypt(self, msg: bytes, nonce: bytes, aad: bytes) -> bytes: + """""" + if len(nonce) != self._nonce_len: + raise ValueError( + "The length of nonce should be %d bytes." % self._nonce_len + ) + return self._cipher.encrypt(nonce, msg, aad) + + def decrypt(self, msg: bytes, nonce: bytes, aad: bytes) -> bytes: + """""" + if len(nonce) != self._nonce_len: + raise ValueError( + "The length of nonce should be %d bytes." % self._nonce_len + ) + return self._cipher.decrypt(nonce, msg, aad) diff --git a/cwt/ b/cwt/ new file mode 100644 index 0000000..e30ba69 --- /dev/null +++ b/cwt/ @@ -0,0 +1,28 @@ +from typing import List + + +def i2osp(x: int, x_len: int) -> bytes: + """ + Integer-to-Octet-String primitive + """ + if x >= 256 ** x_len: + raise ValueError("integer too large") + digits = [] + while x: + digits.append(int(x % 256)) + x //= 256 + for i in range(x_len - len(digits)): + digits.append(0) + return bytes.fromhex("".join("%.2x" % x for x in digits[::-1])) + + +def os2ip(octet_string: List[int]) -> int: + """ + Octet-String-to-Integer primitive + """ + x_len = len(octet_string) + octet_string = octet_string[::-1] + x = 0 + for i in range(x_len): + x += octet_string[i] * 256 ** i + return x diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..410d9ee --- /dev/null +++ b/setup.cfg @@ -0,0 +1,66 @@ +[metadata] +name = Python CWT +version = attr: cwt.__version__ +author = attr: cwt.__author__ +author_email = attr: cwt.__email__ +description = attr: cwt.__description__ +long_description = file: +long_description_content_type = text/markdown +license = attr: cwt.__license__ +keywords = cbor cwt cose security signature encryption token +url = +classifiers = + Development Status :: 3 - Alpha + Intended Audience :: Developers + Natural Language :: English + License :: OSI Approved :: MIT License + Topic :: Utilities + Programming Language :: Python + Programming Language :: Python :: 3 + Programming Language :: Python :: 3 :: Only + Programming Language :: Python :: 3.6 + Programming Language :: Python :: 3.7 + Programming Language :: Python :: 3.8 + Programming Language :: Python :: 3.9 + +[options] +zip_safe = false +include_package_data = true +python_requires = >=3.6 +packages = find: +install_requires = + cbor2 + cryptography>=3.3.1 + +[options.package_data] +* = py.typed + +[options.extras_require] +docs = + sphinx + sphinx-rtd-theme + sphinx-autodoc-typehints >= 1.2.0 +tests = + pytest>=6.0.0 + pytest-cov>=2.0 + coverage[toml]==5.0.4 +dev = + sphinx + sphinx-rtd-theme + cryptography>=3.3.1 + pytest>=6.0.0,<7.0.0 + coverage[toml]==5.0.4 + mypy + +[options.packages.find] +exclude = + tests + tests.* + +[flake8] +extend-ignore = E203, E501 + +[mypy] +python_version = 3.6 +ignore_missing_imports = true +warn_unused_ignores = true diff --git a/ b/ new file mode 100644 index 0000000..6068493 --- /dev/null +++ b/ @@ -0,0 +1,3 @@ +from setuptools import setup + +setup() diff --git a/tests/ b/tests/ new file mode 100644 index 0000000..eea69f2 --- /dev/null +++ b/tests/ @@ -0,0 +1,269 @@ +# pylint: disable=R0201, R0904, W0621 +# R0201: Method could be a function +# R0904: Too many public methods +# W0621: Redefined outer name + +""" +Tests for samples on README and RFCs related to CWT/COSE. +""" +# import cbor2 +# import pytest + +import cwt +from cwt import claims, cose_key + +# A sample of 128-Bit Symmetric Key referred from RFC8392 +SAMPLE_COSE_KEY_RFC8392_A2_1 = ( + "a42050231f4c4d4d3051fdc2ec0a3851d5b3830104024c53796d6d6574726963" "313238030a" +) + +# A sample of 256-Bit Symmetric Key referred from RFC8392 +SAMPLE_COSE_KEY_RFC8392_A2_2 = ( + "a4205820403697de87af64611c1d32a05dab0fe1fcb715a86ab435f1ec99192d" + "795693880104024c53796d6d6574726963323536030a" +) + +# A sample of ECDSA P-256 256-Bit COSE Key referred from RFC8392 +SAMPLE_COSE_KEY_RFC8392_A2_3 = ( + "a72358206c1382765aec5358f117733d281c1c7bdc39884d04a45a1e6c67c858" + "bc206c1922582060f7f1a780d8a783bfb7a2dd6b2796e8128dbbcef9d3d168db" + "9529971a36e7b9215820143329cce7868e416927599cf65a34f3ce2ffda55a7e" + "ca69ed8919a394d42f0f2001010202524173796d6d6574726963454344534132" + "35360326" +) + +# A sample of Signed CWT referred from RFC8392 +SAMPLE_CWT_RFC8392_A3 = ( + "d28443a10126a104524173796d6d657472696345434453413235365850a70175" + "636f61703a2f2f61732e6578616d706c652e636f6d02656572696b7703781863" + "6f61703a2f2f6c696768742e6578616d706c652e636f6d041a5612aeb0051a56" + "10d9f0061a5610d9f007420b7158405427c1ff28d23fbad1f29c4c7c6a555e60" + "1d6fa29f9179bc3d7438bacaca5acd08c8d4d4f96131680c429a01f85951ecee" + "743a52b9b63632c57209120e1c9e30" +) + +# A sample of MACed CWT referred from RFC8392 +SAMPLE_CWT_RFC8392_A4 = ( + "d83dd18443a10104a1044c53796d6d65747269633235365850a70175636f6170" + "3a2f2f61732e6578616d706c652e636f6d02656572696b77037818636f61703a" + "2f2f6c696768742e6578616d706c652e636f6d041a5612aeb0051a5610d9f006" + "1a5610d9f007420b7148093101ef6d789200" +) + +# A sample of Encrypted CWT referred from RFC8392 +SAMPLE_CWT_RFC8392_A5 = ( + "d08343a1010aa2044c53796d6d6574726963313238054d99a0d7846e762c49ff" + "e8a63e0b5858b918a11fd81e438b7f973d9e2e119bcb22424ba0f38a80f27562" + "f400ee1d0d6c0fdb559c02421fd384fc2ebe22d7071378b0ea7428fff157444d" + "45f7e6afcda1aae5f6495830c58627087fc5b4974f319a8707a635dd643b" +) + +# A Sample of Nested CWT referred from RFC8392 +SAMPLE_CWT_RFC8392_A6 = ( + "d08343a1010aa2044c53796d6d6574726963313238054d4a0694c0e69ee6b595" + "6655c7b258b7f6b0914f993de822cc47e5e57a188d7960b528a747446fe12f0e" + "7de05650dec74724366763f167a29c002dfd15b34d8993391cf49bc91127f545" + "dba8703d66f5b7f1ae91237503d371e6333df9708d78c4fb8a8386c8ff09dc49" + "af768b23179deab78d96490a66d5724fb33900c60799d9872fac6da3bdb89043" + "d67c2a05414ce331b5b8f1ed8ff7138f45905db2c4d5bc8045ab372bff142631" + "610a7e0f677b7e9b0bc73adefdcee16d9d5d284c616abeab5d8c291ce0" +) + + +class TestSample: + """ + Tests for samples in README or in RFCs related to CWT/COSE. + """ + + def test_sample_readme_maced_cwt_with_json_dict(self): + """""" + key = cose_key.from_symmetric_key("mysecretpassword") + encoded = cwt.encode_and_mac( + claims.from_json( + {"iss": "https://as.example", "sub": "dajiaji", "cti": "123"} + ), + key, + ) + decoded = cwt.decode(encoded, key) + assert 1 in decoded and decoded[1] == "https://as.example" + assert 2 in decoded and decoded[2] == "dajiaji" + assert 7 in decoded and decoded[7] == b"123" + + def test_sample_readme_maced_cwt_with_json_str(self): + """""" + key = cose_key.from_symmetric_key("mysecretpassword") + encoded = cwt.encode_and_mac( + claims.from_json( + '{"iss":"https://as.example","sub":"dajiaji","cti":"123"}' + ), + key, + ) + decoded = cwt.decode(encoded, key) + assert 1 in decoded and decoded[1] == "https://as.example" + assert 2 in decoded and decoded[2] == "dajiaji" + assert 7 in decoded and decoded[7] == b"123" + + def test_sample_readme_maced_cwt_with_json_bytes(self): + """""" + key = cose_key.from_symmetric_key("mysecretpassword") + encoded = cwt.encode_and_mac( + claims.from_json( + b'{"iss":"https://as.example","sub":"dajiaji","cti":"123"}' + ), + key, + ) + decoded = cwt.decode(encoded, key) + assert 1 in decoded and decoded[1] == "https://as.example" + assert 2 in decoded and decoded[2] == "dajiaji" + assert 7 in decoded and decoded[7] == b"123" + + def test_sample_readme_maced_cwt(self): + """""" + key = cose_key.from_symmetric_key("mysecretpassword") + encoded = cwt.encode_and_mac( + {1: "https://as.example", 2: "dajiaji", 7: b"123"}, key + ) + decoded = cwt.decode(encoded, key) + assert 1 in decoded and decoded[1] == "https://as.example" + assert 2 in decoded and decoded[2] == "dajiaji" + assert 7 in decoded and decoded[7] == b"123" + + def test_sample_rfc8392_a3(self): + """""" + key = cose_key.from_bytes(bytes.fromhex(SAMPLE_COSE_KEY_RFC8392_A2_3)) + encoded = bytes.fromhex(SAMPLE_CWT_RFC8392_A3) + decoded = cwt.decode(encoded, key=key) + assert 1 in decoded and decoded[1] == "coap://" + assert 2 in decoded and decoded[2] == "erikw" + assert 3 in decoded and decoded[3] == "coap://" + assert 4 in decoded and decoded[4] == 1444064944 + assert 5 in decoded and decoded[5] == 1443944944 + assert 6 in decoded and decoded[6] == 1443944944 + assert 7 in decoded and decoded[7] == bytes.fromhex("0b71") + + def test_sample_rfc8392_a3_with_encoding(self): + """""" + key = cose_key.from_bytes(bytes.fromhex(SAMPLE_COSE_KEY_RFC8392_A2_3)) + encoded = cwt.encode_and_sign( + { + 1: "coap://", + 2: "erikw", + 3: "coap://", + 4: 1444064944, + 5: 1443944944, + 6: 1443944944, + 7: bytes.fromhex("0b71"), + }, + key=key, + ) + decoded = cwt.decode(encoded, key=key) + assert 1 in decoded and decoded[1] == "coap://" + assert 2 in decoded and decoded[2] == "erikw" + assert 3 in decoded and decoded[3] == "coap://" + assert 4 in decoded and decoded[4] == 1444064944 + assert 5 in decoded and decoded[5] == 1443944944 + assert 6 in decoded and decoded[6] == 1443944944 + assert 7 in decoded and decoded[7] == bytes.fromhex("0b71") + + def test_sample_rfc8392_a4(self): + """""" + key = cose_key.from_dict( + { + -1: bytes.fromhex( + "403697de87af64611c1d32a05dab0fe1fcb715a86ab435f1ec99192d79569388" + ), + 1: 4, # Symmetric + 2: bytes.fromhex("53796d6d6574726963323536"), + 3: 4, # HMAC256/64 + } + ) + encoded = cwt.encode_and_mac( + { + 1: "coap://", + 2: "erikw", + 3: "coap://", + 4: 1444064944, + 5: 1443944944, + 6: 1443944944, + 7: bytes.fromhex("0b71"), + }, + key=key, + tagged=True, + ) + assert encoded == bytes.fromhex(SAMPLE_CWT_RFC8392_A4) + decoded = cwt.decode(encoded, key=key) + assert 1 in decoded and decoded[1] == "coap://" + assert 2 in decoded and decoded[2] == "erikw" + assert 3 in decoded and decoded[3] == "coap://" + assert 4 in decoded and decoded[4] == 1444064944 + assert 5 in decoded and decoded[5] == 1443944944 + assert 6 in decoded and decoded[6] == 1443944944 + assert 7 in decoded and decoded[7] == bytes.fromhex("0b71") + + def test_sample_rfc8392_a5(self): + """""" + key = cose_key.from_bytes(bytes.fromhex(SAMPLE_COSE_KEY_RFC8392_A2_1)) + nonce = bytes.fromhex("99a0d7846e762c49ffe8a63e0b") + encoded = cwt.encode_and_encrypt( + { + 1: "coap://", + 2: "erikw", + 3: "coap://", + 4: 1444064944, + 5: 1443944944, + 6: 1443944944, + 7: bytes.fromhex("0b71"), + }, + key=key, + nonce=nonce, + ) + assert encoded == bytes.fromhex(SAMPLE_CWT_RFC8392_A5) + decoded = cwt.decode(encoded, key=key) + assert 1 in decoded and decoded[1] == "coap://" + assert 2 in decoded and decoded[2] == "erikw" + assert 3 in decoded and decoded[3] == "coap://" + assert 4 in decoded and decoded[4] == 1444064944 + assert 5 in decoded and decoded[5] == 1443944944 + assert 6 in decoded and decoded[6] == 1443944944 + assert 7 in decoded and decoded[7] == bytes.fromhex("0b71") + + def test_sample_rfc8392_a6(self): + """""" + sig_key = cose_key.from_bytes(bytes.fromhex(SAMPLE_COSE_KEY_RFC8392_A2_3)) + enc_key = cose_key.from_bytes(bytes.fromhex(SAMPLE_COSE_KEY_RFC8392_A2_1)) + encrypted = bytes.fromhex(SAMPLE_CWT_RFC8392_A6) + decoded = cwt.decode(encrypted, key=[enc_key, sig_key]) + assert 1 in decoded and decoded[1] == "coap://" + assert 2 in decoded and decoded[2] == "erikw" + assert 3 in decoded and decoded[3] == "coap://" + assert 4 in decoded and decoded[4] == 1444064944 + assert 5 in decoded and decoded[5] == 1443944944 + assert 6 in decoded and decoded[6] == 1443944944 + assert 7 in decoded and decoded[7] == bytes.fromhex("0b71") + + def test_sample_rfc8392_a6_with_encoding(self): + """""" + sig_key = cose_key.from_bytes(bytes.fromhex(SAMPLE_COSE_KEY_RFC8392_A2_3)) + signed = cwt.encode_and_sign( + { + 1: "coap://", + 2: "erikw", + 3: "coap://", + 4: 1444064944, + 5: 1443944944, + 6: 1443944944, + 7: bytes.fromhex("0b71"), + }, + key=sig_key, + ) + enc_key = cose_key.from_bytes(bytes.fromhex(SAMPLE_COSE_KEY_RFC8392_A2_1)) + nonce = bytes.fromhex("4a0694c0e69ee6b5956655c7b2") + encrypted = cwt.encode_and_encrypt(signed, key=enc_key, nonce=nonce) + decoded = cwt.decode(encrypted, key=[enc_key, sig_key]) + assert 1 in decoded and decoded[1] == "coap://" + assert 2 in decoded and decoded[2] == "erikw" + assert 3 in decoded and decoded[3] == "coap://" + assert 4 in decoded and decoded[4] == 1444064944 + assert 5 in decoded and decoded[5] == 1443944944 + assert 6 in decoded and decoded[6] == 1443944944 + assert 7 in decoded and decoded[7] == bytes.fromhex("0b71") diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..9f6cfd4 --- /dev/null +++ b/tox.ini @@ -0,0 +1,77 @@ +[pytest] +addopts = -ra +testpaths = tests +filterwarnings = + once::Warning + ignore:::pympler[.*] + + +[gh-actions] +python = + 3.6: py36 + 3.7: py37, docs + 3.8: py38, typing + 3.9: py39 + + +[tox] +envlist = + lint + typing + py{36,37,38,39} + docs + pypi-description + coverage-report +isolated_build = True + + +[testenv] +# Prevent random setuptools/pip breakages like +# from breaking our builds. +setenv = + VIRTUALENV_NO_DOWNLOAD=1 +extras = + tests +commands = {envpython} -b -m coverage run -m pytest {posargs} + + +[testenv:docs] +basepython = python3.7 +extras = docs +commands = + sphinx-build -n -T -W -b html -d {envtmpdir}/doctrees docs docs/_build/html + sphinx-build -n -T -W -b doctest -d {envtmpdir}/doctrees docs docs/_build/html + python -m doctest README.rst + + +[testenv:typing] +basepython = python3.8 +extras = dev +commands = mypy cwt + + +[testenv:lint] +basepython = python3.8 +extras = dev +passenv = HOMEPATH # needed on Windows +commands = pre-commit run --all-files + + +[testenv:pypi-description] +basepython = python3.8 +skip_install = true +deps = + twine + pip >= 18.0.0 +commands = + pip wheel -w {envtmpdir}/build --no-deps . + twine check {envtmpdir}/build/* + + +[testenv:coverage-report] +basepython = python3.8 +skip_install = true +deps = coverage[toml]==5.0.4 +commands = + coverage combine + coverage report