From 81f8ff850836808605c476715069fb0efb810488 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Thu, 18 Jan 2024 14:39:54 +0100 Subject: [PATCH 01/16] Removing hardcoded resource types --- pandaharvester/harvesterworkermaker/simple_worker_maker.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pandaharvester/harvesterworkermaker/simple_worker_maker.py b/pandaharvester/harvesterworkermaker/simple_worker_maker.py index 2b8d0303..7aef280a 100644 --- a/pandaharvester/harvesterworkermaker/simple_worker_maker.py +++ b/pandaharvester/harvesterworkermaker/simple_worker_maker.py @@ -119,7 +119,7 @@ def make_worker(self, jobspec_list, queue_config, job_type, resource_type): workSpec.nCore = site_corecount workSpec.minRamCount = site_maxrss else: - if not len(jobspec_list) and resource_type not in ["SCORE", "SCORE_HIMEM", "MCORE", "MCORE_HIMEM"]: + if not len(jobspec_list) and self.rt_mapper.is_valid_resource_type(resource_type): # some testing PQs have ucore + pure pull, need to default to SCORE tmpLog.warning(f'Invalid resource type "{resource_type}" (perhaps due to ucore with pure pull); default to SCORE') resource_type = "SCORE" @@ -174,8 +174,7 @@ def make_worker(self, jobspec_list, queue_config, job_type, resource_type): choice_list = core_utils.make_choice_list(pdpm=pdpm, default="managed") tmp_prodsourcelabel = random.choice(choice_list) fake_job = JobSpec() - fake_job.jobParams = {} - fake_job.jobParams["prodSourceLabel"] = tmp_prodsourcelabel + fake_job.jobParams = {"prodSourceLabel": tmp_prodsourcelabel} workSpec.pilotType = fake_job.get_pilot_type() del fake_job if workSpec.pilotType in ["RC", "ALRB", "PT"]: From 6f1e5658d4467db2038c9389df846ee4ad1a0544 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Fri, 19 Jan 2024 13:51:53 +0100 Subject: [PATCH 02/16] Implemented generic functions to replace any hardcoded usage of resource types --- .../harvestercore/resource_type_mapper.py | 81 +++++++++++++- .../simple_worker_maker.py | 100 ++++++++---------- 2 files changed, 120 insertions(+), 61 deletions(-) diff --git a/pandaharvester/harvestercore/resource_type_mapper.py b/pandaharvester/harvestercore/resource_type_mapper.py index 54b5f2bb..b18930c1 100644 --- a/pandaharvester/harvestercore/resource_type_mapper.py +++ b/pandaharvester/harvestercore/resource_type_mapper.py @@ -6,6 +6,9 @@ from .db_proxy_pool import DBProxyPool as DBProxy +CRIC_RAM_TAG = 'maxrss' +CRIC_CORE_TAG = 'corecount' +UNIFIED_QUEUE_TAG = 'ucore' class ResourceType(object): def __init__(self, resource_type_dict): @@ -21,6 +24,34 @@ def __init__(self, resource_type_dict): self.min_ram_per_core = resource_type_dict["minrampercore"] self.max_ram_per_core = resource_type_dict["maxrampercore"] + # basic resource_type + self.basic_resource_type_single_core = "SCORE" + self.basic_resource_type_multi_core = "MCORE" + + def match(self, core_count, ram_count): + """ + Checks if the resource type matches the core count and ram count + :param core_count: number of cores + :param ram_count: amount of memory + :return: boolean + """ + + # basic validation that the values are not None or 0 + if not core_count or not ram_count: + return False + + # normalize ram count + ram_per_core = ram_count / core_count + + # check if the resource type matches the core count and ram count + if (self.max_core and core_count > self.max_core) or \ + (self.min_core and core_count < self.min_core) or \ + (self.max_ram_per_core and ram_per_core > self.max_ram_per_core) or \ + (self.min_ram_per_core and ram_per_core < self.min_ram_per_core): + return False + + return True + class ResourceTypeMapper(object): def __init__(self): @@ -56,9 +87,10 @@ def load_data(self): def is_valid_resource_type(self, resource_name): """ Checks if the resource type is valid (exists in the dictionary of resource types) - :param resource_name: string with the resource type name (e.g. SCORE, SCORE_HIMEM,...) + :param resource_name: string with the resource type name :return: boolean """ + self.load_data() if resource_name in self.resource_types: return True return False @@ -76,10 +108,10 @@ def calculate_worker_requirements(self, resource_name, queue_config): resource_type = self.resource_types[resource_name] # retrieve the queue configuration - site_max_rss = queue_config.get("maxrss", 0) or 0 - site_core_count = queue_config.get("corecount", 1) or 1 + site_max_rss = queue_config.get(CRIC_RAM_TAG, 0) or 0 + site_core_count = queue_config.get(CRIC_CORE_TAG, 1) or 1 - unified_queue = queue_config.get("capability", "") == "ucore" + unified_queue = queue_config.get("capability", "") == UNIFIED_QUEUE_TAG if not unified_queue: # site is not unified, just request whatever is configured in AGIS return site_max_rss, site_core_count @@ -99,3 +131,44 @@ def calculate_worker_requirements(self, resource_name, queue_config): pass return worker_cores, worker_memory + + def is_single_core_resource_type(self, resource_name): + """ + Validates whether the resource type is single core by looking at the min and max core definitions + :param resource_name: string with the resource type name + :return: boolean + """ + self.load_data() + if resource_name in self.resource_types: + min_core = self.resource_types[resource_name].min_core + max_core = self.resource_types[resource_name].max_core + + if min_core == max_core == 1: + return True + + return False + + def get_rtype_for_queue(self, queue_config): + """ + Returns the resource type name for a given queue configuration + :param queue_config: queue configuration + :return: string with the resource type name + """ + self.load_data() + + # retrieve the queue configuration + site_max_rss = queue_config.get(CRIC_RAM_TAG, 0) or 0 + site_core_count = queue_config.get(CRIC_CORE_TAG, 1) or 1 + capability = queue_dict.get("capability", "") + + # unified queues are not mapped to any particular resource type + if capability == UNIFIED_QUEUE_TAG: + return '' + + # loop over the resource types and find the one that matches the queue configuration + for resource_name, resource_type in self.resource_types.items(): + if resource_type.match(site_core_count, site_max_rss): + return resource_name + + # no match found + return '' diff --git a/pandaharvester/harvesterworkermaker/simple_worker_maker.py b/pandaharvester/harvesterworkermaker/simple_worker_maker.py index 7aef280a..656f115a 100644 --- a/pandaharvester/harvesterworkermaker/simple_worker_maker.py +++ b/pandaharvester/harvesterworkermaker/simple_worker_maker.py @@ -26,7 +26,7 @@ def __init__(self, **kwarg): def get_job_core_and_memory(self, queue_dict, job_spec): job_memory = job_spec.jobParams.get("minRamCount", 0) or 0 - job_corecount = job_spec.jobParams.get("coreCount", 1) or 1 + job_core_count = job_spec.jobParams.get("coreCount", 1) or 1 is_ucore = queue_dict.get("capability", "") == "ucore" @@ -34,12 +34,12 @@ def get_job_core_and_memory(self, queue_dict, job_spec): site_maxrss = queue_dict.get("maxrss", 0) or 0 site_corecount = queue_dict.get("corecount", 1) or 1 - if job_corecount == 1: + if job_core_count == 1: job_memory = int(math.ceil(site_maxrss / site_corecount)) else: job_memory = site_maxrss - return job_corecount, job_memory + return job_core_count, job_memory def get_job_type(self, job_spec, job_type, queue_dict, tmp_prodsourcelabel=None): queue_type = queue_dict.get("type", None) @@ -69,26 +69,14 @@ def get_job_type(self, job_spec, job_type, queue_dict, tmp_prodsourcelabel=None) return job_type_final - def capability_to_rtype(self, capability): - if capability == "score": - return "SCORE" - elif capability == "himem": - return "SCORE_HIMEM" - elif capability == "mcore": - return "MCORE" - elif capability == "mcorehimem": - return "MCORE_HIMEM" - else: - return None - # make a worker from jobs def make_worker(self, jobspec_list, queue_config, job_type, resource_type): - tmpLog = self.make_logger(_logger, f"queue={queue_config.queueName}:{job_type}:{resource_type}", method_name="make_worker") + tmp_log = self.make_logger(_logger, f"queue={queue_config.queueName}:{job_type}:{resource_type}", method_name="make_worker") - tmpLog.debug(f"jobspec_list: {jobspec_list}") + tmp_log.debug(f"jobspec_list: {jobspec_list}") - workSpec = WorkSpec() - workSpec.creationTime = datetime.datetime.utcnow() + work_spec = WorkSpec() + work_spec.creationTime = datetime.datetime.utcnow() # get the queue configuration from CRIC panda_queues_dict = PandaQueuesDict() @@ -98,36 +86,35 @@ def make_worker(self, jobspec_list, queue_config, job_type, resource_type): is_ucore = queue_dict.get("capability", "") == "ucore" # case of traditional (non-ucore) queue: look at the queue configuration if not is_ucore: - workSpec.nCore = queue_dict.get("corecount", 1) or 1 - workSpec.minRamCount = queue_dict.get("maxrss", 1) or 1 + work_spec.nCore = queue_dict.get("corecount", 1) or 1 + work_spec.minRamCount = queue_dict.get("maxrss", 1) or 1 # case of unified queue: look at the job & resource type and queue configuration else: catchall = queue_dict.get("catchall", "") if "useMaxRam" in catchall: - # temporary workaround to debug killed workers + # some sites require to always set the maximum memory due to memory killing jobs site_corecount = queue_dict.get("corecount", 1) or 1 site_maxrss = queue_dict.get("maxrss", 1) or 1 # some cases need to overwrite those values - if "SCORE" in resource_type: - # the usual pilot streaming use case - workSpec.nCore = 1 - workSpec.minRamCount = int(math.ceil(site_maxrss / site_corecount)) + if self.rt_mapper.is_single_core_resource_type(resource_type): + work_spec.nCore = 1 + work_spec.minRamCount = int(math.ceil(site_maxrss / site_corecount)) else: # default values - workSpec.nCore = site_corecount - workSpec.minRamCount = site_maxrss + work_spec.nCore = site_corecount + work_spec.minRamCount = site_maxrss else: if not len(jobspec_list) and self.rt_mapper.is_valid_resource_type(resource_type): - # some testing PQs have ucore + pure pull, need to default to SCORE - tmpLog.warning(f'Invalid resource type "{resource_type}" (perhaps due to ucore with pure pull); default to SCORE') - resource_type = "SCORE" - workSpec.nCore, workSpec.minRamCount = self.rt_mapper.calculate_worker_requirements(resource_type, queue_dict) + # some testing PQs have ucore + pure pull, need to default to the basic 1-core resource type + tmp_log.warning(f'Invalid resource type "{resource_type}" (perhaps due to ucore with pure pull); default to the basic 1-core resource type') + resource_type = self.rt_mapper.basic_resource_type_single_core + work_spec.nCore, work_spec.minRamCount = self.rt_mapper.calculate_worker_requirements(resource_type, queue_dict) # parameters that are independent on traditional vs unified - workSpec.maxWalltime = queue_dict.get("maxtime", 1) - workSpec.maxDiskCount = queue_dict.get("maxwdir", 1) + work_spec.maxWalltime = queue_dict.get("maxtime", 1) + work_spec.maxDiskCount = queue_dict.get("maxwdir", 1) if len(jobspec_list) > 0: # get info from jobs @@ -137,8 +124,8 @@ def make_worker(self, jobspec_list, queue_config, job_type, resource_type): maxWalltime = 0 ioIntensity = 0 for jobSpec in jobspec_list: - job_corecount, job_memory = self.get_job_core_and_memory(queue_dict, jobSpec) - nCore += job_corecount + job_core_count, job_memory = self.get_job_core_and_memory(queue_dict, jobSpec) + nCore += job_core_count minRamCount += job_memory try: maxDiskCount += jobSpec.jobParams["maxDiskCount"] @@ -154,18 +141,18 @@ def make_worker(self, jobspec_list, queue_config, job_type, resource_type): pass # fill in worker attributes if (nCore > 0 and "nCore" in self.jobAttributesToUse) or is_ucore: - workSpec.nCore = nCore + work_spec.nCore = nCore if (minRamCount > 0 and "minRamCount" in self.jobAttributesToUse) or is_ucore: - workSpec.minRamCount = minRamCount + work_spec.minRamCount = minRamCount if maxDiskCount > 0 and ("maxDiskCount" in self.jobAttributesToUse or associated_params_dict.get("job_maxdiskcount") is True): - workSpec.maxDiskCount = maxDiskCount + work_spec.maxDiskCount = maxDiskCount if maxWalltime > 0 and ("maxWalltime" in self.jobAttributesToUse or associated_params_dict.get("job_maxwalltime") is True): - workSpec.maxWalltime = maxWalltime + work_spec.maxWalltime = maxWalltime if ioIntensity > 0 and ("ioIntensity" in self.jobAttributesToUse or associated_params_dict.get("job_iointensity") is True): - workSpec.ioIntensity = ioIntensity + work_spec.ioIntensity = ioIntensity - workSpec.pilotType = jobspec_list[0].get_pilot_type() - workSpec.jobType = self.get_job_type(jobspec_list[0], job_type, queue_dict) + work_spec.pilotType = jobspec_list[0].get_pilot_type() + work_spec.jobType = self.get_job_type(jobspec_list[0], job_type, queue_dict) else: # when no job @@ -175,29 +162,28 @@ def make_worker(self, jobspec_list, queue_config, job_type, resource_type): tmp_prodsourcelabel = random.choice(choice_list) fake_job = JobSpec() fake_job.jobParams = {"prodSourceLabel": tmp_prodsourcelabel} - workSpec.pilotType = fake_job.get_pilot_type() + work_spec.pilotType = fake_job.get_pilot_type() del fake_job - if workSpec.pilotType in ["RC", "ALRB", "PT"]: - tmpLog.info(f"a worker has pilotType={workSpec.pilotType}") + if work_spec.pilotType in ["RC", "ALRB", "PT"]: + tmp_log.info(f"a worker has pilotType={work_spec.pilotType}") - workSpec.jobType = self.get_job_type(None, job_type, queue_dict, tmp_prodsourcelabel) - tmpLog.debug( + work_spec.jobType = self.get_job_type(None, job_type, queue_dict, tmp_prodsourcelabel) + tmp_log.debug( "get_job_type decided for job_type: {0} (input job_type: {1}, queue_type: {2}, tmp_prodsourcelabel: {3})".format( - workSpec.jobType, job_type, queue_dict.get("type", None), tmp_prodsourcelabel + work_spec.jobType, job_type, queue_dict.get("type", None), tmp_prodsourcelabel ) ) # retrieve queue resource type - capability = queue_dict.get("capability", "") - queue_rtype = self.capability_to_rtype(capability) + queue_rtype = self.get_rtype_for_queue(queue_config) if resource_type and resource_type != "ANY": - workSpec.resourceType = resource_type + work_spec.resourceType = resource_type elif queue_rtype: - workSpec.resourceType = queue_rtype - elif workSpec.nCore == 1: - workSpec.resourceType = "SCORE" + work_spec.resourceType = queue_rtype + elif work_spec.nCore == 1: + work_spec.resourceType = self.rt_mapper.basic_resource_type_single_core else: - workSpec.resourceType = "MCORE" + work_spec.resourceType = self.rt_mapper.basic_resource_type_multi_core - return workSpec + return work_spec From 4fcbf2c1ebed6a81fdacd7387604cc699456d83f Mon Sep 17 00:00:00 2001 From: fbarreir Date: Fri, 19 Jan 2024 15:54:16 +0100 Subject: [PATCH 03/16] Removing more hardcoded resource types --- pandaharvester/harvestercore/db_proxy.py | 3 +- .../harvestercore/queue_config_mapper.py | 6 +-- .../harvestercore/resource_type_mapper.py | 42 ++++++++++++++++--- pandaharvester/harvestermisc/k8s_utils.py | 20 ++++++--- .../harvestersubmitter/htcondor_submitter.py | 11 ++++- .../harvestersubmitter/submitter_common.py | 12 +++--- .../simple_worker_maker.py | 8 ++-- 7 files changed, 74 insertions(+), 28 deletions(-) diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index a7468bf0..e752de44 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -30,6 +30,7 @@ from .seq_number_spec import SeqNumberSpec from .service_metrics_spec import ServiceMetricSpec from .work_spec import WorkSpec +from .resource_type_mapper import BASIC_RESOURCE_TYPE_SINGLE_CORE # logger _logger = core_utils.setup_logger("db_proxy") @@ -3510,7 +3511,7 @@ def get_worker_stats_bulk(self, active_ups_queues): if active_ups_queues: for ups_queue in active_ups_queues: if ups_queue not in retMap or not retMap[ups_queue] or retMap[ups_queue] == {"ANY": {}}: - retMap[ups_queue] = {"managed": {"SCORE": {"running": 0, "submitted": 0, "finished": 0, "to_submit": 0}}} + retMap[ups_queue] = {"managed": {BASIC_RESOURCE_TYPE_SINGLE_CORE: {"running": 0, "submitted": 0, "finished": 0, "to_submit": 0}}} # commit self.commit() diff --git a/pandaharvester/harvestercore/queue_config_mapper.py b/pandaharvester/harvestercore/queue_config_mapper.py index b12a0a9b..a79df1e5 100644 --- a/pandaharvester/harvestercore/queue_config_mapper.py +++ b/pandaharvester/harvestercore/queue_config_mapper.py @@ -294,9 +294,9 @@ def load_data(self, refill_table=False): return # start with self.lock: - # update timesatmp of last reload, lock with check interval - got_timesatmp_update_lock = self.dbProxy.get_process_lock("qconf_reload", "qconf_universal", self.updateInterval) - if got_timesatmp_update_lock: + # update timestamp of last reload, lock with check interval + got_timestamp_update_lock = self.dbProxy.get_process_lock("qconf_reload", "qconf_universal", self.updateInterval) + if got_timestamp_update_lock: now_ts = time.time() retVal = self._update_last_reload_time(now_ts) self.lastReload = datetime.datetime.utcfromtimestamp(now_ts) diff --git a/pandaharvester/harvestercore/resource_type_mapper.py b/pandaharvester/harvestercore/resource_type_mapper.py index b18930c1..65951d13 100644 --- a/pandaharvester/harvestercore/resource_type_mapper.py +++ b/pandaharvester/harvestercore/resource_type_mapper.py @@ -5,10 +5,16 @@ import threading from .db_proxy_pool import DBProxyPool as DBProxy +from .core_utils import SingletonWithID CRIC_RAM_TAG = 'maxrss' CRIC_CORE_TAG = 'corecount' UNIFIED_QUEUE_TAG = 'ucore' +CAPABILITY_TAG = 'capability' + +# basic resource types +BASIC_RESOURCE_TYPE_SINGLE_CORE = "SCORE" +BASIC_RESOURCE_TYPE_MULTI_CORE = "MCORE" class ResourceType(object): def __init__(self, resource_type_dict): @@ -24,10 +30,6 @@ def __init__(self, resource_type_dict): self.min_ram_per_core = resource_type_dict["minrampercore"] self.max_ram_per_core = resource_type_dict["maxrampercore"] - # basic resource_type - self.basic_resource_type_single_core = "SCORE" - self.basic_resource_type_multi_core = "MCORE" - def match(self, core_count, ram_count): """ Checks if the resource type matches the core count and ram count @@ -53,7 +55,7 @@ def match(self, core_count, ram_count): return True -class ResourceTypeMapper(object): +class ResourceTypeMapper(object, metaclass=SingletonWithID): def __init__(self): self.lock = threading.Lock() self.resource_types = {} @@ -148,6 +150,34 @@ def is_single_core_resource_type(self, resource_name): return False + def get_single_core_resource_types(self): + """ + Returns a list of resource types that are single core + :return: list of strings with the resource type names + """ + self.load_data() + # iterate over the resource types and find the ones that are single core + single_core_resource_types = [resource_name for resource_name in self.resource_types if self.is_single_core_resource_type(resource_name)] + return single_core_resource_types + + def get_multi_core_resource_types(self): + """ + Returns a list of resource types that are multi core + :return: list of strings with the resource type names + """ + self.load_data() + # iterate over the resource types and find the ones that are multi core (not single core) + single_core_resource_types = [resource_name for resource_name in self.resource_types if not self.is_single_core_resource_type(resource_name)] + return single_core_resource_types + + def get_all_resource_types(self): + """ + Returns a list with all resource types + :return: list of strings with the resource type names + """ + self.load_data() + return self.resource_types.keys() + def get_rtype_for_queue(self, queue_config): """ Returns the resource type name for a given queue configuration @@ -159,7 +189,7 @@ def get_rtype_for_queue(self, queue_config): # retrieve the queue configuration site_max_rss = queue_config.get(CRIC_RAM_TAG, 0) or 0 site_core_count = queue_config.get(CRIC_CORE_TAG, 1) or 1 - capability = queue_dict.get("capability", "") + capability = queue_config.get(CAPABILITY_TAG, "") # unified queues are not mapped to any particular resource type if capability == UNIFIED_QUEUE_TAG: diff --git a/pandaharvester/harvestermisc/k8s_utils.py b/pandaharvester/harvestermisc/k8s_utils.py index e91d1f31..5713d55c 100644 --- a/pandaharvester/harvestermisc/k8s_utils.py +++ b/pandaharvester/harvestermisc/k8s_utils.py @@ -12,6 +12,7 @@ from pandaharvester.harvesterconfig import harvester_config from pandaharvester.harvestercore import core_utils from pandaharvester.harvestermisc.info_utils_k8s import PandaQueuesDictK8s +from pandaharvester.harvestercore.resource_type_mapper import ResourceTypeMapper base_logger = core_utils.setup_logger("k8s_utils") @@ -38,6 +39,8 @@ def __init__(self, namespace, config_file=None, queue_name=None): self.namespace = namespace self.queue_name = queue_name + self.rt_mapper = ResourceTypeMapper() + def read_yaml_file(self, yaml_file): with open(yaml_file) as f: yaml_content = yaml.load(f, Loader=yaml.FullLoader) @@ -420,18 +423,25 @@ def set_affinity(self, yaml_content, use_affinity, use_anti_affinity): yaml_content["spec"]["template"]["spec"]["affinity"] = {} yaml_affinity = yaml_content["spec"]["template"]["spec"]["affinity"] - scores = ["SCORE", "SCORE_HIMEM"] - mcores = ["MCORE", "MCORE_HIMEM"] + single_core_resource_types = self.rt_mapper.get_single_core_resource_types() + multi_core_resource_types = self.rt_mapper.get_multi_core_resource_types() + all_resource_types = self.rt_mapper.get_all_resource_types() - anti_affinity_matrix = {"SCORE": mcores, "SCORE_HIMEM": mcores, "MCORE": scores, "MCORE_HIMEM": scores} + # create the anti-affinity matrix for higher single and multi core separation + anti_affinity_matrix = {} + for tmp_type in single_core_resource_types: + anti_affinity_matrix[tmp_type] = multi_core_resource_types + for tmp_type in multi_core_resource_types: + anti_affinity_matrix[tmp_type] = single_core_resource_types + # create the affinity spec affinity_spec = { "preferredDuringSchedulingIgnoredDuringExecution": [ { "weight": 100, "podAffinityTerm": { "labelSelector": { - "matchExpressions": [{"key": "resourceType", "operator": "In", "values": ["SCORE", "SCORE_HIMEM", "MCORE", "MCORE_HIMEM"]}] + "matchExpressions": [{"key": "resourceType", "operator": "In", "values": all_resource_types}] }, "topologyKey": "kubernetes.io/hostname", }, @@ -441,7 +451,7 @@ def set_affinity(self, yaml_content, use_affinity, use_anti_affinity): resource_type = yaml_content["spec"]["template"]["metadata"]["labels"]["resourceType"] - if use_affinity and resource_type in scores: + if use_affinity and resource_type in single_core_resource_types: # resource type SCORE* should attract each other instead of spreading across the nodes yaml_affinity["podAffinity"] = copy.deepcopy(affinity_spec) diff --git a/pandaharvester/harvestersubmitter/htcondor_submitter.py b/pandaharvester/harvestersubmitter/htcondor_submitter.py index aa129ab4..05ab67fe 100644 --- a/pandaharvester/harvestersubmitter/htcondor_submitter.py +++ b/pandaharvester/harvestersubmitter/htcondor_submitter.py @@ -14,6 +14,7 @@ from pandaharvester.harvestercore import core_utils from pandaharvester.harvestercore.plugin_base import PluginBase from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper +from pandaharvester.harvestercore.resource_type_mapper import ResourceTypeMapper from pandaharvester.harvestermisc.htcondor_utils import ( CondorJobSubmit, get_job_id_tuple_from_batchid, @@ -240,6 +241,11 @@ def make_a_jdl( tmpLog.warning(f"token_path is None: site={panda_queue_name}, token_dir={token_dir} , token_filename={token_filename}") # open tmpfile as submit description file tmpFile = tempfile.NamedTemporaryFile(mode="w", delete=False, suffix="_submit.sdf", dir=workspec.get_access_point()) + + # instance of resource type mapper + rt_mapper = ResourceTypeMapper() + all_resource_types = rt_mapper.get_all_resource_types() + # placeholder map placeholder_map = { "sdfPath": tmpFile.name, @@ -274,8 +280,8 @@ def make_a_jdl( "gtag": batch_log_dict.get("gtag", "fake_GTAG_string"), "prodSourceLabel": prod_source_label, "jobType": workspec.jobType, - "resourceType": submitter_common.get_resource_type(workspec.resourceType, is_unified_queue), - "pilotResourceTypeOption": submitter_common.get_resource_type(workspec.resourceType, is_unified_queue, True), + "resourceType": submitter_common.get_resource_type(workspec.resourceType, is_unified_queue, all_resource_types), + "pilotResourceTypeOption": submitter_common.get_resource_type(workspec.resourceType, is_unified_queue, all_resource_types, is_pilot_option=True), "ioIntensity": io_intensity, "pilotType": pilot_type_opt, "pilotUrlOption": pilot_url_str, @@ -335,6 +341,7 @@ def __init__(self, **kwarg): else: self.hostname = socket.gethostname().split(".")[0] PluginBase.__init__(self, **kwarg) + # number of processes try: self.nProcesses diff --git a/pandaharvester/harvestersubmitter/submitter_common.py b/pandaharvester/harvestersubmitter/submitter_common.py index f1d28e50..f5e19b40 100644 --- a/pandaharvester/harvestersubmitter/submitter_common.py +++ b/pandaharvester/harvestersubmitter/submitter_common.py @@ -90,17 +90,15 @@ def get_pilot_job_type(job_type, is_unified_dispatch=False): # Parse resource type from string for Unified PanDA Queue - - -def get_resource_type(string, is_unified_queue, is_pilot_option=False): - string = str(string) +def get_resource_type(resource_type_name, is_unified_queue, all_resource_types, is_pilot_option=False): + resource_type_name = str(resource_type_name) if not is_unified_queue: ret = "" - elif string in set(["SCORE", "MCORE", "SCORE_HIMEM", "MCORE_HIMEM"]): + elif resource_type_name in set(all_resource_types): if is_pilot_option: - ret = f"--resource-type {string}" + ret = f"--resource-type {resource_type_name}" else: - ret = string + ret = resource_type_name else: ret = "" return ret diff --git a/pandaharvester/harvesterworkermaker/simple_worker_maker.py b/pandaharvester/harvesterworkermaker/simple_worker_maker.py index 656f115a..77a88b2d 100644 --- a/pandaharvester/harvesterworkermaker/simple_worker_maker.py +++ b/pandaharvester/harvesterworkermaker/simple_worker_maker.py @@ -6,7 +6,7 @@ from pandaharvester.harvestercore import core_utils from pandaharvester.harvestercore.job_spec import JobSpec -from pandaharvester.harvestercore.resource_type_mapper import ResourceTypeMapper +from pandaharvester.harvestercore.resource_type_mapper import ResourceTypeMapper, BASIC_RESOURCE_TYPE_SINGLE_CORE, BASIC_RESOURCE_TYPE_MULTI_CORE from pandaharvester.harvestercore.work_spec import WorkSpec from pandaharvester.harvestermisc.info_utils import PandaQueuesDict @@ -109,7 +109,7 @@ def make_worker(self, jobspec_list, queue_config, job_type, resource_type): if not len(jobspec_list) and self.rt_mapper.is_valid_resource_type(resource_type): # some testing PQs have ucore + pure pull, need to default to the basic 1-core resource type tmp_log.warning(f'Invalid resource type "{resource_type}" (perhaps due to ucore with pure pull); default to the basic 1-core resource type') - resource_type = self.rt_mapper.basic_resource_type_single_core + resource_type = BASIC_RESOURCE_TYPE_SINGLE_CORE work_spec.nCore, work_spec.minRamCount = self.rt_mapper.calculate_worker_requirements(resource_type, queue_dict) # parameters that are independent on traditional vs unified @@ -182,8 +182,8 @@ def make_worker(self, jobspec_list, queue_config, job_type, resource_type): elif queue_rtype: work_spec.resourceType = queue_rtype elif work_spec.nCore == 1: - work_spec.resourceType = self.rt_mapper.basic_resource_type_single_core + work_spec.resourceType = BASIC_RESOURCE_TYPE_SINGLE_CORE else: - work_spec.resourceType = self.rt_mapper.basic_resource_type_multi_core + work_spec.resourceType = BASIC_RESOURCE_TYPE_MULTI_CORE return work_spec From 226f8c0cfab3e4b461a51de616693f6883d2c5e3 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Wed, 24 Jan 2024 14:24:28 +0100 Subject: [PATCH 04/16] Breaking down resource_type_mapper.py to avoid circular imports --- pandaharvester/harvestercore/db_proxy.py | 2 +- pandaharvester/harvestercore/resource_type_mapper.py | 9 +-------- .../harvesterworkermaker/simple_worker_maker.py | 3 ++- 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index e752de44..a9767923 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -30,7 +30,7 @@ from .seq_number_spec import SeqNumberSpec from .service_metrics_spec import ServiceMetricSpec from .work_spec import WorkSpec -from .resource_type_mapper import BASIC_RESOURCE_TYPE_SINGLE_CORE +from .resource_type_constants import BASIC_RESOURCE_TYPE_SINGLE_CORE # logger _logger = core_utils.setup_logger("db_proxy") diff --git a/pandaharvester/harvestercore/resource_type_mapper.py b/pandaharvester/harvestercore/resource_type_mapper.py index 65951d13..e36a8e58 100644 --- a/pandaharvester/harvestercore/resource_type_mapper.py +++ b/pandaharvester/harvestercore/resource_type_mapper.py @@ -7,14 +7,7 @@ from .db_proxy_pool import DBProxyPool as DBProxy from .core_utils import SingletonWithID -CRIC_RAM_TAG = 'maxrss' -CRIC_CORE_TAG = 'corecount' -UNIFIED_QUEUE_TAG = 'ucore' -CAPABILITY_TAG = 'capability' - -# basic resource types -BASIC_RESOURCE_TYPE_SINGLE_CORE = "SCORE" -BASIC_RESOURCE_TYPE_MULTI_CORE = "MCORE" +from .resource_type_constants import CRIC_RAM_TAG, CRIC_CORE_TAG, UNIFIED_QUEUE_TAG, CAPABILITY_TAG class ResourceType(object): def __init__(self, resource_type_dict): diff --git a/pandaharvester/harvesterworkermaker/simple_worker_maker.py b/pandaharvester/harvesterworkermaker/simple_worker_maker.py index 77a88b2d..10f49683 100644 --- a/pandaharvester/harvesterworkermaker/simple_worker_maker.py +++ b/pandaharvester/harvesterworkermaker/simple_worker_maker.py @@ -6,7 +6,8 @@ from pandaharvester.harvestercore import core_utils from pandaharvester.harvestercore.job_spec import JobSpec -from pandaharvester.harvestercore.resource_type_mapper import ResourceTypeMapper, BASIC_RESOURCE_TYPE_SINGLE_CORE, BASIC_RESOURCE_TYPE_MULTI_CORE +from pandaharvester.harvestercore.resource_type_mapper import ResourceTypeMapper +from pandaharvester.harvestercore.resource_type_constants import BASIC_RESOURCE_TYPE_SINGLE_CORE, BASIC_RESOURCE_TYPE_MULTI_CORE from pandaharvester.harvestercore.work_spec import WorkSpec from pandaharvester.harvestermisc.info_utils import PandaQueuesDict From 49b979d4fa6a3ca36ecaf9b7baa5e063f90d71d0 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Wed, 24 Jan 2024 14:25:52 +0100 Subject: [PATCH 05/16] Breaking down circular imports --- pandaharvester/harvestercore/resource_type_constants.py | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 pandaharvester/harvestercore/resource_type_constants.py diff --git a/pandaharvester/harvestercore/resource_type_constants.py b/pandaharvester/harvestercore/resource_type_constants.py new file mode 100644 index 00000000..d281f13e --- /dev/null +++ b/pandaharvester/harvestercore/resource_type_constants.py @@ -0,0 +1,8 @@ +CRIC_RAM_TAG = 'maxrss' +CRIC_CORE_TAG = 'corecount' +UNIFIED_QUEUE_TAG = 'ucore' +CAPABILITY_TAG = 'capability' + +# basic resource types +BASIC_RESOURCE_TYPE_SINGLE_CORE = "SCORE" +BASIC_RESOURCE_TYPE_MULTI_CORE = "MCORE" \ No newline at end of file From 1c2c50077f8888c475de6e9efdfd8b9d747b99e5 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Wed, 24 Jan 2024 17:09:49 +0100 Subject: [PATCH 06/16] Typo --- pandaharvester/harvesterworkermaker/simple_worker_maker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pandaharvester/harvesterworkermaker/simple_worker_maker.py b/pandaharvester/harvesterworkermaker/simple_worker_maker.py index 10f49683..2d5a99ba 100644 --- a/pandaharvester/harvesterworkermaker/simple_worker_maker.py +++ b/pandaharvester/harvesterworkermaker/simple_worker_maker.py @@ -175,8 +175,8 @@ def make_worker(self, jobspec_list, queue_config, job_type, resource_type): ) ) - # retrieve queue resource type - queue_rtype = self.get_rtype_for_queue(queue_config) + # retrieve queue resource types + queue_rtype = self.rt_mapper.get_rtype_for_queue(queue_config) if resource_type and resource_type != "ANY": work_spec.resourceType = resource_type From f993d10af3fceb68abdddb6d308a1d0af96b17f2 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Wed, 24 Jan 2024 17:58:13 +0100 Subject: [PATCH 07/16] Typos --- pandaharvester/harvestercore/resource_type_mapper.py | 11 ++++------- .../harvesterworkermaker/simple_worker_maker.py | 7 +++++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pandaharvester/harvestercore/resource_type_mapper.py b/pandaharvester/harvestercore/resource_type_mapper.py index e36a8e58..bd141707 100644 --- a/pandaharvester/harvestercore/resource_type_mapper.py +++ b/pandaharvester/harvestercore/resource_type_mapper.py @@ -171,19 +171,16 @@ def get_all_resource_types(self): self.load_data() return self.resource_types.keys() - def get_rtype_for_queue(self, queue_config): + def calculate_rtype(self, capability, site_core_count, site_max_rss): """ Returns the resource type name for a given queue configuration - :param queue_config: queue configuration + :param capability: string with the queue capability (e.g. "ucore") + :param site_core_count: number of cores + :param site_max_rss: amount of memory, NOT normalized by number of cores :return: string with the resource type name """ self.load_data() - # retrieve the queue configuration - site_max_rss = queue_config.get(CRIC_RAM_TAG, 0) or 0 - site_core_count = queue_config.get(CRIC_CORE_TAG, 1) or 1 - capability = queue_config.get(CAPABILITY_TAG, "") - # unified queues are not mapped to any particular resource type if capability == UNIFIED_QUEUE_TAG: return '' diff --git a/pandaharvester/harvesterworkermaker/simple_worker_maker.py b/pandaharvester/harvesterworkermaker/simple_worker_maker.py index 2d5a99ba..c35c203b 100644 --- a/pandaharvester/harvesterworkermaker/simple_worker_maker.py +++ b/pandaharvester/harvesterworkermaker/simple_worker_maker.py @@ -175,8 +175,11 @@ def make_worker(self, jobspec_list, queue_config, job_type, resource_type): ) ) - # retrieve queue resource types - queue_rtype = self.rt_mapper.get_rtype_for_queue(queue_config) + # retrieve resource types based on queue configuration + capability = queue_dict.get(CAPABILITY_TAG, "") + site_core_count = queue_dict.get(CRIC_CORE_TAG, 1) or 1 + site_max_rss = queue_dict.get(CRIC_RAM_TAG, 0) or 0 + queue_rtype = self.rt_mapper.calculate_rtype(capability, site_core_count, site_max_rss) if resource_type and resource_type != "ANY": work_spec.resourceType = resource_type From fbb33b8d74ecdd046e3b7c5dd461b3640675c338 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Wed, 24 Jan 2024 18:05:07 +0100 Subject: [PATCH 08/16] More typos --- .../harvestercore/resource_type_mapper.py | 20 +++++++++++-------- .../simple_worker_maker.py | 7 ++----- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/pandaharvester/harvestercore/resource_type_mapper.py b/pandaharvester/harvestercore/resource_type_mapper.py index bd141707..75b52740 100644 --- a/pandaharvester/harvestercore/resource_type_mapper.py +++ b/pandaharvester/harvestercore/resource_type_mapper.py @@ -9,6 +9,7 @@ from .resource_type_constants import CRIC_RAM_TAG, CRIC_CORE_TAG, UNIFIED_QUEUE_TAG, CAPABILITY_TAG + class ResourceType(object): def __init__(self, resource_type_dict): """ @@ -90,7 +91,7 @@ def is_valid_resource_type(self, resource_name): return True return False - def calculate_worker_requirements(self, resource_name, queue_config): + def calculate_worker_requirements(self, resource_name, queue_dict): """ Calculates worker requirements (cores and memory) to request in pilot streaming mode/unified pull queue """ @@ -103,10 +104,10 @@ def calculate_worker_requirements(self, resource_name, queue_config): resource_type = self.resource_types[resource_name] # retrieve the queue configuration - site_max_rss = queue_config.get(CRIC_RAM_TAG, 0) or 0 - site_core_count = queue_config.get(CRIC_CORE_TAG, 1) or 1 + site_max_rss = queue_dict.get(CRIC_RAM_TAG, 0) or 0 + site_core_count = queue_dict.get(CRIC_CORE_TAG, 1) or 1 - unified_queue = queue_config.get("capability", "") == UNIFIED_QUEUE_TAG + unified_queue = queue_dict.get(CAPABILITY_TAG, "") == UNIFIED_QUEUE_TAG if not unified_queue: # site is not unified, just request whatever is configured in AGIS return site_max_rss, site_core_count @@ -171,16 +172,19 @@ def get_all_resource_types(self): self.load_data() return self.resource_types.keys() - def calculate_rtype(self, capability, site_core_count, site_max_rss): + def get_rtype_for_queue(self, queue_dict): """ Returns the resource type name for a given queue configuration - :param capability: string with the queue capability (e.g. "ucore") - :param site_core_count: number of cores - :param site_max_rss: amount of memory, NOT normalized by number of cores + :param queue_dict: queue configuration :return: string with the resource type name """ self.load_data() + # retrieve the queue configuration + site_max_rss = queue_dict.get(CRIC_RAM_TAG, 0) or 0 + site_core_count = queue_dict.get(CRIC_CORE_TAG, 1) or 1 + capability = queue_dict.get(CAPABILITY_TAG, "") + # unified queues are not mapped to any particular resource type if capability == UNIFIED_QUEUE_TAG: return '' diff --git a/pandaharvester/harvesterworkermaker/simple_worker_maker.py b/pandaharvester/harvesterworkermaker/simple_worker_maker.py index c35c203b..d720058b 100644 --- a/pandaharvester/harvesterworkermaker/simple_worker_maker.py +++ b/pandaharvester/harvesterworkermaker/simple_worker_maker.py @@ -175,11 +175,8 @@ def make_worker(self, jobspec_list, queue_config, job_type, resource_type): ) ) - # retrieve resource types based on queue configuration - capability = queue_dict.get(CAPABILITY_TAG, "") - site_core_count = queue_dict.get(CRIC_CORE_TAG, 1) or 1 - site_max_rss = queue_dict.get(CRIC_RAM_TAG, 0) or 0 - queue_rtype = self.rt_mapper.calculate_rtype(capability, site_core_count, site_max_rss) + # retrieve queue resource types + queue_rtype = self.rt_mapper.get_rtype_for_queue(queue_dict) if resource_type and resource_type != "ANY": work_spec.resourceType = resource_type From 22b232a099aa5ff70b7b0e7238bfc7075a9b8937 Mon Sep 17 00:00:00 2001 From: Fernando Harald Barreiro Megino Date: Wed, 7 Feb 2024 15:04:24 +0100 Subject: [PATCH 09/16] Changing wrapper command --- pandaharvester/harvestermisc/k8s_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pandaharvester/harvestermisc/k8s_utils.py b/pandaharvester/harvestermisc/k8s_utils.py index 5713d55c..f8625412 100644 --- a/pandaharvester/harvestermisc/k8s_utils.py +++ b/pandaharvester/harvestermisc/k8s_utils.py @@ -23,6 +23,7 @@ # command and image defaults DEF_COMMAND = ["/usr/bin/bash"] DEF_ARGS = ["-c", "cd; python $EXEC_DIR/pilots_starter.py || true"] +DEF_ARGS = ["-c", "cd; command -v python3 >/dev/null && python3 $EXEC_DIR/pilots_starter.py || python $EXEC_DIR/pilots_starter.py || true"] DEF_IMAGE = "atlasadc/atlas-grid-centos7" From 608f7413ca3fac2b4196e9690e417f5a5f254b5b Mon Sep 17 00:00:00 2001 From: Fernando Harald Barreiro Megino Date: Wed, 20 Mar 2024 15:29:42 +0100 Subject: [PATCH 10/16] wrong variable names --- .../harvesterworkermaker/simple_worker_maker.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pandaharvester/harvesterworkermaker/simple_worker_maker.py b/pandaharvester/harvesterworkermaker/simple_worker_maker.py index 3a1d4d57..08ee9404 100644 --- a/pandaharvester/harvesterworkermaker/simple_worker_maker.py +++ b/pandaharvester/harvesterworkermaker/simple_worker_maker.py @@ -4,8 +4,11 @@ from pandaharvester.harvestercore import core_utils from pandaharvester.harvestercore.job_spec import JobSpec +from pandaharvester.harvestercore.resource_type_constants import ( + BASIC_RESOURCE_TYPE_MULTI_CORE, + BASIC_RESOURCE_TYPE_SINGLE_CORE, +) from pandaharvester.harvestercore.resource_type_mapper import ResourceTypeMapper -from pandaharvester.harvestercore.resource_type_constants import BASIC_RESOURCE_TYPE_SINGLE_CORE, BASIC_RESOURCE_TYPE_MULTI_CORE from pandaharvester.harvestercore.work_spec import WorkSpec from pandaharvester.harvestermisc.info_utils import PandaQueuesDict @@ -74,8 +77,8 @@ def make_worker(self, jobspec_list, queue_config, job_type, resource_type): tmp_log.debug(f"jobspec_list: {jobspec_list}") - workSpec = WorkSpec() - workSpec.creationTime = core_utils.naive_utcnow() + work_spec = WorkSpec() + work_spec.creationTime = core_utils.naive_utcnow() # get the queue configuration from CRIC panda_queues_dict = PandaQueuesDict() @@ -140,9 +143,9 @@ def make_worker(self, jobspec_list, queue_config, job_type, resource_type): pass # fill in worker attributes if is_ucore or (nCore > 0 and "nCore" in self.jobAttributesToUse): - workSpec.nCore = nCore + work_spec.nCore = nCore if is_ucore or (minRamCount > 0 and ("minRamCount" in self.jobAttributesToUse or associated_params_dict.get("job_minramcount") is True)): - workSpec.minRamCount = minRamCount + work_spec.minRamCount = minRamCount if maxDiskCount > 0 and ("maxDiskCount" in self.jobAttributesToUse or associated_params_dict.get("job_maxdiskcount") is True): work_spec.maxDiskCount = maxDiskCount if maxWalltime > 0 and ("maxWalltime" in self.jobAttributesToUse or associated_params_dict.get("job_maxwalltime") is True): From 22bbe9f537f5d2b533f871463024ea77446e3dbc Mon Sep 17 00:00:00 2001 From: Fernando Harald Barreiro Megino Date: Wed, 20 Mar 2024 15:43:17 +0100 Subject: [PATCH 11/16] Unused import --- pandaharvester/harvesterworkermaker/simple_worker_maker.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pandaharvester/harvesterworkermaker/simple_worker_maker.py b/pandaharvester/harvesterworkermaker/simple_worker_maker.py index 08ee9404..d3f31549 100644 --- a/pandaharvester/harvesterworkermaker/simple_worker_maker.py +++ b/pandaharvester/harvesterworkermaker/simple_worker_maker.py @@ -1,4 +1,3 @@ -import datetime import math import random From 66c8a00a49da13c95e48d9e4612a467165bf7965 Mon Sep 17 00:00:00 2001 From: Fernando Harald Barreiro Megino Date: Wed, 20 Mar 2024 16:47:39 +0100 Subject: [PATCH 12/16] Corrections --- .../harvestercore/resource_type_mapper.py | 26 ++++++++++++------- .../harvestersubmitter/k8s_submitter.py | 6 ++--- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/pandaharvester/harvestercore/resource_type_mapper.py b/pandaharvester/harvestercore/resource_type_mapper.py index 7f6e9601..8249ad67 100644 --- a/pandaharvester/harvestercore/resource_type_mapper.py +++ b/pandaharvester/harvestercore/resource_type_mapper.py @@ -4,10 +4,14 @@ from pandaharvester.harvestercore import core_utils -from .db_proxy_pool import DBProxyPool as DBProxy from .core_utils import SingletonWithID - -from .resource_type_constants import CRIC_RAM_TAG, CRIC_CORE_TAG, UNIFIED_QUEUE_TAG, CAPABILITY_TAG +from .db_proxy_pool import DBProxyPool as DBProxy +from .resource_type_constants import ( + CAPABILITY_TAG, + CRIC_CORE_TAG, + CRIC_RAM_TAG, + UNIFIED_QUEUE_TAG, +) class ResourceType(object): @@ -40,10 +44,12 @@ def match(self, core_count, ram_count): ram_per_core = ram_count / core_count # check if the resource type matches the core count and ram count - if (self.max_core and core_count > self.max_core) or \ - (self.min_core and core_count < self.min_core) or \ - (self.max_ram_per_core and ram_per_core > self.max_ram_per_core) or \ - (self.min_ram_per_core and ram_per_core < self.min_ram_per_core): + if ( + (self.max_core and core_count > self.max_core) + or (self.min_core and core_count < self.min_core) + or (self.max_ram_per_core and ram_per_core > self.max_ram_per_core) + or (self.min_ram_per_core and ram_per_core < self.min_ram_per_core) + ): return False return True @@ -170,7 +176,7 @@ def get_all_resource_types(self): :return: list of strings with the resource type names """ self.load_data() - return self.resource_types.keys() + return list(self.resource_types.keys()) def get_rtype_for_queue(self, queue_dict): """ @@ -187,7 +193,7 @@ def get_rtype_for_queue(self, queue_dict): # unified queues are not mapped to any particular resource type if capability == UNIFIED_QUEUE_TAG: - return '' + return "" # loop over the resource types and find the one that matches the queue configuration for resource_name, resource_type in self.resource_types.items(): @@ -195,4 +201,4 @@ def get_rtype_for_queue(self, queue_dict): return resource_name # no match found - return '' + return "" diff --git a/pandaharvester/harvestersubmitter/k8s_submitter.py b/pandaharvester/harvestersubmitter/k8s_submitter.py index bf762be5..928d2333 100644 --- a/pandaharvester/harvestersubmitter/k8s_submitter.py +++ b/pandaharvester/harvestersubmitter/k8s_submitter.py @@ -98,9 +98,7 @@ def submit_k8s_worker(self, work_spec): cert = self._choose_proxy(work_spec, is_grandly_unified_queue) if not cert: err_str = "No proxy specified in proxySecretPath. Not submitted" - tmp_return_value = (False, err_str) - return tmp_return_value - + return False, err_str # get the walltime limit try: max_time = this_panda_queue_dict["maxtime"] @@ -153,7 +151,7 @@ def submit_workers(self, workspec_list): n_workers = len(workspec_list) tmp_log.debug(f"start, n_workers={n_workers}") - ret_list = list() + ret_list = [] if not workspec_list: tmp_log.debug("empty workspec_list") return ret_list From c04bb3e971660aefed25b05c9cd576ecf73704ef Mon Sep 17 00:00:00 2001 From: Fernando Harald Barreiro Megino Date: Wed, 20 Mar 2024 16:58:30 +0100 Subject: [PATCH 13/16] Removed hardcoded resource types from submitterTest.py --- pandaharvester/harvestertest/submitterTest.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/pandaharvester/harvestertest/submitterTest.py b/pandaharvester/harvestertest/submitterTest.py index e19e8bef..1d2c1dd2 100644 --- a/pandaharvester/harvestertest/submitterTest.py +++ b/pandaharvester/harvestertest/submitterTest.py @@ -5,6 +5,10 @@ from pandaharvester.harvestercore.job_spec import JobSpec from pandaharvester.harvestercore.plugin_factory import PluginFactory from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper +from pandaharvester.harvestercore.resource_type_constants import ( + BASIC_RESOURCE_TYPE_SINGLE_CORE, +) +from pandaharvester.harvestercore.resource_type_mapper import ResourceTypeMapper from pandaharvester.harvestercore.work_spec import WorkSpec from pandaharvester.harvestermisc import signal_utils @@ -16,19 +20,21 @@ if len(sys.argv) not in (2, 4): print("Wrong number of parameters. You can either:") print(" - specify the queue name") - print(" - specify the queue name, jobType (managed, user) and resourceType (SCORE, SCORE_HIMEM, MCORE, MCORE_HIMEM)") + print(" - specify the queue name, jobType (managed, user) and resourceType (e.g. SCORE, SCORE_HIMEM, MCORE, MCORE_HIMEM)") sys.exit(0) queueName = sys.argv[1] queueConfigMapper = QueueConfigMapper() queueConfig = queueConfigMapper.get_queue(queueName) + resource_type_mapper = ResourceTypeMapper() + if queueConfig.prodSourceLabel in ("user", "managed"): jobType = queueConfig.prodSourceLabel else: jobType = "managed" # default, can be overwritten by parameters - resourceType = "SCORE" # default, can be overwritten by parameters + resourceType = BASIC_RESOURCE_TYPE_SINGLE_CORE # default, can be overwritten by parameters if len(sys.argv) == 4: # jobType should be 'managed' or 'user'. If not specified will default to a production job @@ -37,8 +43,8 @@ else: print(f"value for jobType not valid, defaulted to {jobType}") - # resourceType should be 'SCORE', 'SCORE_HIMEM', 'MCORE', 'MCORE_HIMEM'. If not specified defaults to single core - if sys.argv[3] in ("SCORE", "SCORE_HIMEM", "MCORE", "MCORE_HIMEM"): + # resourceType should be a valid resource type, e.g. 'SCORE', 'SCORE_HIMEM', 'MCORE', 'MCORE_HIMEM'. If not specified defaults to single core + if resource_type_mapper.is_valid_resource_type(sys.argv[3]): resourceType = sys.argv[3] else: print(f"value for resourceType not valid, defaulted to {resourceType}") From e82d7a00a99311c6ae08ea581c189caf2ea0bcb9 Mon Sep 17 00:00:00 2001 From: Fernando Harald Barreiro Megino Date: Thu, 21 Mar 2024 14:42:43 +0100 Subject: [PATCH 14/16] Debug --- pandaharvester/harvesterworkermaker/simple_worker_maker.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pandaharvester/harvesterworkermaker/simple_worker_maker.py b/pandaharvester/harvesterworkermaker/simple_worker_maker.py index d3f31549..58421a67 100644 --- a/pandaharvester/harvesterworkermaker/simple_worker_maker.py +++ b/pandaharvester/harvesterworkermaker/simple_worker_maker.py @@ -109,6 +109,7 @@ def make_worker(self, jobspec_list, queue_config, job_type, resource_type): else: if not len(jobspec_list) and self.rt_mapper.is_valid_resource_type(resource_type): # some testing PQs have ucore + pure pull, need to default to the basic 1-core resource type + tmp_log.warning(f"{resource_type} -> {type(resource_type)}, {self.rt_mapper.resource_types}") tmp_log.warning(f'Invalid resource type "{resource_type}" (perhaps due to ucore with pure pull); default to the basic 1-core resource type') resource_type = BASIC_RESOURCE_TYPE_SINGLE_CORE work_spec.nCore, work_spec.minRamCount = self.rt_mapper.calculate_worker_requirements(resource_type, queue_dict) From ee352eb8c83f620b5c8e99bb7217ec5279d8b235 Mon Sep 17 00:00:00 2001 From: Fernando Harald Barreiro Megino Date: Thu, 21 Mar 2024 14:44:45 +0100 Subject: [PATCH 15/16] Debug --- pandaharvester/harvesterworkermaker/simple_worker_maker.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pandaharvester/harvesterworkermaker/simple_worker_maker.py b/pandaharvester/harvesterworkermaker/simple_worker_maker.py index 58421a67..a5119a5c 100644 --- a/pandaharvester/harvesterworkermaker/simple_worker_maker.py +++ b/pandaharvester/harvesterworkermaker/simple_worker_maker.py @@ -109,7 +109,9 @@ def make_worker(self, jobspec_list, queue_config, job_type, resource_type): else: if not len(jobspec_list) and self.rt_mapper.is_valid_resource_type(resource_type): # some testing PQs have ucore + pure pull, need to default to the basic 1-core resource type - tmp_log.warning(f"{resource_type} -> {type(resource_type)}, {self.rt_mapper.resource_types}") + tmp_log.warning( + f"{resource_type} -> {type(resource_type)}, {self.rt_mapper.resource_types}, {resource_type in self.rt_mapper.resource_types}" + ) tmp_log.warning(f'Invalid resource type "{resource_type}" (perhaps due to ucore with pure pull); default to the basic 1-core resource type') resource_type = BASIC_RESOURCE_TYPE_SINGLE_CORE work_spec.nCore, work_spec.minRamCount = self.rt_mapper.calculate_worker_requirements(resource_type, queue_dict) From b488caf8eb2b0725ac17a718c724ccb943e98814 Mon Sep 17 00:00:00 2001 From: Fernando Harald Barreiro Megino Date: Thu, 21 Mar 2024 14:50:11 +0100 Subject: [PATCH 16/16] Fixed wrong condition --- pandaharvester/harvesterworkermaker/simple_worker_maker.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pandaharvester/harvesterworkermaker/simple_worker_maker.py b/pandaharvester/harvesterworkermaker/simple_worker_maker.py index a5119a5c..767f4eda 100644 --- a/pandaharvester/harvesterworkermaker/simple_worker_maker.py +++ b/pandaharvester/harvesterworkermaker/simple_worker_maker.py @@ -107,11 +107,8 @@ def make_worker(self, jobspec_list, queue_config, job_type, resource_type): work_spec.nCore = site_corecount work_spec.minRamCount = site_maxrss else: - if not len(jobspec_list) and self.rt_mapper.is_valid_resource_type(resource_type): + if not len(jobspec_list) and not self.rt_mapper.is_valid_resource_type(resource_type): # some testing PQs have ucore + pure pull, need to default to the basic 1-core resource type - tmp_log.warning( - f"{resource_type} -> {type(resource_type)}, {self.rt_mapper.resource_types}, {resource_type in self.rt_mapper.resource_types}" - ) tmp_log.warning(f'Invalid resource type "{resource_type}" (perhaps due to ucore with pure pull); default to the basic 1-core resource type') resource_type = BASIC_RESOURCE_TYPE_SINGLE_CORE work_spec.nCore, work_spec.minRamCount = self.rt_mapper.calculate_worker_requirements(resource_type, queue_dict)