From 75526a8ca7610e2e1c1a9281bea2d4c3fe109989 Mon Sep 17 00:00:00 2001 From: Nichamon Naksinehaboon Date: Wed, 1 May 2024 13:06:37 -0500 Subject: [PATCH] Make Sampler Advertisement support multiple advertisements from a single node --- ldms/src/ldmsd/ldmsd.h | 19 +++++- ldms/src/ldmsd/ldmsd_config.c | 2 + ldms/src/ldmsd/ldmsd_prdcr.c | 77 +++++++++++++++++------ ldms/src/ldmsd/ldmsd_request.c | 112 ++++++++++++++++++--------------- 4 files changed, 142 insertions(+), 68 deletions(-) diff --git a/ldms/src/ldmsd/ldmsd.h b/ldms/src/ldmsd/ldmsd.h index c956a8d072..14915cec70 100644 --- a/ldms/src/ldmsd/ldmsd.h +++ b/ldms/src/ldmsd/ldmsd.h @@ -226,6 +226,24 @@ typedef struct ldmsd_prdcr { LDMSD_PRDCR_STATE_CONNECTED, /** Waiting for task join and xprt cleanup */ LDMSD_PRDCR_STATE_STOPPING, + /** The STANDBY state is valid only for 'GENERATED' producers. + * + * Producer task has been stopped but there is an outstanding xprt. + * + * Once the aggregator receives an advertisement notification + * and verifies that the hostname or IP address matches + * a listen producer, it creates a generated producer, + * maps the producer to the request's transport, moves + * the producer state to STANDBY, and then starts the producer. + * + * The producer synchronously moves to 'CONNECTED' when it starts. + * + * prdcr_stop does not tear down the connection. + * The producer's transport is reset to NULL only when + * the aggregator receives a 'disconnected' event either initiated by + * the sampler daemon or the aggregator. + */ + LDMSD_PRDCR_STATE_STANDBY, } conn_state; enum ldmsd_prdcr_type { @@ -372,7 +390,6 @@ typedef struct ldmsd_prdcr_listen { LDMSD_PRDCR_LISTEN_STATE_RUNNING, } state; const char *hostname_regex_s; - uint64_t prdcr_conn_intvl; /* reconnect interval of generated producers */ regex_t regex; int rails; /* Rail size */ int recv_credits; /* bytes */ diff --git a/ldms/src/ldmsd/ldmsd_config.c b/ldms/src/ldmsd/ldmsd_config.c index 01b05d1fe6..dea3088b35 100644 --- a/ldms/src/ldmsd/ldmsd_config.c +++ b/ldms/src/ldmsd/ldmsd_config.c @@ -242,6 +242,8 @@ const char *prdcr_state_str(enum ldmsd_prdcr_state state) return "CONNECTED"; case LDMSD_PRDCR_STATE_STOPPING: return "STOPPING"; + case LDMSD_PRDCR_STATE_STANDBY: + return "STANDBY"; } return "BAD STATE"; } diff --git a/ldms/src/ldmsd/ldmsd_prdcr.c b/ldms/src/ldmsd/ldmsd_prdcr.c index 3c727fe0c1..62a45d3bc2 100644 --- a/ldms/src/ldmsd/ldmsd_prdcr.c +++ b/ldms/src/ldmsd/ldmsd_prdcr.c @@ -589,6 +589,15 @@ static int __advertise_resp_cb(ldmsd_req_cmd_t rcmd) if (prdcr->xprt) ldms_xprt_close(prdcr->xprt); ovis_log(config_log, OVIS_LINFO, "advertise: %s.\n", errmsg); + } else if (EAGAIN == resp->rsp_err) { + /* + * The aggregator isn't ready to receive the advertisement. + * Retry again at the next interval. + */ + ovis_log(config_log, OVIS_LINFO, "advertise: The aggregator " + "isn't ready to accept an advertisement. Retry again\n"); + if (prdcr->xprt) + ldms_xprt_close(prdcr->xprt); } else { /* * LDMSD doesn't automatically stop the advertisement to @@ -740,7 +749,7 @@ static int __agg_routine(ldms_t x, ldms_xprt_event_t e, ldmsd_prdcr_t prdcr) return is_reset_prdcr; } -static void prdcr_connect_cb(ldms_t x, ldms_xprt_event_t e, void *cb_arg) +void prdcr_connect_cb(ldms_t x, ldms_xprt_event_t e, void *cb_arg) { int is_reset_prdcr = 0; ldmsd_prdcr_t prdcr = cb_arg; @@ -787,6 +796,7 @@ static void prdcr_connect_cb(ldms_t x, ldms_xprt_event_t e, void *cb_arg) prdcr_reset_sets(prdcr); switch (prdcr->conn_state) { case LDMSD_PRDCR_STATE_STOPPING: + case LDMSD_PRDCR_STATE_STANDBY: prdcr->conn_state = LDMSD_PRDCR_STATE_STOPPED; break; case LDMSD_PRDCR_STATE_DISCONNECTED: @@ -803,7 +813,8 @@ static void prdcr_connect_cb(ldms_t x, ldms_xprt_event_t e, void *cb_arg) assert(0 == "BAD STATE"); } if (prdcr->xprt) { - if (prdcr->type == LDMSD_PRDCR_TYPE_PASSIVE) { + if ((prdcr->type == LDMSD_PRDCR_TYPE_PASSIVE) || + (prdcr->type == LDMSD_PRDCR_TYPE_ADVERTISED)) { /* Put back the ldms_xprt_by_remote_sin() reference. */ ldms_xprt_put(prdcr->xprt); } @@ -821,9 +832,6 @@ static void prdcr_connect_cb(ldms_t x, ldms_xprt_event_t e, void *cb_arg) ldmsd_prdcr_unlock(prdcr); } -extern const char *auth_name; -extern struct attr_value_list *auth_opt; - static void prdcr_connect(ldmsd_prdcr_t prdcr) { int ret; @@ -857,17 +865,19 @@ static void prdcr_connect(ldmsd_prdcr_t prdcr) } break; case LDMSD_PRDCR_TYPE_PASSIVE: - case LDMSD_PRDCR_TYPE_ADVERTISED: assert(prdcr->xprt == NULL); prdcr->xprt = ldms_xprt_by_remote_sin((struct sockaddr *)&prdcr->ss); - /* - * The transport endpoint has be assigned in the advertise_notification handler before - * the producer has been started. - * - * Call connect callback to advance state and update timers - */ + ldms_xprt_event_cb_set(prdcr->xprt, prdcr_connect_cb, prdcr); + /* let through */ + case LDMSD_PRDCR_TYPE_ADVERTISED: if (prdcr->xprt) { - ldms_xprt_event_cb_set(prdcr->xprt, prdcr_connect_cb, prdcr); + /* + * For 'ADVERTISED' producers, + * prdcr->xprt is assigned when the aggregator + * has received the advertisement notification. + * + * Call connect callback to advance state and update timers + */ ldmsd_prdcr_unlock(prdcr); struct ldms_xprt_event conn_ev = {.type = LDMS_XPRT_EVENT_CONNECTED}; prdcr_connect_cb(prdcr->xprt, &conn_ev, prdcr); @@ -889,6 +899,7 @@ static void prdcr_task_cb(ldmsd_task_t task, void *arg) case LDMSD_PRDCR_STATE_STOPPING: ldmsd_task_stop(&prdcr->task); break; + case LDMSD_PRDCR_STATE_STANDBY: case LDMSD_PRDCR_STATE_DISCONNECTED: prdcr_connect(prdcr); break; @@ -1108,12 +1119,31 @@ int __ldmsd_prdcr_start(ldmsd_prdcr_t prdcr, ldmsd_sec_ctxt_t ctxt) rc = ldmsd_cfgobj_access_check(&prdcr->obj, 0222, ctxt); if (rc) goto out; - if (prdcr->conn_state != LDMSD_PRDCR_STATE_STOPPED) { - rc = EBUSY; - goto out; - } - prdcr->conn_state = LDMSD_PRDCR_STATE_DISCONNECTED; + if (prdcr->type == LDMSD_PRDCR_TYPE_ADVERTISED) { + if (prdcr->conn_state == LDMSD_PRDCR_STATE_STOPPED) { + /* The connect was disconnected. */ + prdcr->conn_state = LDMSD_PRDCR_STATE_DISCONNECTED; + } else if (prdcr->conn_state == LDMSD_PRDCR_STATE_STANDBY) { + /* + * The connection is still connected. + * + * The state will be synchronously moved to CONNECTED + * in prdcr_connect(). + * + */ + } else { + rc = EBUSY; + goto out; + } + } else { + if (prdcr->conn_state != LDMSD_PRDCR_STATE_STOPPED) { + rc = EBUSY; + goto out; + } else { + prdcr->conn_state = LDMSD_PRDCR_STATE_DISCONNECTED; + } + } prdcr->obj.perm |= LDMSD_PERM_DSTART; ldmsd_task_start(&prdcr->task, prdcr_task_cb, prdcr, @@ -1163,6 +1193,17 @@ int __ldmsd_prdcr_stop(ldmsd_prdcr_t prdcr, ldmsd_sec_ctxt_t ctxt) rc = EBUSY; goto out; } + + if (prdcr->type == LDMSD_PRDCR_TYPE_ADVERTISED) { + if (prdcr->conn_state == LDMSD_PRDCR_STATE_STANDBY) { + /* + * Already stopped, return 0 so that caller knows stop succeeds. + */ + rc = 0; + goto out; + } + } + ovis_log(prdcr_log, OVIS_LINFO, "Stopping producer %s\n", prdcr->obj.name); if (prdcr->type == LDMSD_PRDCR_TYPE_LOCAL) prdcr_reset_sets(prdcr); diff --git a/ldms/src/ldmsd/ldmsd_request.c b/ldms/src/ldmsd/ldmsd_request.c index 03ea5acc98..40ec0d4589 100644 --- a/ldms/src/ldmsd/ldmsd_request.c +++ b/ldms/src/ldmsd/ldmsd_request.c @@ -7469,7 +7469,7 @@ static char * __prdcr_stats_as_json(size_t *json_sz) size_t sz = __APPEND_SZ; int prdcr_count = 0, stopped_count = 0, disconnected_count = 0, connecting_count = 0, connected_count = 0, stopping_count = 0, - set_count = 0; + set_count = 0, standby_count = 0; (void)clock_gettime(CLOCK_REALTIME, &start); ldmsd_cfg_lock(LDMSD_CFGOBJ_PRDCR); @@ -7492,6 +7492,9 @@ static char * __prdcr_stats_as_json(size_t *json_sz) case LDMSD_PRDCR_STATE_STOPPING: stopping_count++; break; + case LDMSD_PRDCR_STATE_STANDBY: + standby_count++; + break; } set_count += rbt_card(&prdcr->set_tree); } @@ -7509,6 +7512,7 @@ static char * __prdcr_stats_as_json(size_t *json_sz) __APPEND(" \"connecting_count\": %d,\n", connecting_count); __APPEND(" \"connected_count\": %d,\n", connected_count); __APPEND(" \"stopping_count\": %d,\n", stopping_count); + __APPEND(" \"standby_count\": %d,\n", standby_count); __APPEND(" \"set_count\": %d,\n", set_count); (void)clock_gettime(CLOCK_REALTIME, &end); uint64_t compute_time = ldms_timespec_diff_us(&start, &end); @@ -9496,7 +9500,6 @@ static int prdcr_listen_add_handler(ldmsd_req_ctxt_t reqc) char *disabled_start; char *attr_name; ldmsd_prdcr_listen_t pl; - long reconnect_us; int rail; uint64_t credits; uint64_t rx_rate; @@ -9513,25 +9516,6 @@ static int prdcr_listen_add_handler(ldmsd_req_ctxt_t reqc) cidr_str = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_IP); disabled_start = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_AUTO_INTERVAL); - attr_name = "reconnect"; - reconnect_str = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_INTERVAL); - if (!reconnect_str) { - goto einval; - } else { - reqc->errcode = ovis_time_str2us(reconnect_str, &reconnect_us); - if (reqc->errcode) { - (void) snprintf(reqc->line_buf, reqc->line_len, - "The given 'reconnect' is invalid."); - goto send_reply; - } - if (reconnect_us <= 0) { - reqc->errcode = EINVAL; - (void) snprintf(reqc->line_buf, reqc->line_len, - "The reconnect interval must be a positive number."); - goto send_reply; - } - } - rail_str = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_RAIL); if (rail_str) { rail = atoi(rail_str); @@ -9599,7 +9583,6 @@ static int prdcr_listen_add_handler(ldmsd_req_ctxt_t reqc) } } - pl->prdcr_conn_intvl = reconnect_us; if (cidr_str) { pl->cidr_str = strdup(cidr_str); if (!pl->cidr_str) { @@ -9834,9 +9817,6 @@ static int prdcr_listen_status_handler(ldmsd_req_ctxt_t reqc) struct rbn *rbn; struct ldmsd_req_attr_s attr; - /* - * TODO: It'll be helpfull to list all producers generated by each prdcr_listen. - */ ldmsd_cfg_lock(LDMSD_CFGOBJ_PRDCR_LISTEN); for (pl = (ldmsd_prdcr_listen_t)ldmsd_cfgobj_first(LDMSD_CFGOBJ_PRDCR_LISTEN); pl; pl = (ldmsd_prdcr_listen_t)ldmsd_cfgobj_next(&pl->obj)) { @@ -9849,13 +9829,11 @@ static int prdcr_listen_status_handler(ldmsd_req_ctxt_t reqc) "\"state\":\"%s\"," "\"regex\":\"%s\"," "\"IP range\":\"%s\"," - "\"reconnect\":\"%ld\"," "\"producers\":[", pl->obj.name, ((pl->state==LDMSD_PRDCR_LISTEN_STATE_RUNNING)?("running"):("stopped")), (pl->hostname_regex_s?pl->hostname_regex_s:"-"), - (pl->cidr_str?pl->cidr_str:"-"), - pl->prdcr_conn_intvl + (pl->cidr_str?pl->cidr_str:"-") ); if (rc) goto err; @@ -9920,9 +9898,10 @@ static int prdcr_listen_status_handler(ldmsd_req_ctxt_t reqc) extern int __ldmsd_updtr_prdcr_add(ldmsd_updtr_t updtr, ldmsd_prdcr_t prdcr); /* The implementations are in ldmsd_prdcr.c */ extern ldmsd_prdcr_ref_t prdcr_ref_new(ldmsd_prdcr_t prdcr); +extern void prdcr_connect_cb(ldms_t x, ldms_xprt_event_t e, void *cb_arg); static int __process_advertisement(ldmsd_req_ctxt_t reqc, ldmsd_prdcr_listen_t lp, struct ldms_addr *rem_addr) { - int rc; + int rc = 0; char *name; char *xprt_s; char *hostname; @@ -9933,7 +9912,8 @@ static int __process_advertisement(ldmsd_req_ctxt_t reqc, ldmsd_prdcr_listen_t l struct ldmsd_sec_ctxt sctxt; uid_t uid; gid_t gid; - int is_start; + int is_start = 0; + struct ldms_xprt_event conn_ev; name = xprt_s = hostname = NULL; attr_name = "name"; @@ -9948,7 +9928,6 @@ static int __process_advertisement(ldmsd_req_ctxt_t reqc, ldmsd_prdcr_listen_t l xprt_s = (char *)ldms_xprt_type_name(reqc->xprt->ldms.ldms); - /* TODO: make sure that it makes sense to use uid, gid from the request context. */ ldmsd_req_ctxt_sec_get(reqc, &sctxt); uid = sctxt.crd.uid; gid = sctxt.crd.gid; @@ -9968,14 +9947,6 @@ static int __process_advertisement(ldmsd_req_ctxt_t reqc, ldmsd_prdcr_listen_t l goto out; } is_start = 1; - } else { - if (prdcr->xprt) { - ovis_log(NULL, OVIS_LERROR, - "Received a duplicate advertise request of producer '%s'. " - "LDMSD ignores the subsequent request.\n", name); - rc = EBUSY; - goto out; - } } rbn = rbt_find(&lp->prdcr_tree, name); @@ -10017,18 +9988,61 @@ static int __process_advertisement(ldmsd_req_ctxt_t reqc, ldmsd_prdcr_listen_t l } ldmsd_cfg_unlock(LDMSD_CFGOBJ_UPDTR); - /* - * The producer state will be moved in the prdcr_task_cb() path. - */ - if (lp->auto_start && is_start) { - rc = ldmsd_prdcr_start(name, NULL, &sctxt); - if (rc) { - ovis_log(config_log, OVIS_LERROR, "Failed to start the " - "generated producer '%s'. Error %d.\n", - name, rc); - goto err; + ldmsd_prdcr_lock(prdcr); + switch (prdcr->conn_state) { + case LDMSD_PRDCR_STATE_CONNECTED: + case LDMSD_PRDCR_STATE_STANDBY: + /* prdcr->xprt is not NULL. */ + ovis_log(NULL, OVIS_LERROR, + "Received a duplicate advertise request of producer '%s'. " + "LDMSD ignores the subsequent request.\n", name); + rc = EBUSY; + break; + case LDMSD_PRDCR_STATE_STOPPING: + /* + * The producer was manually stopped but + * the aggregator hasn't received the 'DISCONNECTED' event yet. + * This is a race condition between a disconnected event and + * an advertisement notification. We reject any advertisements + * of this producer until the transport has been completely torn down. + * + * Let the sampler daemon retry again. + */ + rc = EAGAIN; + break; + case LDMSD_PRDCR_STATE_STOPPED: + prdcr->xprt = ldms_xprt_get(reqc->xprt->ldms.ldms); + ldms_xprt_event_cb_set(prdcr->xprt, prdcr_connect_cb, prdcr); + prdcr->conn_state = LDMSD_PRDCR_STATE_STANDBY; + if (lp->auto_start && is_start) { + ldmsd_prdcr_unlock(prdcr); + rc = ldmsd_prdcr_start(name, NULL, &sctxt); + if (rc) { + ovis_log(config_log, OVIS_LERROR, "Failed to start the " + "generated producer '%s'. Error %d.\n", + name, rc); + goto err; + } + ldmsd_prdcr_lock(prdcr); } + break; + case LDMSD_PRDCR_STATE_DISCONNECTED: + prdcr->xprt = ldms_xprt_get(reqc->xprt->ldms.ldms); + ldms_xprt_event_cb_set(prdcr->xprt, prdcr_connect_cb, prdcr); + /* Move the producer state to CONNECTED */ + conn_ev.type = LDMS_XPRT_EVENT_CONNECTED; + ldmsd_prdcr_unlock(prdcr); + prdcr_connect_cb(prdcr->xprt, &conn_ev, prdcr); + ldmsd_prdcr_lock(prdcr); + break; + default: + ovis_log(NULL, OVIS_LERROR, "Reach an unexpected state (%s) of " + "a generated producer %s.\n", + prdcr_state_str(prdcr->conn_state), + prdcr->obj.name); + break; } + ldmsd_prdcr_unlock(prdcr); out: return rc; einval: