Skip to content

Commit

Permalink
Bug fix for updater parsing
Browse files Browse the repository at this point in the history
Addresses bug when "updaters" is omitted from "peers" dictionary in top level "aggregators" dictionary.
Fixes bug that occurs when "producers" regular expression is not defined in the "updaters" dictionary.
Updaters now correctly only add the parent dictionary's producers when "produers" is not defined in the updaters dictionary.
Updated producer balancing when multiple aggregators share producers in the YAML configuration.
  • Loading branch information
nick-enoent authored and tom95858 committed Jan 17, 2025
1 parent d5ca1a5 commit 4f105db
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 45 deletions.
40 changes: 23 additions & 17 deletions ldms/man/ldmsd_yaml_parser.man
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
.\" Manpage for ldmsd_yaml_parser
.\" Contact ovis-help@ca.sandia to correct errors or typos.
.TH man 8 "20 Nov 2024" "ovis-4.4.5" "ldmsd_yaml_parser man page"
.TH YAML Configuration Quick Guide
.TH "YAML Configuration Quick Guide"

.SH NAME
ldmsd_yaml_parser \- a python program to parse a YAML configuration file into a v4 LDMS configuration.
Expand Down Expand Up @@ -67,7 +67,24 @@ https://readthedocs.org/projects/py-hostlist/downloads/pdf/latest/
NOTE: When using a configuration string, rather than a dictionary, to configure a plugin, the string is not parsed by the YAML parser, and the exact string will be passed to the LDMSD as is. As such, the only accepted permission format when using a configuration string, rather than a dictionary, is an octal number, e.g. "0777". If an octal number is entered into the configuration as an integer, the parser will interpret the number incorrectly, and the permissions will not be set as expected.

.SH daemons
List of dictionaries describing LDMS daemons that are part of the cluster and their endpoints that share transport configurations. The primary keys are "names", "hosts", "endpoints", and "environment". Any keys provided that do not match this string values are assumed to be either CLI commands, or overarching default configuration commands for a daemon.
List of dictionaries describing LDMS daemons that are part of the cluster and their endpoints that share transport configurations.
.br
The primary keys are "names", "hosts", "endpoints", and "environment"
.br
Any key:values provided that do not match one of the primary keys are assumed to be either CLI commands, or overarching default configuration commands for a daemon.
.br
CLI commands must have the long form CLI option as the key, and the arugment as the value.
.br
.e.g. set_memory : "1g"
.br
Configuration commands may be configured in a dictionary format or a string.
.br
e.g. metric_sets_default_authz : "perm=0700 uid=0 gid=0"
.br
e.g. metric_sets_default_authz:
perm : "0777"
uid : 0
gid : 0

.SS names
Regex of a group of LDMS daemon attributes and endpoints that share transport configuration. These strings are referenced in the samplers and aggregators sections. Hostlist format.
Expand Down Expand Up @@ -123,23 +140,12 @@ List of dictionaries defining aggregator configurations, their “peers” i.e.
.br
The daemons reference daemon configuration definitions defined in the "daemons" dictionary.
.br
The stores reference storage policy names defined in the "stores" top level dictionary.
.br
The "plugins" key reference plugin instance names defined in the "plugins" top level dictionary.
.br
The primary keys are "names", "hosts", "endpoints", and "environment"
.br
Any keys provided that do not match one of these string values are assumed to be either CLI commands, or overarching default configuration commands for a daemon.

.SS names
String regex in hostlist format of a group of LDMS daemon attributes and endpoints that share transport configuration in hostlist format. These strings are referenced in the sampler and aggregator configurations.

.SS hosts
String regex in hostlist format of hostnames on which the LDMS daemon will operate. Must expand to an equal length as the daemon names, or be evenly divisble. e.g. 2 hostnames for 4 daemons.

.SS environment
A dictionary of environment variables for a LDMSD and their values. Keys are the environment variable name.

.SS daemons
String of daemon names in hostlist format that references daemon names defined in the top level daemons section.
.SS [plugins]
List of plugin key references defined in the top level "plugins" dictionary.
.SS [subscribe]
List of dictionaries of streams to subscribe producers to.
.TP
Expand Down
71 changes: 43 additions & 28 deletions ldms/python/ldmsd/parser_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import itertools as it
import collections
import ldmsd.hostlist as hostlist
from ldmsd.ldmsd_communicator import LDMSD_CTRL_CMD_MAP

AUTH_ATTRS = [
'auth',
Expand Down Expand Up @@ -120,6 +121,16 @@ def check_required(attr_list, container, container_name):
if name not in container:
raise ValueError(f'The "{name}" attribute is required in {container_name}\n')

def dist_list(list_, n):
q, r = divmod(len(list_), n)
dist_list = []
idx = 0
for i in range(1, n + 1):
s = idx
idx += q + 1 if i <= r else q
dist_list.append(list_[s:idx])
return dist_list

def NUM_STR(obj):
return str(obj) if type(obj) in [ int, float ] else obj

Expand Down Expand Up @@ -301,6 +312,8 @@ def build_daemons(self, config):
cli_opt = {}
for key in spec:
if key not in ['hosts','names','endpoints','environment']:
if type(spec[key]) is list:
raise ValueError(f'Lists are not a valid value for configuration line options. metric_sets_default_authz may be configured as a dictionary')
cli_opt[key] = spec[key]
for dname, host in zip(dnames, hosts):
ep_dict[spec['names']][dname] = {}
Expand Down Expand Up @@ -469,7 +482,6 @@ def build_producers(self, config):
if group not in producers:
producers[group] = {}

upd_spec = check_opt('updaters', prod)
# Expand and generate all the producers
typ = prod['type']
reconnect = check_intrvl_str(prod['reconnect'])
Expand All @@ -496,8 +508,7 @@ def build_producers(self, config):
'group' : group,
'reconnect' : reconnect,
'perm' : perm,
'cache_ip' : cache_ip,
'updaters' : upd_spec
'cache_ip' : cache_ip
}
producers[group][endpoint] = prod
except:
Expand All @@ -521,7 +532,7 @@ def build_updaters(self, config):
if 'prdcr_listen' in agg:
peer_list += agg['prdcr_listen']
for prod in peer_list:
if prod['updaters'] is None:
if 'updaters' not in prod:
continue
if type(prod['updaters']) is not list:
raise TypeError(f'{LDMS_YAML_ERR}\n'
Expand All @@ -548,20 +559,18 @@ def build_updaters(self, config):
f'An updater name must be unique within the group\n')
perm = check_opt('perm', updtr_spec)
perm = perm_handler(perm)
prod_regex = check_opt('producers', prod)
prod_list = []
if prod_regex:
prod_list.append({'regex' : prod_regex})
else:
prod_regex = '.*'
prod_list.append({'regex' : '.*'})
prod_regex = check_opt('producers', updtr_spec)
if type(prod_regex) is not str and prod_regex is not None:
raise TypeError(f'Error: Configuration error in keyword "producers". Only regex string values are valid.')
if prod_regex is None:
prod_regex = expand_names(prod['endpoints'])
updtr = {
'name' : updtr_name,
'interval' : check_intrvl_str(updtr_spec['interval']),
'perm' : perm,
'group' : agg['daemons'],
'sets' : updtr_sets,
'producers' : prod_list
'producers' : prod_regex
}
if 'offset' in updtr_spec:
updtr['offset'] = check_intrvl_str(updtr_spec['offset'])
Expand Down Expand Up @@ -792,13 +801,10 @@ def write_opt_attr(self, dstr, attr, val, endline=True):
def write_producers(self, dstr, group_name, dmn, auth_list):
if group_name in self.producers:
''' Balance samplers across aggregators '''
ppd = -(len(self.producers[group_name]) // -len(self.aggregators[group_name].keys()))
rem = len(self.producers[group_name]) % len(self.aggregators[group_name].keys())
prdcrs = list(self.producers[group_name].keys())
aggs = list(self.daemons[group_name].keys())
agg_idx = int(aggs.index(dmn))
prdcr_idx = int(ppd * agg_idx)
prod_group = prdcrs[prdcr_idx:prdcr_idx+ppd]
prod_group = dist_list(prdcrs, len(aggs))[agg_idx]
i = 0
auth = None
for ep in prod_group:
Expand Down Expand Up @@ -847,15 +853,18 @@ def write_options(self, dstr, grp, dname):
return dstr
cli_opt = self.daemons[grp][dname]['cli_opt']
for opt in cli_opt:
if type(cli_opt[opt]) is dict:
dstr += f'{opt}'
for arg in cli_opt[opt]:
if arg == 'perm':
cli_opt[opt][arg] = perm_handler(cli_opt[opt][arg])
dstr += f' {arg}={cli_opt[opt][arg]}'
dstr += '\n'
else:
if opt not in LDMSD_CTRL_CMD_MAP:
dstr += f'option --{opt} {cli_opt[opt]}\n'
else:
if type(cli_opt[opt]) is dict:
dstr += f'{opt}'
for arg in cli_opt[opt]:
if arg == 'perm':
cli_opt[opt][arg] = perm_handler(cli_opt[opt][arg])
dstr += f' {arg}={cli_opt[opt][arg]}'
dstr += '\n'
else:
dstr += f'{opt} {cli_opt[opt]}\n'
return dstr

def write_env(self, dstr, grp, dname):
Expand Down Expand Up @@ -955,7 +964,7 @@ def write_aggregator(self, dstr, group_name, dmn):
dstr = self.write_prdcr_listeners(dstr, group_name)
dstr = self.write_stream_subscribe(dstr, group_name, dmn)
dstr = self.write_agg_plugins(dstr, group_name, dmn)
dstr = self.write_updaters(dstr, group_name)
dstr = self.write_updaters(dstr, group_name, dmn)
dstr = self.write_stores(dstr, group_name)
return dstr
except Exception as e:
Expand All @@ -977,7 +986,7 @@ def write_agg_plugins(self, dstr, group_name, agg):
dstr += f'config name={plugin["name"]} {cfg_str}\n'
return dstr

def write_updaters(self, dstr, group_name):
def write_updaters(self, dstr, group_name, dmn):
if group_name in self.updaters:
updtr_group = self.updaters[group_name]
for updtr in updtr_group:
Expand All @@ -1000,9 +1009,15 @@ def write_updaters(self, dstr, group_name):
offset = check_opt('offset', updtr_group[updtr])
dstr = self.write_opt_attr(dstr, 'perm', perm, endline=False)
dstr = self.write_opt_attr(dstr, 'offset', offset)
for prod in updtr_group[updtr]['producers']:
if type(updtr_group[updtr]['producers']) is str:
dstr += f'updtr_prdcr_add name={updtr} '\
f'regex={prod["regex"]}\n'
f'regex={updtr_group[updtr]["producers"]}\n'
else:
aggs = list(self.daemons[group_name].keys())
agg_idx = int(aggs.index(dmn))
prod_group = dist_list(updtr_group[updtr]['producers'], len(aggs))[agg_idx]
for prod in prod_group:
dstr += f'updtr_prdcr_add name={updtr} regex={prod}\n'
if updtr_group[updtr]['sets']:
for s in updtr_group[updtr]['sets']:
dstr += f'updtr_match_add name={updtr} regex={s["regex"]} match={s["field"]}\n'
Expand Down

0 comments on commit 4f105db

Please sign in to comment.