Skip to content

Commit

Permalink
Merge pull request #220 from HSF/resource_types
Browse files Browse the repository at this point in the history
Cleaning up hardcoded resource types
  • Loading branch information
fbarreir authored Mar 25, 2024
2 parents 9a8c913 + b488caf commit 6594390
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 91 deletions.
3 changes: 2 additions & 1 deletion pandaharvester/harvestercore/db_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from .seq_number_spec import SeqNumberSpec
from .service_metrics_spec import ServiceMetricSpec
from .work_spec import WorkSpec
from .resource_type_constants import BASIC_RESOURCE_TYPE_SINGLE_CORE

# logger
_logger = core_utils.setup_logger("db_proxy")
Expand Down Expand Up @@ -3511,7 +3512,7 @@ def get_worker_stats_bulk(self, active_ups_queues):
if active_ups_queues:
for ups_queue in active_ups_queues:
if ups_queue not in retMap or not retMap[ups_queue] or retMap[ups_queue] == {"ANY": {}}:
retMap[ups_queue] = {"managed": {"SCORE": {"running": 0, "submitted": 0, "finished": 0, "to_submit": 0}}}
retMap[ups_queue] = {"managed": {BASIC_RESOURCE_TYPE_SINGLE_CORE: {"running": 0, "submitted": 0, "finished": 0, "to_submit": 0}}}

# commit
self.commit()
Expand Down
8 changes: 8 additions & 0 deletions pandaharvester/harvestercore/resource_type_constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CRIC_RAM_TAG = 'maxrss'
CRIC_CORE_TAG = 'corecount'
UNIFIED_QUEUE_TAG = 'ucore'
CAPABILITY_TAG = 'capability'

# basic resource types
BASIC_RESOURCE_TYPE_SINGLE_CORE = "SCORE"
BASIC_RESOURCE_TYPE_MULTI_CORE = "MCORE"
115 changes: 109 additions & 6 deletions pandaharvester/harvestercore/resource_type_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@

from pandaharvester.harvestercore import core_utils

from .core_utils import SingletonWithID
from .db_proxy_pool import DBProxyPool as DBProxy
from .resource_type_constants import (
CAPABILITY_TAG,
CRIC_CORE_TAG,
CRIC_RAM_TAG,
UNIFIED_QUEUE_TAG,
)


class ResourceType(object):
Expand All @@ -21,8 +28,34 @@ def __init__(self, resource_type_dict):
self.min_ram_per_core = resource_type_dict["minrampercore"]
self.max_ram_per_core = resource_type_dict["maxrampercore"]

def match(self, core_count, ram_count):
"""
Checks if the resource type matches the core count and ram count
:param core_count: number of cores
:param ram_count: amount of memory
:return: boolean
"""

# basic validation that the values are not None or 0
if not core_count or not ram_count:
return False

# normalize ram count
ram_per_core = ram_count / core_count

class ResourceTypeMapper(object):
# check if the resource type matches the core count and ram count
if (
(self.max_core and core_count > self.max_core)
or (self.min_core and core_count < self.min_core)
or (self.max_ram_per_core and ram_per_core > self.max_ram_per_core)
or (self.min_ram_per_core and ram_per_core < self.min_ram_per_core)
):
return False

return True


class ResourceTypeMapper(object, metaclass=SingletonWithID):
def __init__(self):
self.lock = threading.Lock()
self.resource_types = {}
Expand Down Expand Up @@ -56,14 +89,15 @@ def load_data(self):
def is_valid_resource_type(self, resource_name):
"""
Checks if the resource type is valid (exists in the dictionary of resource types)
:param resource_name: string with the resource type name (e.g. SCORE, SCORE_HIMEM,...)
:param resource_name: string with the resource type name
:return: boolean
"""
self.load_data()
if resource_name in self.resource_types:
return True
return False

def calculate_worker_requirements(self, resource_name, queue_config):
def calculate_worker_requirements(self, resource_name, queue_dict):
"""
Calculates worker requirements (cores and memory) to request in pilot streaming mode/unified pull queue
"""
Expand All @@ -76,10 +110,10 @@ def calculate_worker_requirements(self, resource_name, queue_config):
resource_type = self.resource_types[resource_name]

# retrieve the queue configuration
site_max_rss = queue_config.get("maxrss", 0) or 0
site_core_count = queue_config.get("corecount", 1) or 1
site_max_rss = queue_dict.get(CRIC_RAM_TAG, 0) or 0
site_core_count = queue_dict.get(CRIC_CORE_TAG, 1) or 1

unified_queue = queue_config.get("capability", "") == "ucore"
unified_queue = queue_dict.get(CAPABILITY_TAG, "") == UNIFIED_QUEUE_TAG
if not unified_queue:
# site is not unified, just request whatever is configured in AGIS
return site_max_rss, site_core_count
Expand All @@ -99,3 +133,72 @@ def calculate_worker_requirements(self, resource_name, queue_config):
pass

return worker_cores, worker_memory

def is_single_core_resource_type(self, resource_name):
"""
Validates whether the resource type is single core by looking at the min and max core definitions
:param resource_name: string with the resource type name
:return: boolean
"""
self.load_data()
if resource_name in self.resource_types:
min_core = self.resource_types[resource_name].min_core
max_core = self.resource_types[resource_name].max_core

if min_core == max_core == 1:
return True

return False

def get_single_core_resource_types(self):
"""
Returns a list of resource types that are single core
:return: list of strings with the resource type names
"""
self.load_data()
# iterate over the resource types and find the ones that are single core
single_core_resource_types = [resource_name for resource_name in self.resource_types if self.is_single_core_resource_type(resource_name)]
return single_core_resource_types

def get_multi_core_resource_types(self):
"""
Returns a list of resource types that are multi core
:return: list of strings with the resource type names
"""
self.load_data()
# iterate over the resource types and find the ones that are multi core (not single core)
single_core_resource_types = [resource_name for resource_name in self.resource_types if not self.is_single_core_resource_type(resource_name)]
return single_core_resource_types

def get_all_resource_types(self):
"""
Returns a list with all resource types
:return: list of strings with the resource type names
"""
self.load_data()
return list(self.resource_types.keys())

def get_rtype_for_queue(self, queue_dict):
"""
Returns the resource type name for a given queue configuration
:param queue_dict: queue configuration
:return: string with the resource type name
"""
self.load_data()

# retrieve the queue configuration
site_max_rss = queue_dict.get(CRIC_RAM_TAG, 0) or 0
site_core_count = queue_dict.get(CRIC_CORE_TAG, 1) or 1
capability = queue_dict.get(CAPABILITY_TAG, "")

# unified queues are not mapped to any particular resource type
if capability == UNIFIED_QUEUE_TAG:
return ""

# loop over the resource types and find the one that matches the queue configuration
for resource_name, resource_type in self.resource_types.items():
if resource_type.match(site_core_count, site_max_rss):
return resource_name

# no match found
return ""
20 changes: 15 additions & 5 deletions pandaharvester/harvestermisc/k8s_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from pandaharvester.harvesterconfig import harvester_config
from pandaharvester.harvestercore import core_utils
from pandaharvester.harvestermisc.info_utils_k8s import PandaQueuesDictK8s
from pandaharvester.harvestercore.resource_type_mapper import ResourceTypeMapper

base_logger = core_utils.setup_logger("k8s_utils")

Expand All @@ -38,6 +39,8 @@ def __init__(self, namespace, config_file=None, queue_name=None):
self.namespace = namespace
self.queue_name = queue_name

self.rt_mapper = ResourceTypeMapper()

def read_yaml_file(self, yaml_file):
with open(yaml_file) as f:
yaml_content = yaml.load(f, Loader=yaml.FullLoader)
Expand Down Expand Up @@ -432,18 +435,25 @@ def set_affinity(self, yaml_content, use_affinity, use_anti_affinity):
yaml_content["spec"]["template"]["spec"]["affinity"] = {}
yaml_affinity = yaml_content["spec"]["template"]["spec"]["affinity"]

scores = ["SCORE", "SCORE_HIMEM"]
mcores = ["MCORE", "MCORE_HIMEM"]
single_core_resource_types = self.rt_mapper.get_single_core_resource_types()
multi_core_resource_types = self.rt_mapper.get_multi_core_resource_types()
all_resource_types = self.rt_mapper.get_all_resource_types()

anti_affinity_matrix = {"SCORE": mcores, "SCORE_HIMEM": mcores, "MCORE": scores, "MCORE_HIMEM": scores}
# create the anti-affinity matrix for higher single and multi core separation
anti_affinity_matrix = {}
for tmp_type in single_core_resource_types:
anti_affinity_matrix[tmp_type] = multi_core_resource_types
for tmp_type in multi_core_resource_types:
anti_affinity_matrix[tmp_type] = single_core_resource_types

# create the affinity spec
affinity_spec = {
"preferredDuringSchedulingIgnoredDuringExecution": [
{
"weight": 100,
"podAffinityTerm": {
"labelSelector": {
"matchExpressions": [{"key": "resourceType", "operator": "In", "values": ["SCORE", "SCORE_HIMEM", "MCORE", "MCORE_HIMEM"]}]
"matchExpressions": [{"key": "resourceType", "operator": "In", "values": all_resource_types}]
},
"topologyKey": "kubernetes.io/hostname",
},
Expand All @@ -453,7 +463,7 @@ def set_affinity(self, yaml_content, use_affinity, use_anti_affinity):

resource_type = yaml_content["spec"]["template"]["metadata"]["labels"]["resourceType"]

if use_affinity and resource_type in scores:
if use_affinity and resource_type in single_core_resource_types:
# resource type SCORE* should attract each other instead of spreading across the nodes
yaml_affinity["podAffinity"] = copy.deepcopy(affinity_spec)

Expand Down
11 changes: 9 additions & 2 deletions pandaharvester/harvestersubmitter/htcondor_submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from pandaharvester.harvestercore import core_utils
from pandaharvester.harvestercore.plugin_base import PluginBase
from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
from pandaharvester.harvestercore.resource_type_mapper import ResourceTypeMapper
from pandaharvester.harvestermisc.htcondor_utils import (
CondorJobSubmit,
get_job_id_tuple_from_batchid,
Expand Down Expand Up @@ -240,6 +241,11 @@ def make_a_jdl(
tmpLog.warning(f"token_path is None: site={panda_queue_name}, token_dir={token_dir} , token_filename={token_filename}")
# open tmpfile as submit description file
tmpFile = tempfile.NamedTemporaryFile(mode="w", delete=False, suffix="_submit.sdf", dir=workspec.get_access_point())

# instance of resource type mapper
rt_mapper = ResourceTypeMapper()
all_resource_types = rt_mapper.get_all_resource_types()

# placeholder map
placeholder_map = {
"sdfPath": tmpFile.name,
Expand Down Expand Up @@ -274,8 +280,8 @@ def make_a_jdl(
"gtag": batch_log_dict.get("gtag", "fake_GTAG_string"),
"prodSourceLabel": prod_source_label,
"jobType": workspec.jobType,
"resourceType": submitter_common.get_resource_type(workspec.resourceType, is_unified_queue),
"pilotResourceTypeOption": submitter_common.get_resource_type(workspec.resourceType, is_unified_queue, True),
"resourceType": submitter_common.get_resource_type(workspec.resourceType, is_unified_queue, all_resource_types),
"pilotResourceTypeOption": submitter_common.get_resource_type(workspec.resourceType, is_unified_queue, all_resource_types, is_pilot_option=True),
"ioIntensity": io_intensity,
"pilotType": pilot_type_opt,
"pilotUrlOption": pilot_url_str,
Expand Down Expand Up @@ -335,6 +341,7 @@ def __init__(self, **kwarg):
else:
self.hostname = socket.gethostname().split(".")[0]
PluginBase.__init__(self, **kwarg)

# number of processes
try:
self.nProcesses
Expand Down
6 changes: 2 additions & 4 deletions pandaharvester/harvestersubmitter/k8s_submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,7 @@ def submit_k8s_worker(self, work_spec):
cert = self._choose_proxy(work_spec, is_grandly_unified_queue)
if not cert:
err_str = "No proxy specified in proxySecretPath. Not submitted"
tmp_return_value = (False, err_str)
return tmp_return_value

return False, err_str
# get the walltime limit
try:
max_time = this_panda_queue_dict["maxtime"]
Expand Down Expand Up @@ -153,7 +151,7 @@ def submit_workers(self, workspec_list):
n_workers = len(workspec_list)
tmp_log.debug(f"start, n_workers={n_workers}")

ret_list = list()
ret_list = []
if not workspec_list:
tmp_log.debug("empty workspec_list")
return ret_list
Expand Down
12 changes: 5 additions & 7 deletions pandaharvester/harvestersubmitter/submitter_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,15 @@ def get_pilot_job_type(job_type, is_unified_dispatch=False):


# Parse resource type from string for Unified PanDA Queue


def get_resource_type(string, is_unified_queue, is_pilot_option=False):
string = str(string)
def get_resource_type(resource_type_name, is_unified_queue, all_resource_types, is_pilot_option=False):
resource_type_name = str(resource_type_name)
if not is_unified_queue:
ret = ""
elif string in set(["SCORE", "MCORE", "SCORE_HIMEM", "MCORE_HIMEM"]):
elif resource_type_name in set(all_resource_types):
if is_pilot_option:
ret = f"--resource-type {string}"
ret = f"--resource-type {resource_type_name}"
else:
ret = string
ret = resource_type_name
else:
ret = ""
return ret
Expand Down
14 changes: 10 additions & 4 deletions pandaharvester/harvestertest/submitterTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
from pandaharvester.harvestercore.job_spec import JobSpec
from pandaharvester.harvestercore.plugin_factory import PluginFactory
from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
from pandaharvester.harvestercore.resource_type_constants import (
BASIC_RESOURCE_TYPE_SINGLE_CORE,
)
from pandaharvester.harvestercore.resource_type_mapper import ResourceTypeMapper
from pandaharvester.harvestercore.work_spec import WorkSpec
from pandaharvester.harvestermisc import signal_utils

Expand All @@ -16,19 +20,21 @@
if len(sys.argv) not in (2, 4):
print("Wrong number of parameters. You can either:")
print(" - specify the queue name")
print(" - specify the queue name, jobType (managed, user) and resourceType (SCORE, SCORE_HIMEM, MCORE, MCORE_HIMEM)")
print(" - specify the queue name, jobType (managed, user) and resourceType (e.g. SCORE, SCORE_HIMEM, MCORE, MCORE_HIMEM)")
sys.exit(0)

queueName = sys.argv[1]
queueConfigMapper = QueueConfigMapper()
queueConfig = queueConfigMapper.get_queue(queueName)

resource_type_mapper = ResourceTypeMapper()

if queueConfig.prodSourceLabel in ("user", "managed"):
jobType = queueConfig.prodSourceLabel
else:
jobType = "managed" # default, can be overwritten by parameters

resourceType = "SCORE" # default, can be overwritten by parameters
resourceType = BASIC_RESOURCE_TYPE_SINGLE_CORE # default, can be overwritten by parameters

if len(sys.argv) == 4:
# jobType should be 'managed' or 'user'. If not specified will default to a production job
Expand All @@ -37,8 +43,8 @@
else:
print(f"value for jobType not valid, defaulted to {jobType}")

# resourceType should be 'SCORE', 'SCORE_HIMEM', 'MCORE', 'MCORE_HIMEM'. If not specified defaults to single core
if sys.argv[3] in ("SCORE", "SCORE_HIMEM", "MCORE", "MCORE_HIMEM"):
# resourceType should be a valid resource type, e.g. 'SCORE', 'SCORE_HIMEM', 'MCORE', 'MCORE_HIMEM'. If not specified defaults to single core
if resource_type_mapper.is_valid_resource_type(sys.argv[3]):
resourceType = sys.argv[3]
else:
print(f"value for resourceType not valid, defaulted to {resourceType}")
Expand Down
Loading

0 comments on commit 6594390

Please sign in to comment.