Skip to content

Commit

Permalink
Make Sampler Advertisement support multiple advertisements from a sin…
Browse files Browse the repository at this point in the history
…gle node
  • Loading branch information
nichamon authored and tom95858 committed May 21, 2024
1 parent c360d32 commit af3c3ef
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 68 deletions.
19 changes: 18 additions & 1 deletion ldms/src/ldmsd/ldmsd.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,24 @@ typedef struct ldmsd_prdcr {
LDMSD_PRDCR_STATE_CONNECTED,
/** Waiting for task join and xprt cleanup */
LDMSD_PRDCR_STATE_STOPPING,
/** The STANDBY state is valid only for 'GENERATED' producers.
*
* Producer task has been stopped but there is an outstanding xprt.
*
* Once the aggregator receives an advertisement notification
* and verifies that the hostname or IP address matches
* a listen producer, it creates a generated producer,
* maps the producer to the request's transport, moves
* the producer state to STANDBY, and then starts the producer.
*
* The producer synchronously moves to 'CONNECTED' when it starts.
*
* prdcr_stop does not tear down the connection.
* The producer's transport is reset to NULL only when
* the aggregator receives a 'disconnected' event either initiated by
* the sampler daemon or the aggregator.
*/
LDMSD_PRDCR_STATE_STANDBY,
} conn_state;

enum ldmsd_prdcr_type {
Expand Down Expand Up @@ -372,7 +390,6 @@ typedef struct ldmsd_prdcr_listen {
LDMSD_PRDCR_LISTEN_STATE_RUNNING,
} state;
const char *hostname_regex_s;
uint64_t prdcr_conn_intvl; /* reconnect interval of generated producers */
regex_t regex;
int rails; /* Rail size */
int recv_credits; /* bytes */
Expand Down
2 changes: 2 additions & 0 deletions ldms/src/ldmsd/ldmsd_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ const char *prdcr_state_str(enum ldmsd_prdcr_state state)
return "CONNECTED";
case LDMSD_PRDCR_STATE_STOPPING:
return "STOPPING";
case LDMSD_PRDCR_STATE_STANDBY:
return "STANDBY";
}
return "BAD STATE";
}
Expand Down
77 changes: 59 additions & 18 deletions ldms/src/ldmsd/ldmsd_prdcr.c
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,15 @@ static int __advertise_resp_cb(ldmsd_req_cmd_t rcmd)
if (prdcr->xprt)
ldms_xprt_close(prdcr->xprt);
ovis_log(config_log, OVIS_LINFO, "advertise: %s.\n", errmsg);
} else if (EAGAIN == resp->rsp_err) {
/*
* The aggregator isn't ready to receive the advertisement.
* Retry again at the next interval.
*/
ovis_log(config_log, OVIS_LINFO, "advertise: The aggregator "
"isn't ready to accept an advertisement. Retry again\n");
if (prdcr->xprt)
ldms_xprt_close(prdcr->xprt);
} else {
/*
* LDMSD doesn't automatically stop the advertisement to
Expand Down Expand Up @@ -740,7 +749,7 @@ static int __agg_routine(ldms_t x, ldms_xprt_event_t e, ldmsd_prdcr_t prdcr)
return is_reset_prdcr;
}

static void prdcr_connect_cb(ldms_t x, ldms_xprt_event_t e, void *cb_arg)
void prdcr_connect_cb(ldms_t x, ldms_xprt_event_t e, void *cb_arg)
{
int is_reset_prdcr = 0;
ldmsd_prdcr_t prdcr = cb_arg;
Expand Down Expand Up @@ -787,6 +796,7 @@ static void prdcr_connect_cb(ldms_t x, ldms_xprt_event_t e, void *cb_arg)
prdcr_reset_sets(prdcr);
switch (prdcr->conn_state) {
case LDMSD_PRDCR_STATE_STOPPING:
case LDMSD_PRDCR_STATE_STANDBY:
prdcr->conn_state = LDMSD_PRDCR_STATE_STOPPED;
break;
case LDMSD_PRDCR_STATE_DISCONNECTED:
Expand All @@ -803,7 +813,8 @@ static void prdcr_connect_cb(ldms_t x, ldms_xprt_event_t e, void *cb_arg)
assert(0 == "BAD STATE");
}
if (prdcr->xprt) {
if (prdcr->type == LDMSD_PRDCR_TYPE_PASSIVE) {
if ((prdcr->type == LDMSD_PRDCR_TYPE_PASSIVE) ||
(prdcr->type == LDMSD_PRDCR_TYPE_ADVERTISED)) {
/* Put back the ldms_xprt_by_remote_sin() reference. */
ldms_xprt_put(prdcr->xprt);
}
Expand All @@ -821,9 +832,6 @@ static void prdcr_connect_cb(ldms_t x, ldms_xprt_event_t e, void *cb_arg)
ldmsd_prdcr_unlock(prdcr);
}

extern const char *auth_name;
extern struct attr_value_list *auth_opt;

static void prdcr_connect(ldmsd_prdcr_t prdcr)
{
int ret;
Expand Down Expand Up @@ -857,17 +865,19 @@ static void prdcr_connect(ldmsd_prdcr_t prdcr)
}
break;
case LDMSD_PRDCR_TYPE_PASSIVE:
case LDMSD_PRDCR_TYPE_ADVERTISED:
assert(prdcr->xprt == NULL);
prdcr->xprt = ldms_xprt_by_remote_sin((struct sockaddr *)&prdcr->ss);
/*
* The transport endpoint has be assigned in the advertise_notification handler before
* the producer has been started.
*
* Call connect callback to advance state and update timers
*/
ldms_xprt_event_cb_set(prdcr->xprt, prdcr_connect_cb, prdcr);
/* let through */
case LDMSD_PRDCR_TYPE_ADVERTISED:
if (prdcr->xprt) {
ldms_xprt_event_cb_set(prdcr->xprt, prdcr_connect_cb, prdcr);
/*
* For 'ADVERTISED' producers,
* prdcr->xprt is assigned when the aggregator
* has received the advertisement notification.
*
* Call connect callback to advance state and update timers
*/
ldmsd_prdcr_unlock(prdcr);
struct ldms_xprt_event conn_ev = {.type = LDMS_XPRT_EVENT_CONNECTED};
prdcr_connect_cb(prdcr->xprt, &conn_ev, prdcr);
Expand All @@ -889,6 +899,7 @@ static void prdcr_task_cb(ldmsd_task_t task, void *arg)
case LDMSD_PRDCR_STATE_STOPPING:
ldmsd_task_stop(&prdcr->task);
break;
case LDMSD_PRDCR_STATE_STANDBY:
case LDMSD_PRDCR_STATE_DISCONNECTED:
prdcr_connect(prdcr);
break;
Expand Down Expand Up @@ -1108,12 +1119,31 @@ int __ldmsd_prdcr_start(ldmsd_prdcr_t prdcr, ldmsd_sec_ctxt_t ctxt)
rc = ldmsd_cfgobj_access_check(&prdcr->obj, 0222, ctxt);
if (rc)
goto out;
if (prdcr->conn_state != LDMSD_PRDCR_STATE_STOPPED) {
rc = EBUSY;
goto out;
}

prdcr->conn_state = LDMSD_PRDCR_STATE_DISCONNECTED;
if (prdcr->type == LDMSD_PRDCR_TYPE_ADVERTISED) {
if (prdcr->conn_state == LDMSD_PRDCR_STATE_STOPPED) {
/* The connect was disconnected. */
prdcr->conn_state = LDMSD_PRDCR_STATE_DISCONNECTED;
} else if (prdcr->conn_state == LDMSD_PRDCR_STATE_STANDBY) {
/*
* The connection is still connected.
*
* The state will be synchronously moved to CONNECTED
* in prdcr_connect().
*
*/
} else {
rc = EBUSY;
goto out;
}
} else {
if (prdcr->conn_state != LDMSD_PRDCR_STATE_STOPPED) {
rc = EBUSY;
goto out;
} else {
prdcr->conn_state = LDMSD_PRDCR_STATE_DISCONNECTED;
}
}

prdcr->obj.perm |= LDMSD_PERM_DSTART;
ldmsd_task_start(&prdcr->task, prdcr_task_cb, prdcr,
Expand Down Expand Up @@ -1163,6 +1193,17 @@ int __ldmsd_prdcr_stop(ldmsd_prdcr_t prdcr, ldmsd_sec_ctxt_t ctxt)
rc = EBUSY;
goto out;
}

if (prdcr->type == LDMSD_PRDCR_TYPE_ADVERTISED) {
if (prdcr->conn_state == LDMSD_PRDCR_STATE_STANDBY) {
/*
* Already stopped, return 0 so that caller knows stop succeeds.
*/
rc = 0;
goto out;
}
}

ovis_log(prdcr_log, OVIS_LINFO, "Stopping producer %s\n", prdcr->obj.name);
if (prdcr->type == LDMSD_PRDCR_TYPE_LOCAL)
prdcr_reset_sets(prdcr);
Expand Down
112 changes: 63 additions & 49 deletions ldms/src/ldmsd/ldmsd_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -7469,7 +7469,7 @@ static char * __prdcr_stats_as_json(size_t *json_sz)
size_t sz = __APPEND_SZ;
int prdcr_count = 0, stopped_count = 0, disconnected_count = 0,
connecting_count = 0, connected_count = 0, stopping_count = 0,
set_count = 0;
set_count = 0, standby_count = 0;

(void)clock_gettime(CLOCK_REALTIME, &start);
ldmsd_cfg_lock(LDMSD_CFGOBJ_PRDCR);
Expand All @@ -7492,6 +7492,9 @@ static char * __prdcr_stats_as_json(size_t *json_sz)
case LDMSD_PRDCR_STATE_STOPPING:
stopping_count++;
break;
case LDMSD_PRDCR_STATE_STANDBY:
standby_count++;
break;
}
set_count += rbt_card(&prdcr->set_tree);
}
Expand All @@ -7509,6 +7512,7 @@ static char * __prdcr_stats_as_json(size_t *json_sz)
__APPEND(" \"connecting_count\": %d,\n", connecting_count);
__APPEND(" \"connected_count\": %d,\n", connected_count);
__APPEND(" \"stopping_count\": %d,\n", stopping_count);
__APPEND(" \"standby_count\": %d,\n", standby_count);
__APPEND(" \"set_count\": %d,\n", set_count);
(void)clock_gettime(CLOCK_REALTIME, &end);
uint64_t compute_time = ldms_timespec_diff_us(&start, &end);
Expand Down Expand Up @@ -9496,7 +9500,6 @@ static int prdcr_listen_add_handler(ldmsd_req_ctxt_t reqc)
char *disabled_start;
char *attr_name;
ldmsd_prdcr_listen_t pl;
long reconnect_us;
int rail;
uint64_t credits;
uint64_t rx_rate;
Expand All @@ -9513,25 +9516,6 @@ static int prdcr_listen_add_handler(ldmsd_req_ctxt_t reqc)
cidr_str = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_IP);
disabled_start = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_AUTO_INTERVAL);

attr_name = "reconnect";
reconnect_str = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_INTERVAL);
if (!reconnect_str) {
goto einval;
} else {
reqc->errcode = ovis_time_str2us(reconnect_str, &reconnect_us);
if (reqc->errcode) {
(void) snprintf(reqc->line_buf, reqc->line_len,
"The given 'reconnect' is invalid.");
goto send_reply;
}
if (reconnect_us <= 0) {
reqc->errcode = EINVAL;
(void) snprintf(reqc->line_buf, reqc->line_len,
"The reconnect interval must be a positive number.");
goto send_reply;
}
}

rail_str = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_RAIL);
if (rail_str) {
rail = atoi(rail_str);
Expand Down Expand Up @@ -9599,7 +9583,6 @@ static int prdcr_listen_add_handler(ldmsd_req_ctxt_t reqc)
}
}

pl->prdcr_conn_intvl = reconnect_us;
if (cidr_str) {
pl->cidr_str = strdup(cidr_str);
if (!pl->cidr_str) {
Expand Down Expand Up @@ -9834,9 +9817,6 @@ static int prdcr_listen_status_handler(ldmsd_req_ctxt_t reqc)
struct rbn *rbn;
struct ldmsd_req_attr_s attr;

/*
* TODO: It'll be helpfull to list all producers generated by each prdcr_listen.
*/
ldmsd_cfg_lock(LDMSD_CFGOBJ_PRDCR_LISTEN);
for (pl = (ldmsd_prdcr_listen_t)ldmsd_cfgobj_first(LDMSD_CFGOBJ_PRDCR_LISTEN); pl;
pl = (ldmsd_prdcr_listen_t)ldmsd_cfgobj_next(&pl->obj)) {
Expand All @@ -9849,13 +9829,11 @@ static int prdcr_listen_status_handler(ldmsd_req_ctxt_t reqc)
"\"state\":\"%s\","
"\"regex\":\"%s\","
"\"IP range\":\"%s\","
"\"reconnect\":\"%ld\","
"\"producers\":[",
pl->obj.name,
((pl->state==LDMSD_PRDCR_LISTEN_STATE_RUNNING)?("running"):("stopped")),
(pl->hostname_regex_s?pl->hostname_regex_s:"-"),
(pl->cidr_str?pl->cidr_str:"-"),
pl->prdcr_conn_intvl
(pl->cidr_str?pl->cidr_str:"-")
);
if (rc)
goto err;
Expand Down Expand Up @@ -9920,9 +9898,10 @@ static int prdcr_listen_status_handler(ldmsd_req_ctxt_t reqc)
extern int __ldmsd_updtr_prdcr_add(ldmsd_updtr_t updtr, ldmsd_prdcr_t prdcr);
/* The implementations are in ldmsd_prdcr.c */
extern ldmsd_prdcr_ref_t prdcr_ref_new(ldmsd_prdcr_t prdcr);
extern void prdcr_connect_cb(ldms_t x, ldms_xprt_event_t e, void *cb_arg);
static int __process_advertisement(ldmsd_req_ctxt_t reqc, ldmsd_prdcr_listen_t lp, struct ldms_addr *rem_addr)
{
int rc;
int rc = 0;
char *name;
char *xprt_s;
char *hostname;
Expand All @@ -9933,7 +9912,8 @@ static int __process_advertisement(ldmsd_req_ctxt_t reqc, ldmsd_prdcr_listen_t l
struct ldmsd_sec_ctxt sctxt;
uid_t uid;
gid_t gid;
int is_start;
int is_start = 0;
struct ldms_xprt_event conn_ev;
name = xprt_s = hostname = NULL;

attr_name = "name";
Expand All @@ -9948,7 +9928,6 @@ static int __process_advertisement(ldmsd_req_ctxt_t reqc, ldmsd_prdcr_listen_t l

xprt_s = (char *)ldms_xprt_type_name(reqc->xprt->ldms.ldms);

/* TODO: make sure that it makes sense to use uid, gid from the request context. */
ldmsd_req_ctxt_sec_get(reqc, &sctxt);
uid = sctxt.crd.uid;
gid = sctxt.crd.gid;
Expand All @@ -9968,14 +9947,6 @@ static int __process_advertisement(ldmsd_req_ctxt_t reqc, ldmsd_prdcr_listen_t l
goto out;
}
is_start = 1;
} else {
if (prdcr->xprt) {
ovis_log(NULL, OVIS_LERROR,
"Received a duplicate advertise request of producer '%s'. "
"LDMSD ignores the subsequent request.\n", name);
rc = EBUSY;
goto out;
}
}

rbn = rbt_find(&lp->prdcr_tree, name);
Expand Down Expand Up @@ -10017,18 +9988,61 @@ static int __process_advertisement(ldmsd_req_ctxt_t reqc, ldmsd_prdcr_listen_t l
}
ldmsd_cfg_unlock(LDMSD_CFGOBJ_UPDTR);

/*
* The producer state will be moved in the prdcr_task_cb() path.
*/
if (lp->auto_start && is_start) {
rc = ldmsd_prdcr_start(name, NULL, &sctxt);
if (rc) {
ovis_log(config_log, OVIS_LERROR, "Failed to start the "
"generated producer '%s'. Error %d.\n",
name, rc);
goto err;
ldmsd_prdcr_lock(prdcr);
switch (prdcr->conn_state) {
case LDMSD_PRDCR_STATE_CONNECTED:
case LDMSD_PRDCR_STATE_STANDBY:
/* prdcr->xprt is not NULL. */
ovis_log(NULL, OVIS_LERROR,
"Received a duplicate advertise request of producer '%s'. "
"LDMSD ignores the subsequent request.\n", name);
rc = EBUSY;
break;
case LDMSD_PRDCR_STATE_STOPPING:
/*
* The producer was manually stopped but
* the aggregator hasn't received the 'DISCONNECTED' event yet.
* This is a race condition between a disconnected event and
* an advertisement notification. We reject any advertisements
* of this producer until the transport has been completely torn down.
*
* Let the sampler daemon retry again.
*/
rc = EAGAIN;
break;
case LDMSD_PRDCR_STATE_STOPPED:
prdcr->xprt = ldms_xprt_get(reqc->xprt->ldms.ldms);
ldms_xprt_event_cb_set(prdcr->xprt, prdcr_connect_cb, prdcr);
prdcr->conn_state = LDMSD_PRDCR_STATE_STANDBY;
if (lp->auto_start && is_start) {
ldmsd_prdcr_unlock(prdcr);
rc = ldmsd_prdcr_start(name, NULL, &sctxt);
if (rc) {
ovis_log(config_log, OVIS_LERROR, "Failed to start the "
"generated producer '%s'. Error %d.\n",
name, rc);
goto err;
}
ldmsd_prdcr_lock(prdcr);
}
break;
case LDMSD_PRDCR_STATE_DISCONNECTED:
prdcr->xprt = ldms_xprt_get(reqc->xprt->ldms.ldms);
ldms_xprt_event_cb_set(prdcr->xprt, prdcr_connect_cb, prdcr);
/* Move the producer state to CONNECTED */
conn_ev.type = LDMS_XPRT_EVENT_CONNECTED;
ldmsd_prdcr_unlock(prdcr);
prdcr_connect_cb(prdcr->xprt, &conn_ev, prdcr);
ldmsd_prdcr_lock(prdcr);
break;
default:
ovis_log(NULL, OVIS_LERROR, "Reach an unexpected state (%s) of "
"a generated producer %s.\n",
prdcr_state_str(prdcr->conn_state),
prdcr->obj.name);
break;
}
ldmsd_prdcr_unlock(prdcr);
out:
return rc;
einval:
Expand Down

0 comments on commit af3c3ef

Please sign in to comment.