diff --git a/iso15118/evcc/states/iso15118_20_states.py b/iso15118/evcc/states/iso15118_20_states.py index c2e6cb09..4f9c958a 100644 --- a/iso15118/evcc/states/iso15118_20_states.py +++ b/iso15118/evcc/states/iso15118_20_states.py @@ -3,6 +3,7 @@ V2GMessage objects of the ISO 15118-20 protocol, from SessionSetupRes to SessionStopRes. """ + import logging import time from typing import Any, List, Union, cast @@ -391,15 +392,16 @@ async def process_message( elapsed_time = time.time() - self.comm_session.ongoing_timer if elapsed_time > TimeoutsShared.V2G_EVCC_ONGOING_TIMEOUT: debug_message = "Ongoing timer timed out for 'AuthorizationRes'" - self.comm_session.charging_session_stop_v20 = \ + self.comm_session.charging_session_stop_v20 = ( ChargingSession.TERMINATE + ) session_stop_req = SessionStopReq( header=MessageHeader( session_id=self.comm_session.session_id, timestamp=time.time(), ), charging_session=self.comm_session.charging_session_stop_v20, - ev_termination_explanation=debug_message + ev_termination_explanation=debug_message, ) self.create_next_message( SessionStop, diff --git a/iso15118/secc/states/iso15118_20_states.py b/iso15118/secc/states/iso15118_20_states.py index b0bfaa86..9c27b069 100644 --- a/iso15118/secc/states/iso15118_20_states.py +++ b/iso15118/secc/states/iso15118_20_states.py @@ -251,13 +251,15 @@ async def process_message( offered_auth_options: List[AuthEnum] = [] eim_as_res, pnc_as_res = None, None - supported_auth_options: List[AuthEnum] = self.comm_session.config.supported_auth_options # noqa: E501 + supported_auth_options: List[AuthEnum] = ( + self.comm_session.config.supported_auth_options + ) # noqa: E501 is_eim_authorized: bool = self.comm_session.evse_controller.is_eim_authorized() if ( - AuthEnum.PNC in supported_auth_options and - self.comm_session.is_tls and - not is_eim_authorized + AuthEnum.PNC in supported_auth_options + and self.comm_session.is_tls + and not is_eim_authorized ): offered_auth_options.append(AuthEnum.PNC) self.comm_session.gen_challenge = get_random_bytes(16) diff --git a/iso15118/shared/security.py b/iso15118/shared/security.py index 109d9b60..30173293 100644 --- a/iso15118/shared/security.py +++ b/iso15118/shared/security.py @@ -17,9 +17,7 @@ EllipticCurvePrivateKey, EllipticCurvePublicKey, ) -from cryptography.hazmat.primitives.asymmetric.ed448 import ( - Ed448PublicKey, -) +from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PublicKey from cryptography.hazmat.primitives.asymmetric.utils import ( decode_dss_signature, encode_dss_signature, @@ -86,7 +84,6 @@ ) from iso15118.shared.settings import SettingKey, shared_settings - logger = logging.getLogger(__name__) @@ -248,7 +245,7 @@ def get_ssl_context(server_side: bool) -> Optional[SSLContext]: logger.exception(exc) return None - if hasattr(ssl_context, 'keylog_filename'): + if hasattr(ssl_context, "keylog_filename"): # It is possible to decrypt the TLS frames, using wireshark # if the keylogfile is generated with the pre-master secret # The file is generated when DEBUG level mode is set and @@ -259,10 +256,12 @@ def get_ssl_context(server_side: bool) -> Optional[SSLContext]: # https://docs.python.org/3/library/ssl.html#ssl.create_default_context # https://docs.python.org/3/library/ssl.html#ssl.SSLContext.keylog_filename # https://github.com/python/cpython/blob/3.11/Lib/ssl.py#L777 - keylogfile = os.path.join(shared_settings[SettingKey.PKI_PATH], "keylogfile.txt") + keylogfile = os.path.join( + shared_settings[SettingKey.PKI_PATH], "keylogfile.txt" + ) if logging.getLogger().level == logging.DEBUG: if not os.path.exists(keylogfile): - with open(keylogfile, 'w'): + with open(keylogfile, "w"): pass logger.debug(f"TLS (Pre)-Master-Secret log filename path: {keylogfile}") ssl_context.keylog_filename = keylogfile @@ -518,9 +517,7 @@ def log_certs_details(certs: List[bytes]): def _validate_signature( - cert_to_check, - parent_pub_key: Union[EllipticCurvePublicKey, - Ed448PublicKey] + cert_to_check, parent_pub_key: Union[EllipticCurvePublicKey, Ed448PublicKey] ) -> None: if isinstance(parent_pub_key, EllipticCurvePublicKey): ec_curve_name = parent_pub_key.curve.name @@ -530,7 +527,8 @@ def _validate_signature( hash_algorithm = SHA512() else: raise KeyTypeError( - f"Unexpected curve name " f"{ec_curve_name}." + f"Unexpected curve name " + f"{ec_curve_name}." f"None of secp256r1, secp521r1" ) parent_pub_key.verify( @@ -544,9 +542,7 @@ def _validate_signature( cert_to_check.tbs_certificate_bytes, ) else: - raise KeyTypeError( - f"Unexpected public key type " f"{type(parent_pub_key)}" - ) + raise KeyTypeError(f"Unexpected public key type " f"{type(parent_pub_key)}") def verify_certs( @@ -603,7 +599,7 @@ def verify_certs( certs_to_check: List[Certificate] = [leaf_cert] if len(sub_ca_der_certs) != 0: certs_to_check.extend(sub_ca_der_certs) - check_validity(certs_to_check) + _check_validity(certs_to_check) except (CertNotYetValidError, CertExpiredError) as exc: raise exc @@ -665,9 +661,11 @@ def verify_certs( raise CertChainLengthError(allowed_num_sub_cas=2, num_sub_cas=0) if (sub_ca2_cert or sub_ca1_cert) and private_environment: - logger.error("Sub-CA 1 and 2 certificate are included and " - "PE is set at the same time. " - "In a PE there are no Sub-CA certs") + logger.error( + "Sub-CA 1 and 2 certificate are included and " + "PE is set at the same time. " + "In a PE there are no Sub-CA certs" + ) raise CertChainLengthError(allowed_num_sub_cas=0, num_sub_cas=1) # Step 2.b: Now that we have established the right order of sub-CA @@ -681,7 +679,7 @@ def verify_certs( parent_cert_pub_key = root_ca_cert.public_key() _validate_signature(cert_to_check, parent_cert_pub_key) else: - + parent_cert_pub_key = sub_ca2_cert.public_key() _validate_signature(cert_to_check, parent_cert_pub_key) @@ -690,7 +688,7 @@ def verify_certs( cert_to_check = sub_ca2_cert parent_cert_pub_key = sub_ca1_cert.public_key() _validate_signature(cert_to_check, parent_cert_pub_key) - + # check subca1 signature cert_to_check = sub_ca1_cert parent_cert_pub_key = root_ca_cert.public_key() @@ -730,7 +728,6 @@ def verify_certs( # Step 2: Check that each certificate is valid, i.e. the current time is # between the notBefore and notAfter timestamps of the certificate try: - certs_to_check: List[Certificate] = [leaf_cert] if sub_ca2_cert: certs_to_check.append(sub_ca2_cert) if sub_ca1_cert: @@ -1000,7 +997,8 @@ def verify_signature( hash_algorithm = SHA512() else: raise KeyTypeError( - f"Unexpected curve name " f"{ec_curve_name}." + f"Unexpected curve name " + f"{ec_curve_name}." f"None of secp256r1, secp521r1" ) pub_key.verify( diff --git a/iso15118/shared/settings.py b/iso15118/shared/settings.py index 8fa6e367..95300df1 100644 --- a/iso15118/shared/settings.py +++ b/iso15118/shared/settings.py @@ -26,7 +26,9 @@ def load_shared_settings(env_path: Optional[str] = None): SettingKey.PKI_PATH: env.str("PKI_PATH", default=SHARED_CWD + "/pki/"), SettingKey.MESSAGE_LOG_JSON: env.bool("MESSAGE_LOG_JSON", default=True), SettingKey.MESSAGE_LOG_EXI: env.bool("MESSAGE_LOG_EXI", default=False), - SettingKey.FORCE_TLS_CLIENT_AUTH: env.bool("FORCE_TLS_CLIENT_AUTH", default=False), + SettingKey.FORCE_TLS_CLIENT_AUTH: env.bool( + "FORCE_TLS_CLIENT_AUTH", default=False + ), } shared_settings.update(settings) env.seal() # raise all errors at once, if any