From 0d667e87ffc7b965602c6945f713a9f1656805db Mon Sep 17 00:00:00 2001 From: Nichamon Naksinehaboon Date: Wed, 21 Feb 2024 13:03:41 -0600 Subject: [PATCH] Introduce Sampler Advertisement This commit introduces a new feature that simplifies aggregator configuration. * Previously, admins needed to manually specify hostnames for all samplers in the aggregator configuration using the `prdcr_add` command. * This change enables samplers to advertise themselves to an aggregator. Admins specifies the aggregator hostname and listening port in sampler configuration via the `advertise_add` command and start the advertisement with the `advertise_start` command. The samplers now advertise their hostname to the aggregator. * On the aggregator, admins may specify a regular expression to be matched with sampler hostname or an IP range in the CIDR format using the `prdcr_listen_add` command. The `prdcr_listen_start` command is used to tell the aggregator to automatically add producers corresponding to a sampler of which the hostname matches the regular expressions or the IP is in the subnet mask given at the `prdcr_listen_add` line. If neither of the regular expression or the IP range is given, LDMSD creates a producer when it receives an advertisement. This feature eliminates the need for manual configuration of sampler hostnames in the aggregator configuration file. The aggregator can now automatically discover samplers and add them as metric set producers. --- ldms/man/ldmsd_sampler_advertisement.man | 258 ++++++ ldms/python/ldmsd/ldmsd_communicator.py | 338 +++++++- ldms/python/ldmsd/ldmsd_controller | 251 +++++- ldms/src/core/ldms.h | 53 +- ldms/src/core/ldms_rail.c | 59 ++ ldms/src/core/ldms_xprt.c | 25 + ldms/src/ldmsd/ldmsd.h | 59 +- ldms/src/ldmsd/ldmsd_cfgobj.c | 6 + ldms/src/ldmsd/ldmsd_config.c | 131 +-- ldms/src/ldmsd/ldmsd_prdcr.c | 120 ++- ldms/src/ldmsd/ldmsd_request.c | 999 +++++++++++++++++++++-- ldms/src/ldmsd/ldmsd_request.h | 20 +- ldms/src/ldmsd/ldmsd_request_util.c | 81 ++ ldms/src/ldmsd/ldmsd_updtr.c | 62 +- 14 files changed, 2275 insertions(+), 187 deletions(-) create mode 100644 ldms/man/ldmsd_sampler_advertisement.man diff --git a/ldms/man/ldmsd_sampler_advertisement.man b/ldms/man/ldmsd_sampler_advertisement.man new file mode 100644 index 0000000000..60e37d1c82 --- /dev/null +++ b/ldms/man/ldmsd_sampler_advertisement.man @@ -0,0 +1,258 @@ +\" Manpage for ldmsd_sampler_advertisement +.TH man 7 "27 March 2024" "v5" "LDMSD Sampler Advertisement man page" + +.\""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""/. +.SH NAME +ldmsd_sampler_advertisement - Manual for LDMSD Sampler Advertisement + +.\""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""/. +.SH SYNOPSIS + +**Sampler side Commands** + +.IP \fBadvertiser_add +.RI "name=" NAME " xprt=" XPRT " host=" HOST " port=" PORT +.RI "[auth=" AUTH_DOMAIN "]" + +.IP \fBadvertiser_start +.RI "name=" NAME +.RI "[xprt=" XPRT " host=" HOST " port=" PORT " auth=" AUTH_DOMAIN "]" + +.IP \fBadvertiser_stop +.RI "name=" NAME + +.IP \fBadvertiser_del +.RI "name=" NAME + +.IP \fBadvertiser_status +.RI "[name=" NAME "]" + +.PP +**Aggregator Side Commands** + +.IP \fBprdcr_listen_add +.RI "name=" NAME " +.RI "[disabled_start=" TURE|FALSE "] [regex=" REGEX "] [ip=" CIDR "] [rail=" SIZE "] [credits=" BYTES "] [rx_rate=" RATE_LIMIT "]" + +.IP \fBprdcr_listen_start +.RI "name=" NAME + +.IP \fBprdcr_listen_stop +.RI "name=" NAME + +.IP \fBprdcr_listen_del +.RI "name=" NAME + +.IP \fBprdcr_listen_status + +.SH DESCRIPTION + +LDMSD Sampler Discovery is a capability that enables LDMSD automatically add +producers that its hostname matches a given regular expression. The feature +eliminates the need for manual configuration of sampler hostname in the +aggregator configuration file. + +Admins specify the aggregator hostname and the listening port in sampler +configuration via the \fBadvertiser_add\fR command and start the advertisement +with the \fBadvertiser_start\fR command. The samplers now advertise their +hostname to the aggregator. On the aggregator, admins specify a regular +expression to be matched with sampler hostname via the \fBprdcr_listen_add\fR +command. The \fBprdcr_listen_start\fR command is used to tell the aggregator to +automatically add producers corresponding to a sampler of which the hostname +matches the regular expression. + +The auto-generated producers is of the ‘advertised’ type. The producer name is +the same as the name given at the \fBadvertiser_add\fR line in the sampler +configuration file. LDMSD automatically starts them; however, admins need to +stop them manually by using the command \fBprdcr_stop\fR or +\fBprdcr_stop_regex\fR. They can be restarted by using the command +\fBprdcr_start\fR or \fBprdcr_start_regex\fR. + +The description for each command and its parameters are as follows. + +**Sampler Side Commands** + +\fBadvertiser_add\fR adds a new advertisement. The parameters are: +.RS +.IP \fBname\fR=\fINAME +String of the advertisement name. The aggregator uses the string as the producer name as well. +.IP \fBhost\fR=\fIHOST +Aggregator hostname +.IP \fBxprt\fR=\fIXPRT +Transport to connect to the aggregator +.IP \fBport\fR=\fIPORT +Listen port of the aggregator +.IP \fBreconnect\fR=\fIINTERVAL +Reconnect interval +d +.IP \fB[auth\fR=\fIAUTH_DOMAIN\fB] +The authentication domain to be used to connect to the aggregator +.RE + +\fBadvertiser_start\fR starts an advertisement. If the advertiser does not exist, LDMSD will create the advertiser. In this case, the mandatory attributes for \fBadvertiser_add\fB must be given. The parameters are: +.RS +.IP \fBname\fR=\fINAME +The advertisement name to be started +.IP \fB[host\fR=\fIHOST\fB] +Aggregator hostname +.IP \fB[xprt\fR=\fIXPRT\fB] +Transport to connect to the aggregator +.IP \fB[port\fR=\fIPORT\fB] +Listen port of the aggregator +.IP \fB[reconnect\fR=\fIINTERVAL\fB] +Reconnect interval +.IP \fB[auth\fR=\fIAUTH_DOMAIN\fB] +The authentication domain to be used to connect to the aggregator +.RE + +\fBadvertiser_stop\fR stops an advertisement. The parameters are: +.RS +.IP \fBname\fR=\fINAME +The advertisement name to be stopped +.RE + +\fBadvertiser_del\fR deletes an advertisement. The parameters are: +.RS +.IP \fBname\fR=\fINAME +The advertisement name to be deleted +.RE + +\fBadvertiser_status reports the status of each advertisement. An optional parameter is: +.RS +.IP \fB[name\fR=\fINAME\fB] +Advertisement name +.RE + +.PP +**Aggregator Side commands** + +\fBprdcr_listen_add\fR adds a regular expression to match sampler advertisements. The parameters are: +.RS +.IP \fBname\fR=\fINAME +String of the prdcr_listen name. +.IP \fB[disabled_start\fR=\fITRUE|FALSE\fB] +True to tell LDMSD not to start producers automatically +.IP \fB[regex\fR=\fIREGEX\fB] +Regular expression to match with hostnames in sampler advertisements +.IP \fBip\fR=\fICIDR\fB] +IP Range in the CIDR format either in IPV4 or IPV6 +.IP \fB[rail\fR=\fIRAIL\fB] +Number of rails +.IP \fB[credit\fR=\fICREDIT\fB] +Receive credits each producer connection accepts in bytes +.IP \fB[rx_rate\fR=\fIRATE\fB] +Receive rate limit each producer connection acceipts +.RE + +\fBprdcr_listen_start\fR starts accepting sampler advertisement with matches hostnames. The parameters are: +.RS +.IP \fBname\fR=\fINAME +Name of prdcr_listen to be started +.RE + +\fBprdcr_listen_stop\fR stops accepting sampler advertisement with matches hostnames. The parameters are: +.RS +.IP \fBname\fR=\fINAME +Name of prdcr_listen to be stopped +.RE + +\fBprdcr_listen_del\fR deletes a regular expression to match hostnames in sampler advertisements. The parameters are: +.RS +.IP \fBname\fR=\fINAME +Name of prdcr_listen to be deleted +.RE + +\fBprdcr_listen_status\fR report the status of each prdcr_listen object. There is no parameter. + +.SH EXAMPLE + +In this example, there are three LDMS daemons running on \fBnode-1\fR, +\fBnode-2\fR, and \fBnode03\fR. LDMSD running on \fBnode-1\fR and \fBnode-2\fR +are sampler daemons, namely \fBsamplerd-1\fR and \fBsamplerd-2\fR. The +aggregator (\fBagg\fR) runs on \fBnode-3\fR. All LDMSD listen on port 411. + +The sampler daemons collect the \fBmeminfo\fR set, and they are configured to +advertise themselves and connect to the aggregator using sock on host +\fBnode-3\fR at port 411. The following are the configuration files of the +\fBsamplerd-1\fR and \fBsamplerd-2\fR. + +.EX +.B +> cat samplerd-1.conf +.RS 4 +# Add and start an advertisement +advertiser_add name=samplerd-1 xprt=sock host=node-3 port=411 reconnect=10s +advertiser_start name=samplerd-1 +# Load, configure, and start the meminfo plugin +load name=meminfo +config name=meminfo producer=samplerd-1 instance=samplerd-1/meminfo +start name=meminfo interval=1s +.RE + +.B +> cat samplerd-2.conf +.RS 4 +# Add and start an advertisement +advertiser_add name=samplerd-2 host=node-3 port=411 reconnect=10s +advertiser_start name=samplerd-2 +# Load, configure, and start the meminfo plugin +load name=meminfo +config name=meminfo producer=samplerd-2 instance=samplerd-2/meminfo +start name=meminfo interval=1s +.RE +.EE + +The aggregator is configured to accept advertisements from the sampler daemons +that the hostnames match the regular expressions \fBnode0[1-2]\fR. The +auto-added producers will check for an establish connection with the samplers +every 10 seconds if the connection becomes disconnected. An updater is added to +update the sets of all producers on the aggregators every 10 seconds at the 100 +milliseconds offset. + +.EX +.B +> cat agg.conf +.RS 4 +# Accept advertisements sent from LDMSD running on hostnames matched node-[1-2] +prdcr_listen_add name=computes regex=node-[1-2] +prdcr_listen_start name=computes +# Add and start an updater +updtr_add name=all_sets interval=1s offset=100ms +updtr_prdcr_add name=all_sets regex=.* +updtr_start name=all +.RE +.EE + +LDMSD provides the command \fBadvertiser_status\fR to report the status of +advertisement of a sampler daemon. + +.EX +.B +> ldmsd_controller -x sock -p 10001 -h node-1 +Welcome to the LDMSD control processor +sock:node-1:10001> advertiser_status +Name Aggregator Host Aggregator Port Transport Reconnect (us) State +---------------- ---------------- --------------- ------------ --------------- ------------ +samplerd-1 node-3 10001 sock 10000000 CONNECTED +sock:node-1:10001> +.EE + +Similarly, LDMSD provides the command \fBprdcr_listen_status\fR to report the +status of all prdcr_listen objects on an aggregator. The command also reports +the list of auto-added producers corresponding to each prdcr_listen object. + +.EX +.B +> ldmsd_controller -x sock -p 10001 -h node-3 +Welcome to the LDMSD control processor +sock:node-3:10001> prdcr_listen_status +Name State Regex IP Range +-------------------- ---------- --------------- ------------------------------ +compute running node-[1-2] - +Producers: samplerd-1, samplerd-2 +sock:node-3:10001> +.EE + +.SH SEE ALSO +.BR ldmsd (8) +.BR ldmsd_controller (8) diff --git a/ldms/python/ldmsd/ldmsd_communicator.py b/ldms/python/ldmsd/ldmsd_communicator.py index 7b74fee459..049a9e1d8b 100644 --- a/ldms/python/ldmsd/ldmsd_communicator.py +++ b/ldms/python/ldmsd/ldmsd_communicator.py @@ -55,7 +55,6 @@ import time import json import errno -from pickle import NONE #:Dictionary contains the cmd_id, required attribute list #:and optional attribute list of each ldmsd commands. For example, @@ -207,7 +206,26 @@ 'opt_attr': ['instance'] }, ##### Authetication. ##### - 'auth_add': {'req_attr': ['name', 'plugin'], 'opt_attr': []}, + 'auth_add': {'req_attr': ['name', 'xprt', 'host', 'port', 'reconnect'], + 'opt_attr' : [ 'auth', 'perm', 'rail', 'credits', 'rx_rate' ] }, + ##### Sampler Discovery ##### + 'advertiser_add': {'req_attr': ['name', 'xprt', 'host', 'port'], + 'opt_attr' : [ 'auth', 'perm', 'interval', + 'reconnect', 'rail', + 'credits', 'rx_rate' ] }, + 'advertiser_del': {'req_attr': ['name'], 'opt_attr': []}, + 'advertiser_start': {'req_attr': ['name'], + 'opt_attr' : ['xprt', 'host', 'port', + 'auth', 'perm', + 'reconnect', 'rail', + 'credits', 'rx_rate' ] }, + 'advertiser_stop': {'req_attr': ['name'], 'opt_attr': []}, + 'prdcr_listen_add': {'req_attr': ['name'], + 'opt_attr': ['rail', 'ip', 'credits', 'rx_rate', 'regex', 'disabled_start']}, + 'prdcr_listen_del': {'req_attr': ['name'], 'opt_attr': []}, + 'prdcr_listen_start': {'req_attr': ['name'], 'opt_attr': []}, + 'prdcr_listen_stop': {'req_attr': ['name'], 'opt_attr': []}, + 'prdcr_listen_status': {'req_attr': [], 'opt_attr': []}, } def fmt_status(msg): @@ -276,7 +294,9 @@ class LDMSD_Req_Attr(object): CREDITS = 39 RX_RATE = 40 SUMMARY = 41 - LAST = 42 + SIZE = 42 + IP = 43 + LAST = 44 NAME_ID_MAP = {'name': NAME, 'interval': INTERVAL, @@ -323,6 +343,8 @@ class LDMSD_Req_Attr(object): 'rx_rate' : RX_RATE, 'reconnect' : INTERVAL, 'summary' : SUMMARY, + 'size' : SIZE, + 'IP' : IP, 'TERMINATING': LAST } @@ -367,6 +389,7 @@ class LDMSD_Req_Attr(object): CREDITS : 'credits', RX_RATE : 'rx_rate', SUMMARY : 'summary', + IP : 'ip', LAST : 'TERMINATING' } @@ -474,6 +497,17 @@ class LDMSD_Request(object): PRDCR_SUBSCRIBE = 0x100 + 9 PRDCR_UNSUBSCRIBE = 0x100 + 10 PRDCR_STREAM_STATUS = 0x100 + 11 + PRDCR_BRDIGE_ADD = 0x100 + 12 + ADVERTISER_ADD = 0x100 + 13 + ADVERTISER_START = 0x100 + 14 + ADVERTISER_STOP = 0x100 + 15 + ADVERTISER_DEL = 0x100 + 16 + PRDCR_LISTEN_ADD = 0x100 + 17 + PRDCR_LISTEN_DEL = 0x100 + 18 + PRDCR_LISTEN_START = 0x100 + 19 + PRDCR_LISTEN_STOP = 0x100 + 20 + PRDCR_LISTEN_STATUS = 0x100 + 21 + ADVERTISE = 0x100 + 22 STRGP_ADD = 0x200 STRGP_DEL = 0x200 + 1 @@ -581,6 +615,16 @@ class LDMSD_Request(object): 'prdcr_unsubscribe': {'id': PRDCR_UNSUBSCRIBE}, 'prdcr_stream_status' : {'id': PRDCR_STREAM_STATUS}, + 'advertiser_add': {'id': ADVERTISER_ADD}, + 'advertiser_start': {'id': ADVERTISER_START}, + 'advertiser_stop': {'id': ADVERTISER_STOP}, + 'advertiser_del': {'id': ADVERTISER_DEL}, + 'prdcr_listen_add': {'id': PRDCR_LISTEN_ADD}, + 'prdcr_listen_start': {'id': PRDCR_LISTEN_START}, + 'prdcr_listen_stop': {'id': PRDCR_LISTEN_STOP}, + 'prdcr_listen_del': {'id': PRDCR_LISTEN_DEL}, + 'prdcr_listen_status': {'id': PRDCR_LISTEN_STATUS}, + 'strgp_add': {'id': STRGP_ADD}, 'strgp_del': {'id': STRGP_DEL}, 'strgp_start': {'id': STRGP_START}, @@ -2065,6 +2109,30 @@ def plugn_start(self, name, interval_us, offset_us=None): self.close() return errno.ENOTCONN, str(e) + def _prdcr_add_attr_prep(self, **kwargs): + attrs = [ + LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.NAME, value=kwargs['name']), + LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.XPRT, value=kwargs['xprt']), + LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.HOST, value=kwargs['host']), + LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.PORT, value=str(kwargs['port'])) + ] + if 'reconnect' in kwargs.keys() and kwargs['reconnect']: + attrs.append(LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.INTERVAL, value=str(kwargs['reconnect']))) + if 'ptype' in kwargs.keys() and kwargs['ptype']: + attrs.append(LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.TYPE, value=kwargs['ptype'])) + if 'auth' in kwargs.keys() and kwargs['auth']: + attrs.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.AUTH, value=kwargs['auth'])) + if 'perm' in kwargs.keys() and kwargs['perm']: + attrs.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.PERM, value=str(kwargs['perm']))) + if 'rail' in kwargs.keys() and kwargs['rail']: + attrs.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.RAIL, value=str(int(kwargs['rail'])))) + if 'credit' in kwargs.keys() and kwargs['credits']: + attrs.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.CREDITS, value=str(int(kwargs['credits'])))) + if 'rx_rate' in kwargs.keys() and kwargs['rx_rate']: + attrs.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.RX_RATE, value=str(int(kwargs['rx_rate'])))) + + return attrs + def prdcr_add(self, name, ptype, xprt, host, port, reconnect, auth=None, perm=None, rail=None, credits=None, rx_rate=None): """ @@ -2098,28 +2166,11 @@ def prdcr_add(self, name, ptype, xprt, host, port, reconnect, auth=None, perm=No - status is an errno from the errno module - data is an error message if status != 0 or None """ - attrs = [ - LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.NAME, value=name), - LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.TYPE, value=ptype), - LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.XPRT, value=xprt), - LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.HOST, value=host), - LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.PORT, value=str(port)), - LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.INTERVAL, value=str(reconnect)) - ] - if auth: - attrs.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.AUTH, value=auth)) - if perm: - attrs.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.PERM, value=str(perm))) - if rail: - attrs.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.RAIL, value=str(int(rail)))) - if credits: - attrs.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.CREDITS, value=str(int(credits)))) - if rx_rate: - attrs.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.RX_RATE, value=str(int(rx_rate)))) - - req = LDMSD_Request( - command_id=LDMSD_Request.PRDCR_ADD, - attrs=attrs) + args_d = {'name': name, 'ptype': ptype, 'xprt': xprt, 'host': host, 'port': port, + 'reconnect': reconnect, 'auth': auth, 'perm': perm, + 'rail': rail, 'credits': credits, 'rx_rate': rx_rate} + attrs = self._prdcr_add_attr_prep(**args_d) + req = LDMSD_Request( command_id = LDMSD_Request.PRDCR_ADD, attrs = attrs) try: req.send(self) resp = req.receive(self) @@ -2153,7 +2204,7 @@ def prdcr_del(self, name): self.close() return errno.ENOTCONN, str(e) - def prdcr_start(self, name, regex=True, reconnect=None): + def prdcr_start(self, name, regex=True, reconnect=None, **kwargs): """ Start one or more STOPPED producers @@ -2167,6 +2218,9 @@ def prdcr_start(self, name, regex=True, reconnect=None): reconnect - The reconnect interval in microseconds. If not None, this will override the interval specified when the producer was created. Default is None. + kwargs - Additional keyword argument as in prdcr_add(). + It is to support producer creation if it doesn't exist at start. + Currently, only advertiser_start() uses this feature. Returns: A tuple of status, data @@ -2187,6 +2241,9 @@ def prdcr_start(self, name, regex=True, reconnect=None): attrs.append(LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.INTERVAL, value = str(reconnect))) + for key, value in kwargs.items(): + attrs.append(LDMSD_Req_Attr(attr_name = key, value = value)) + req = LDMSD_Request(command_id = cmd_id, attrs = attrs) try: req.send(self) @@ -2394,6 +2451,235 @@ def prdcr_hint_tree(self, name=None): except Exception as e: return errno.ENOTCONN, str(e) + def advertiser_add(self, name, xprt, host, port, reconnect, auth=None, perm=None, + rail=None, credits=None, rx_rate=None): + """ + Add an advertiser. An advertiser sends an advertisement to an aggregator + add it as a producer. Once started, the LDSMD will attempt to + periodically send a connection request until a connection is established. + + An advertiser starts in the STOPPED state. Use the advertiser_start() function + to start the advertiser. + + Parameters: + - The name to give the advertiser. This name must be unique among all advertisement sent to the aggregator. + - The transport type, one of 'sock', 'ugni', 'rdma', or 'fabric' + - The aggregator's hostname + - The aggregator's listening port number + - The reconnect interval in microseconds + + Keyword Parameters: + auth - The authentication demain + perm - The configuration client permission required to + modify the producer configuration. Default is None. + rail - The number of endpoints in a rail. The default is 1. + credits - The send credits of our side of the connection (the daemon we + are controlling). The default is the daemon's default + ('-C' ldmsd option). + rx_rate - The recv rate (bytes/second) limit for this connection. The + default is -1 (unlimited). + + Returns: + A tuple of status, data + - status is an errno from the errno module + - data is an error message if status != 0 or None + """ + args_d = {'name': name, 'xprt': xprt, 'host': host, 'port': port, + 'reconnect': reconnect, 'auth': auth, 'perm': perm, + 'rail': rail, 'credits': credits, 'rx_rate': rx_rate} + attrs = self._prdcr_add_attr_prep(**args_d) + attrs.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.TYPE, value="advertise")) + req = LDMSD_Request( command_id = LDMSD_Request.ADVERTISER_ADD, attrs = attrs) + try: + req.send(self) + resp = req.receive(self) + return resp['errcode'], resp['msg'] + except Exception as e: + self.close() + return errno.ENOTCONN, str(e) + + def advertiser_start(self, name, xprt=None, host=None, port=None, + reconnect=None, auth=None, perm=None, + rail=None, credits=None, rx_rate=None): + """ + Start an advertiser. If the advertiser does not exist, LDMSD will create it. + In this case, the values of the required attributes in advertiser_add must be given. + + Parameters: + - The name to give the advertiser. This name must be unique among all advertisement sent to the aggregator. + + Keyword Parameters: + xprt - The transport type, one of 'sock', 'ugni', 'rdma', or 'fabric' + host - The aggregator's hostname + port - The aggregator's listening port number + reconnect - The reconnect interval in microseconds + auth - The authentication demain + perm - The configuration client permission required to + modify the producer configuration. Default is None. + rail - The number of endpoints in a rail. The default is 1. + credits - The send credits of our side of the connection (the daemon we + are controlling). The default is the daemon's default + ('-C' ldmsd option). + rx_rate - The recv rate (bytes/second) limit for this connection. The + default is -1 (unlimited). + + Returns: + A tuple of status, data + - status is an errno from the errno module + - data is an error message if status != 0 or None + """ + args_d = {'name': name, 'xprt': xprt, 'host': host, 'port': port, + 'reconnect': reconnect, 'auth': auth, 'perm': perm, + 'rail': rail, 'credits': credits, 'rx_rate': rx_rate} + attrs = self._prdcr_add_attr_prep(**args_d) + attrs.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.TYPE, value="advertise")) + req = LDMSD_Request( command_id = LDMSD_Request.ADVERTISER_START, attrs = attrs) + try: + req.send(self) + resp = req.receive(self) + return resp['errcode'], resp['msg'] + except Exception as e: + self.close() + return errno.ENOTCONN, str(e) + + def advertiser_stop(self, name): + req = LDMSD_Request(command_id = LDMSD_Request.ADVERTISER_STOP, + attrs = [LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.NAME, value=name)]) + try: + req.send(self) + resp = req.receive(self) + return resp['errcode'], resp['msg'] + except Exception as e: + self.close() + return errno.ENOTCONN, str(e) + + def advertiser_del(self, name): + req = LDMSD_Request(command_id = LDMSD_Request.ADVERTISER_DEL, + attrs = [LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.NAME, value=name)]) + try: + req.send(self) + resp = req.receive(self) + return resp['errcode'], resp['msg'] + except Exception as e: + self.close() + return errno.ENOTCONN, str(e) + + def prdcr_listen_add(self, name, disabled_start=None, regex=None, ip=None, rail=None, credits=None, rx_rate=None): + """ + Tell an aggregator to wait for advertisements from samplers + + The ggregator automatically adds and starts a producer when it receives + an advertisement that the peer (sampler) hostname matches the regular expression. + + Parameters: + - Name of the producer listen + - Regular expression to match sampler hostnames + - The number of rail + - The credits in bytes + - The receive rate limit + + Return: + - status is an errno from the errno module + - data is an error message if status !=0 or None + """ + attr_list = [ LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.NAME, value=name) ] + if disabled_start is not None: + attr_list.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.AUTO_INTERVAL, value=disabled_start)) + if regex is not None: + attr_list.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.REGEX, value=regex)) + if ip is not None: + attr_list.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.IP, value=ip)) + if rail is not None: + attr_list.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.RAIL, value=rail)) + if credits is not None: + attr_list.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.CREDITS, value=credits)) + if rx_rate is not None: + attr_list.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.RX_RATE, value=rx_rate)) + + req = LDMSD_Request(command_id=LDMSD_Request.PRDCR_LISTEN_ADD, + attrs=attr_list) + try: + req.send(self) + resp = req.receive(self) + return resp['errcode'], resp['msg'] + except Exception as e: + return errno.ENOTCONN, str(e) + + def prdcr_listen_del(self, name): + """ + Delete a producer listen + + Parameter: + - Name of the producer listen to be deleted + + Return: + - Status is an errno from the errno module + - Data is an error message if status != 0 or None + """ + attr_list = [ LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.NAME, value=name)] + req = LDMSD_Request(command_id=LDMSD_Request.PRDCR_LISTEN_DEL, + attrs=attr_list) + try: + req.send(self) + resp = req.receive(self) + return resp['errcode'], resp['msg'] + except Exception as e: + return errno.ENOTCONN, str(e) + + def prdcr_listen_start(self, name): + """ + Start a producer listen + + Parameter: + - Name of the producer listen to be started + + Return: + - Status is an errno from the errno module + - Data is an error message if status != 0 or None + """ + attr_list = [ LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.NAME, value=name)] + req = LDMSD_Request(command_id=LDMSD_Request.PRDCR_LISTEN_START, + attrs=attr_list) + try: + req.send(self) + resp = req.receive(self) + return resp['errcode'], resp['msg'] + except Exception as e: + return errno.ENOTCONN, str(e) + + def prdcr_listen_stop(self, name): + """ + Stop a producer listen + + Parameter: + - Name of the producer listen to be stopped + + Return: + - Status is an errno from the errno module + - Data is an error message if status != 0 or None + """ + attr_list = [ LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.NAME, value=name)] + req = LDMSD_Request(command_id=LDMSD_Request.PRDCR_LISTEN_STOP, + attrs=attr_list) + try: + req.send(self) + resp = req.receive(self) + return resp['errcode'], resp['msg'] + except Exception as e: + return errno.ENOTCONN, str(e) + + def prdcr_listen_status(self): + """ + Get the status of all producer listen + """ + req = LDMSD_Request(command_id=LDMSD_Request.PRDCR_LISTEN_STATUS) + try: + req.send(self) + resp = req.receive(self) + return resp['errcode'], resp['msg'] + except Exception as e: + return errno.ENOTCONN, str(e) + def updtr_add(self, name, interval=1000000, offset=None, push=None, auto=None, perm=None): """ Add an Updater that will periodically update Producer metric sets either diff --git a/ldms/python/ldmsd/ldmsd_controller b/ldms/python/ldmsd/ldmsd_controller index 6cecbb54e8..7340828565 100755 --- a/ldms/python/ldmsd/ldmsd_controller +++ b/ldms/python/ldmsd/ldmsd_controller @@ -426,6 +426,7 @@ class LdmsdCmdParser(cmd.Cmd): [rx_rate=] The recv rate (bytes/sec) limit for this connection. The default is -1 (unlimited). """ + arg = f"{arg} type=bridge" # Automatically add the producer type arg = self.handle_args('prdcr_add', arg) if arg is None: return @@ -1023,10 +1024,19 @@ class LdmsdCmdParser(cmd.Cmd): print("Name Host Port Transport State Type") print("---------------- ---------------- ------------ ------------ ------------ ----------") for prdcr in producers: + port = prdcr['port'] if prdcr['type'] == 'bridge': continue - print(f"{prdcr['name']:16} {prdcr['host']:16} {prdcr['port']:12} {prdcr['transport']:12} " \ - f"{prdcr['state']:12} {prdcr['type']:10}") + pstate = prdcr['state'] + if prdcr['type'] == 'advertised': + if prdcr['state'] == 'STANDBY': + # We report STOPPED to tell + # the users that the producer is not running. + pstate = "STOPPED" + print(f"{prdcr['name']:16} {prdcr['host']:16} " \ + f"{port:12} " \ + f"{prdcr['transport']:12} " \ + f"{pstate:12} {prdcr['type']:10}") for pset in prdcr['sets']: print(" {0:16} {1:16} {2}".format(pset['inst_name'], pset['schema_name'], @@ -2800,6 +2810,243 @@ class LdmsdCmdParser(cmd.Cmd): if rc: print(f"Failed to reset the statistics") + def do_advertiser_add(self, arg): + """ + Add an advertisement of its hostname to an aggregator. This is a part of the sampler discovery feature. + Parameters: + name= A unique name to be used as producer name on the aggregator + xprt= The transport name [sock, rdma, ugni] + host= The aggregator hostname + reconnect= The connection retry interval + [auth=] The authentication method + [perm=] The permission to modify the producer in the future. + [rail=] The number of rail endpoints for the prdcr (default: 1). + [credits=] The send credits our ldmsd (the one we are controlling) + advertises to the prdcr (default: value from ldmsd --credits + option). This limits how much outstanding data our ldmsd + holds for the prdcr. The prdcr drops messages when it does + not have enough send credits. + [rx_rate=] The recv rate (bytes/sec) limit for this connection. The + default is -1 (unlimited). + """ + arg = self.handle_args('advertiser_add', arg) + if arg is None: + return + if arg['reconnect'] is None: + print(f"The attribute 'reconnect' is missing.") + else: + rc, msg = self.comm.advertiser_add(arg['name'], + arg['xprt'], + arg['host'], + arg['port'], + arg['reconnect'], + arg['auth'], + arg['perm'], + arg['rail'], + arg['credits'], + arg['rx_rate']) + if rc: + print(f'Error adding Bridge {arg["name"]}: {msg}') + + def complete_advertiser_add(self, text, line, begidx, endidx): + return self.__complete_attr_list('advertiser_add', text) + + def do_advertiser_del(self, arg): + """ + Delete an advertise from the sampler daemon. The advertise connot be in use. + Parameters: + name = The advertise name + """ + arg = self.handle_args('advertiser_del', arg) + if arg is None: + return + rc, msg = self.comm.advertiser_del(**arg) + if rc: + print(f"Error deleting advertiser {arg['name']}: {msg}") + + def complete_advertiser_del(self, text, line, begidx, endidx): + return self.__complete_attr_list('advertiser_del', text) + + def do_advertiser_start(self, arg): + """ + Start an advertisement + Parameters: + name = The advertise name + """ + arg = self.handle_args('advertiser_start', arg) + if arg is None: + return + rc, msg = self.comm.advertiser_start(**arg) + if rc: + print(f"Error starting advertiser {arg['name']}: {msg}") + + def complete_advertiser_start(self, text, line, begidx, endidx): + return self.__complete_attr_list('advertiser_start', text) + + def do_advertiser_stop(self, arg): + """ + Stop an advertisement + Parameters: + name = The advertise name + """ + arg = self.handle_args('advertiser_stop', arg) + if arg is None: + return + rc, msg = self.comm.advertiser_stop(**arg) + if rc: + print(f"Error stopping advertiser {arg['name']}: {msg}") + + def complete_advertiser_stop(self, text, line, begidx, endidx): + return self.__complete_attr_list('advertiser_stop', text) + + def do_advertiser_status(self, arg): + """ + Get the statuses of advertisers + Parameters: + [name=] Advertiser name + """ + arg = self.handle_args('prdcr_status', arg) + if arg: + rc, msg = self.comm.prdcr_status(arg['name']) + if rc == 0 and msg is not None: + producers = fmt_status(msg) + print("Name Aggregator Host Aggregator Port Transport Reconnect(us) State") + print("---------------- ---------------- --------------- ------------ --------------- ------------") + for prdcr in producers: + if prdcr['type'] != 'advertise': + continue + print(f"{prdcr['name']:16} {prdcr['host']:16} {prdcr['port']:<15} " \ + f"{prdcr['transport']:12} {prdcr['reconnect_us']:15} " \ + f"{prdcr['state']:12}") + else: + print(f'Error getting advertise status: {msg}') + + def complete_advertiser_status(self, text, line, begidx, endidx): + return self.__complete_attr_list('prdcr_status', text) + + def do_prdcr_listen_add(self, arg): + """ + Add a producer listen + + The producer listen must be started by using 'prdcr_listen_start'. + + After the producer listen starts, + the aggregator waits for advertisements from samplers and + automatically adds and starts a producer if the peer (sampler) hostname + matches the regular expression. + + The auto-generated producers can be stopped and restarted by using + prdcr_stop and prdcr_start, respectively, as manually added producers. + + Parameters: + name= A unique name of the producer listen + reconnect= The retry interval to check for connection establishment of producers matched the regular expression. + [disabled_start=] Tell LDMSD not to start the producers + [regex=] A regular expression to match sampler hostnames + [rail=] The number of rail endpoints for the prdcr (default: 1). + [credits=] The send credits our ldmsd (the one we are controlling) + advertises to the prdcr (default: value from ldmsd --credits + option). This limits how much outstanding data our ldmsd + holds for the prdcr. The prdcr drops messages when it does + not have enough send credits. + [rx_rate=] The recv rate (bytes/sec) limit for this connection. The + default is -1 (unlimited). + """ + arg = self.handle_args('prdcr_listen_add', arg) + if arg is None: + return + rc, msg = self.comm.prdcr_listen_add(**arg) + if rc: + print(f"Error adding producer listen {arg['name']}: {msg}") + + def complete_prdcr_listen_add(self, text, line, begidx, endidx): + return self.__complete_attr_list('prdcr_listen_add', text) + + def do_prdcr_listen_del(self, arg): + """ + Delete a producer_listen + + The producer listen must not be running. + + Parameters: + name= A unique name of the producer listen + """ + arg = self.handle_args('prdcr_listen_del', arg) + if arg is None: + return + rc, msg = self.comm.prdcr_listen_del(**arg) + if rc: + print(f"Error deleting producer listen {arg['name']}: {msg}") + + def complete_prdcr_listen_del(self, text, line, begidx, endidx): + return self.__complete_attr_list('prdcr_listen_del', text) + + def do_prdcr_listen_start(self, arg): + """ + Start a producer_listen + + The aggregator waits for advertisements from samplers. It matches the + hostnames in advertisements with the regular expression. If they match, + there are two scenarios; 1) no producer of the same name exists, and 2) + a producer of the same name exists. In the former case, the aggregator + will create a producer with the given name and start it. In the later + case, if the producer is stopped, the aggregator will _not_ start it + automatically. The producer can be started using `prdcr_start` or + `prdcr_start_regex`. + + Parameters: + name= A unique name of the producer listen + """ + arg = self.handle_args('prdcr_listen_start', arg) + if arg is None: + return + rc, msg = self.comm.prdcr_listen_start(**arg) + if rc: + print(f"Error starting producer listen {arg['name']}: {msg}") + + def complete_prdcr_listen_start(self, text, line, begidx, endidx): + return self.__complete_attr_list('prdcr_listen_start', text) + + def do_prdcr_listen_stop(self, arg): + """ + Stop a running producer_listen + + The aggregator stops matching the hostnames in advertisements with the + regular expression. That is, the aggregator will not automatically add + any producers that the hostnames matches the regular expression. + + Parameters: + name= A unique name of the producer listen + """ + arg = self.handle_args('prdcr_listen_stop', arg) + if arg is None: + return + rc, msg = self.comm.prdcr_listen_stop(**arg) + if rc: + print(f"Error stopping producer listen {arg['name']}: {msg}") + + def complete_prdcr_listen_stop(self, text, line, begidx, endidx): + return self.__complete_attr_list('prdcr_listen_stop', text) + + def do_prdcr_listen_status(self, arg): + """ + Display the status of all producer listen + """ + arg = self.handle_args('prdcr_listen_status', arg) + if arg is None: + return + rc, msg = self.comm.prdcr_listen_status(**arg) + if rc == 0 and msg is not None: + l = fmt_status(msg) + print(f"{'Name':20} {'State':10} {'Regex':15} {'IP Range':30}") + print(f"{'-'*20} {'-'*10} {'-'*15} {'-'*30}") + for pl in l: + print(f"{pl['name']:20} {pl['state']:10} {pl['regex']:15} {pl['IP range']:30}") + if len(pl['producers']): + print(f"Producers: {', '.join(p for p in pl['producers'])}") + else: + print(f'Error getting prdcr_listen status: {msg}') + def do_option(self, arg): """ ONLY SUPPORTED IN CONFIGURATION FILES diff --git a/ldms/src/core/ldms.h b/ldms/src/core/ldms.h index e2a2d132ea..1f0a0c5d93 100644 --- a/ldms/src/core/ldms.h +++ b/ldms/src/core/ldms.h @@ -902,6 +902,51 @@ int ldms_xprt_sockaddr(ldms_t x, struct sockaddr *local_sa, struct sockaddr *remote_sa, socklen_t *sa_len); +/* currently only support IPv4 and IPv6 */ +struct ldms_addr { + sa_family_t sa_family; /* host-endian */ + in_port_t sin_port; /* network-endian */ + uint8_t addr[16]; /* addr[0-3] for IPv4, + addr[0-15] for IPv6 */ +}; + +/** + * \brief Get local and remote address in \c ldms_addr struct from the xprt + * + * \param x LDMS Transport pointer + * \param local_addr Local address (re-entrant) + * \param remote_addr Remote address (re-entrant) + * + * \return 0 on success. + */ +int ldms_xprt_addr(ldms_t x, struct ldms_addr *local_addr, + struct ldms_addr *remote_addr); + +const char *ldms_sockaddr_ntop(struct sockaddr *sa, char *buff, size_t sz); + +/** + * \brief Convert a CIDR IP address string to \c ldms_addr + * + * The address is stored in \c addr, and the prefix length is stored in \c prefix_len. + * + * \param addr ldms_addr pointer + * \param prefix_len Integer pointer + * + * \retval 0 if success. Otherwise, an errno is returned. + */ +int ldms_cidr2addr(const char *cdir_str, struct ldms_addr *addr, int *prefix_len); + +/** + * \brief Verify if \c sa is in \net_addr with the prefix \c prefix_len + * + * \param ip_addr IP Address + * \param net_addr Network Address + * \param prefix_len Prefix length for masking + * + * \return 1 if the IP address is in the network address. Otherwise, 0 is returned. + */ +int ldms_addr_in_network_addr(struct ldms_addr *ip_addr, + struct ldms_addr *net_addr, int prefix_len); /** * \brief Close a connection to an LDMS host. * @@ -1104,14 +1149,6 @@ int ldms_stream_publish_file(ldms_t x, const char *stream_name, typedef struct ldms_stream_client_s *ldms_stream_client_t; typedef struct json_entity_s *json_entity_t; -/* currently only support IPv4 and IPv6 */ -struct ldms_addr { - sa_family_t sa_family; /* host-endian */ - in_port_t sin_port; /* network-endian */ - uint8_t addr[16]; /* addr[0-3] for IPv4, - addr[0-15] for IPv6 */ -}; - enum ldms_stream_event_type { LDMS_STREAM_EVENT_RECV, /* stream data received */ LDMS_STREAM_EVENT_SUBSCRIBE_STATUS, /* reporting subscription status */ diff --git a/ldms/src/core/ldms_rail.c b/ldms/src/core/ldms_rail.c index d35bbf074b..773577ab07 100644 --- a/ldms/src/core/ldms_rail.c +++ b/ldms/src/core/ldms_rail.c @@ -1411,6 +1411,65 @@ const char *ldms_addr_ntop(struct ldms_addr *addr, char *buff, size_t sz) return buff; } +/* *2 for the two hex digits needed for each 16-bit value, * 8 for the 8 groups of values, + 7 for the 7 colons */ +#define MAX_IPV6_STR_LEN (sizeof(uint16_t) * 2 * 8 + 7) +int ldms_cidr2addr(const char *cdir_str, struct ldms_addr *addr, int *prefix_len) +{ + int rc; + int is_ipv6 = 0; + char netaddr_str[MAX_IPV6_STR_LEN]; + int _prefix_len; + + if (strchr(cdir_str, ':') != NULL) + is_ipv6 = 1; + + rc = sscanf(cdir_str, "%[^/]/%d", netaddr_str, &_prefix_len); + if (rc != 2) { + return EINVAL; + } + + if (prefix_len) + *prefix_len = _prefix_len; + + if (addr) { + if (is_ipv6) { + addr->sa_family = AF_INET6; + rc = inet_pton(AF_INET6, netaddr_str, &addr->addr); + } else { + addr->sa_family = AF_INET; + rc = inet_pton(AF_INET, netaddr_str, &addr->addr); + } + } + + if (rc != 1) + return rc; + return 0; +} + +int ldms_addr_in_network_addr(struct ldms_addr *ip_addr, + struct ldms_addr *net_addr, int prefix_len) +{ + if (ip_addr->sa_family != net_addr->sa_family) + return 0; + + int i; + int masked_bytes = prefix_len/8; + int residue_bits = prefix_len % 8; + + for (i = 0; i < masked_bytes; i++) { + if (ip_addr->addr[i] != net_addr->addr[i]) + return 0; + } + + if (residue_bits) { + uint8_t mask_bits = 0xff << (8 - residue_bits); + if ( (ip_addr->addr[i] & mask_bits) != (net_addr->addr[i] & mask_bits)) + return 0; + } + + return 1; +} + ldms_t __ldms_xprt_to_rail(ldms_t x) { if (XTYPE_IS_RAIL(x->xtype)) { diff --git a/ldms/src/core/ldms_xprt.c b/ldms/src/core/ldms_xprt.c index e6c1673703..b0e710bfa9 100644 --- a/ldms/src/core/ldms_xprt.c +++ b/ldms/src/core/ldms_xprt.c @@ -4293,6 +4293,31 @@ int ldms_xprt_sockaddr(ldms_t x, struct sockaddr *local_sa, return x->ops.sockaddr(x, local_sa, remote_sa, sa_len); } +/* The implementation is in ldms_rail.c. */ +extern int sockaddr2ldms_addr(struct sockaddr *sa, struct ldms_addr *la); +int ldms_xprt_addr(ldms_t x, struct ldms_addr *local_addr, + struct ldms_addr *remote_addr) +{ + int rc; + struct sockaddr_storage local_so, remote_so; + socklen_t so_len = sizeof(local_so); + + rc = ldms_xprt_sockaddr(x, (void*)&local_so, (void*)&remote_so, &so_len); + if (rc) + return rc; + if (local_addr) { + rc = sockaddr2ldms_addr((void*)&local_so, local_addr); + if (rc) + return rc; + } + if (remote_addr) { + rc = sockaddr2ldms_addr((void*)&remote_so, remote_addr); + if (rc) + return rc; + } + return 0; +} + int __ldms_xprt_get_threads(ldms_t x, pthread_t *out, int n) { if (n < 1) diff --git a/ldms/src/ldmsd/ldmsd.h b/ldms/src/ldmsd/ldmsd.h index 6c6b60d0f6..c956a8d072 100644 --- a/ldms/src/ldmsd/ldmsd.h +++ b/ldms/src/ldmsd/ldmsd.h @@ -153,6 +153,7 @@ typedef enum ldmsd_cfgobj_type { LDMSD_CFGOBJ_STRGP, LDMSD_CFGOBJ_LISTEN, LDMSD_CFGOBJ_AUTH, + LDMSD_CFGOBJ_PRDCR_LISTEN, } ldmsd_cfgobj_type_t; struct ldmsd_cfgobj; @@ -221,7 +222,7 @@ typedef struct ldmsd_prdcr { LDMSD_PRDCR_STATE_DISCONNECTED, /** Connection request is outstanding */ LDMSD_PRDCR_STATE_CONNECTING, - /** Connect complete */ + /** Connect complete, and ready to send a dir request */ LDMSD_PRDCR_STATE_CONNECTED, /** Waiting for task join and xprt cleanup */ LDMSD_PRDCR_STATE_STOPPING, @@ -235,7 +236,28 @@ typedef struct ldmsd_prdcr { /** Producer is local to this daemon */ LDMSD_PRDCR_TYPE_LOCAL, /** Connection initiated at this side but the peer will initiate the dir request. */ + /** + * Connection initiated at this side and the peer is aware of its existence. + * The peer will initiate the dir request after the connection is established. + */ LDMSD_PRDCR_TYPE_BRIDGE, + /** + * Connection initiated at this side to advertise itself to the peer. + * The peer does not know about its existence until it sends + * an advertise_notification request. The peer will initiate the dir request + * after the peer verifies its hostname. + */ + LDMSD_PRDCR_TYPE_ADVERTISER, + /** + * The producer is generated by LDMSD upon receiving a + * advertise_notification request that the hostname matches + * a regular expression of a listening produce. LDMSD also starts + * the producer automatically after its creation. + * + * Similarly to passive producers, the connection is initiated + * by an advertise producer on the peer. This side initiates the dir request. + */ + LDMSD_PRDCR_TYPE_ADVERTISED, } type; struct ldmsd_task task; @@ -338,6 +360,38 @@ typedef struct ldmsd_prdcr_ref { struct rbn rbn; } *ldmsd_prdcr_ref_t; +/** + * Listening Producer: Named set of conditions of LDMS metric set providers + */ +typedef struct ldmsd_prdcr_listen { + struct ldmsd_cfgobj obj; + enum ldmsd_listen_prdcr_state_e { + /** Initial listen producer state */ + LDMSD_PRDCR_LISTEN_STATE_STOPPED = 0, + /** Ready for handling advertise_notification and generating producer */ + LDMSD_PRDCR_LISTEN_STATE_RUNNING, + } state; + const char *hostname_regex_s; + uint64_t prdcr_conn_intvl; /* reconnect interval of generated producers */ + regex_t regex; + int rails; /* Rail size */ + int recv_credits; /* bytes */ + int rate_limits; /* bytes/sec */ + int auto_start; /* default is 1, i.e., auto start producers */ + + /* Network Address & prefix_len from a given CIDR IP address string */ + const char *cidr_str; /* IP Range */ + struct ldms_addr net_addr; + int prefix_len; + + /* + * For query the prdcr_listen information, ldmsd could report which + * producers were added because their hostnames match the regex of + * this prdcr_listen. + */ + struct rbt prdcr_tree; +} *ldmsd_prdcr_listen_t; + /** * Updater: Named set of rules for updating remote metric sets * @@ -381,6 +435,9 @@ typedef struct ldmsd_updtr { LDMSD_UPDTR_STATE_STOPPING, } state; + /* The list of regular expressions to match producer names. */ + LIST_HEAD(updtr_prdcr_filter, ldmsd_name_match) prdcr_filter; + /* * flag to enable or disable the functionality * that automatically schedules set updates according to diff --git a/ldms/src/ldmsd/ldmsd_cfgobj.c b/ldms/src/ldmsd/ldmsd_cfgobj.c index 6a3d23eea0..2271c110dd 100644 --- a/ldms/src/ldmsd/ldmsd_cfgobj.c +++ b/ldms/src/ldmsd/ldmsd_cfgobj.c @@ -84,12 +84,16 @@ pthread_mutex_t listen_tree_lock = PTHREAD_MUTEX_INITIALIZER; struct rbt auth_tree = RBT_INITIALIZER(cfgobj_cmp); pthread_mutex_t auth_tree_lock = PTHREAD_MUTEX_INITIALIZER; +struct rbt listen_prdcr_tree = RBT_INITIALIZER(cfgobj_cmp); +pthread_mutex_t listen_prdcr_tree_lock = PTHREAD_MUTEX_INITIALIZER; + pthread_mutex_t *cfgobj_locks[] = { [LDMSD_CFGOBJ_PRDCR] = &prdcr_tree_lock, [LDMSD_CFGOBJ_UPDTR] = &updtr_tree_lock, [LDMSD_CFGOBJ_STRGP] = &strgp_tree_lock, [LDMSD_CFGOBJ_LISTEN] = &listen_tree_lock, [LDMSD_CFGOBJ_AUTH] = &auth_tree_lock, + [LDMSD_CFGOBJ_PRDCR_LISTEN] = &listen_prdcr_tree_lock, }; struct rbt *cfgobj_trees[] = { @@ -98,6 +102,7 @@ struct rbt *cfgobj_trees[] = { [LDMSD_CFGOBJ_STRGP] = &strgp_tree, [LDMSD_CFGOBJ_LISTEN] = &listen_tree, [LDMSD_CFGOBJ_AUTH] = &auth_tree, + [LDMSD_CFGOBJ_PRDCR_LISTEN] = &listen_prdcr_tree, }; void ldmsd_cfgobj_init(void) @@ -107,6 +112,7 @@ void ldmsd_cfgobj_init(void) rbt_init(&strgp_tree, cfgobj_cmp); rbt_init(&listen_tree, cfgobj_cmp); rbt_init(&auth_tree, cfgobj_cmp); + rbt_init(&listen_prdcr_tree, cfgobj_cmp); } void ldmsd_cfgobj___del(ldmsd_cfgobj_t obj) diff --git a/ldms/src/ldmsd/ldmsd_config.c b/ldms/src/ldmsd/ldmsd_config.c index ab705dc8b1..01b05d1fe6 100644 --- a/ldms/src/ldmsd/ldmsd_config.c +++ b/ldms/src/ldmsd/ldmsd_config.c @@ -584,16 +584,10 @@ static uint64_t __get_cfgfile_id() } extern int is_req_id_priority(enum ldmsd_request req_id); -/* - * \param req_filter is a function that returns zero if we want to process the - * request, and returns non-zero otherwise. - */ static int __process_config_file(const char *path, int *lno, int trust, - int (*req_filter)(ldmsd_cfg_xprt_t, ldmsd_req_hdr_t, void *), - void *ctxt) + req_filter_fn req_filter, void *ctxt) { - static uint32_t msg_no = 0; int rc = 0; int lineno = 0; FILE *fin = NULL; @@ -627,6 +621,7 @@ int __process_config_file(const char *path, int *lno, int trust, } xprt.type = LDMSD_CFG_TYPE_FILE; + xprt.file.path = path; xprt.file.cfgfile_id = __get_cfgfile_id(); xprt.send_fn = log_response_fn; xprt.max_msg = LDMSD_CFG_FILE_XPRT_MAX_REC; @@ -710,7 +705,10 @@ int __process_config_file(const char *path, int *lno, int trust, } } - req_array = ldmsd_parse_config_str(line, msg_no, xprt.max_msg); + /* + * The message number is the line number. + */ + req_array = ldmsd_parse_config_str(line, lineno, xprt.max_msg); if (!req_array) { rc = errno; ovis_log(config_log, OVIS_LERROR, "Process config file error at line %d " @@ -743,27 +741,7 @@ int __process_config_file(const char *path, int *lno, int trust, if (xprt.max_msg < ntohl(request->rec_len)) xprt.max_msg = ntohl(request->rec_len); - if (req_filter) { - rc = req_filter(&xprt, request, ctxt); - /* rc = 0, filter OK */ - if (rc == 0) { - __dlog(DLOG_CFGOK, "# deferring line %d (%s): %s\n", - lineno, path, line); - goto next_req; - } - /* rc == errno */ - if (rc > 0) { - ovis_log(config_log, OVIS_LERROR, - "Configuration error at " - "line %d (%s)\n", lineno, path); - goto cleanup; - } else { - /* rc < 0, filter not applied */ - rc = 0; - } - } - - rc = ldmsd_process_config_request(&xprt, request); + rc = ldmsd_process_config_request(&xprt, request, req_filter, ctxt); if (rc || xprt.rsp_err) { if (!rc) rc = xprt.rsp_err; @@ -774,7 +752,6 @@ int __process_config_file(const char *path, int *lno, int trust, next_req: free(request); request = NULL; - msg_no += 1; off = 0; goto next_line; @@ -793,23 +770,17 @@ int __process_config_file(const char *path, int *lno, int trust, return rc; } -int __req_deferred_start_regex(ldmsd_req_hdr_t req, ldmsd_cfgobj_type_t type) +int __req_deferred_start_regex(ldmsd_req_ctxt_t reqc, ldmsd_cfgobj_type_t type) { regex_t regex = {0}; - ldmsd_req_attr_t attr; ldmsd_cfgobj_t obj; int rc; char *val; - attr = ldmsd_req_attr_get_by_id((void*)req, LDMSD_ATTR_REGEX); - if (!attr) { + val = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_REGEX); + if (!val) { ovis_log(NULL, OVIS_LERROR, "`regex` attribute is required.\n"); return EINVAL; } - val = str_repl_env_vars((char *)attr->attr_value); - if (!val) { - ovis_log(NULL, OVIS_LERROR, "Not enough memory.\n"); - return ENOMEM; - } rc = regcomp(®ex, val, REG_NOSUB); if (rc) { ovis_log(NULL, OVIS_LERROR, "Bad regex: %s\n", val); @@ -828,21 +799,15 @@ int __req_deferred_start_regex(ldmsd_req_hdr_t req, ldmsd_cfgobj_type_t type) return 0; } -int __req_deferred_start(ldmsd_req_hdr_t req, ldmsd_cfgobj_type_t type) +int __req_deferred_start(ldmsd_req_ctxt_t reqc, ldmsd_cfgobj_type_t type) { - ldmsd_req_attr_t attr; ldmsd_cfgobj_t obj; char *name; - attr = ldmsd_req_attr_get_by_id((void*)req, LDMSD_ATTR_NAME); - if (!attr) { + name = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_NAME); + if (!name) { ovis_log(NULL, OVIS_LERROR, "`name` attribute is required.\n"); return EINVAL; } - name = str_repl_env_vars((char *)attr->attr_value); - if (!name) { - ovis_log(NULL, OVIS_LERROR, "Not enough memory.\n"); - return ENOMEM; - } obj = ldmsd_cfgobj_find(name, type); if (!obj) { ovis_log(NULL, OVIS_LERROR, "Config object not found: %s\n", name); @@ -855,41 +820,75 @@ int __req_deferred_start(ldmsd_req_hdr_t req, ldmsd_cfgobj_type_t type) return 0; } +/* The implementation is in ldmsd_request.c. */ +extern ldmsd_prdcr_t +__prdcr_add_handler(ldmsd_req_ctxt_t reqc, char *verb, char *obj_name); +int __req_deferred_advertiser_start(ldmsd_req_ctxt_t reqc) +{ + int rc = 0; + ldmsd_prdcr_t prdcr; + char *name; + + name = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_NAME); + if (!name) { + ovis_log(config_log, OVIS_LERROR, "`name` attribute is required.\n"); + return EINVAL; + } + + prdcr = ldmsd_prdcr_find(name); + if (!prdcr) { + prdcr = __prdcr_add_handler(reqc, "advertiser_start", "advertiser"); + if (!prdcr) { + ovis_log(config_log, OVIS_LERROR, "%s", reqc->line_buf); + rc = reqc->errcode; + goto out; + } + ldmsd_prdcr_get(prdcr); + } + + prdcr->obj.perm |= LDMSD_PERM_DSTART; + ldmsd_prdcr_put(prdcr); +out: + free(name); + return rc; +} + /* * rc = 0, filter applied OK * rc > 0, rc == -errno, error * rc = -1, filter not applied (but not an error) */ -int __req_filter_failover(ldmsd_cfg_xprt_t x, ldmsd_req_hdr_t req, void *ctxt) +int __req_filter_failover(ldmsd_req_ctxt_t reqc, void *ctxt) { int *use_failover = ctxt; int rc; - /* req is in network byte order */ - ldmsd_ntoh_req_msg(req); - - switch (req->req_id) { + switch (reqc->req_id) { case LDMSD_FAILOVER_START_REQ: *use_failover = 1; rc = 0; break; case LDMSD_PRDCR_START_REGEX_REQ: - rc = __req_deferred_start_regex(req, LDMSD_CFGOBJ_PRDCR); + rc = __req_deferred_start_regex(reqc, LDMSD_CFGOBJ_PRDCR); + break; + case LDMSD_ADVERTISER_START_REQ: + rc = __req_deferred_advertiser_start(reqc); break; case LDMSD_PRDCR_START_REQ: - rc = __req_deferred_start(req, LDMSD_CFGOBJ_PRDCR); + rc = __req_deferred_start(reqc, LDMSD_CFGOBJ_PRDCR); break; case LDMSD_UPDTR_START_REQ: - rc = __req_deferred_start(req, LDMSD_CFGOBJ_UPDTR); + rc = __req_deferred_start(reqc, LDMSD_CFGOBJ_UPDTR); break; case LDMSD_STRGP_START_REQ: - rc = __req_deferred_start(req, LDMSD_CFGOBJ_STRGP); + rc = __req_deferred_start(reqc, LDMSD_CFGOBJ_STRGP); + break; + case LDMSD_PRDCR_LISTEN_START_REQ: + rc = __req_deferred_start(reqc, LDMSD_CFGOBJ_PRDCR_LISTEN); break; default: rc = -1; } - /* convert req back to network byte order */ - ldmsd_hton_req_msg(req); return rc; } @@ -954,11 +953,19 @@ int ldmsd_cfgobjs_start(int (*filter)(ldmsd_cfgobj_t)) ldmsd_cfg_unlock(LDMSD_CFGOBJ_STRGP); goto out; } - __dlog(DLOG_CFGOK, "strgp_start name=%s # delayed \n", - obj->name); + __dlog(DLOG_CFGOK, "strgp_start name=%s # delayed \n", + obj->name); } ldmsd_cfg_unlock(LDMSD_CFGOBJ_STRGP); + ldmsd_cfg_lock(LDMSD_CFGOBJ_PRDCR_LISTEN); + LDMSD_CFGOBJ_FOREACH(obj, LDMSD_CFGOBJ_PRDCR_LISTEN) { + if (filter && filter(obj)) + continue; + ((ldmsd_prdcr_listen_t)obj)->state = LDMSD_PRDCR_LISTEN_STATE_RUNNING; + } + ldmsd_cfg_unlock(LDMSD_CFGOBJ_PRDCR_LISTEN); + out: return rc; } @@ -1048,7 +1055,7 @@ void ldmsd_recv_msg(ldms_t x, char *data, size_t data_len) switch (ntohl(request->type)) { case LDMSD_REQ_TYPE_CONFIG_CMD: - (void)ldmsd_process_config_request(&xprt, request); + (void)ldmsd_process_config_request(&xprt, request, NULL, NULL); break; case LDMSD_REQ_TYPE_CONFIG_RESP: (void)ldmsd_process_config_response(&xprt, request); diff --git a/ldms/src/ldmsd/ldmsd_prdcr.c b/ldms/src/ldmsd/ldmsd_prdcr.c index afcfc89d80..3c727fe0c1 100644 --- a/ldms/src/ldmsd/ldmsd_prdcr.c +++ b/ldms/src/ldmsd/ldmsd_prdcr.c @@ -566,6 +566,73 @@ static void __ldmsd_xprt_ctxt_free(void *_ctxt) free(ctxt); } +static ovis_log_t config_log; +static int __advertise_resp_cb(ldmsd_req_cmd_t rcmd) +{ + ldmsd_req_hdr_t resp = (ldmsd_req_hdr_t)(rcmd->reqc->req_buf); + ldmsd_prdcr_t prdcr = (ldmsd_prdcr_t)(rcmd->ctxt); + /* The rsp_err value is set in ldmsd_request.c:advertise_notification_handler() */ + if (resp->rsp_err) { + char *errmsg = ldmsd_req_attr_str_value_get_by_id(rcmd->reqc, + LDMSD_ATTR_STRING); + if (ENOENT== resp->rsp_err) { + /* + * The hostname doesn't match any prdcr_listen on the aggregator. + * Retry! + * + * To simplify producer's state management, I decided to + * disconnect the connection to reset the producer state. + * This avoids the need for an additional state to differentiate + * between 'connected and matching a prdcr_listen on the peer' + * and 'connected but not yet matching any prdcr_listen on the peer'. + */ + if (prdcr->xprt) + ldms_xprt_close(prdcr->xprt); + ovis_log(config_log, OVIS_LINFO, "advertise: %s.\n", errmsg); + } else { + /* + * LDMSD doesn't automatically stop the advertisement to + * keep the consistency that LDMSD does not automatically + * start or stop any configuration objects. + */ + ovis_log(config_log, OVIS_LERROR, + "'advertise': An error occurred on the aggregator. " + "Error: \"%s\" Please stop advertising and restart " + "with updated configuration.\n", errmsg); + } + free(errmsg); + } + return 0; +} + +static int __send_advertisement(ldmsd_prdcr_t prdcr) +{ + int rc; + ldmsd_req_cmd_t rcmd; + char my_hostname[HOST_NAME_MAX+1]; + + rcmd = ldmsd_req_cmd_new(prdcr->xprt, + LDMSD_ADVERTISE_REQ, NULL, + __advertise_resp_cb, prdcr); + if (!rcmd) { + ovis_log(NULL, OVIS_LCRIT, "Memory allocation failure.\n"); + return ENOMEM; + } + + rc = ldmsd_req_cmd_attr_append_str(rcmd, LDMSD_ATTR_NAME, prdcr->obj.name); + if (rc) + goto out; + rc = gethostname(my_hostname, HOST_NAME_MAX+1); + rc = ldmsd_req_cmd_attr_append_str(rcmd, LDMSD_ATTR_HOST, my_hostname); + if (rc) + goto out; + rc = ldmsd_req_cmd_attr_term(rcmd); + if (rc) + goto out; +out: + return rc; +} + static int __sampler_routine(ldms_t x, ldms_xprt_event_t e, ldmsd_prdcr_t prdcr) { int is_reset_prdcr = 0; @@ -573,6 +640,10 @@ static int __sampler_routine(ldms_t x, ldms_xprt_event_t e, ldmsd_prdcr_t prdcr) case LDMS_XPRT_EVENT_CONNECTED: /* Do nothing */ prdcr->conn_state = LDMSD_PRDCR_STATE_CONNECTED; + if (prdcr->type == LDMSD_PRDCR_TYPE_ADVERTISER) { + __send_advertisement(prdcr); + /* TODO: handle the error */ + } break; case LDMS_XPRT_EVENT_DISCONNECTED: case LDMS_XPRT_EVENT_ERROR: @@ -581,6 +652,9 @@ static int __sampler_routine(ldms_t x, ldms_xprt_event_t e, ldmsd_prdcr_t prdcr) is_reset_prdcr = 1; break; case LDMS_XPRT_EVENT_RECV: + /* Receive the response of an advertisement */ + ldmsd_recv_msg(x, e->data, e->data_len); + break; case LDMS_XPRT_EVENT_SEND_COMPLETE: /* Ignore */ break; @@ -692,9 +766,11 @@ static void prdcr_connect_cb(ldms_t x, ldms_xprt_event_t e, void *cb_arg) switch (prdcr->type) { case LDMSD_PRDCR_TYPE_ACTIVE: case LDMSD_PRDCR_TYPE_PASSIVE: + case LDMSD_PRDCR_TYPE_ADVERTISED: is_reset_prdcr = __agg_routine(x, e, prdcr); break; case LDMSD_PRDCR_TYPE_BRIDGE: + case LDMSD_PRDCR_TYPE_ADVERTISER: is_reset_prdcr = __sampler_routine(x, e, prdcr); break; default: @@ -752,10 +828,11 @@ static void prdcr_connect(ldmsd_prdcr_t prdcr) { int ret; - assert(prdcr->xprt == NULL); switch (prdcr->type) { case LDMSD_PRDCR_TYPE_ACTIVE: case LDMSD_PRDCR_TYPE_BRIDGE: + case LDMSD_PRDCR_TYPE_ADVERTISER: + assert(prdcr->xprt == NULL); prdcr->conn_state = LDMSD_PRDCR_STATE_CONNECTING; prdcr->xprt = ldms_xprt_rail_new(prdcr->xprt_name, prdcr->rail, @@ -780,8 +857,15 @@ static void prdcr_connect(ldmsd_prdcr_t prdcr) } break; case LDMSD_PRDCR_TYPE_PASSIVE: + case LDMSD_PRDCR_TYPE_ADVERTISED: + assert(prdcr->xprt == NULL); prdcr->xprt = ldms_xprt_by_remote_sin((struct sockaddr *)&prdcr->ss); - /* Call connect callback to advance state and update timers*/ + /* + * The transport endpoint has be assigned in the advertise_notification handler before + * the producer has been started. + * + * Call connect callback to advance state and update timers + */ if (prdcr->xprt) { ldms_xprt_event_cb_set(prdcr->xprt, prdcr_connect_cb, prdcr); ldmsd_prdcr_unlock(prdcr); @@ -833,6 +917,10 @@ int ldmsd_prdcr_str2type(const char *type) prdcr_type = LDMSD_PRDCR_TYPE_LOCAL; else if (0 == strcasecmp(type, "bridge")) prdcr_type = LDMSD_PRDCR_TYPE_BRIDGE; + else if (0 == strcasecmp(type, "advertiser")) + prdcr_type = LDMSD_PRDCR_TYPE_ADVERTISER; + else if (0 == strcasecmp(type, "advertised")) + prdcr_type = LDMSD_PRDCR_TYPE_ADVERTISED; else return -EINVAL; return prdcr_type; @@ -848,10 +936,29 @@ const char *ldmsd_prdcr_type2str(enum ldmsd_prdcr_type type) return "local"; else if (LDMSD_PRDCR_TYPE_BRIDGE == type) return "bridge"; + else if (LDMSD_PRDCR_TYPE_ADVERTISER == type) + return "advertiser"; + else if (LDMSD_PRDCR_TYPE_ADVERTISED == type) + return "advertised"; else return NULL; } +int prdcr_ref_cmp(void *a, const void *b) +{ + return strcmp(a, b); +} + +ldmsd_prdcr_ref_t prdcr_ref_new(ldmsd_prdcr_t prdcr) +{ + ldmsd_prdcr_ref_t ref = calloc(1, sizeof *ref); + if (ref) { + ref->prdcr = ldmsd_prdcr_get(prdcr); + rbn_init(&ref->rbn, prdcr->obj.name); + } + return ref; +} + ldmsd_prdcr_t ldmsd_prdcr_new_with_auth(const char *name, const char *xprt_name, const char *host_name, const unsigned short port_no, @@ -885,8 +992,13 @@ ldmsd_prdcr_new_with_auth(const char *name, const char *xprt_name, if (!prdcr->host_name) goto out; prdcr->xprt_name = strdup(xprt_name); - if ((type != LDMSD_PRDCR_TYPE_PASSIVE) && (!prdcr->port_no)) - goto out; + if ((type == LDMSD_PRDCR_TYPE_ACTIVE) || (type == LDMSD_PRDCR_TYPE_BRIDGE)) { + /* The producer needs the port information to send the connection request */ + /* Verify that the port_no exists. */ + if (!prdcr->port_no) { + goto out; + } + } prdcr->ss_len = sizeof(prdcr->ss); if (prdcr_resolve(host_name, port_no, &prdcr->ss, &prdcr->ss_len)) { diff --git a/ldms/src/ldmsd/ldmsd_request.c b/ldms/src/ldmsd/ldmsd_request.c index 5058ff65ff..03ea5acc98 100644 --- a/ldms/src/ldmsd/ldmsd_request.c +++ b/ldms/src/ldmsd/ldmsd_request.c @@ -119,6 +119,7 @@ extern const char *prdcr_state_str(enum ldmsd_prdcr_state state); extern int ldmsd_credits; /* defined in ldmsd.c */ +#define CONFIG_PLAYBACK_ENABLED(_match_) ((_match_) & ldmsd_req_debug) struct timeval ldmsd_req_last_time; __attribute__((format(printf, 2, 3))) void __dlog(int match, const char *fmt, ...) @@ -315,6 +316,18 @@ static int default_credits_set_handler(ldmsd_req_ctxt_t reqc); static int pid_file_handler(ldmsd_req_ctxt_t reqc); static int banner_mode_handler(ldmsd_req_ctxt_t reqc); +/* Sampler Advertisement */ +static int prdcr_listen_add_handler(ldmsd_req_ctxt_t reqc); +static int prdcr_listen_del_handler(ldmsd_req_ctxt_t reqc); +static int prdcr_listen_start_handler(ldmsd_req_ctxt_t reqc); +static int prdcr_listen_stop_handler(ldmsd_req_ctxt_t reqc); +static int prdcr_listen_status_handler(ldmsd_req_ctxt_t reqc); +static int advertiser_add_handler(ldmsd_req_ctxt_t reqc); +static int advertiser_start_handler(ldmsd_req_ctxt_t reqc); +static int advertiser_stop_handler(ldmsd_req_ctxt_t reqc); +static int advertiser_del_handler(ldmsd_req_ctxt_t reqc); +static int advertise_handler(ldmsd_req_ctxt_t reqc); + /* executable for all */ #define XALL 0111 /* executable for user, and group */ @@ -687,6 +700,38 @@ static struct request_handler_entry request_handler[] = { [LDMSD_BANNER_MODE_REQ] = { LDMSD_BANNER_MODE_REQ, banner_mode_handler, XUG }, + + /* Sampler Discovery */ + [LDMSD_ADVERTISER_ADD_REQ] = { + LDMSD_ADVERTISER_ADD_REQ, advertiser_add_handler, XUG + }, + [LDMSD_ADVERTISER_START_REQ] = { + LDMSD_ADVERTISER_START_REQ, advertiser_start_handler, XUG + }, + [LDMSD_ADVERTISER_STOP_REQ] = { + LDMSD_ADVERTISER_STOP_REQ, advertiser_stop_handler, XUG + }, + [LDMSD_ADVERTISER_DEL_REQ] = { + LDMSD_ADVERTISER_DEL_REQ, advertiser_del_handler, XUG + }, + [LDMSD_PRDCR_LISTEN_ADD_REQ] = { + LDMSD_PRDCR_LISTEN_ADD_REQ, prdcr_listen_add_handler, XUG + }, + [LDMSD_PRDCR_LISTEN_DEL_REQ] = { + LDMSD_PRDCR_LISTEN_DEL_REQ, prdcr_listen_del_handler, XUG + }, + [LDMSD_PRDCR_LISTEN_START_REQ] = { + LDMSD_PRDCR_LISTEN_START_REQ, prdcr_listen_start_handler, XUG | MOD + }, + [LDMSD_PRDCR_LISTEN_STOP_REQ] = { + LDMSD_PRDCR_LISTEN_STOP_REQ, prdcr_listen_stop_handler, XUG | MOD + }, + [LDMSD_PRDCR_LISTEN_STATUS_REQ] = { + LDMSD_PRDCR_LISTEN_STATUS_REQ, prdcr_listen_status_handler, XALL + }, + [LDMSD_ADVERTISE_REQ] = { + LDMSD_ADVERTISE_REQ, advertise_handler, XUG + }, }; int is_req_id_priority(enum ldmsd_request req_id) @@ -1238,7 +1283,12 @@ void ldmsd_send_cfg_rec_adv(ldmsd_cfg_xprt_t xprt, uint32_t msg_no, uint32_t rec } extern void cleanup(int x, char *reason); -int ldmsd_process_config_request(ldmsd_cfg_xprt_t xprt, ldmsd_req_hdr_t request) +/* + * \param req_filter is a function that returns zero if we want to process the + * request, and returns non-zero otherwise. + */ +int ldmsd_process_config_request(ldmsd_cfg_xprt_t xprt, ldmsd_req_hdr_t request, + req_filter_fn req_filter, void *filter_ctxt) { struct req_ctxt_key key; ldmsd_req_ctxt_t reqc = NULL; @@ -1334,6 +1384,23 @@ int ldmsd_process_config_request(ldmsd_cfg_xprt_t xprt, ldmsd_req_hdr_t request) ldmsd_ntoh_req_msg((ldmsd_req_hdr_t)reqc->req_buf); reqc->req_id = ((ldmsd_req_hdr_t)reqc->req_buf)->req_id; + if (req_filter) { + rc = req_filter(reqc, filter_ctxt); + /* rc = 0, filter OK */ + if (rc == 0) { + __dlog(DLOG_CFGOK, "# deferring line %d (%s)\n", + reqc->key.msg_no, reqc->xprt->file.path); + goto put_reqc; + } + /* rc == errno */ + if (rc > 0) { + goto put_reqc; + } else { + /* rc < 0, filter not applied */ + rc = 0; + } + } + rc = ldmsd_handle_request(reqc); if (!rc && !reqc->errcode) { @@ -1342,6 +1409,7 @@ int ldmsd_process_config_request(ldmsd_cfg_xprt_t xprt, ldmsd_req_hdr_t request) ldmsd_inc_cfg_cntr(); } +put_reqc: if (xprt != reqc->xprt) memcpy(xprt, reqc->xprt, sizeof(*xprt)); @@ -1535,9 +1603,9 @@ static int example_handler(ldmsd_req_ctxt_t reqc) return rc; } -static int prdcr_add_handler(ldmsd_req_ctxt_t reqc) +ldmsd_prdcr_t __prdcr_add_handler(ldmsd_req_ctxt_t reqc, char *verb, char *obj_name) { - ldmsd_prdcr_t prdcr; + ldmsd_prdcr_t prdcr = NULL; char *name, *host, *xprt, *attr_name, *type_s, *port_s, *interval_s, *rail_s, *credits_s, *rx_rate_s; char *auth; @@ -1553,7 +1621,6 @@ static int prdcr_add_handler(ldmsd_req_ctxt_t reqc) int rail = 1; char *perm_s = NULL; - reqc->errcode = 0; name = host = xprt = type_s = port_s = interval_s = auth = rail_s = credits_s = NULL; attr_name = "name"; @@ -1572,14 +1639,14 @@ static int prdcr_add_handler(ldmsd_req_ctxt_t reqc) cnt = Snprintf(&reqc->line_buf, &reqc->line_len, "The attribute type '%s' is invalid.", type_s); - goto send_reply; + goto out; } if (type == LDMSD_PRDCR_TYPE_LOCAL) { cnt = Snprintf(&reqc->line_buf, &reqc->line_len, - "Producer with type 'local' is " - "not supported."); + "%s with type 'local' is " + "not supported.", obj_name); reqc->errcode = EINVAL; - goto send_reply; + goto out; } } attr_name = "xprt"; @@ -1609,8 +1676,8 @@ static int prdcr_add_handler(ldmsd_req_ctxt_t reqc) if (port_s) { cnt = snprintf(reqc->line_buf, reqc->line_len, "Ignore the given port %s because " - "prdcr %s's type is passive.", - port_s, name); + "the type of %s %s is passive.", + port_s, obj_name, name); } port_no = -1; } @@ -1624,13 +1691,13 @@ static int prdcr_add_handler(ldmsd_req_ctxt_t reqc) if (reqc->errcode) { cnt = snprintf(reqc->line_buf, reqc->line_len, "The given 'reconnect' is invalid."); - goto send_reply; + goto out; } if (interval_us <= 0) { reqc->errcode = EINVAL; cnt = snprintf(reqc->line_buf, reqc->line_len, "The reconnect interval must be a positive number."); - goto send_reply; + goto out; } } @@ -1653,7 +1720,7 @@ static int prdcr_add_handler(ldmsd_req_ctxt_t reqc) reqc->errcode = EINVAL; cnt = Snprintf(&reqc->line_buf, &reqc->line_len, "'rail' attribute must be a positive integer, got '%s'", rail_s); - goto send_reply; + goto out; } } @@ -1664,7 +1731,7 @@ static int prdcr_add_handler(ldmsd_req_ctxt_t reqc) reqc->errcode = EINVAL; cnt = Snprintf(&reqc->line_buf, &reqc->line_len, "'credits' attribute must be greater than -2, got '%s'", credits_s); - goto send_reply; + goto out; } } @@ -1675,10 +1742,9 @@ static int prdcr_add_handler(ldmsd_req_ctxt_t reqc) reqc->errcode = EINVAL; cnt = Snprintf(&reqc->line_buf, &reqc->line_len, "'rx_rate' attribute must be greater than -2, got '%s'", rx_rate_s); - goto send_reply; + goto out; } } - prdcr = ldmsd_prdcr_new_with_auth(name, xprt, host, port_no, type, interval_us, auth, uid, gid, perm, rail, credits, rx_rate); @@ -1692,39 +1758,33 @@ static int prdcr_add_handler(ldmsd_req_ctxt_t reqc) else goto enomem; } - __dlog(DLOG_CFGOK, "prdcr_add name=%s xprt=%s host=%s port=%u type=%s " - "reconnect=%ld auth=%s uid=%d gid=%d perm=%o\n", - name, xprt, host, port_no, type_s, - interval_us, auth ? auth : "none", (int)uid, (int)gid, - (unsigned)perm); - goto send_reply; + goto out; ebadauth: reqc->errcode = ENOENT; cnt = Snprintf(&reqc->line_buf, &reqc->line_len, "Authentication name not found, check the auth_add configuration."); - goto send_reply; + goto out; enomem: reqc->errcode = ENOMEM; cnt = Snprintf(&reqc->line_buf, &reqc->line_len, "Memory allocation failed."); - goto send_reply; + goto out; eexist: reqc->errcode = EEXIST; cnt = Snprintf(&reqc->line_buf, &reqc->line_len, "The prdcr %s already exists.", name); - goto send_reply; + goto out; eafnosupport: reqc->errcode = EAFNOSUPPORT; cnt = Snprintf(&reqc->line_buf, &reqc->line_len, "Error resolving hostname '%s'\n", host); - goto send_reply; + goto out; einval: reqc->errcode = EINVAL; cnt = Snprintf(&reqc->line_buf, &reqc->line_len, "The attribute '%s' is required.", attr_name); -send_reply: - ldmsd_send_req_response(reqc, reqc->line_buf); +out: free(name); free(type_s); free(port_s); @@ -1735,10 +1795,28 @@ static int prdcr_add_handler(ldmsd_req_ctxt_t reqc) free(auth); free(rail_s); free(credits_s); + return prdcr; +} + +static int prdcr_add_handler(ldmsd_req_ctxt_t reqc) +{ + ldmsd_prdcr_t prdcr; + prdcr = __prdcr_add_handler(reqc, "prdcr_add", "producer"); + if (prdcr) { + __dlog(DLOG_CFGOK, "prdcr_add name=%s xprt=%s host=%s port=%u type=%s " + "reconnect=%ld auth=%s uid=%d gid=%d perm=%o\n", + prdcr->obj.name, prdcr->xprt_name, prdcr->host_name, + prdcr->port_no, ldmsd_prdcr_type2str(prdcr->type), + prdcr->conn_intrvl_us, prdcr->conn_auth_dom_name, + (int)prdcr->obj.uid, (int)prdcr->obj.gid, + (unsigned)prdcr->obj.perm); + } + + ldmsd_send_req_response(reqc, reqc->line_buf); return 0; } -static int prdcr_del_handler(ldmsd_req_ctxt_t reqc) +static int __prdcr_del_handler(ldmsd_req_ctxt_t reqc, const char *cmd, const char *obj_type) { char *name = NULL, *attr_name; size_t cnt = 0; @@ -1751,9 +1829,9 @@ static int prdcr_del_handler(ldmsd_req_ctxt_t reqc) if (!name) { reqc->errcode = EINVAL; cnt = Snprintf(&reqc->line_buf, &reqc->line_len, - "The attribute '%s' is required by prdcr_del.", - attr_name); - goto send_reply; + "The attribute '%s' is required by %s.", + attr_name, cmd); + goto out; } ldmsd_req_ctxt_sec_get(reqc, &sctxt); @@ -1761,15 +1839,15 @@ static int prdcr_del_handler(ldmsd_req_ctxt_t reqc) reqc->errcode = ldmsd_prdcr_del(name, &sctxt); switch (reqc->errcode) { case 0: - __dlog(DLOG_CFGOK, "prdcr_del name=%s\n", name); + __dlog(DLOG_CFGOK, "%s name=%s\n", cmd, name); break; case ENOENT: Snprintf(&reqc->line_buf, &reqc->line_len, - "The producer specified does not exist."); + "The %s specified does not exist.", obj_type); break; case EBUSY: Snprintf(&reqc->line_buf, &reqc->line_len, - "The producer is in use."); + "The %s is in use.", obj_type); break; case EACCES: Snprintf(&reqc->line_buf, &reqc->line_len, @@ -1780,14 +1858,19 @@ static int prdcr_del_handler(ldmsd_req_ctxt_t reqc) "Error: %d %s", reqc->errcode, ovis_errno_abbvr(reqc->errcode)); } - -send_reply: - ldmsd_send_req_response(reqc, reqc->line_buf); +out: free(name); return 0; } -static int prdcr_start_handler(ldmsd_req_ctxt_t reqc) +static int prdcr_del_handler(ldmsd_req_ctxt_t reqc) +{ + int rc = __prdcr_del_handler(reqc, "prdcr_del", "producer"); + ldmsd_send_req_response(reqc, reqc->line_buf); + return rc; +} + +static int __prdcr_start_handler(ldmsd_req_ctxt_t reqc, const char *cmd, const char *obj_type) { char *name, *interval_str; name = interval_str = NULL; @@ -1800,8 +1883,8 @@ static int prdcr_start_handler(ldmsd_req_ctxt_t reqc) if (!name) { reqc->errcode = EINVAL; cnt = Snprintf(&reqc->line_buf, &reqc->line_len, - "The attribute 'name' is required by prdcr_start."); - goto send_reply; + "The attribute 'name' is required by %s.", cmd); + goto out; } ldmsd_req_ctxt_sec_get(reqc, &sctxt); @@ -1810,16 +1893,15 @@ static int prdcr_start_handler(ldmsd_req_ctxt_t reqc) reqc->errcode = ldmsd_prdcr_start(name, interval_str, &sctxt); switch (reqc->errcode) { case 0: - __dlog(DLOG_CFGOK, "prdcr_start name=%s reconnect=%s\n", - name, interval_str); + /* do nothing */ break; case EBUSY: cnt = Snprintf(&reqc->line_buf, &reqc->line_len, - "The producer is already running."); + "The %s is already running.", obj_type); break; case ENOENT: cnt = Snprintf(&reqc->line_buf, &reqc->line_len, - "The producer specified does not exist."); + "The %s specified does not exist.", obj_type); break; case EACCES: Snprintf(&reqc->line_buf, &reqc->line_len, @@ -1839,14 +1921,34 @@ static int prdcr_start_handler(ldmsd_req_ctxt_t reqc) reqc->errcode, ovis_errno_abbvr(reqc->errcode)); } -send_reply: - ldmsd_send_req_response(reqc, reqc->line_buf); +out: free(name); free(interval_str); return 0; } -static int prdcr_stop_handler(ldmsd_req_ctxt_t reqc) +static int prdcr_start_handler(ldmsd_req_ctxt_t reqc) +{ + int rc = __prdcr_start_handler(reqc, "prdcr_start", "producer"); + if (CONFIG_PLAYBACK_ENABLED(DLOG_CFGOK)) { + if (!rc && !reqc->errcode) { + char *name = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_NAME); + char *interval_us = ldmsd_req_attr_str_value_get_by_id(reqc, + LDMSD_ATTR_INTERVAL); + if (interval_us) { + __dlog(DLOG_CFGOK, "prdcr_start name=%s reconnect=%s\n", + name, interval_us); + free(interval_us); + } else { + __dlog(DLOG_CFGOK, "prdcr_start name=%s\n", name); + } + } + } + ldmsd_send_req_response(reqc, reqc->line_buf); + return 0; +} + +static int __prdcr_stop_handler(ldmsd_req_ctxt_t reqc, const char *cmd, const char *obj_type) { char *name = NULL; size_t cnt = 0; @@ -1858,8 +1960,8 @@ static int prdcr_stop_handler(ldmsd_req_ctxt_t reqc) if (!name) { reqc->errcode = EINVAL; cnt = Snprintf(&reqc->line_buf, &reqc->line_len, - "The attribute 'name' is required by prdcr_stop."); - goto send_reply; + "The attribute 'name' is required by %s.", cmd); + goto out; } ldmsd_req_ctxt_sec_get(reqc, &sctxt); @@ -1867,15 +1969,15 @@ static int prdcr_stop_handler(ldmsd_req_ctxt_t reqc) reqc->errcode = ldmsd_prdcr_stop(name, &sctxt); switch (reqc->errcode) { case 0: - __dlog(DLOG_CFGOK, "prdcr_stop name=%s\n", name); + __dlog(DLOG_CFGOK, "%s name=%s\n", cmd, name); break; case EBUSY: cnt = Snprintf(&reqc->line_buf, &reqc->line_len, - "The producer is already stopped."); + "The %s is already stopped.", obj_type); break; case ENOENT: cnt = Snprintf(&reqc->line_buf, &reqc->line_len, - "The producer specified does not exist."); + "The %s specified does not exist.", obj_type); break; case EACCES: cnt = Snprintf(&reqc->line_buf, &reqc->line_len, @@ -1887,12 +1989,18 @@ static int prdcr_stop_handler(ldmsd_req_ctxt_t reqc) reqc->errcode, ovis_errno_abbvr(reqc->errcode)); } -send_reply: - ldmsd_send_req_response(reqc, reqc->line_buf); +out: free(name); return 0; } +static int prdcr_stop_handler(ldmsd_req_ctxt_t reqc) +{ + int rc = __prdcr_stop_handler(reqc, "prdcr_stop", "producer"); + ldmsd_send_req_response(reqc, reqc->line_buf); + return rc; +} + static int prdcr_start_regex_handler(ldmsd_req_ctxt_t reqc) { char *prdcr_regex, *interval_str; @@ -9326,3 +9434,784 @@ static int banner_mode_handler(ldmsd_req_ctxt_t reqc) free(mode_s); return rc; } + +/* Sampler Discovery */ + +/* *2 for the two hex digits needed for each 16-bit value, * 8 for the 8 groups of values, + 7 for the 7 colons */ +#define MAX_IPV6_STR_LEN (sizeof(uint16_t) * 2 * 8 + 7) +static int __cidr2addr6(const char *cdir_str, struct ldms_addr *addr, int *prefix_len) +{ + int rc; + int is_ipv6 = 0; + char netaddr_str[MAX_IPV6_STR_LEN]; + int _prefix_len; + struct ldms_addr s6 = { + .addr = {0,0,0,0,0,0,0,0,0,0,0xff,0xff,0,0,0,0} + }; + if (strchr(cdir_str, ':') != NULL) + is_ipv6 = 1; + + rc = sscanf(cdir_str, "%[^/]/%d", netaddr_str, &_prefix_len); + if (rc != 2) { + return EINVAL; + } + + if (prefix_len) + *prefix_len = _prefix_len; + + if (addr) { + if (is_ipv6) { + rc = inet_pton(AF_INET6, netaddr_str, &addr->addr); + } else { + rc = inet_pton(AF_INET, netaddr_str, &addr->addr); + } + } + + if (rc != 1) + return rc; + if (!is_ipv6) { + /* Make the ipv4-mapped ipv6 format */ + memcpy(&s6.addr[12], &addr->addr, 4); + memcpy(&addr->addr, &s6.addr, 16); + *prefix_len += 96; + } + addr->sa_family = AF_INET6; + return 0; +} + + +/* Aggregator */ +/* The implementation is in ldmsd_prdcr.c */ +extern int prdcr_ref_cmp(void *a, const void *b); +static int prdcr_listen_add_handler(ldmsd_req_ctxt_t reqc) +{ + int rc = 0; + char *name; + char *regex_str; + char *cidr_str; + char *reconnect_str; + char *rail_str; + char *credits_str; + char *rx_rate_str; + char *disabled_start; + char *attr_name; + ldmsd_prdcr_listen_t pl; + long reconnect_us; + int rail; + uint64_t credits; + uint64_t rx_rate; + + name = regex_str = reconnect_str = rail_str = credits_str = NULL; + rx_rate_str = cidr_str = disabled_start = NULL; + + attr_name = "name"; + name = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_NAME); + if (!name) + goto einval; + + regex_str = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_REGEX); + cidr_str = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_IP); + disabled_start = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_AUTO_INTERVAL); + + attr_name = "reconnect"; + reconnect_str = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_INTERVAL); + if (!reconnect_str) { + goto einval; + } else { + reqc->errcode = ovis_time_str2us(reconnect_str, &reconnect_us); + if (reqc->errcode) { + (void) snprintf(reqc->line_buf, reqc->line_len, + "The given 'reconnect' is invalid."); + goto send_reply; + } + if (reconnect_us <= 0) { + reqc->errcode = EINVAL; + (void) snprintf(reqc->line_buf, reqc->line_len, + "The reconnect interval must be a positive number."); + goto send_reply; + } + } + + rail_str = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_RAIL); + if (rail_str) { + rail = atoi(rail_str); + if (rail <= 0) { + reqc->errcode = EINVAL; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "'rail' attribute must be a positive " + "integer, got '%s'", rail_str); + goto send_reply; + } + } + + credits_str = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_CREDITS); + if (credits_str) { + credits = atol(credits_str); + if (credits <= 2) { + reqc->errcode = EINVAL; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "'credits' attribute must be greater " + "than -2, got '%s'", credits_str); + goto send_reply; + } + } + + rx_rate_str = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_RX_RATE); + if (rx_rate_str) { + rx_rate = atol(rx_rate_str); + if (credits <= -2) { + reqc->errcode = EINVAL; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "'rx_rate' attribute must be greater " + "than -2, got '%s'", rx_rate_str); + goto send_reply; + } + } + + pl = (ldmsd_prdcr_listen_t) + ldmsd_cfgobj_new_with_auth(name, LDMSD_CFGOBJ_PRDCR_LISTEN, + sizeof(*pl), NULL, 0, 0, 0); + if (!pl) + goto enomem; + pl->auto_start = 1; + if (disabled_start) { + if ((0 == strcmp(disabled_start, "1")) || + (0 == strcasecmp(disabled_start, "true"))) { + pl->auto_start = 0; + } + } + + if (regex_str) { + pl->hostname_regex_s = strdup(regex_str); + if (!pl->hostname_regex_s) { + ldmsd_cfgobj_put(&pl->obj); + goto enomem; + } + + rc = ldmsd_compile_regex(&pl->regex, regex_str, reqc->line_buf, reqc->line_len); + if (rc) { + reqc->errcode = EINVAL; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "The regular expression string " + "'%s' is invalid.", regex_str); + ldmsd_cfgobj_put(&pl->obj); + goto send_reply; + } + } + + pl->prdcr_conn_intvl = reconnect_us; + if (cidr_str) { + pl->cidr_str = strdup(cidr_str); + if (!pl->cidr_str) { + reqc->errcode = ENOMEM; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "Memory allocation failure."); + ldmsd_cfgobj_put(&pl->obj); + goto send_reply; + } + rc = __cidr2addr6(cidr_str, &pl->net_addr, &pl->prefix_len); + if (rc) { + reqc->errcode = EINVAL; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "The given CIDR string '%s' " + "is invalid.", cidr_str); + ldmsd_cfgobj_put(&pl->obj); + goto send_reply; + } + } + + pl->rails = rail; + pl->rate_limits = rx_rate; + rbt_init(&pl->prdcr_tree, prdcr_ref_cmp); + ldmsd_cfgobj_unlock(&pl->obj); + +send_reply: + free(name); + free(regex_str); + free(cidr_str); + free(reconnect_str); + free(rail_str); + free(credits_str); + free(rx_rate_str); + free(disabled_start); + ldmsd_send_req_response(reqc, reqc->line_buf); + return rc; +enomem: + reqc->errcode = ENOMEM; + (void)snprintf(reqc->line_buf, reqc->line_len, + "Memory allocation failed."); + goto send_reply; +einval: + reqc->errcode = EINVAL; + (void) snprintf(reqc->line_buf, reqc->line_len, + "The attribute '%s' is required.", attr_name); + goto send_reply; +} + +/* This is implemented in ldmsd_cfgobj.c */ +extern struct rbt *cfgobj_trees[]; +static int prdcr_listen_del_handler(ldmsd_req_ctxt_t reqc) +{ + int rc = 0; + char *name = NULL; + struct ldmsd_sec_ctxt sctxt; + ldmsd_prdcr_listen_t pl; + + name = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_NAME); + if (!name) { + reqc->errcode = EINVAL; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "The attribute 'name' is required,"); + goto send_reply; + } + + ldmsd_req_ctxt_sec_get(reqc, &sctxt); + + ldmsd_cfg_lock(LDMSD_CFGOBJ_PRDCR_LISTEN); + for (pl = (ldmsd_prdcr_listen_t)ldmsd_cfgobj_first(LDMSD_CFGOBJ_PRDCR_LISTEN); pl; + pl = (ldmsd_prdcr_listen_t)ldmsd_cfgobj_next(&pl->obj)) { + if (0 != strcmp(name, pl->obj.name)) + continue; + + ldmsd_cfgobj_lock(&pl->obj); + rc = ldmsd_cfgobj_access_check(&pl->obj, 0222, &sctxt); + if (rc) { + ldmsd_cfgobj_unlock(&pl->obj); + ldmsd_cfg_unlock(LDMSD_CFGOBJ_PRDCR_LISTEN); + reqc->errcode = EACCES; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "Permission denied"); + goto send_reply; + } + + if (pl->state != LDMSD_PRDCR_LISTEN_STATE_STOPPED) { + ldmsd_cfgobj_unlock(&pl->obj); + ldmsd_cfg_unlock(LDMSD_CFGOBJ_PRDCR_LISTEN); + reqc->errcode = EBUSY; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "The producer listen '%s' is in use.\n", + name); + goto send_reply; + } + + if (ldmsd_cfgobj_refcount(&pl->obj) > 2) { + ldmsd_cfgobj_unlock(&pl->obj); + ldmsd_cfg_unlock(LDMSD_CFGOBJ_PRDCR_LISTEN); + reqc->errcode = EBUSY; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "The producer listen '%s' is in use.\n", + name); + goto send_reply; + } + + rbt_del(cfgobj_trees[LDMSD_CFGOBJ_PRDCR_LISTEN], &pl->obj.rbn); + ldmsd_cfgobj_put(&pl->obj); /* Put back the reference from the tree */ + ldmsd_cfgobj_unlock(&pl->obj); + goto unlock_tree; + } + + if (!pl) { + ldmsd_cfg_unlock(LDMSD_CFGOBJ_PRDCR_LISTEN); + reqc->errcode = ENOENT; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "The producer listen '%s' does not exist.\n", + name); + goto send_reply; + } + +unlock_tree: + ldmsd_cfg_unlock(LDMSD_CFGOBJ_PRDCR_LISTEN); + +send_reply: + if (pl) + ldmsd_cfgobj_put(&pl->obj); /* Put back the 'first' or 'next' reference */ + ldmsd_send_req_response(reqc, reqc->line_buf); + return rc; +} + +static int prdcr_listen_start_handler(ldmsd_req_ctxt_t reqc) +{ + int rc = 0; + char *name = NULL; + struct ldmsd_sec_ctxt sctxt; + ldmsd_prdcr_listen_t pl; + + name = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_NAME); + if (!name) { + reqc->errcode = EINVAL; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "The attribute 'name' is required,"); + goto send_reply; + } + + ldmsd_req_ctxt_sec_get(reqc, &sctxt); + + pl = (ldmsd_prdcr_listen_t)ldmsd_cfgobj_find(name, LDMSD_CFGOBJ_PRDCR_LISTEN); + if (!pl) { + reqc->errcode = ENOENT; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "The listen_producer '%s' does not exist.", + name); + goto send_reply; + } + + ldmsd_cfgobj_lock(&pl->obj); + rc = ldmsd_cfgobj_access_check(&pl->obj, 0222, &sctxt); + if (rc) { + ldmsd_cfgobj_unlock(&pl->obj); + reqc->errcode = EACCES; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "Permission denied"); + goto send_reply; + } + + pl->obj.perm |= LDMSD_PERM_DSTART; + pl->state = LDMSD_PRDCR_LISTEN_STATE_RUNNING; + ldmsd_cfgobj_put(&pl->obj); /* Put back the 'find' reference */ + ldmsd_cfgobj_unlock(&pl->obj); + +send_reply: + ldmsd_send_req_response(reqc, reqc->line_buf); + return rc; +} + +static int prdcr_listen_stop_handler(ldmsd_req_ctxt_t reqc) +{ + int rc = 0; + char *name = NULL; + struct ldmsd_sec_ctxt sctxt; + ldmsd_prdcr_listen_t pl; + + name = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_NAME); + if (!name) { + reqc->errcode = EINVAL; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "The attribute 'name' is required,"); + goto send_reply; + } + + ldmsd_req_ctxt_sec_get(reqc, &sctxt); + + pl = (ldmsd_prdcr_listen_t)ldmsd_cfgobj_find(name, LDMSD_CFGOBJ_PRDCR_LISTEN); + if (!pl) { + reqc->errcode = ENOENT; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "The listen_producer '%s' does not exist.", + name); + goto send_reply; + } + + ldmsd_cfgobj_lock(&pl->obj); + rc = ldmsd_cfgobj_access_check(&pl->obj, 0222, &sctxt); + if (rc) { + reqc->errcode = EACCES; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "Permission denied"); + goto out; + } + + if (pl->state == LDMSD_PRDCR_LISTEN_STATE_STOPPED) + goto out; /* already stopped, return as stop succeeds. */ + + pl->obj.perm &= ~LDMSD_PERM_DSTART; + pl->state = LDMSD_PRDCR_LISTEN_STATE_STOPPED; +out: + ldmsd_cfgobj_put(&pl->obj); /* Put back the 'find' reference */ + ldmsd_cfgobj_unlock(&pl->obj); + +send_reply: + ldmsd_send_req_response(reqc, reqc->line_buf); + return rc; +} + +static int prdcr_listen_status_handler(ldmsd_req_ctxt_t reqc) +{ + int rc = 0; + int cnt = 0; + int prdcr_cnt = 0; + ldmsd_prdcr_listen_t pl; + ldmsd_prdcr_ref_t pref; + struct rbn *rbn; + struct ldmsd_req_attr_s attr; + + /* + * TODO: It'll be helpfull to list all producers generated by each prdcr_listen. + */ + ldmsd_cfg_lock(LDMSD_CFGOBJ_PRDCR_LISTEN); + for (pl = (ldmsd_prdcr_listen_t)ldmsd_cfgobj_first(LDMSD_CFGOBJ_PRDCR_LISTEN); pl; + pl = (ldmsd_prdcr_listen_t)ldmsd_cfgobj_next(&pl->obj)) { + if (cnt) { + if ((rc = linebuf_printf(reqc, ","))) + goto err; + } + rc = linebuf_printf(reqc, + "{\"name\":\"%s\"," + "\"state\":\"%s\"," + "\"regex\":\"%s\"," + "\"IP range\":\"%s\"," + "\"reconnect\":\"%ld\"," + "\"producers\":[", + pl->obj.name, + ((pl->state==LDMSD_PRDCR_LISTEN_STATE_RUNNING)?("running"):("stopped")), + (pl->hostname_regex_s?pl->hostname_regex_s:"-"), + (pl->cidr_str?pl->cidr_str:"-"), + pl->prdcr_conn_intvl + ); + if (rc) + goto err; + RBT_FOREACH(rbn, &pl->prdcr_tree) { + pref = container_of(rbn, struct ldmsd_prdcr_ref, rbn); + if (prdcr_cnt) { + if ((rc = linebuf_printf(reqc, ","))) + goto err; + } + if ((rc = linebuf_printf(reqc, "\"%s\"", pref->prdcr->obj.name))) + goto err; + prdcr_cnt++; + } + if ((rc = linebuf_printf(reqc, "]}"))) + goto err; + cnt++; + } + ldmsd_cfg_unlock(LDMSD_CFGOBJ_PRDCR_LISTEN); + cnt = reqc->line_off + 2; /* +2 for '[' and ']' */ + + /* Send the json attribute header */ + attr.discrim = 1; + attr.attr_len = cnt; + attr.attr_id = LDMSD_ATTR_JSON; + ldmsd_hton_req_attr(&attr); + rc = ldmsd_append_reply(reqc, (char *)&attr, sizeof(attr), LDMSD_REQ_SOM_F); + if (rc) + goto out; + + /* Send the json object */ + rc = ldmsd_append_reply(reqc, "[", 1, 0); + if (rc) + goto out; + if (reqc->line_off) { + rc = ldmsd_append_reply(reqc, reqc->line_buf, reqc->line_off, 0); + if (rc) + goto out; + } + rc = ldmsd_append_reply(reqc, "]", 1, 0); + if (rc) { + goto out; + } + + /* Send the terminating attribute */ + attr.discrim = 0; + rc = ldmsd_append_reply(reqc, (char *)&attr.discrim, + sizeof(uint32_t), LDMSD_REQ_EOM_F); +out: + return rc; +err: + if (pl) + ldmsd_cfgobj_put(&pl->obj); + ldmsd_cfg_unlock(LDMSD_CFGOBJ_PRDCR_LISTEN); + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "Error getting the status: Error %d.", rc); + reqc->errcode = EIO; + ldmsd_send_req_response(reqc, reqc->line_buf); + return rc; +} + +/* The implementation is in ldmsd_updtr.c */ +extern int __ldmsd_updtr_prdcr_add(ldmsd_updtr_t updtr, ldmsd_prdcr_t prdcr); +/* The implementations are in ldmsd_prdcr.c */ +extern ldmsd_prdcr_ref_t prdcr_ref_new(ldmsd_prdcr_t prdcr); +static int __process_advertisement(ldmsd_req_ctxt_t reqc, ldmsd_prdcr_listen_t lp, struct ldms_addr *rem_addr) +{ + int rc; + char *name; + char *xprt_s; + char *hostname; + char *attr_name; + ldmsd_prdcr_t prdcr; + ldmsd_prdcr_ref_t pl_pref, updtr_pref; + struct rbn *rbn; + struct ldmsd_sec_ctxt sctxt; + uid_t uid; + gid_t gid; + int is_start; + name = xprt_s = hostname = NULL; + + attr_name = "name"; + name = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_NAME); + if (!name) + goto einval; + + attr_name = "hostname"; + hostname = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_HOST); + if (!hostname) + goto einval; + + xprt_s = (char *)ldms_xprt_type_name(reqc->xprt->ldms.ldms); + + /* TODO: make sure that it makes sense to use uid, gid from the request context. */ + ldmsd_req_ctxt_sec_get(reqc, &sctxt); + uid = sctxt.crd.uid; + gid = sctxt.crd.gid; + + prdcr = ldmsd_prdcr_find(name); + if (!prdcr) { + prdcr = ldmsd_prdcr_new_with_auth(name, xprt_s, hostname, rem_addr->sin_port, + LDMSD_PRDCR_TYPE_ADVERTISED, INT_MAX, + NULL, uid, gid, 0770, + lp->rails, lp->recv_credits, lp->rate_limits); + if (!prdcr) { + reqc->errcode = ENOMEM; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "Aggregator failed to create " + "the producer '%s'", name); + rc = ENOMEM; + goto out; + } + is_start = 1; + } else { + if (prdcr->xprt) { + ovis_log(NULL, OVIS_LERROR, + "Received a duplicate advertise request of producer '%s'. " + "LDMSD ignores the subsequent request.\n", name); + rc = EBUSY; + goto out; + } + } + + rbn = rbt_find(&lp->prdcr_tree, name); + if (!rbn) { + pl_pref = prdcr_ref_new(prdcr); + if (!pl_pref) { + ovis_log(config_log, OVIS_LCRIT, "Memory allocation failure.\n"); + reqc->errcode = ENOMEM; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "Aggregator has memory allocation failure."); + rc = ENOMEM; + goto out; + } + rbt_ins(&lp->prdcr_tree, &pl_pref->rbn); + } + + /* Add the producer to any updaters that the producer matches */ + ldmsd_updtr_t updtr; + ldmsd_name_match_t match; + ldmsd_cfg_lock(LDMSD_CFGOBJ_UPDTR); + for (updtr = ldmsd_updtr_first(); updtr; updtr = ldmsd_updtr_next(updtr)) { + updtr_pref = ldmsd_updtr_prdcr_find(updtr, prdcr->obj.name); + if (updtr_pref) + continue; + + LIST_FOREACH(match, &updtr->prdcr_filter, entry) { + if (0 == regexec(&match->regex, prdcr->obj.name, 0, NULL, 0)) { + rc = __ldmsd_updtr_prdcr_add(updtr, prdcr); + if (rc) { + ovis_log(config_log, OVIS_LERROR, + "Failed to add the generated producer " + "'%s' to updater '%s'. Error %d\n", + name, updtr->obj.name, rc); + goto err; + } + break; + } + } + } + ldmsd_cfg_unlock(LDMSD_CFGOBJ_UPDTR); + + /* + * The producer state will be moved in the prdcr_task_cb() path. + */ + if (lp->auto_start && is_start) { + rc = ldmsd_prdcr_start(name, NULL, &sctxt); + if (rc) { + ovis_log(config_log, OVIS_LERROR, "Failed to start the " + "generated producer '%s'. Error %d.\n", + name, rc); + goto err; + } + } +out: + return rc; +einval: + ovis_log(config_log, OVIS_LERROR, + "The '%s' attribute is missing from " + "an advertise request.\n", attr_name); + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "The attribute '%s' is missing from " + "an advertise request to an aggregator.", attr_name); + reqc->errcode = rc = EINVAL; + goto out; +err: + reqc->errcode = rc; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "Aggregator failed to start the " + "generated producer '%s'.", name); + goto out; +} + +/* + * If the producer listen contains both hostname regex and CIDR IP address range, + * the advertiser matches only when its hostname and IP address are matched + * the prdcr_listen's hostname regex and IP range. + */ +int __is_advertiser_matched(ldmsd_prdcr_listen_t pl, struct ldms_addr *advts_addr, + const char *advts_hostname) +{ + int is_host_matched = 1; + int is_ip_matched = 1; + + if (pl->hostname_regex_s) { + if (0 != regexec(&pl->regex, advts_hostname, 0, NULL, 0)) + is_host_matched = 0; + } + + if (pl->prefix_len) { + if (advts_addr->sa_family == AF_INET) { + struct ldms_addr s6 = { + .addr ={0,0,0,0,0,0,0,0,0,0,0xff,0xff,0,0,0,0} + }; + memcpy(&s6.addr[12], &advts_addr->addr, 4); + memcpy(&advts_addr->addr, &s6.addr, 16); + advts_addr->sa_family = AF_INET6; + } + /* A CIDR IP address was given. */ + if (0 == ldms_addr_in_network_addr(advts_addr, &pl->net_addr, pl->prefix_len)) + is_ip_matched = 0; + } + + if (is_host_matched && is_ip_matched) + return 1; + else + return 0; +} + +static int advertise_handler(ldmsd_req_ctxt_t reqc) +{ + int rc = 0; + ldmsd_prdcr_listen_t pl; + char *hostname; + char *name; + hostname = name = NULL; + struct ldms_addr rem_addr = {0}; + + rc = ldms_xprt_addr(reqc->xprt->ldms.ldms, NULL, &rem_addr); + if (rc) { + reqc->errcode = rc; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "An error %d occurred on the aggregator " + "while processing the advertisement.", rc); + goto send_reply; + } + + hostname = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_HOST); + if (!hostname) { + reqc->errcode = EINVAL; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "The attribute 'hostname' is required."); + goto send_reply; + } + + for (pl = (ldmsd_prdcr_listen_t)ldmsd_cfgobj_first(LDMSD_CFGOBJ_PRDCR_LISTEN); + pl; pl = (ldmsd_prdcr_listen_t)ldmsd_cfgobj_next(&pl->obj)) + { + if (pl->state != LDMSD_PRDCR_LISTEN_STATE_RUNNING) + continue; + if (__is_advertiser_matched(pl, &rem_addr, hostname)) { + /* The hostname matches the regular expression. */ + reqc->errcode = __process_advertisement(reqc, pl, &rem_addr); + if (reqc->errcode) { + if (reqc->errcode == EBUSY) { + snprintf(reqc->line_buf, reqc->line_len, + "The client already has a running " + "producer with the given name."); + } else { + snprintf(reqc->line_buf, reqc->line_len, + "An error '%d' occurred on the peer.", reqc->errcode); + } + } + ldmsd_cfgobj_put(&pl->obj); /* Put back the 'first' or 'next' reference */ + goto send_reply; + } + } + /* + * The advertisement doesn't match any listening producers + */ + reqc->errcode = ENOENT; + snprintf(reqc->line_buf, reqc->line_len, + "The given hostname '%s' doesn't match " + "any `prdcr_listen`'s regex.", hostname); + ovis_log(NULL, OVIS_LERROR, "Received a producer advertisement " + "with hostname '%s', which isn't matched any listening producers. " + "Stop the advertisement, update its configuration, and then restart.\n", + hostname); +send_reply: + ldmsd_send_req_response(reqc, reqc->line_buf); + return rc; +} + +static int advertiser_add_handler(ldmsd_req_ctxt_t reqc) +{ + ldmsd_prdcr_t prdcr; + prdcr = __prdcr_add_handler(reqc, "advertiser_add", "advertiser"); + if (prdcr) { + __dlog(DLOG_CFGOK, "advertiser_add name=%s xprt=%s host=%s port=%u " + "auth=%s uid=%d gid=%d perm=%o\n", + prdcr->obj.name, prdcr->xprt_name, prdcr->host_name, + prdcr->port_no, prdcr->conn_auth_dom_name, + (int)prdcr->obj.uid, (int)prdcr->obj.gid, + (unsigned)prdcr->obj.perm); + } + + ldmsd_send_req_response(reqc, reqc->line_buf); + return 0; +} + +static int advertiser_start_handler(ldmsd_req_ctxt_t reqc) +{ + int rc = 0; + ldmsd_prdcr_t prdcr; + char *name = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_NAME); + + prdcr = ldmsd_prdcr_find(name); + if (!prdcr) { + prdcr = __prdcr_add_handler(reqc, "advertiser_start", "advertiser"); + if (!prdcr) { + /* + * Failed to create the producer. + * The error message was prepared in __prdcr_add_handler() + */ + goto send_reply; + } + ldmsd_prdcr_get(prdcr); /* Get a reference to match the find reference */ + } + + rc = __prdcr_start_handler(reqc, "advertiser_start", "advertiser"); + if (CONFIG_PLAYBACK_ENABLED(DLOG_CFGOK)) { + if (!rc && !reqc->errcode) { + __dlog(DLOG_CFGOK, "advertiser_start " + "name=%s xprt=%s host=%s port=%u " + "reconnect=%ld auth=%s uid=%d gid=%d perm=%o\n", + prdcr->obj.name, prdcr->xprt_name, prdcr->host_name, + prdcr->port_no, prdcr->conn_intrvl_us, prdcr->conn_auth_dom_name, + (int)prdcr->obj.uid, (int)prdcr->obj.gid, + (unsigned)prdcr->obj.perm); + } + } + +send_reply: + ldmsd_send_req_response(reqc, reqc->line_buf); + return rc; +} + +static int advertiser_stop_handler(ldmsd_req_ctxt_t reqc) +{ + int rc = __prdcr_stop_handler(reqc, "advertiser_stop", "advertiser"); + ldmsd_send_req_response(reqc, reqc->line_buf); + return rc; +} + +static int advertiser_del_handler(ldmsd_req_ctxt_t reqc) +{ + int rc = __prdcr_del_handler(reqc, "advertiser_del", "advertiser"); + ldmsd_send_req_response(reqc, reqc->line_buf); + return rc; +} diff --git a/ldms/src/ldmsd/ldmsd_request.h b/ldms/src/ldmsd/ldmsd_request.h index 4b96bc0960..be9e22a4d1 100644 --- a/ldms/src/ldmsd/ldmsd_request.h +++ b/ldms/src/ldmsd/ldmsd_request.h @@ -88,6 +88,16 @@ enum ldmsd_request { LDMSD_PRDCR_UNSUBSCRIBE_REQ, LDMSD_PRDCR_STREAM_STATUS_REQ, LDMSD_BRIDGE_ADD_REQ, + LDMSD_ADVERTISER_ADD_REQ, + LDMSD_ADVERTISER_START_REQ, + LDMSD_ADVERTISER_STOP_REQ, + LDMSD_ADVERTISER_DEL_REQ, + LDMSD_PRDCR_LISTEN_ADD_REQ, + LDMSD_PRDCR_LISTEN_DEL_REQ, + LDMSD_PRDCR_LISTEN_START_REQ, + LDMSD_PRDCR_LISTEN_STOP_REQ, + LDMSD_PRDCR_LISTEN_STATUS_REQ, + LDMSD_ADVERTISE_REQ, LDMSD_STRGP_ADD_REQ = 0x200, LDMSD_STRGP_DEL_REQ, LDMSD_STRGP_START_REQ, @@ -244,6 +254,7 @@ enum ldmsd_request_attr { LDMSD_ATTR_RX_RATE, LDMSD_ATTR_SUMMARY, LDMSD_ATTR_SIZE, + LDMSD_ATTR_IP, LDMSD_ATTR_LAST, }; @@ -304,6 +315,7 @@ typedef struct ldmsd_cfg_ldms_s { } *ldmsd_cfg_ldms_t; typedef struct ldmsd_cfg_file_s { + const char *path; /* Point to the path attribute value, don't free() */ uint64_t cfgfile_id; } *ldmsd_cfg_file_t; @@ -542,7 +554,13 @@ void ldmsd_ntoh_req_msg(ldmsd_req_hdr_t msg); * \param rec_len The record length */ void ldmsd_send_cfg_rec_adv(ldmsd_cfg_xprt_t xprt, uint32_t msg_no, uint32_t rec_len); -int ldmsd_process_config_request(ldmsd_cfg_xprt_t xprt, ldmsd_req_hdr_t request); +/* + * \param req_filter is a function that returns zero if we want to process the + * request, and returns non-zero otherwise. + */ +typedef int (*req_filter_fn)(ldmsd_req_ctxt_t, void *); +int ldmsd_process_config_request(ldmsd_cfg_xprt_t xprt, ldmsd_req_hdr_t request, + req_filter_fn req_filter, void *filter_ctxt); int ldmsd_process_config_response(ldmsd_cfg_xprt_t xprt, ldmsd_req_hdr_t response); int ldmsd_append_reply(struct ldmsd_req_ctxt *reqc, const char *data, size_t data_len, int msg_flags); void ldmsd_send_error_reply(ldmsd_cfg_xprt_t xprt, uint32_t msg_no, diff --git a/ldms/src/ldmsd/ldmsd_request_util.c b/ldms/src/ldmsd/ldmsd_request_util.c index 8e12e962f0..5a89fd35df 100644 --- a/ldms/src/ldmsd/ldmsd_request_util.c +++ b/ldms/src/ldmsd/ldmsd_request_util.c @@ -63,6 +63,10 @@ struct req_str_id { const struct req_str_id req_str_id_table[] = { /* This table need to be sorted by keyword for bsearch() */ + { "advertiser_add", LDMSD_ADVERTISER_ADD_REQ }, + { "advertiser_del", LDMSD_ADVERTISER_DEL_REQ }, + { "advertiser_start", LDMSD_ADVERTISER_START_REQ }, + { "advertiser_stop", LDMSD_ADVERTISER_STOP_REQ }, { "auth_add", LDMSD_AUTH_ADD_REQ }, { "auth_del", LDMSD_AUTH_DEL_REQ }, { "banner", LDMSD_BANNER_MODE_REQ }, @@ -105,6 +109,10 @@ const struct req_str_id req_str_id_table[] = { { "prdcr_add", LDMSD_PRDCR_ADD_REQ }, { "prdcr_del", LDMSD_PRDCR_DEL_REQ }, { "prdcr_hint_tree", LDMSD_PRDCR_HINT_TREE_REQ }, + { "prdcr_listen_add", LDMSD_PRDCR_LISTEN_ADD_REQ }, + { "prdcr_listen_del", LDMSD_PRDCR_LISTEN_DEL_REQ }, + { "prdcr_listen_start", LDMSD_PRDCR_LISTEN_START_REQ }, + { "prdcr_listen_stop", LDMSD_PRDCR_LISTEN_STOP_REQ }, { "prdcr_set_status", LDMSD_PRDCR_SET_REQ }, { "prdcr_start", LDMSD_PRDCR_START_REQ }, { "prdcr_start_regex", LDMSD_PRDCR_START_REGEX_REQ }, @@ -169,6 +177,7 @@ const struct req_str_id attr_str_id_table[] = { { "container", LDMSD_ATTR_CONTAINER }, { "credits", LDMSD_ATTR_CREDITS }, { "decomposition", LDMSD_ATTR_DECOMP }, + { "disabled_start", LDMSD_ATTR_AUTO_INTERVAL }, { "flush", LDMSD_ATTR_INTERVAL }, { "gid", LDMSD_ATTR_GID }, { "host", LDMSD_ATTR_HOST }, @@ -176,6 +185,7 @@ const struct req_str_id attr_str_id_table[] = { { "instance", LDMSD_ATTR_INSTANCE }, { "interval", LDMSD_ATTR_INTERVAL }, { "interval_us", LDMSD_ATTR_INTERVAL }, + { "ip", LDMSD_ATTR_IP }, { "level", LDMSD_ATTR_LEVEL }, { "match", LDMSD_ATTR_MATCH }, { "metric", LDMSD_ATTR_METRIC }, @@ -254,6 +264,7 @@ const char *ldmsd_req_id2str(enum ldmsd_request req_id) case LDMSD_PRDCR_HINT_TREE_REQ : return "PRDCR_HINT_TREE_REQ"; case LDMSD_PRDCR_SUBSCRIBE_REQ : return "PRDCR_SUBSCRIBE_REQ"; case LDMSD_PRDCR_UNSUBSCRIBE_REQ : return "PRDCR_UNSUBSCRIBE_REQ"; + case LDMSD_PRDCR_LISTEN_ADD_REQ : return "PRDCR_LISTEN_REQ"; case LDMSD_STRGP_ADD_REQ : return "STRGP_ADD_REQ"; case LDMSD_STRGP_DEL_REQ : return "STRGP_DEL_REQ"; @@ -869,6 +880,64 @@ int __ldmsd_parse_default_auth_req(struct ldmsd_parse_ctxt *ctxt) return rc; } +/* The function adds the attribute 'type' with the 'advertise' value to the request */ +int __ldmsd_parse_advertiser_add_req(struct ldmsd_parse_ctxt *ctxt) +{ + char *av = ctxt->av; + size_t len = strlen(av); + size_t cnt = 0; + char *tmp, *name, *value, *ptr, *dummy; + int rc = 0; + dummy = NULL; + tmp = malloc(len); + if (!tmp) { + rc = ENOMEM; + goto out; + } + av = strtok_r(av, __ldmsd_cfg_delim, &ptr); + while (av) { + ctxt->av = av; + dummy = strdup(av); + if (!dummy) { + rc = ENOMEM; + goto out; + } + __get_attr_name_value(dummy, &name, &value); + if (!name) { + /* av is neither attribute value nor keyword */ + rc = EINVAL; + goto out; + } + rc = add_attr_from_attr_str(name, value, + &ctxt->request, + &ctxt->request_sz); + if (rc) + goto out; + av = strtok_r(NULL, __ldmsd_cfg_delim, &ptr); + free(dummy); + dummy = NULL; + } + rc = add_attr_from_attr_str("type", "advertiser", + &ctxt->request, + &ctxt->request_sz); + if (rc) + goto out; + + if (cnt) { + tmp[cnt-1] = '\0'; /* Replace the last ' ' with '\0' */ + /* Add an attribute of type 'STRING' */ + rc = add_attr_from_attr_str(NULL, tmp, + &ctxt->request, + &ctxt->request_sz); + } + +out: + if (tmp) + free(tmp); + if (dummy) + free(dummy); + return rc; +} struct ldmsd_req_array * ldmsd_parse_config_str(const char *cfg, uint32_t msg_no, size_t xprt_max_msg) @@ -945,6 +1014,10 @@ ldmsd_parse_config_str(const char *cfg, uint32_t msg_no, size_t xprt_max_msg) case LDMSD_DEFAULT_AUTH_REQ: rc = __ldmsd_parse_default_auth_req(&ctxt); break; + case LDMSD_ADVERTISER_ADD_REQ: + case LDMSD_ADVERTISER_START_REQ: + rc = __ldmsd_parse_advertiser_add_req(&ctxt); + break; default: rc = __ldmsd_parse_generic(&ctxt); break; @@ -1053,6 +1126,14 @@ ldmsd_req_attr_t ldmsd_req_attr_get_by_id(char *request, uint32_t attr_id) return NULL; } +char *ldmsd_req_attr_value_by_id(char *request, uint32_t attr_id) +{ + ldmsd_req_attr_t attr = ldmsd_req_attr_get_by_id(request, attr_id); + if (!attr) + return NULL; + return str_repl_env_vars((char *)attr->attr_value); +} + ldmsd_req_attr_t ldmsd_req_attr_get_by_name(char *request, const char *name) { int32_t attr_id = ldmsd_req_attr_str2id(name); diff --git a/ldms/src/ldmsd/ldmsd_updtr.c b/ldms/src/ldmsd/ldmsd_updtr.c index f8542a07a6..58c91a2573 100644 --- a/ldms/src/ldmsd/ldmsd_updtr.c +++ b/ldms/src/ldmsd/ldmsd_updtr.c @@ -890,13 +890,10 @@ static int updtr_tasks_create(ldmsd_updtr_t updtr) return rc; } -int prdcr_ref_cmp(void *a, const void *b) -{ - return strcmp(a, b); -} - #define UPDTR_TREE_MGMT_TASK_INTRVL 3600000000 +/* The implementation is in ldmsd_prdcr.c */ +extern int prdcr_ref_cmp(void *a, const void *b); ldmsd_updtr_t ldmsd_updtr_new_with_auth(const char *name, char *interval_str, char *offset_str, int push_flags, int is_auto_task, @@ -1337,15 +1334,8 @@ int ldmsd_updtr_match_del(const char *updtr_name, const char *regex_str, return rc; } -ldmsd_prdcr_ref_t prdcr_ref_new(ldmsd_prdcr_t prdcr) -{ - ldmsd_prdcr_ref_t ref = calloc(1, sizeof *ref); - if (ref) { - ref->prdcr = ldmsd_prdcr_get(prdcr); - rbn_init(&ref->rbn, prdcr->obj.name); - } - return ref; -} +/* The implementations are in ldmsd_prdcr.c */ +extern ldmsd_prdcr_ref_t prdcr_ref_new(ldmsd_prdcr_t prdcr); ldmsd_prdcr_ref_t prdcr_ref_find(ldmsd_updtr_t updtr, const char *name) { @@ -1356,8 +1346,6 @@ ldmsd_prdcr_ref_t prdcr_ref_find(ldmsd_updtr_t updtr, const char *name) return container_of(rbn, struct ldmsd_prdcr_ref, rbn); } - - ldmsd_prdcr_ref_t prdcr_ref_find_regex(ldmsd_updtr_t updtr, regex_t *regex) { struct rbn *rbn; @@ -1380,10 +1368,10 @@ int __ldmsd_updtr_prdcr_add(ldmsd_updtr_t updtr, ldmsd_prdcr_t prdcr) ldmsd_prdcr_ref_t ref; ldmsd_updtr_lock(updtr); - if (updtr->state != LDMSD_UPDTR_STATE_STOPPED) { - rc = EBUSY; - goto out; - } +// if (updtr->state != LDMSD_UPDTR_STATE_STOPPED) { +// rc = EBUSY; +// goto out; +// } ref = prdcr_ref_find(updtr, prdcr->obj.name); if (ref) { rc = EEXIST; @@ -1403,20 +1391,15 @@ int __ldmsd_updtr_prdcr_add(ldmsd_updtr_t updtr, ldmsd_prdcr_t prdcr) int ldmsd_updtr_prdcr_add(const char *updtr_name, const char *prdcr_regex, char *rep_buf, size_t rep_len, ldmsd_sec_ctxt_t ctxt) { - regex_t regex; ldmsd_updtr_t updtr; ldmsd_prdcr_t prdcr; + ldmsd_name_match_t prd_match; int rc; - rc = ldmsd_compile_regex(®ex, prdcr_regex, rep_buf, rep_len); - if (rc) - return EINVAL; - updtr = ldmsd_updtr_find(updtr_name); if (!updtr) { sprintf(rep_buf, "%dThe updater specified does not " "exist\n", ENOENT); - regfree(®ex); return ENOENT; } @@ -1430,9 +1413,32 @@ int ldmsd_updtr_prdcr_add(const char *updtr_name, const char *prdcr_regex, rc = EBUSY; goto out_1; } + + prd_match = calloc(1, sizeof(*prd_match)); + if (!prd_match) { + ovis_log(NULL, OVIS_LCRIT, "Memory allocation failure.\n"); + rc = ENOMEM; + goto unlock; + } + prd_match->regex_str = strdup(prdcr_regex); + if (!prd_match->regex_str) { + ovis_log(NULL, OVIS_LCRIT, "Memory allocation failure.\n"); + rc = ENOMEM; + free(prd_match); + goto unlock; + } + + rc = ldmsd_compile_regex(&prd_match->regex, prdcr_regex, rep_buf, rep_len); + if (rc) { + rc = EINVAL; + free(prd_match); + goto unlock; + } + + LIST_INSERT_HEAD(&updtr->prdcr_filter, prd_match, entry); ldmsd_cfg_lock(LDMSD_CFGOBJ_PRDCR); for (prdcr = ldmsd_prdcr_first(); prdcr; prdcr = ldmsd_prdcr_next(prdcr)) { - if (regexec(®ex, prdcr->obj.name, 0, NULL, 0)) + if (regexec(&prd_match->regex, prdcr->obj.name, 0, NULL, 0)) continue; /* See if this match is already in the list */ ldmsd_prdcr_ref_t ref = prdcr_ref_find(updtr, prdcr->obj.name); @@ -1451,7 +1457,7 @@ int ldmsd_updtr_prdcr_add(const char *updtr_name, const char *prdcr_regex, ldmsd_cfg_unlock(LDMSD_CFGOBJ_PRDCR); sprintf(rep_buf, "0\n"); out_1: - regfree(®ex); +unlock: ldmsd_updtr_unlock(updtr); ldmsd_updtr_put(updtr); return rc;