Skip to content

Commit

Permalink
Merge branch 'devel' into adl_wf
Browse files Browse the repository at this point in the history
  • Loading branch information
grindsa committed Dec 20, 2024
2 parents a13ee44 + 4237837 commit 71c8082
Showing 1 changed file with 43 additions and 26 deletions.
69 changes: 43 additions & 26 deletions examples/ca_handler/nclm_ca_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import List, Tuple, Dict
import requests
# pylint: disable=e0401, r0913
from acme_srv.helper import load_config, build_pem_file, b64_encode, b64_url_recode, convert_string_to_byte, cert_serial_get, uts_now, parse_url, proxy_check, error_dic_get, uts_to_date_utc, header_info_get, eab_profile_header_info_check, config_eab_profile_load, config_headerinfo_load, config_enroll_config_log_load, enrollment_config_log
from acme_srv.helper import load_config, build_pem_file, b64_encode, b64_url_recode, convert_string_to_byte, cert_serial_get, uts_now, parse_url, proxy_check, error_dic_get, uts_to_date_utc, header_info_get, eab_profile_header_info_check, config_eab_profile_load, config_headerinfo_load, config_enroll_config_log_load, enrollment_config_log, config_allowed_domainlist_load, allowed_domainlist_check_error


class CAhandler(object):
Expand All @@ -33,6 +33,7 @@ def __init__(self, _debug=None, logger=None):
self.eab_profiling = False
self.enrollment_config_log = False
self.enrollment_config_log_skip_list = []
self.allowed_domainlist = []

def __enter__(self):
""" Makes CAhandler a Context Manager """
Expand Down Expand Up @@ -412,6 +413,8 @@ def _config_load(self):
self._config_timer_load(config_dic)

self._config_proxy_load(config_dic)
# load allowed domainlist
self.allowed_domainlist = config_allowed_domainlist_load(self.logger, config_dic)
# load profiling
self.eab_profiling, self.eab_handler = config_eab_profile_load(self.logger, config_dic)
# load header info
Expand Down Expand Up @@ -442,6 +445,43 @@ def _container_id_lookup(self):
self.logger.error('CAhandler._container_id_lookup() no target-system-groups found for filter: %s.', self.container_info_dic['name'])
self.logger.debug('CAhandler._container_id_lookup() ended with: %s', str(self.container_info_dic['id']))

def _csr_check(self, csr: str) -> str:
""" check csr """
self.logger.debug('CAhandler._csr_check()')

# check for eab profiling and header_info
error = eab_profile_header_info_check(self.logger, self, csr, 'profile_id')

if not error:
# check for allowed domainlist
error = allowed_domainlist_check_error(self.logger, csr, self.allowed_domainlist)

self.logger.debug('CAhandler._csr_check() ended with: %s', error)
return error

def _enroll(self, csr: str, ca_id: int) -> Tuple[str, str, str, str]:
""" enroll certificate from NCLM """
self.logger.debug('CAhandler._enroll()')

error = None
cert_bundle = None
cert_raw = None
cert_id = None

if self.enrollment_config_log:
self.enrollment_config_log_skip_list.extend(['headers', 'credential_dic'])
enrollment_config_log(self.logger, self, self.enrollment_config_log_skip_list)

if ca_id and self.container_info_dic['id']:
# enroll operation
(error, cert_bundle, cert_raw, cert_id) = self._cert_enroll(csr, ca_id)
else:
error = f'Enrollment aborted. ca: {ca_id}, tsg_id: {self.container_info_dic["id"]}'
self.logger.error('CAhandler.eroll(): Enrollment aborted. ca_id: %s, container: %s', ca_id, self.container_info_dic['id'])

self.logger.debug('CAhandler._enroll() ended with: %s', error)
return (error, cert_bundle, cert_raw, cert_id)

def _login(self):
""" _login into NCLM API """
self.logger.debug('CAhandler._login()')
Expand Down Expand Up @@ -544,29 +584,6 @@ def _template_id_lookup(self, ca_id: int):

self.logger.debug('CAhandler._template_id_lookup() ended with: %s', str(self.template_info_dic['id']))

def _enroll(self, csr: str, ca_id: int) -> Tuple[str, str, str, str]:
""" enroll certificate from NCLM """
self.logger.debug('CAhandler._enroll()')

error = None
cert_bundle = None
cert_raw = None
cert_id = None

if self.enrollment_config_log:
self.enrollment_config_log_skip_list.extend(['headers', 'credential_dic'])
enrollment_config_log(self.logger, self, self.enrollment_config_log_skip_list)

if ca_id and self.container_info_dic['id']:
# enroll operation
(error, cert_bundle, cert_raw, cert_id) = self._cert_enroll(csr, ca_id)
else:
error = f'Enrollment aborted. ca: {ca_id}, tsg_id: {self.container_info_dic["id"]}'
self.logger.error('CAhandler.eroll(): Enrollment aborted. ca_id: %s, container: %s', ca_id, self.container_info_dic['id'])

self.logger.debug('CAhandler._enroll() ended with: %s', error)
return (error, cert_bundle, cert_raw, cert_id)

def enroll(self, csr: str) -> Tuple[str, str, str, str]:
""" enroll certificate from NCLM """
self.logger.debug('CAhandler.enroll()')
Expand All @@ -588,8 +605,8 @@ def enroll(self, csr: str) -> Tuple[str, str, str, str]:
if ca_id and self.template_info_dic['name'] and not self.template_info_dic['id']:
self._template_id_lookup(ca_id)

# check for eab profiling and header_info
error = eab_profile_header_info_check(self.logger, self, csr, 'profile_id')
error = self._csr_check(csr)

if not error:
(error, cert_bundle, cert_raw, cert_id) = self._enroll(csr, ca_id)
else:
Expand Down

0 comments on commit 71c8082

Please sign in to comment.