Skip to content

Commit

Permalink
Code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
seanthegeek committed Oct 28, 2024
1 parent cd481cb commit 72d8eee
Showing 1 changed file with 37 additions and 15 deletions.
52 changes: 37 additions & 15 deletions checkdmarc/bimi.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,14 @@
import xmltodict
import pem
from pyleri import Grammar, Regex, Sequence, List
from OpenSSL.crypto import load_certificate, FILETYPE_PEM, X509Store, X509StoreContext, X509, X509StoreContextError
from OpenSSL.crypto import (
load_certificate,
FILETYPE_PEM,
X509Store,
X509StoreContext,
X509,
X509StoreContextError,
)

import checkdmarc.resources
from checkdmarc._constants import SYNTAX_ERROR_MARKER, USER_AGENT
Expand Down Expand Up @@ -52,8 +59,7 @@

# Load the certificates included in MVACAS.pem into a certificate store
X509STORE = X509Store()
with pkg_resources.path(checkdmarc.resources,
"MVACAs.pem") as path:
with pkg_resources.path(checkdmarc.resources, "MVACAs.pem") as path:

CA_PEMS = pem.parse_file(path)
for CA_PEM in CA_PEMS:
Expand Down Expand Up @@ -174,6 +180,7 @@ class _BIMIGrammar(Grammar):
List(tag_value, delimiter=Regex(f"{WSP_REGEX}*;{WSP_REGEX}*"), opt=True),
)


def get_svg_metadata(raw_xml: Union[str, bytes]) -> OrderedDict:
if isinstance(raw_xml, bytes):
raw_xml = raw_xml.decode(errors="ignore")
Expand Down Expand Up @@ -208,26 +215,29 @@ def get_svg_metadata(raw_xml: Union[str, bytes]) -> OrderedDict:
def check_svg_requirements(svg_metadata: OrderedDict) -> [str]:
_warnings = []
if svg_metadata["svg_version"] != "1.2":
_warnings.append(f"The SVG version must be 1.2, not {svg_metadata['svg_version']}")
_warnings.append(
f"The SVG version must be 1.2, not {svg_metadata['svg_version']}"
)
if svg_metadata["base_profile"] != "tiny-ps":
_warnings.append(f"The SVG base profile must be tiny-ps")
if svg_metadata["width"] != svg_metadata["height"]:
_warnings.append(
"The SVG dimensions must be square, not {width}x{height}"
)
_warnings.append("The SVG dimensions must be square, not {width}x{height}")
if float(svg_metadata["filesize"].strip(" KB")) > 32:
_warnings.append("The SVG file exceeds the maximum size of 32 kB")
return _warnings


def _get_certificate_subjecttaltnames(cert: Union[X509, bytes]) -> [str]:
"""Get the subjectaltname from a PEM certificate"""
if type(cert) is bytes:
cert = load_certificate(FILETYPE_PEM, cert)
for cert_ext_id in range(cert.get_extension_count()):
cert_ext = cert.get_extension(cert_ext_id)
if cert_ext.get_short_name() == b"subjectAltName":
ext_data = cert_ext.get_data().strip(b'0\x10\x82\x0e').strip(b'0\x81\x8e\x82\x13')
return ext_data.decode("utf-8", errors="ignore").lower().split('\x82\x15')
ext_data = (
cert_ext.get_data().strip(b"0\x10\x82\x0e").strip(b"0\x81\x8e\x82\x13")
)
return ext_data.decode("utf-8", errors="ignore").lower().split("\x82\x15")


def extract_logo_from_certificate(cert: Union[bytes, X509]):
Expand All @@ -241,19 +251,23 @@ def extract_logo_from_certificate(cert: Union[bytes, X509]):
logo_base64 = base64.b64decode(logotype_data.split(",")[1])
logo = gzip.decompress(logo_base64)
return logo


def get_certificate_metadata(pem_crt: Union[str, bytes], domain=None) -> OrderedDict:
"""Get metadata about a Verified Mark Certificate"""
metadata = OrderedDict()
valid = False
validation_errors = []
subjectaltnames = []

def decode_components(components: dict):
new_dict = OrderedDict()
for component in components:
new_key = component[0].decode("utf-8", errors="ignore")
new_value = component[1].decode("utf-8", errors="ignore")
new_dict[new_key] = new_value
return new_dict

try:
if type(pem_crt) is bytes:
pem_crt = pem_crt.decode(errors="ignore")
Expand Down Expand Up @@ -285,11 +299,14 @@ def decode_components(components: dict):
validation_errors.append(str(e))
if domain is not None:
if domain.lower() not in subjectaltnames:
validation_errors.append(f"{domain} does not match the certificate subjectaltnames, {subjectaltnames}")
validation_errors.append(
f"{domain} does not match the certificate subjectaltnames, {subjectaltnames}"
)
metadata["validation_errors"] = validation_errors
metadata["valid"] = False
return metadata


def _query_bimi_record(
domain: str,
selector: str = "default",
Expand Down Expand Up @@ -548,14 +565,18 @@ def parse_bimi_record(
response.raise_for_status()
pem_bytes = response.content
cert_metadata = get_certificate_metadata(pem_bytes, domain=domain)
if image_metadata["sha256_hash"] == cert_metadata["logodata_sha256_hash"]:
if (
image_metadata["sha256_hash"]
== cert_metadata["logodata_sha256_hash"]
):
certificate_provided = True
else:
warnings.append("SHA256 hash mismatch between the certificate and the image")
warnings.append(
"SHA256 hash mismatch between the certificate and the image"
)
except Exception as e:
warnings.append(f"Unable to download mark certificate - {str(e)}")


if not certificate_provided:
warnings.append(
"Most providers will not display a BIMI image without a valid mark certificate"
Expand Down Expand Up @@ -622,8 +643,9 @@ def check_bimi(
bimi_results["selector"] = selector
bimi_results["record"] = bimi_query["record"]
parsed_bimi = parse_bimi_record(
bimi_results["record"], include_tag_descriptions=include_tag_descriptions,
domain=domain
bimi_results["record"],
include_tag_descriptions=include_tag_descriptions,
domain=domain,
)
bimi_results["tags"] = parsed_bimi["tags"]
if "image" in parsed_bimi.keys():
Expand Down

0 comments on commit 72d8eee

Please sign in to comment.