From 42378370a8a8ad07103cf38b54ca895f09d22d6f Mon Sep 17 00:00:00 2001 From: grindsa Date: Fri, 20 Dec 2024 14:41:52 +0100 Subject: [PATCH] [fix] allowed_domainlist check in nclm_ca_handler.py --- examples/ca_handler/nclm_ca_handler.py | 69 ++++++++++++++++---------- 1 file changed, 43 insertions(+), 26 deletions(-) diff --git a/examples/ca_handler/nclm_ca_handler.py b/examples/ca_handler/nclm_ca_handler.py index 0b9d9230..8740b41d 100644 --- a/examples/ca_handler/nclm_ca_handler.py +++ b/examples/ca_handler/nclm_ca_handler.py @@ -7,7 +7,7 @@ from typing import List, Tuple, Dict import requests # pylint: disable=e0401, r0913 -from acme_srv.helper import load_config, build_pem_file, b64_encode, b64_url_recode, convert_string_to_byte, cert_serial_get, uts_now, parse_url, proxy_check, error_dic_get, uts_to_date_utc, header_info_get, eab_profile_header_info_check, config_eab_profile_load, config_headerinfo_load, config_enroll_config_log_load, enrollment_config_log +from acme_srv.helper import load_config, build_pem_file, b64_encode, b64_url_recode, convert_string_to_byte, cert_serial_get, uts_now, parse_url, proxy_check, error_dic_get, uts_to_date_utc, header_info_get, eab_profile_header_info_check, config_eab_profile_load, config_headerinfo_load, config_enroll_config_log_load, enrollment_config_log, config_allowed_domainlist_load, allowed_domainlist_check_error class CAhandler(object): @@ -33,6 +33,7 @@ def __init__(self, _debug=None, logger=None): self.eab_profiling = False self.enrollment_config_log = False self.enrollment_config_log_skip_list = [] + self.allowed_domainlist = [] def __enter__(self): """ Makes CAhandler a Context Manager """ @@ -412,6 +413,8 @@ def _config_load(self): self._config_timer_load(config_dic) self._config_proxy_load(config_dic) + # load allowed domainlist + self.allowed_domainlist = config_allowed_domainlist_load(self.logger, config_dic) # load profiling self.eab_profiling, self.eab_handler = config_eab_profile_load(self.logger, config_dic) # load header info @@ -442,6 +445,43 @@ def _container_id_lookup(self): self.logger.error('CAhandler._container_id_lookup() no target-system-groups found for filter: %s.', self.container_info_dic['name']) self.logger.debug('CAhandler._container_id_lookup() ended with: %s', str(self.container_info_dic['id'])) + def _csr_check(self, csr: str) -> str: + """ check csr """ + self.logger.debug('CAhandler._csr_check()') + + # check for eab profiling and header_info + error = eab_profile_header_info_check(self.logger, self, csr, 'profile_id') + + if not error: + # check for allowed domainlist + error = allowed_domainlist_check_error(self.logger, csr, self.allowed_domainlist) + + self.logger.debug('CAhandler._csr_check() ended with: %s', error) + return error + + def _enroll(self, csr: str, ca_id: int) -> Tuple[str, str, str, str]: + """ enroll certificate from NCLM """ + self.logger.debug('CAhandler._enroll()') + + error = None + cert_bundle = None + cert_raw = None + cert_id = None + + if self.enrollment_config_log: + self.enrollment_config_log_skip_list.extend(['headers', 'credential_dic']) + enrollment_config_log(self.logger, self, self.enrollment_config_log_skip_list) + + if ca_id and self.container_info_dic['id']: + # enroll operation + (error, cert_bundle, cert_raw, cert_id) = self._cert_enroll(csr, ca_id) + else: + error = f'Enrollment aborted. ca: {ca_id}, tsg_id: {self.container_info_dic["id"]}' + self.logger.error('CAhandler.eroll(): Enrollment aborted. ca_id: %s, container: %s', ca_id, self.container_info_dic['id']) + + self.logger.debug('CAhandler._enroll() ended with: %s', error) + return (error, cert_bundle, cert_raw, cert_id) + def _login(self): """ _login into NCLM API """ self.logger.debug('CAhandler._login()') @@ -544,29 +584,6 @@ def _template_id_lookup(self, ca_id: int): self.logger.debug('CAhandler._template_id_lookup() ended with: %s', str(self.template_info_dic['id'])) - def _enroll(self, csr: str, ca_id: int) -> Tuple[str, str, str, str]: - """ enroll certificate from NCLM """ - self.logger.debug('CAhandler._enroll()') - - error = None - cert_bundle = None - cert_raw = None - cert_id = None - - if self.enrollment_config_log: - self.enrollment_config_log_skip_list.extend(['headers', 'credential_dic']) - enrollment_config_log(self.logger, self, self.enrollment_config_log_skip_list) - - if ca_id and self.container_info_dic['id']: - # enroll operation - (error, cert_bundle, cert_raw, cert_id) = self._cert_enroll(csr, ca_id) - else: - error = f'Enrollment aborted. ca: {ca_id}, tsg_id: {self.container_info_dic["id"]}' - self.logger.error('CAhandler.eroll(): Enrollment aborted. ca_id: %s, container: %s', ca_id, self.container_info_dic['id']) - - self.logger.debug('CAhandler._enroll() ended with: %s', error) - return (error, cert_bundle, cert_raw, cert_id) - def enroll(self, csr: str) -> Tuple[str, str, str, str]: """ enroll certificate from NCLM """ self.logger.debug('CAhandler.enroll()') @@ -588,8 +605,8 @@ def enroll(self, csr: str) -> Tuple[str, str, str, str]: if ca_id and self.template_info_dic['name'] and not self.template_info_dic['id']: self._template_id_lookup(ca_id) - # check for eab profiling and header_info - error = eab_profile_header_info_check(self.logger, self, csr, 'profile_id') + error = self._csr_check(csr) + if not error: (error, cert_bundle, cert_raw, cert_id) = self._enroll(csr, ca_id) else: