Skip to content

Commit

Permalink
Fixed some formatting errors.Some more left
Browse files Browse the repository at this point in the history
  • Loading branch information
machinex85 committed Oct 10, 2024
1 parent 5e33565 commit 98969e5
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 29 deletions.
6 changes: 4 additions & 2 deletions iso15118/evcc/states/iso15118_20_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
V2GMessage objects of the ISO 15118-20 protocol, from SessionSetupRes to
SessionStopRes.
"""

import logging
import time
from typing import Any, List, Union, cast
Expand Down Expand Up @@ -391,15 +392,16 @@ async def process_message(
elapsed_time = time.time() - self.comm_session.ongoing_timer
if elapsed_time > TimeoutsShared.V2G_EVCC_ONGOING_TIMEOUT:
debug_message = "Ongoing timer timed out for 'AuthorizationRes'"
self.comm_session.charging_session_stop_v20 = \
self.comm_session.charging_session_stop_v20 = (
ChargingSession.TERMINATE
)
session_stop_req = SessionStopReq(
header=MessageHeader(
session_id=self.comm_session.session_id,
timestamp=time.time(),
),
charging_session=self.comm_session.charging_session_stop_v20,
ev_termination_explanation=debug_message
ev_termination_explanation=debug_message,
)
self.create_next_message(
SessionStop,
Expand Down
10 changes: 6 additions & 4 deletions iso15118/secc/states/iso15118_20_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,13 +251,15 @@ async def process_message(

offered_auth_options: List[AuthEnum] = []
eim_as_res, pnc_as_res = None, None
supported_auth_options: List[AuthEnum] = self.comm_session.config.supported_auth_options # noqa: E501
supported_auth_options: List[AuthEnum] = (
self.comm_session.config.supported_auth_options
) # noqa: E501
is_eim_authorized: bool = self.comm_session.evse_controller.is_eim_authorized()

if (
AuthEnum.PNC in supported_auth_options and
self.comm_session.is_tls and
not is_eim_authorized
AuthEnum.PNC in supported_auth_options
and self.comm_session.is_tls
and not is_eim_authorized
):
offered_auth_options.append(AuthEnum.PNC)
self.comm_session.gen_challenge = get_random_bytes(16)
Expand Down
42 changes: 20 additions & 22 deletions iso15118/shared/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
EllipticCurvePrivateKey,
EllipticCurvePublicKey,
)
from cryptography.hazmat.primitives.asymmetric.ed448 import (
Ed448PublicKey,
)
from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PublicKey
from cryptography.hazmat.primitives.asymmetric.utils import (
decode_dss_signature,
encode_dss_signature,
Expand Down Expand Up @@ -86,7 +84,6 @@
)
from iso15118.shared.settings import SettingKey, shared_settings


logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -248,7 +245,7 @@ def get_ssl_context(server_side: bool) -> Optional[SSLContext]:
logger.exception(exc)
return None

if hasattr(ssl_context, 'keylog_filename'):
if hasattr(ssl_context, "keylog_filename"):
# It is possible to decrypt the TLS frames, using wireshark
# if the keylogfile is generated with the pre-master secret
# The file is generated when DEBUG level mode is set and
Expand All @@ -259,10 +256,12 @@ def get_ssl_context(server_side: bool) -> Optional[SSLContext]:
# https://docs.python.org/3/library/ssl.html#ssl.create_default_context
# https://docs.python.org/3/library/ssl.html#ssl.SSLContext.keylog_filename
# https://github.com/python/cpython/blob/3.11/Lib/ssl.py#L777
keylogfile = os.path.join(shared_settings[SettingKey.PKI_PATH], "keylogfile.txt")
keylogfile = os.path.join(
shared_settings[SettingKey.PKI_PATH], "keylogfile.txt"
)
if logging.getLogger().level == logging.DEBUG:
if not os.path.exists(keylogfile):
with open(keylogfile, 'w'):
with open(keylogfile, "w"):
pass
logger.debug(f"TLS (Pre)-Master-Secret log filename path: {keylogfile}")
ssl_context.keylog_filename = keylogfile
Expand Down Expand Up @@ -518,9 +517,7 @@ def log_certs_details(certs: List[bytes]):


def _validate_signature(
cert_to_check,
parent_pub_key: Union[EllipticCurvePublicKey,
Ed448PublicKey]
cert_to_check, parent_pub_key: Union[EllipticCurvePublicKey, Ed448PublicKey]
) -> None:
if isinstance(parent_pub_key, EllipticCurvePublicKey):
ec_curve_name = parent_pub_key.curve.name
Expand All @@ -530,7 +527,8 @@ def _validate_signature(
hash_algorithm = SHA512()
else:
raise KeyTypeError(
f"Unexpected curve name " f"{ec_curve_name}."
f"Unexpected curve name "
f"{ec_curve_name}."
f"None of secp256r1, secp521r1"
)
parent_pub_key.verify(
Expand All @@ -544,9 +542,7 @@ def _validate_signature(
cert_to_check.tbs_certificate_bytes,
)
else:
raise KeyTypeError(
f"Unexpected public key type " f"{type(parent_pub_key)}"
)
raise KeyTypeError(f"Unexpected public key type " f"{type(parent_pub_key)}")


def verify_certs(
Expand Down Expand Up @@ -603,7 +599,7 @@ def verify_certs(
certs_to_check: List[Certificate] = [leaf_cert]
if len(sub_ca_der_certs) != 0:
certs_to_check.extend(sub_ca_der_certs)
check_validity(certs_to_check)
_check_validity(certs_to_check)
except (CertNotYetValidError, CertExpiredError) as exc:
raise exc

Expand Down Expand Up @@ -665,9 +661,11 @@ def verify_certs(
raise CertChainLengthError(allowed_num_sub_cas=2, num_sub_cas=0)

if (sub_ca2_cert or sub_ca1_cert) and private_environment:
logger.error("Sub-CA 1 and 2 certificate are included and "
"PE is set at the same time. "
"In a PE there are no Sub-CA certs")
logger.error(
"Sub-CA 1 and 2 certificate are included and "
"PE is set at the same time. "
"In a PE there are no Sub-CA certs"
)
raise CertChainLengthError(allowed_num_sub_cas=0, num_sub_cas=1)

# Step 2.b: Now that we have established the right order of sub-CA
Expand All @@ -681,7 +679,7 @@ def verify_certs(
parent_cert_pub_key = root_ca_cert.public_key()
_validate_signature(cert_to_check, parent_cert_pub_key)
else:

parent_cert_pub_key = sub_ca2_cert.public_key()
_validate_signature(cert_to_check, parent_cert_pub_key)

Expand All @@ -690,7 +688,7 @@ def verify_certs(
cert_to_check = sub_ca2_cert
parent_cert_pub_key = sub_ca1_cert.public_key()
_validate_signature(cert_to_check, parent_cert_pub_key)

# check subca1 signature
cert_to_check = sub_ca1_cert
parent_cert_pub_key = root_ca_cert.public_key()
Expand Down Expand Up @@ -730,7 +728,6 @@ def verify_certs(
# Step 2: Check that each certificate is valid, i.e. the current time is
# between the notBefore and notAfter timestamps of the certificate
try:
certs_to_check: List[Certificate] = [leaf_cert]
if sub_ca2_cert:
certs_to_check.append(sub_ca2_cert)
if sub_ca1_cert:
Expand Down Expand Up @@ -1000,7 +997,8 @@ def verify_signature(
hash_algorithm = SHA512()
else:
raise KeyTypeError(
f"Unexpected curve name " f"{ec_curve_name}."
f"Unexpected curve name "
f"{ec_curve_name}."
f"None of secp256r1, secp521r1"
)
pub_key.verify(
Expand Down
4 changes: 3 additions & 1 deletion iso15118/shared/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ def load_shared_settings(env_path: Optional[str] = None):
SettingKey.PKI_PATH: env.str("PKI_PATH", default=SHARED_CWD + "/pki/"),
SettingKey.MESSAGE_LOG_JSON: env.bool("MESSAGE_LOG_JSON", default=True),
SettingKey.MESSAGE_LOG_EXI: env.bool("MESSAGE_LOG_EXI", default=False),
SettingKey.FORCE_TLS_CLIENT_AUTH: env.bool("FORCE_TLS_CLIENT_AUTH", default=False),
SettingKey.FORCE_TLS_CLIENT_AUTH: env.bool(
"FORCE_TLS_CLIENT_AUTH", default=False
),
}
shared_settings.update(settings)
env.seal() # raise all errors at once, if any

0 comments on commit 98969e5

Please sign in to comment.