diff --git a/checkdmarc/bimi.py b/checkdmarc/bimi.py index 97de0fc..868eb61 100644 --- a/checkdmarc/bimi.py +++ b/checkdmarc/bimi.py @@ -23,7 +23,14 @@ import xmltodict import pem from pyleri import Grammar, Regex, Sequence, List -from OpenSSL.crypto import load_certificate, FILETYPE_PEM, X509Store, X509StoreContext, X509, X509StoreContextError +from OpenSSL.crypto import ( + load_certificate, + FILETYPE_PEM, + X509Store, + X509StoreContext, + X509, + X509StoreContextError, +) import checkdmarc.resources from checkdmarc._constants import SYNTAX_ERROR_MARKER, USER_AGENT @@ -52,8 +59,7 @@ # Load the certificates included in MVACAS.pem into a certificate store X509STORE = X509Store() -with pkg_resources.path(checkdmarc.resources, - "MVACAs.pem") as path: +with pkg_resources.path(checkdmarc.resources, "MVACAs.pem") as path: CA_PEMS = pem.parse_file(path) for CA_PEM in CA_PEMS: @@ -174,6 +180,7 @@ class _BIMIGrammar(Grammar): List(tag_value, delimiter=Regex(f"{WSP_REGEX}*;{WSP_REGEX}*"), opt=True), ) + def get_svg_metadata(raw_xml: Union[str, bytes]) -> OrderedDict: if isinstance(raw_xml, bytes): raw_xml = raw_xml.decode(errors="ignore") @@ -208,17 +215,18 @@ def get_svg_metadata(raw_xml: Union[str, bytes]) -> OrderedDict: def check_svg_requirements(svg_metadata: OrderedDict) -> [str]: _warnings = [] if svg_metadata["svg_version"] != "1.2": - _warnings.append(f"The SVG version must be 1.2, not {svg_metadata['svg_version']}") + _warnings.append( + f"The SVG version must be 1.2, not {svg_metadata['svg_version']}" + ) if svg_metadata["base_profile"] != "tiny-ps": _warnings.append(f"The SVG base profile must be tiny-ps") if svg_metadata["width"] != svg_metadata["height"]: - _warnings.append( - "The SVG dimensions must be square, not {width}x{height}" - ) + _warnings.append("The SVG dimensions must be square, not {width}x{height}") if float(svg_metadata["filesize"].strip(" KB")) > 32: _warnings.append("The SVG file exceeds the maximum size of 32 kB") return _warnings + def _get_certificate_subjecttaltnames(cert: Union[X509, bytes]) -> [str]: """Get the subjectaltname from a PEM certificate""" if type(cert) is bytes: @@ -226,8 +234,10 @@ def _get_certificate_subjecttaltnames(cert: Union[X509, bytes]) -> [str]: for cert_ext_id in range(cert.get_extension_count()): cert_ext = cert.get_extension(cert_ext_id) if cert_ext.get_short_name() == b"subjectAltName": - ext_data = cert_ext.get_data().strip(b'0\x10\x82\x0e').strip(b'0\x81\x8e\x82\x13') - return ext_data.decode("utf-8", errors="ignore").lower().split('\x82\x15') + ext_data = ( + cert_ext.get_data().strip(b"0\x10\x82\x0e").strip(b"0\x81\x8e\x82\x13") + ) + return ext_data.decode("utf-8", errors="ignore").lower().split("\x82\x15") def extract_logo_from_certificate(cert: Union[bytes, X509]): @@ -241,12 +251,15 @@ def extract_logo_from_certificate(cert: Union[bytes, X509]): logo_base64 = base64.b64decode(logotype_data.split(",")[1]) logo = gzip.decompress(logo_base64) return logo + + def get_certificate_metadata(pem_crt: Union[str, bytes], domain=None) -> OrderedDict: """Get metadata about a Verified Mark Certificate""" metadata = OrderedDict() valid = False validation_errors = [] subjectaltnames = [] + def decode_components(components: dict): new_dict = OrderedDict() for component in components: @@ -254,6 +267,7 @@ def decode_components(components: dict): new_value = component[1].decode("utf-8", errors="ignore") new_dict[new_key] = new_value return new_dict + try: if type(pem_crt) is bytes: pem_crt = pem_crt.decode(errors="ignore") @@ -285,11 +299,14 @@ def decode_components(components: dict): validation_errors.append(str(e)) if domain is not None: if domain.lower() not in subjectaltnames: - validation_errors.append(f"{domain} does not match the certificate subjectaltnames, {subjectaltnames}") + validation_errors.append( + f"{domain} does not match the certificate subjectaltnames, {subjectaltnames}" + ) metadata["validation_errors"] = validation_errors metadata["valid"] = False return metadata + def _query_bimi_record( domain: str, selector: str = "default", @@ -548,14 +565,18 @@ def parse_bimi_record( response.raise_for_status() pem_bytes = response.content cert_metadata = get_certificate_metadata(pem_bytes, domain=domain) - if image_metadata["sha256_hash"] == cert_metadata["logodata_sha256_hash"]: + if ( + image_metadata["sha256_hash"] + == cert_metadata["logodata_sha256_hash"] + ): certificate_provided = True else: - warnings.append("SHA256 hash mismatch between the certificate and the image") + warnings.append( + "SHA256 hash mismatch between the certificate and the image" + ) except Exception as e: warnings.append(f"Unable to download mark certificate - {str(e)}") - if not certificate_provided: warnings.append( "Most providers will not display a BIMI image without a valid mark certificate" @@ -622,8 +643,9 @@ def check_bimi( bimi_results["selector"] = selector bimi_results["record"] = bimi_query["record"] parsed_bimi = parse_bimi_record( - bimi_results["record"], include_tag_descriptions=include_tag_descriptions, - domain=domain + bimi_results["record"], + include_tag_descriptions=include_tag_descriptions, + domain=domain, ) bimi_results["tags"] = parsed_bimi["tags"] if "image" in parsed_bimi.keys():