diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index 342d3a5d..91e524b9 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -29,6 +29,7 @@ from .seq_number_spec import SeqNumberSpec from .service_metrics_spec import ServiceMetricSpec from .work_spec import WorkSpec +from .resource_type_constants import BASIC_RESOURCE_TYPE_SINGLE_CORE # logger _logger = core_utils.setup_logger("db_proxy") @@ -3511,7 +3512,7 @@ def get_worker_stats_bulk(self, active_ups_queues): if active_ups_queues: for ups_queue in active_ups_queues: if ups_queue not in retMap or not retMap[ups_queue] or retMap[ups_queue] == {"ANY": {}}: - retMap[ups_queue] = {"managed": {"SCORE": {"running": 0, "submitted": 0, "finished": 0, "to_submit": 0}}} + retMap[ups_queue] = {"managed": {BASIC_RESOURCE_TYPE_SINGLE_CORE: {"running": 0, "submitted": 0, "finished": 0, "to_submit": 0}}} # commit self.commit() diff --git a/pandaharvester/harvestercore/resource_type_constants.py b/pandaharvester/harvestercore/resource_type_constants.py new file mode 100644 index 00000000..d281f13e --- /dev/null +++ b/pandaharvester/harvestercore/resource_type_constants.py @@ -0,0 +1,8 @@ +CRIC_RAM_TAG = 'maxrss' +CRIC_CORE_TAG = 'corecount' +UNIFIED_QUEUE_TAG = 'ucore' +CAPABILITY_TAG = 'capability' + +# basic resource types +BASIC_RESOURCE_TYPE_SINGLE_CORE = "SCORE" +BASIC_RESOURCE_TYPE_MULTI_CORE = "MCORE" \ No newline at end of file diff --git a/pandaharvester/harvestercore/resource_type_mapper.py b/pandaharvester/harvestercore/resource_type_mapper.py index 5d3799ee..8249ad67 100644 --- a/pandaharvester/harvestercore/resource_type_mapper.py +++ b/pandaharvester/harvestercore/resource_type_mapper.py @@ -4,7 +4,14 @@ from pandaharvester.harvestercore import core_utils +from .core_utils import SingletonWithID from .db_proxy_pool import DBProxyPool as DBProxy +from .resource_type_constants import ( + CAPABILITY_TAG, + CRIC_CORE_TAG, + CRIC_RAM_TAG, + UNIFIED_QUEUE_TAG, +) class ResourceType(object): @@ -21,8 +28,34 @@ def __init__(self, resource_type_dict): self.min_ram_per_core = resource_type_dict["minrampercore"] self.max_ram_per_core = resource_type_dict["maxrampercore"] + def match(self, core_count, ram_count): + """ + Checks if the resource type matches the core count and ram count + :param core_count: number of cores + :param ram_count: amount of memory + :return: boolean + """ + + # basic validation that the values are not None or 0 + if not core_count or not ram_count: + return False + + # normalize ram count + ram_per_core = ram_count / core_count -class ResourceTypeMapper(object): + # check if the resource type matches the core count and ram count + if ( + (self.max_core and core_count > self.max_core) + or (self.min_core and core_count < self.min_core) + or (self.max_ram_per_core and ram_per_core > self.max_ram_per_core) + or (self.min_ram_per_core and ram_per_core < self.min_ram_per_core) + ): + return False + + return True + + +class ResourceTypeMapper(object, metaclass=SingletonWithID): def __init__(self): self.lock = threading.Lock() self.resource_types = {} @@ -56,14 +89,15 @@ def load_data(self): def is_valid_resource_type(self, resource_name): """ Checks if the resource type is valid (exists in the dictionary of resource types) - :param resource_name: string with the resource type name (e.g. SCORE, SCORE_HIMEM,...) + :param resource_name: string with the resource type name :return: boolean """ + self.load_data() if resource_name in self.resource_types: return True return False - def calculate_worker_requirements(self, resource_name, queue_config): + def calculate_worker_requirements(self, resource_name, queue_dict): """ Calculates worker requirements (cores and memory) to request in pilot streaming mode/unified pull queue """ @@ -76,10 +110,10 @@ def calculate_worker_requirements(self, resource_name, queue_config): resource_type = self.resource_types[resource_name] # retrieve the queue configuration - site_max_rss = queue_config.get("maxrss", 0) or 0 - site_core_count = queue_config.get("corecount", 1) or 1 + site_max_rss = queue_dict.get(CRIC_RAM_TAG, 0) or 0 + site_core_count = queue_dict.get(CRIC_CORE_TAG, 1) or 1 - unified_queue = queue_config.get("capability", "") == "ucore" + unified_queue = queue_dict.get(CAPABILITY_TAG, "") == UNIFIED_QUEUE_TAG if not unified_queue: # site is not unified, just request whatever is configured in AGIS return site_max_rss, site_core_count @@ -99,3 +133,72 @@ def calculate_worker_requirements(self, resource_name, queue_config): pass return worker_cores, worker_memory + + def is_single_core_resource_type(self, resource_name): + """ + Validates whether the resource type is single core by looking at the min and max core definitions + :param resource_name: string with the resource type name + :return: boolean + """ + self.load_data() + if resource_name in self.resource_types: + min_core = self.resource_types[resource_name].min_core + max_core = self.resource_types[resource_name].max_core + + if min_core == max_core == 1: + return True + + return False + + def get_single_core_resource_types(self): + """ + Returns a list of resource types that are single core + :return: list of strings with the resource type names + """ + self.load_data() + # iterate over the resource types and find the ones that are single core + single_core_resource_types = [resource_name for resource_name in self.resource_types if self.is_single_core_resource_type(resource_name)] + return single_core_resource_types + + def get_multi_core_resource_types(self): + """ + Returns a list of resource types that are multi core + :return: list of strings with the resource type names + """ + self.load_data() + # iterate over the resource types and find the ones that are multi core (not single core) + single_core_resource_types = [resource_name for resource_name in self.resource_types if not self.is_single_core_resource_type(resource_name)] + return single_core_resource_types + + def get_all_resource_types(self): + """ + Returns a list with all resource types + :return: list of strings with the resource type names + """ + self.load_data() + return list(self.resource_types.keys()) + + def get_rtype_for_queue(self, queue_dict): + """ + Returns the resource type name for a given queue configuration + :param queue_dict: queue configuration + :return: string with the resource type name + """ + self.load_data() + + # retrieve the queue configuration + site_max_rss = queue_dict.get(CRIC_RAM_TAG, 0) or 0 + site_core_count = queue_dict.get(CRIC_CORE_TAG, 1) or 1 + capability = queue_dict.get(CAPABILITY_TAG, "") + + # unified queues are not mapped to any particular resource type + if capability == UNIFIED_QUEUE_TAG: + return "" + + # loop over the resource types and find the one that matches the queue configuration + for resource_name, resource_type in self.resource_types.items(): + if resource_type.match(site_core_count, site_max_rss): + return resource_name + + # no match found + return "" diff --git a/pandaharvester/harvestermisc/k8s_utils.py b/pandaharvester/harvestermisc/k8s_utils.py index 8d98abab..4f40184b 100644 --- a/pandaharvester/harvestermisc/k8s_utils.py +++ b/pandaharvester/harvestermisc/k8s_utils.py @@ -12,6 +12,7 @@ from pandaharvester.harvesterconfig import harvester_config from pandaharvester.harvestercore import core_utils from pandaharvester.harvestermisc.info_utils_k8s import PandaQueuesDictK8s +from pandaharvester.harvestercore.resource_type_mapper import ResourceTypeMapper base_logger = core_utils.setup_logger("k8s_utils") @@ -38,6 +39,8 @@ def __init__(self, namespace, config_file=None, queue_name=None): self.namespace = namespace self.queue_name = queue_name + self.rt_mapper = ResourceTypeMapper() + def read_yaml_file(self, yaml_file): with open(yaml_file) as f: yaml_content = yaml.load(f, Loader=yaml.FullLoader) @@ -432,18 +435,25 @@ def set_affinity(self, yaml_content, use_affinity, use_anti_affinity): yaml_content["spec"]["template"]["spec"]["affinity"] = {} yaml_affinity = yaml_content["spec"]["template"]["spec"]["affinity"] - scores = ["SCORE", "SCORE_HIMEM"] - mcores = ["MCORE", "MCORE_HIMEM"] + single_core_resource_types = self.rt_mapper.get_single_core_resource_types() + multi_core_resource_types = self.rt_mapper.get_multi_core_resource_types() + all_resource_types = self.rt_mapper.get_all_resource_types() - anti_affinity_matrix = {"SCORE": mcores, "SCORE_HIMEM": mcores, "MCORE": scores, "MCORE_HIMEM": scores} + # create the anti-affinity matrix for higher single and multi core separation + anti_affinity_matrix = {} + for tmp_type in single_core_resource_types: + anti_affinity_matrix[tmp_type] = multi_core_resource_types + for tmp_type in multi_core_resource_types: + anti_affinity_matrix[tmp_type] = single_core_resource_types + # create the affinity spec affinity_spec = { "preferredDuringSchedulingIgnoredDuringExecution": [ { "weight": 100, "podAffinityTerm": { "labelSelector": { - "matchExpressions": [{"key": "resourceType", "operator": "In", "values": ["SCORE", "SCORE_HIMEM", "MCORE", "MCORE_HIMEM"]}] + "matchExpressions": [{"key": "resourceType", "operator": "In", "values": all_resource_types}] }, "topologyKey": "kubernetes.io/hostname", }, @@ -453,7 +463,7 @@ def set_affinity(self, yaml_content, use_affinity, use_anti_affinity): resource_type = yaml_content["spec"]["template"]["metadata"]["labels"]["resourceType"] - if use_affinity and resource_type in scores: + if use_affinity and resource_type in single_core_resource_types: # resource type SCORE* should attract each other instead of spreading across the nodes yaml_affinity["podAffinity"] = copy.deepcopy(affinity_spec) diff --git a/pandaharvester/harvestersubmitter/htcondor_submitter.py b/pandaharvester/harvestersubmitter/htcondor_submitter.py index 2844d86c..38910121 100644 --- a/pandaharvester/harvestersubmitter/htcondor_submitter.py +++ b/pandaharvester/harvestersubmitter/htcondor_submitter.py @@ -14,6 +14,7 @@ from pandaharvester.harvestercore import core_utils from pandaharvester.harvestercore.plugin_base import PluginBase from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper +from pandaharvester.harvestercore.resource_type_mapper import ResourceTypeMapper from pandaharvester.harvestermisc.htcondor_utils import ( CondorJobSubmit, get_job_id_tuple_from_batchid, @@ -240,6 +241,11 @@ def make_a_jdl( tmpLog.warning(f"token_path is None: site={panda_queue_name}, token_dir={token_dir} , token_filename={token_filename}") # open tmpfile as submit description file tmpFile = tempfile.NamedTemporaryFile(mode="w", delete=False, suffix="_submit.sdf", dir=workspec.get_access_point()) + + # instance of resource type mapper + rt_mapper = ResourceTypeMapper() + all_resource_types = rt_mapper.get_all_resource_types() + # placeholder map placeholder_map = { "sdfPath": tmpFile.name, @@ -274,8 +280,8 @@ def make_a_jdl( "gtag": batch_log_dict.get("gtag", "fake_GTAG_string"), "prodSourceLabel": prod_source_label, "jobType": workspec.jobType, - "resourceType": submitter_common.get_resource_type(workspec.resourceType, is_unified_queue), - "pilotResourceTypeOption": submitter_common.get_resource_type(workspec.resourceType, is_unified_queue, True), + "resourceType": submitter_common.get_resource_type(workspec.resourceType, is_unified_queue, all_resource_types), + "pilotResourceTypeOption": submitter_common.get_resource_type(workspec.resourceType, is_unified_queue, all_resource_types, is_pilot_option=True), "ioIntensity": io_intensity, "pilotType": pilot_type_opt, "pilotUrlOption": pilot_url_str, @@ -335,6 +341,7 @@ def __init__(self, **kwarg): else: self.hostname = socket.gethostname().split(".")[0] PluginBase.__init__(self, **kwarg) + # number of processes try: self.nProcesses diff --git a/pandaharvester/harvestersubmitter/k8s_submitter.py b/pandaharvester/harvestersubmitter/k8s_submitter.py index bf762be5..928d2333 100644 --- a/pandaharvester/harvestersubmitter/k8s_submitter.py +++ b/pandaharvester/harvestersubmitter/k8s_submitter.py @@ -98,9 +98,7 @@ def submit_k8s_worker(self, work_spec): cert = self._choose_proxy(work_spec, is_grandly_unified_queue) if not cert: err_str = "No proxy specified in proxySecretPath. Not submitted" - tmp_return_value = (False, err_str) - return tmp_return_value - + return False, err_str # get the walltime limit try: max_time = this_panda_queue_dict["maxtime"] @@ -153,7 +151,7 @@ def submit_workers(self, workspec_list): n_workers = len(workspec_list) tmp_log.debug(f"start, n_workers={n_workers}") - ret_list = list() + ret_list = [] if not workspec_list: tmp_log.debug("empty workspec_list") return ret_list diff --git a/pandaharvester/harvestersubmitter/submitter_common.py b/pandaharvester/harvestersubmitter/submitter_common.py index f1d28e50..f5e19b40 100644 --- a/pandaharvester/harvestersubmitter/submitter_common.py +++ b/pandaharvester/harvestersubmitter/submitter_common.py @@ -90,17 +90,15 @@ def get_pilot_job_type(job_type, is_unified_dispatch=False): # Parse resource type from string for Unified PanDA Queue - - -def get_resource_type(string, is_unified_queue, is_pilot_option=False): - string = str(string) +def get_resource_type(resource_type_name, is_unified_queue, all_resource_types, is_pilot_option=False): + resource_type_name = str(resource_type_name) if not is_unified_queue: ret = "" - elif string in set(["SCORE", "MCORE", "SCORE_HIMEM", "MCORE_HIMEM"]): + elif resource_type_name in set(all_resource_types): if is_pilot_option: - ret = f"--resource-type {string}" + ret = f"--resource-type {resource_type_name}" else: - ret = string + ret = resource_type_name else: ret = "" return ret diff --git a/pandaharvester/harvestertest/submitterTest.py b/pandaharvester/harvestertest/submitterTest.py index e19e8bef..1d2c1dd2 100644 --- a/pandaharvester/harvestertest/submitterTest.py +++ b/pandaharvester/harvestertest/submitterTest.py @@ -5,6 +5,10 @@ from pandaharvester.harvestercore.job_spec import JobSpec from pandaharvester.harvestercore.plugin_factory import PluginFactory from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper +from pandaharvester.harvestercore.resource_type_constants import ( + BASIC_RESOURCE_TYPE_SINGLE_CORE, +) +from pandaharvester.harvestercore.resource_type_mapper import ResourceTypeMapper from pandaharvester.harvestercore.work_spec import WorkSpec from pandaharvester.harvestermisc import signal_utils @@ -16,19 +20,21 @@ if len(sys.argv) not in (2, 4): print("Wrong number of parameters. You can either:") print(" - specify the queue name") - print(" - specify the queue name, jobType (managed, user) and resourceType (SCORE, SCORE_HIMEM, MCORE, MCORE_HIMEM)") + print(" - specify the queue name, jobType (managed, user) and resourceType (e.g. SCORE, SCORE_HIMEM, MCORE, MCORE_HIMEM)") sys.exit(0) queueName = sys.argv[1] queueConfigMapper = QueueConfigMapper() queueConfig = queueConfigMapper.get_queue(queueName) + resource_type_mapper = ResourceTypeMapper() + if queueConfig.prodSourceLabel in ("user", "managed"): jobType = queueConfig.prodSourceLabel else: jobType = "managed" # default, can be overwritten by parameters - resourceType = "SCORE" # default, can be overwritten by parameters + resourceType = BASIC_RESOURCE_TYPE_SINGLE_CORE # default, can be overwritten by parameters if len(sys.argv) == 4: # jobType should be 'managed' or 'user'. If not specified will default to a production job @@ -37,8 +43,8 @@ else: print(f"value for jobType not valid, defaulted to {jobType}") - # resourceType should be 'SCORE', 'SCORE_HIMEM', 'MCORE', 'MCORE_HIMEM'. If not specified defaults to single core - if sys.argv[3] in ("SCORE", "SCORE_HIMEM", "MCORE", "MCORE_HIMEM"): + # resourceType should be a valid resource type, e.g. 'SCORE', 'SCORE_HIMEM', 'MCORE', 'MCORE_HIMEM'. If not specified defaults to single core + if resource_type_mapper.is_valid_resource_type(sys.argv[3]): resourceType = sys.argv[3] else: print(f"value for resourceType not valid, defaulted to {resourceType}") diff --git a/pandaharvester/harvesterworkermaker/simple_worker_maker.py b/pandaharvester/harvesterworkermaker/simple_worker_maker.py index d25668e5..767f4eda 100644 --- a/pandaharvester/harvesterworkermaker/simple_worker_maker.py +++ b/pandaharvester/harvesterworkermaker/simple_worker_maker.py @@ -1,9 +1,12 @@ -import datetime import math import random from pandaharvester.harvestercore import core_utils from pandaharvester.harvestercore.job_spec import JobSpec +from pandaharvester.harvestercore.resource_type_constants import ( + BASIC_RESOURCE_TYPE_MULTI_CORE, + BASIC_RESOURCE_TYPE_SINGLE_CORE, +) from pandaharvester.harvestercore.resource_type_mapper import ResourceTypeMapper from pandaharvester.harvestercore.work_spec import WorkSpec from pandaharvester.harvestermisc.info_utils import PandaQueuesDict @@ -24,7 +27,7 @@ def __init__(self, **kwarg): def get_job_core_and_memory(self, queue_dict, job_spec): job_memory = job_spec.jobParams.get("minRamCount", 0) or 0 - job_corecount = job_spec.jobParams.get("coreCount", 1) or 1 + job_core_count = job_spec.jobParams.get("coreCount", 1) or 1 is_ucore = queue_dict.get("capability", "") == "ucore" @@ -32,12 +35,12 @@ def get_job_core_and_memory(self, queue_dict, job_spec): site_maxrss = queue_dict.get("maxrss", 0) or 0 site_corecount = queue_dict.get("corecount", 1) or 1 - if job_corecount == 1: + if job_core_count == 1: job_memory = int(math.ceil(site_maxrss / site_corecount)) else: job_memory = site_maxrss - return job_corecount, job_memory + return job_core_count, job_memory def get_job_type(self, job_spec, job_type, queue_dict, tmp_prodsourcelabel=None): queue_type = queue_dict.get("type", None) @@ -67,26 +70,14 @@ def get_job_type(self, job_spec, job_type, queue_dict, tmp_prodsourcelabel=None) return job_type_final - def capability_to_rtype(self, capability): - if capability == "score": - return "SCORE" - elif capability == "himem": - return "SCORE_HIMEM" - elif capability == "mcore": - return "MCORE" - elif capability == "mcorehimem": - return "MCORE_HIMEM" - else: - return None - # make a worker from jobs def make_worker(self, jobspec_list, queue_config, job_type, resource_type): - tmpLog = self.make_logger(_logger, f"queue={queue_config.queueName}:{job_type}:{resource_type}", method_name="make_worker") + tmp_log = self.make_logger(_logger, f"queue={queue_config.queueName}:{job_type}:{resource_type}", method_name="make_worker") - tmpLog.debug(f"jobspec_list: {jobspec_list}") + tmp_log.debug(f"jobspec_list: {jobspec_list}") - workSpec = WorkSpec() - workSpec.creationTime = core_utils.naive_utcnow() + work_spec = WorkSpec() + work_spec.creationTime = core_utils.naive_utcnow() # get the queue configuration from CRIC panda_queues_dict = PandaQueuesDict() @@ -96,36 +87,35 @@ def make_worker(self, jobspec_list, queue_config, job_type, resource_type): is_ucore = queue_dict.get("capability", "") == "ucore" # case of traditional (non-ucore) queue: look at the queue configuration if not is_ucore: - workSpec.nCore = queue_dict.get("corecount", 1) or 1 - workSpec.minRamCount = queue_dict.get("maxrss", 1) or 1 + work_spec.nCore = queue_dict.get("corecount", 1) or 1 + work_spec.minRamCount = queue_dict.get("maxrss", 1) or 1 # case of unified queue: look at the job & resource type and queue configuration else: catchall = queue_dict.get("catchall", "") if "useMaxRam" in catchall: - # temporary workaround to debug killed workers + # some sites require to always set the maximum memory due to memory killing jobs site_corecount = queue_dict.get("corecount", 1) or 1 site_maxrss = queue_dict.get("maxrss", 1) or 1 # some cases need to overwrite those values - if "SCORE" in resource_type: - # the usual pilot streaming use case - workSpec.nCore = 1 - workSpec.minRamCount = int(math.ceil(site_maxrss / site_corecount)) + if self.rt_mapper.is_single_core_resource_type(resource_type): + work_spec.nCore = 1 + work_spec.minRamCount = int(math.ceil(site_maxrss / site_corecount)) else: # default values - workSpec.nCore = site_corecount - workSpec.minRamCount = site_maxrss + work_spec.nCore = site_corecount + work_spec.minRamCount = site_maxrss else: - if not len(jobspec_list) and resource_type not in ["SCORE", "SCORE_HIMEM", "MCORE", "MCORE_HIMEM"]: - # some testing PQs have ucore + pure pull, need to default to SCORE - tmpLog.warning(f'Invalid resource type "{resource_type}" (perhaps due to ucore with pure pull); default to SCORE') - resource_type = "SCORE" - workSpec.nCore, workSpec.minRamCount = self.rt_mapper.calculate_worker_requirements(resource_type, queue_dict) + if not len(jobspec_list) and not self.rt_mapper.is_valid_resource_type(resource_type): + # some testing PQs have ucore + pure pull, need to default to the basic 1-core resource type + tmp_log.warning(f'Invalid resource type "{resource_type}" (perhaps due to ucore with pure pull); default to the basic 1-core resource type') + resource_type = BASIC_RESOURCE_TYPE_SINGLE_CORE + work_spec.nCore, work_spec.minRamCount = self.rt_mapper.calculate_worker_requirements(resource_type, queue_dict) # parameters that are independent on traditional vs unified - workSpec.maxWalltime = queue_dict.get("maxtime", 1) - workSpec.maxDiskCount = queue_dict.get("maxwdir", 1) + work_spec.maxWalltime = queue_dict.get("maxtime", 1) + work_spec.maxDiskCount = queue_dict.get("maxwdir", 1) if len(jobspec_list) > 0: # get info from jobs @@ -135,8 +125,8 @@ def make_worker(self, jobspec_list, queue_config, job_type, resource_type): maxWalltime = 0 ioIntensity = 0 for jobSpec in jobspec_list: - job_corecount, job_memory = self.get_job_core_and_memory(queue_dict, jobSpec) - nCore += job_corecount + job_core_count, job_memory = self.get_job_core_and_memory(queue_dict, jobSpec) + nCore += job_core_count minRamCount += job_memory try: maxDiskCount += jobSpec.jobParams["maxDiskCount"] @@ -152,18 +142,18 @@ def make_worker(self, jobspec_list, queue_config, job_type, resource_type): pass # fill in worker attributes if is_ucore or (nCore > 0 and "nCore" in self.jobAttributesToUse): - workSpec.nCore = nCore + work_spec.nCore = nCore if is_ucore or (minRamCount > 0 and ("minRamCount" in self.jobAttributesToUse or associated_params_dict.get("job_minramcount") is True)): - workSpec.minRamCount = minRamCount + work_spec.minRamCount = minRamCount if maxDiskCount > 0 and ("maxDiskCount" in self.jobAttributesToUse or associated_params_dict.get("job_maxdiskcount") is True): - workSpec.maxDiskCount = maxDiskCount + work_spec.maxDiskCount = maxDiskCount if maxWalltime > 0 and ("maxWalltime" in self.jobAttributesToUse or associated_params_dict.get("job_maxwalltime") is True): - workSpec.maxWalltime = maxWalltime + work_spec.maxWalltime = maxWalltime if ioIntensity > 0 and ("ioIntensity" in self.jobAttributesToUse or associated_params_dict.get("job_iointensity") is True): - workSpec.ioIntensity = ioIntensity + work_spec.ioIntensity = ioIntensity - workSpec.pilotType = jobspec_list[0].get_pilot_type() - workSpec.jobType = self.get_job_type(jobspec_list[0], job_type, queue_dict) + work_spec.pilotType = jobspec_list[0].get_pilot_type() + work_spec.jobType = self.get_job_type(jobspec_list[0], job_type, queue_dict) else: # when no job @@ -172,31 +162,29 @@ def make_worker(self, jobspec_list, queue_config, job_type, resource_type): choice_list = core_utils.make_choice_list(pdpm=pdpm, default="managed") tmp_prodsourcelabel = random.choice(choice_list) fake_job = JobSpec() - fake_job.jobParams = {} - fake_job.jobParams["prodSourceLabel"] = tmp_prodsourcelabel - workSpec.pilotType = fake_job.get_pilot_type() + fake_job.jobParams = {"prodSourceLabel": tmp_prodsourcelabel} + work_spec.pilotType = fake_job.get_pilot_type() del fake_job - if workSpec.pilotType in ["RC", "ALRB", "PT"]: - tmpLog.info(f"a worker has pilotType={workSpec.pilotType}") + if work_spec.pilotType in ["RC", "ALRB", "PT"]: + tmp_log.info(f"a worker has pilotType={work_spec.pilotType}") - workSpec.jobType = self.get_job_type(None, job_type, queue_dict, tmp_prodsourcelabel) - tmpLog.debug( + work_spec.jobType = self.get_job_type(None, job_type, queue_dict, tmp_prodsourcelabel) + tmp_log.debug( "get_job_type decided for job_type: {0} (input job_type: {1}, queue_type: {2}, tmp_prodsourcelabel: {3})".format( - workSpec.jobType, job_type, queue_dict.get("type", None), tmp_prodsourcelabel + work_spec.jobType, job_type, queue_dict.get("type", None), tmp_prodsourcelabel ) ) - # retrieve queue resource type - capability = queue_dict.get("capability", "") - queue_rtype = self.capability_to_rtype(capability) + # retrieve queue resource types + queue_rtype = self.rt_mapper.get_rtype_for_queue(queue_dict) if resource_type and resource_type != "ANY": - workSpec.resourceType = resource_type + work_spec.resourceType = resource_type elif queue_rtype: - workSpec.resourceType = queue_rtype - elif workSpec.nCore == 1: - workSpec.resourceType = "SCORE" + work_spec.resourceType = queue_rtype + elif work_spec.nCore == 1: + work_spec.resourceType = BASIC_RESOURCE_TYPE_SINGLE_CORE else: - workSpec.resourceType = "MCORE" + work_spec.resourceType = BASIC_RESOURCE_TYPE_MULTI_CORE - return workSpec + return work_spec