Skip to content

Commit

Permalink
Make Sampler Discovery support multiple advertisements from a single …
Browse files Browse the repository at this point in the history
…node
  • Loading branch information
nichamon committed May 15, 2024
1 parent a5a0858 commit 32e40fd
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 69 deletions.
19 changes: 18 additions & 1 deletion ldms/src/ldmsd/ldmsd.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,24 @@ typedef struct ldmsd_prdcr {
LDMSD_PRDCR_STATE_CONNECTED,
/** Waiting for task join and xprt cleanup */
LDMSD_PRDCR_STATE_STOPPING,
/** The STANDBY state is valid only for 'GENERATED' producers.
*
* Producer task has been stopped but there is an outstanding xprt.
*
* Once the aggregator receives an advertisement notification
* and verifies that the hostname or IP address matches
* a listen producer, it creates a generated producer,
* maps the producer to the request's transport, moves
* the producer state to STANDBY, and then starts the producer.
*
* The producer synchronously moves to 'CONNECTED' when it starts.
*
* prdcr_stop does not tear down the connection.
* The producer's transport is reset to NULL only when
* the aggregator receives a 'disconnected' event either initiated by
* the sampler daemon or the aggregator.
*/
LDMSD_PRDCR_STATE_STANDBY,
} conn_state;

enum ldmsd_prdcr_type {
Expand Down Expand Up @@ -372,7 +390,6 @@ typedef struct ldmsd_prdcr_listen {
LDMSD_PRDCR_LISTEN_STATE_RUNNING,
} state;
const char *hostname_regex_s;
uint64_t prdcr_conn_intvl; /* reconnect interval of generated producers */
regex_t regex;
int rails; /* Rail size */
int recv_credits; /* bytes */
Expand Down
2 changes: 2 additions & 0 deletions ldms/src/ldmsd/ldmsd_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ const char *prdcr_state_str(enum ldmsd_prdcr_state state)
return "CONNECTED";
case LDMSD_PRDCR_STATE_STOPPING:
return "STOPPING";
case LDMSD_PRDCR_STATE_STANDBY:
return "STANDBY";
}
return "BAD STATE";
}
Expand Down
77 changes: 59 additions & 18 deletions ldms/src/ldmsd/ldmsd_prdcr.c
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,15 @@ static int __advertise_resp_cb(ldmsd_req_cmd_t rcmd)
if (prdcr->xprt)
ldms_xprt_close(prdcr->xprt);
ovis_log(config_log, OVIS_LINFO, "advertise: %s.\n", errmsg);
} else if (EAGAIN == resp->rsp_err) {
/*
* The aggregator isn't ready to receive the advertisement.
* Retry again at the next interval.
*/
ovis_log(config_log, OVIS_LINFO, "advertise: The aggregator "
"isn't ready to accept an advertisement. Retry again\n");
if (prdcr->xprt)
ldms_xprt_close(prdcr->xprt);
} else {
/*
* LDMSD doesn't automatically stop the advertisement to
Expand Down Expand Up @@ -740,7 +749,7 @@ static int __agg_routine(ldms_t x, ldms_xprt_event_t e, ldmsd_prdcr_t prdcr)
return is_reset_prdcr;
}

static void prdcr_connect_cb(ldms_t x, ldms_xprt_event_t e, void *cb_arg)
void prdcr_connect_cb(ldms_t x, ldms_xprt_event_t e, void *cb_arg)
{
int is_reset_prdcr = 0;
ldmsd_prdcr_t prdcr = cb_arg;
Expand Down Expand Up @@ -787,6 +796,7 @@ static void prdcr_connect_cb(ldms_t x, ldms_xprt_event_t e, void *cb_arg)
prdcr_reset_sets(prdcr);
switch (prdcr->conn_state) {
case LDMSD_PRDCR_STATE_STOPPING:
case LDMSD_PRDCR_STATE_STANDBY:
prdcr->conn_state = LDMSD_PRDCR_STATE_STOPPED;
break;
case LDMSD_PRDCR_STATE_DISCONNECTED:
Expand All @@ -803,7 +813,8 @@ static void prdcr_connect_cb(ldms_t x, ldms_xprt_event_t e, void *cb_arg)
assert(0 == "BAD STATE");
}
if (prdcr->xprt) {
if (prdcr->type == LDMSD_PRDCR_TYPE_PASSIVE) {
if ((prdcr->type == LDMSD_PRDCR_TYPE_PASSIVE) ||
(prdcr->type == LDMSD_PRDCR_TYPE_ADVERTISED)) {
/* Put back the ldms_xprt_by_remote_sin() reference. */
ldms_xprt_put(prdcr->xprt);
}
Expand All @@ -821,9 +832,6 @@ static void prdcr_connect_cb(ldms_t x, ldms_xprt_event_t e, void *cb_arg)
ldmsd_prdcr_unlock(prdcr);
}

extern const char *auth_name;
extern struct attr_value_list *auth_opt;

static void prdcr_connect(ldmsd_prdcr_t prdcr)
{
int ret;
Expand Down Expand Up @@ -857,17 +865,19 @@ static void prdcr_connect(ldmsd_prdcr_t prdcr)
}
break;
case LDMSD_PRDCR_TYPE_PASSIVE:
case LDMSD_PRDCR_TYPE_ADVERTISED:
assert(prdcr->xprt == NULL);
prdcr->xprt = ldms_xprt_by_remote_sin((struct sockaddr *)&prdcr->ss);
/*
* The transport endpoint has be assigned in the advertise_notification handler before
* the producer has been started.
*
* Call connect callback to advance state and update timers
*/
ldms_xprt_event_cb_set(prdcr->xprt, prdcr_connect_cb, prdcr);
/* let through */
case LDMSD_PRDCR_TYPE_ADVERTISED:
if (prdcr->xprt) {
ldms_xprt_event_cb_set(prdcr->xprt, prdcr_connect_cb, prdcr);
/*
* For 'ADVERTISED' producers,
* prdcr->xprt is assigned when the aggregator
* has received the advertisement notification.
*
* Call connect callback to advance state and update timers
*/
ldmsd_prdcr_unlock(prdcr);
struct ldms_xprt_event conn_ev = {.type = LDMS_XPRT_EVENT_CONNECTED};
prdcr_connect_cb(prdcr->xprt, &conn_ev, prdcr);
Expand All @@ -889,6 +899,7 @@ static void prdcr_task_cb(ldmsd_task_t task, void *arg)
case LDMSD_PRDCR_STATE_STOPPING:
ldmsd_task_stop(&prdcr->task);
break;
case LDMSD_PRDCR_STATE_STANDBY:
case LDMSD_PRDCR_STATE_DISCONNECTED:
prdcr_connect(prdcr);
break;
Expand Down Expand Up @@ -1108,12 +1119,31 @@ int __ldmsd_prdcr_start(ldmsd_prdcr_t prdcr, ldmsd_sec_ctxt_t ctxt)
rc = ldmsd_cfgobj_access_check(&prdcr->obj, 0222, ctxt);
if (rc)
goto out;
if (prdcr->conn_state != LDMSD_PRDCR_STATE_STOPPED) {
rc = EBUSY;
goto out;
}

prdcr->conn_state = LDMSD_PRDCR_STATE_DISCONNECTED;
if (prdcr->type == LDMSD_PRDCR_TYPE_ADVERTISED) {
if (prdcr->conn_state == LDMSD_PRDCR_STATE_STOPPED) {
/* The connect was disconnected. */
prdcr->conn_state = LDMSD_PRDCR_STATE_DISCONNECTED;
} else if (prdcr->conn_state == LDMSD_PRDCR_STATE_STANDBY) {
/*
* The connection is still connected.
*
* The state will be synchronously moved to CONNECTED
* in prdcr_connect().
*
*/
} else {
rc = EBUSY;
goto out;
}
} else {
if (prdcr->conn_state != LDMSD_PRDCR_STATE_STOPPED) {
rc = EBUSY;
goto out;
} else {
prdcr->conn_state = LDMSD_PRDCR_STATE_DISCONNECTED;
}
}

prdcr->obj.perm |= LDMSD_PERM_DSTART;
ldmsd_task_start(&prdcr->task, prdcr_task_cb, prdcr,
Expand Down Expand Up @@ -1163,6 +1193,17 @@ int __ldmsd_prdcr_stop(ldmsd_prdcr_t prdcr, ldmsd_sec_ctxt_t ctxt)
rc = EBUSY;
goto out;
}

if (prdcr->type == LDMSD_PRDCR_TYPE_ADVERTISED) {
if (prdcr->conn_state == LDMSD_PRDCR_STATE_STANDBY) {
/*
* Already stopped, return 0 so that caller knows stop succeeds.
*/
rc = 0;
goto out;
}
}

ovis_log(prdcr_log, OVIS_LINFO, "Stopping producer %s\n", prdcr->obj.name);
if (prdcr->type == LDMSD_PRDCR_TYPE_LOCAL)
prdcr_reset_sets(prdcr);
Expand Down
Loading

0 comments on commit 32e40fd

Please sign in to comment.