Skip to content

Commit

Permalink
Support specifying receive credt pits and rate limit per listening en…
Browse files Browse the repository at this point in the history
…dpoint
  • Loading branch information
nichamon authored and tom95858 committed Jul 12, 2024
1 parent 4c351e4 commit 2895050
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 14 deletions.
23 changes: 23 additions & 0 deletions ldms/man/ldmsd_controller.man
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ plugin for more information.
\fBxprt\fR=\fIsock\fR|\fIrdma\fR|\fIugni\fR|\fIfabric\fR
[\fBhost\fR=\fIHOST\fR]
[\fBauth\fR=\fIAUTH_REF\fR]
[\fBcredits\fR=\fICREDITS\fR]
[\fBrx_rate\fR=\fIRX_RATE\fR]
.RS
.TP
\fBport\fR=\fIPORT\fR
Expand All @@ -248,6 +250,27 @@ Instruct \fBldmsd\fR to use \fIAUTH_REF\fR (a name reference to \fBauth\fR
object created by \fBauth_add\fR command) to authenticate connections on this
port. If not given, the port uses the default authentication method specified on
the CLI options (see \fBldmsd\fR(8) option \fB-a\fR).
.TP
.BI [credits " BYTES"]
.br
The LDMS daemon we are managing uses receive credits (measured in bytes) to
control the amount of data received on the connections established by accepting
requests to this listening endpoint. The \fRcredits\Fb value functions
similarly to the \fRcredits\fB attribute in the \fRprdcr_add\fB command,
influencing the amount of data producers created by Sampler Advertisement can
receive. The default value is determined by the command-line \fR--credit\fB
option used when starting the LDMS daemon (ldmsd). If neither the
\fR--credits\fB option nor the \fRcredits\fB attribute is specified, there is
no limit on receive credits.

.TP
.BI [rx_rate " BYTES_PER_SEC"]
.br
The receive rate limit (in bytes/second) controls the rate of data received on
the connections established by accepting requests to this listening endpoint.
Unlike \fRcredits\fB, which controls the total amount of received data, the receive
rate limit focuses on the data flow per second. If not specified, it is
unlimited.
.RE

.SH PRODUCER COMMAND SYNTAX
Expand Down
10 changes: 7 additions & 3 deletions ldms/python/ldmsd/ldmsd_communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@
'prdcr_stats': {'req_attr':[], 'opt_attr': []},
'set_route' : {'req_attr':['instance'], 'opt_attr':[]},
'set_stats': {'req_attr':[], 'opt_attr': ['summary']},
'listen': {'req_attr':['xprt', 'port'], 'opt_attr': ['host', 'auth']},
'listen': {'req_attr':['xprt', 'port'], 'opt_attr': ['host', 'auth', 'credits', 'rx_rate']},
'metric_sets_default_authz': {'req_attr':[], 'opt_attr': ['uid', 'gid', 'perm']},
'set_sec_mod' : {'req_attr': ['regex'], 'opt_attr': ['uid', 'gid', 'perm']},
'log_status' : {'req_attr' : [], 'opt_attr' : ['name']},
Expand Down Expand Up @@ -1486,7 +1486,7 @@ def stream_client_stats(self, reset=None):
except Exception as e:
return errno.ENOTCONN, str(e)

def listen(self, xprt, port, host=None, auth=None):
def listen(self, xprt, port, host=None, auth=None, credits=None, rx_limit=None):
"""
Add a listening endpoint
Expand All @@ -1504,6 +1504,10 @@ def listen(self, xprt, port, host=None, auth=None):
]
if auth:
attr_list.append(LDMSD_Req_Attr(attr_name='auth', value=auth))
if credits is not None:
attr_list.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.CREDITS, value=credits))
if rx_limit is not None:
attr_list.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.RX_RATE, value=rx_limit))
req = LDMSD_Request(
command='listen',
attrs=attr_list
Expand All @@ -1520,7 +1524,7 @@ def metric_sets_default_authz(self, uid=None, gid=None, perm=None):
"""
attr_list = []
if uid:
attr_list.append(LDMSD_Req_Attr(attr_id=LDMSDS_Req_Attr.UID, value=uid))
attr_list.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.UID, value=uid))
if gid:
attr_list.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.GID, value=gid))
if perm:
Expand Down
8 changes: 6 additions & 2 deletions ldms/python/ldmsd/ldmsd_controller
Original file line number Diff line number Diff line change
Expand Up @@ -2685,15 +2685,19 @@ class LdmsdCmdParser(cmd.Cmd):
[auth=] Authenticantion domain.
If this is omitted or auth=auth_default is give,
the default authentication given the command line (-a and -A)
will be used..
will be used
[credits=] Receive credits of the connections established by accepted connection requests
[rx_rate=] Receive rate limit of the connections established by accepted connection requests
"""
arg = self.handle_args('listen', arg)
if not arg:
return
rc, msg = self.comm.listen(arg['xprt'],
arg['port'],
arg['host'],
arg['auth'])
arg['auth'],
arg['credits'],
arg['rx_rate'])
if rc:
print(f'Error adding listener {arg["xprt"]} on port {arg["port"]}: {msg}')

Expand Down
32 changes: 28 additions & 4 deletions ldms/src/ldmsd/ldmsd.c
Original file line number Diff line number Diff line change
Expand Up @@ -1435,7 +1435,7 @@ void ldmsd_listen___del(ldmsd_cfgobj_t obj)
ldmsd_cfgobj___del(obj);
}

ldmsd_listen_t ldmsd_listen_new(char *xprt, char *port, char *host, char *auth)
ldmsd_listen_t ldmsd_listen_new(char *xprt, char *port, char *host, char *auth, char *credits, char *rx_limit)
{
char *name;
int len;
Expand Down Expand Up @@ -1473,6 +1473,20 @@ ldmsd_listen_t ldmsd_listen_new(char *xprt, char *port, char *host, char *auth)
}
}

if (credits) {
listen->credits = atoi(credits);
} else {
/*
* listen->credits will be set to ldmsd_credits (global value) in ldmsd_listen_start().
*/
listen->credits = __RAIL_UNLIMITED;
}

if (rx_limit)
listen->rx_limit = atoi(rx_limit);
else
listen->rx_limit = __RAIL_UNLIMITED;

if (auth) {
auth_dom = ldmsd_auth_find(auth);
if (!auth_dom) {
Expand Down Expand Up @@ -1549,8 +1563,18 @@ int ldmsd_listen_start(ldmsd_listen_t listen)
{
int rc = 0;
assert(NULL == listen->x);
listen->x = ldms_xprt_rail_new(listen->xprt, 1, ldmsd_credits,
__RAIL_UNLIMITED,
if (listen->credits == __RAIL_UNLIMITED) {
/*
* Set listen->credits here to cover the case that
* the global value is set after ldmsd_listen_new() is called.
* This happens when the cli-option `-x` is used to
* add a listening endpoint.
*/
listen->credits = ldmsd_credits;
}
listen->x = ldms_xprt_rail_new(listen->xprt, 1,
((listen->credits>0)?listen->credits:ldmsd_credits),
((listen->rx_limit>0)?listen->rx_limit:__RAIL_UNLIMITED),
ldmsd_auth_name_get(listen),
ldmsd_auth_attr_get(listen));
if (!listen->x) {
Expand Down Expand Up @@ -1938,7 +1962,7 @@ int ldmsd_process_cmd_line_arg(char opt, char *value)
_host++;
}
/* Use the default auth domain */
ldmsd_listen_t listen = ldmsd_listen_new(_xprt, _port, _host, NULL);
ldmsd_listen_t listen = ldmsd_listen_new(_xprt, _port, _host, NULL, NULL, NULL);
free(dup_xtuple);
if (!listen) {
ovis_log(NULL, OVIS_LERROR, "Error %d: failed to add listening "
Expand Down
9 changes: 8 additions & 1 deletion ldms/src/ldmsd/ldmsd.h
Original file line number Diff line number Diff line change
Expand Up @@ -1491,6 +1491,8 @@ typedef struct ldmsd_listen {
char *auth_name;
char *auth_dom_name;
struct attr_value_list *auth_attrs;
int credits;
int rx_limit;
ldms_t x;
} *ldmsd_listen_t;

Expand All @@ -1506,10 +1508,15 @@ uint8_t ldmsd_is_initialized();
* \param port port
* \param host hostname
* \param auth authentication domain name
* \param credits receive credits
* \param rx_limit receive rate limit
*
* To use the default receive credits or receive rate limit, provide NULL.
*
* \return a listen cfgobj
*/
ldmsd_listen_t ldmsd_listen_new(char *xprt, char *port, char *host, char *auth);
ldmsd_listen_t ldmsd_listen_new(char *xprt, char *port, char *host, char *auth,
char *credits, char *rx_limit);

/**
* LDMSD Authentication Domain Configuration Object
Expand Down
2 changes: 2 additions & 0 deletions ldms/src/ldmsd/ldmsd_prdcr.c
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,8 @@ static int __sampler_routine(ldms_t x, ldms_xprt_event_t e, ldmsd_prdcr_t prdcr)
prdcr->xprt_name, prdcr->host_name,
(int)prdcr->port_no, prdcr->conn_auth);
break;
case LDMS_XPRT_EVENT_SEND_CREDIT_DEPOSITED:
break;
default:
ovis_log(prdcr_log, OVIS_LERROR,
"Received an unexpected transport event %d\n", e->type);
Expand Down
8 changes: 5 additions & 3 deletions ldms/src/ldmsd/ldmsd_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -8327,9 +8327,9 @@ extern int ldmsd_listen_start(ldmsd_listen_t listen);
static int listen_handler(ldmsd_req_ctxt_t reqc)
{
ldmsd_listen_t listen;
char *xprt, *port, *host, *auth, *attr_name;
char *xprt, *port, *host, *auth, *attr_name, *credits, *rx_limit;
unsigned short port_no = -1;
xprt = port = host = auth = NULL;
xprt = port = host = auth = credits = rx_limit = NULL;

attr_name = "xprt";
xprt = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_XPRT);
Expand All @@ -8348,8 +8348,10 @@ static int listen_handler(ldmsd_req_ctxt_t reqc)
}
host =ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_HOST);
auth = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_AUTH);
credits = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_CREDITS);
rx_limit = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_RX_RATE);

listen = ldmsd_listen_new(xprt, port, host, auth);
listen = ldmsd_listen_new(xprt, port, host, auth, credits, rx_limit);
if (!listen) {
if (errno == EEXIST)
goto eexist;
Expand Down
4 changes: 3 additions & 1 deletion ldms/src/ldmsd/ldmsd_request_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,9 @@ int __parse_xprt_endpoint(struct ldmsd_parse_ctxt *ctxt,
if ((0 == strncmp(name, "xprt", 4)) ||
(0 == strncmp(name, "port", 4)) ||
(0 == strncmp(name, "host", 4)) ||
(0 == strncmp(name, "auth", 4))) {
(0 == strncmp(name, "auth", 4)) ||
(0 == strncmp(name, "credits", 7)) ||
(0 == strncmp(name, "rx_rate", 8))) {
/* xprt, port, host, auth */
rc = add_attr_from_attr_str(name, value,
&ctxt->request,
Expand Down

0 comments on commit 2895050

Please sign in to comment.