From 71d1f6e0fb8b85eeec7f25598603e0f0b6022112 Mon Sep 17 00:00:00 2001 From: Nichamon Naksinehaboon Date: Mon, 27 Nov 2023 22:22:17 -0600 Subject: [PATCH 1/9] Modify the releasing Zap endpoints from IO thread process The patch separates the process of reducing the number of Zap endpoints associated with an IO thread from the process of releasing the endpoint from the IO thread. Additionally, it defers the nullification of `zap->thread` until the endpoint reference reaches zero. `zap_sock` and `zap_ugni` need to remove their events from epoll before delivering the disconnected event to applications, preventing the reception of additional epoll events. However, without this patch, this results in resetting `zap->thread` to NULL. Thus, the applications cannot retrive Zap thread information in the disconnected path. The patch addresses this limitation. --- lib/src/zap/fabric/zap_fabric.c | 2 ++ lib/src/zap/rdma/zap_rdma.c | 4 +++- lib/src/zap/sock/zap_sock.c | 2 ++ lib/src/zap/ugni/zap_ugni.c | 3 +++ lib/src/zap/zap.c | 10 ++++++++-- lib/src/zap/zap_priv.h | 25 +++++++++++++++++++++++++ 6 files changed, 43 insertions(+), 3 deletions(-) diff --git a/lib/src/zap/fabric/zap_fabric.c b/lib/src/zap/fabric/zap_fabric.c index 81347cdea..7552d8ed3 100644 --- a/lib/src/zap/fabric/zap_fabric.c +++ b/lib/src/zap/fabric/zap_fabric.c @@ -456,6 +456,8 @@ static void z_fi_destroy(zap_ep_t zep) DLOG("rep %p has %d ctxts\n", rep, rep->num_ctxts); + zap_io_thread_ep_remove(zep); + /* Do this first. */ while (!LIST_EMPTY(&rep->ep.map_list)) { map = (zap_map_t)LIST_FIRST(&rep->ep.map_list); diff --git a/lib/src/zap/rdma/zap_rdma.c b/lib/src/zap/rdma/zap_rdma.c index e27a3f8de..06a86bb2d 100644 --- a/lib/src/zap/rdma/zap_rdma.c +++ b/lib/src/zap/rdma/zap_rdma.c @@ -499,8 +499,10 @@ static void __rdma_teardown_conn(struct z_rdma_ep *ep) static void z_rdma_destroy(zap_ep_t zep) { struct z_rdma_ep *rep = (void*)zep; - if (zep->thread) + if (zep->thread) { zap_io_thread_ep_release(zep); + zap_io_thread_ep_remove(zep); + } pthread_mutex_lock(&rep->ep.lock); __rdma_teardown_conn(rep); pthread_mutex_unlock(&rep->ep.lock); diff --git a/lib/src/zap/sock/zap_sock.c b/lib/src/zap/sock/zap_sock.c index a22be58cc..008a5eca9 100644 --- a/lib/src/zap/sock/zap_sock.c +++ b/lib/src/zap/sock/zap_sock.c @@ -1992,6 +1992,8 @@ static void z_sock_destroy(zap_ep_t ep) DEBUG_LOG(sep, "%ld z_sock_destroy(%p)\n", GETTID(), sep); + zap_io_thread_ep_remove(ep); + while (!TAILQ_EMPTY(&sep->sq)) { wr = TAILQ_FIRST(&sep->sq); TAILQ_REMOVE(&sep->sq, wr, link); diff --git a/lib/src/zap/ugni/zap_ugni.c b/lib/src/zap/ugni/zap_ugni.c index 4fd759be9..6e0b7419d 100644 --- a/lib/src/zap/ugni/zap_ugni.c +++ b/lib/src/zap/ugni/zap_ugni.c @@ -2235,6 +2235,9 @@ static void z_ugni_destroy(zap_ep_t ep) { struct z_ugni_ep *uep = (void*)ep; CONN_LOG("destroying endpoint %p\n", uep); + + zap_io_thread_ep_remove(ep); + pthread_mutex_lock(&z_ugni_list_mutex); ZUGNI_LIST_REMOVE(uep, link); pthread_mutex_unlock(&z_ugni_list_mutex); diff --git a/lib/src/zap/zap.c b/lib/src/zap/zap.c index f2f3ea938..f169a3124 100644 --- a/lib/src/zap/zap.c +++ b/lib/src/zap/zap.c @@ -910,6 +910,14 @@ zap_err_t zap_io_thread_ep_assign(zap_ep_t ep, int tpi) zap_err_t zap_io_thread_ep_release(zap_ep_t ep) { zap_err_t zerr; + zerr = ep->z->io_thread_ep_release(ep->thread, ep); + __atomic_fetch_sub(&ep->thread->stat->sq_sz, ep->sq_sz, __ATOMIC_SEQ_CST); + return zerr; +} + +zap_err_t zap_io_thread_ep_remove(zap_ep_t ep) +{ + zap_err_t zerr = 0; zap_io_thread_t t = ep->thread; pthread_mutex_lock(&t->mutex); @@ -917,8 +925,6 @@ zap_err_t zap_io_thread_ep_release(zap_ep_t ep) t->_n_ep--; t->stat->n_eps = t->_n_ep; pthread_mutex_unlock(&t->mutex); - zerr = ep->z->io_thread_ep_release(ep->thread, ep); - __atomic_fetch_sub(&ep->thread->stat->sq_sz, ep->sq_sz, __ATOMIC_SEQ_CST); ep->thread = NULL; return zerr; } diff --git a/lib/src/zap/zap_priv.h b/lib/src/zap/zap_priv.h index c699fd1a0..dce830d24 100644 --- a/lib/src/zap/zap_priv.h +++ b/lib/src/zap/zap_priv.h @@ -483,9 +483,34 @@ zap_err_t zap_io_thread_ep_assign(zap_ep_t ep, int tpi); * The transport shall call this function to release an endpoint from the * associated io thread. \c zap.io_thread_ep_release() will also be called as a * subsequence. + * + * Consequently, the endpoint will not be processed by the thread any further. + * However, the endpoint still keeps a reference to the thread for further statistics data access. + * The endpoint references to the thread is removed when \c zap_io_thread_ep_remove() is called. + * + * \param ep A Zap endpoint + * + * \return ZAP_ERR_OK on success. Otherwise, a Zap error code is returned. + * + * \see zap_io_thread_ep_remove */ zap_err_t zap_io_thread_ep_release(zap_ep_t ep); +/** + * Remove \c ep reference to the zap io thread. + * + * The function nullifies the endpoint reference to the thread and decrements + * the number of endpoints associated to the thread. This results in reducing + * the thread's load counter. + * + * \param ep A Zap endpoint + * + * \return ZAP_ERR_OK on success. Otherwise, a Zap error code is returned. + * + * \see zap_io_thread_ep_release + */ +zap_err_t zap_io_thread_ep_remove(zap_ep_t ep); + /* * The zap_thrstat structure maintains state for * the Zap thread utilization tracking functions. From ee86ef192c1110e8ab4bc7b029888c32f15568e7 Mon Sep 17 00:00:00 2001 From: Nichamon Naksinehaboon Date: Wed, 22 Nov 2023 17:05:42 -0600 Subject: [PATCH 2/9] Add APIs to set/get app context in Zap thread stats --- lib/src/zap/zap.c | 25 +++++++++++++++++++++++++ lib/src/zap/zap.h | 35 +++++++++++++++++++++++++++++++++++ lib/src/zap/zap_priv.h | 3 +++ 3 files changed, 63 insertions(+) diff --git a/lib/src/zap/zap.c b/lib/src/zap/zap.c index f169a3124..b2265db35 100644 --- a/lib/src/zap/zap.c +++ b/lib/src/zap/zap.c @@ -937,6 +937,7 @@ void zap_thrstat_reset(zap_thrstat_t stats) stats->proc_count = stats->wait_count = 0; memset(stats->wait_window, 0, sizeof(uint64_t) * stats->window_size); memset(stats->proc_window, 0, sizeof(uint64_t) * stats->window_size); + stats->app_reset_fn(stats->app_ctxt); } static pthread_mutex_t thrstat_list_lock = PTHREAD_MUTEX_INITIALIZER; @@ -971,6 +972,8 @@ void zap_thrstat_reset_all() memset(t->wait_window, 0, sizeof(uint64_t) * t->window_size); memset(t->proc_window, 0, sizeof(uint64_t) * t->window_size); i += 1; + if (t->app_reset_fn) + t->app_reset_fn(t->app_ctxt); } pthread_mutex_unlock(&thrstat_list_lock); } @@ -1152,6 +1155,28 @@ void zap_thrstat_free_result(struct zap_thrstat_result *res) free(res); } +int zap_thrstat_ctxt_set(zap_ep_t zep, void *ctxt, zap_thrstat_app_reset_fn reset_fn) +{ + zap_thrstat_t thrstat = (zep->thread)?(zep->thread->stat):NULL; + if (!thrstat) + return EBUSY; + if (thrstat->app_ctxt) + return EEXIST; + thrstat->app_ctxt = ctxt; + thrstat->app_reset_fn = reset_fn; + return 0; +} + +void *zap_thrstat_ctxt_get(zap_ep_t zep) +{ + zap_thrstat_t thrstat = (zep->thread)?(zep->thread->stat):NULL; + if (!thrstat) { + errno = EBUSY; + return NULL; + } + return thrstat->app_ctxt; +} + pthread_t zap_ep_thread(zap_ep_t ep) { return ep->thread?ep->thread->thread:0; diff --git a/lib/src/zap/zap.h b/lib/src/zap/zap.h index 3c1dde369..f99aa76d1 100644 --- a/lib/src/zap/zap.h +++ b/lib/src/zap/zap.h @@ -900,6 +900,41 @@ void zap_thrstat_free_result(struct zap_thrstat_result *result); */ const char *zap_thrstat_get_name(zap_thrstat_t stats); +/** + * \brief Set application context of the statistics of a thread corresponding to \c zep + * + * Applications may track its usage of a Zap thread by creating a context and caching + * it in the Zap thread statistics objects. + * + * Zap will call \c reset_fn() when \c zap_thrstat_reset() or \c zap_thrstat_reset_all() + * is called. + * + * \param zep a Zap endpoint + * \param ctxt Application context + * \param reset_fn Handle of application's context reset function + * + * \return 0 on success. + * EBUSY is returned if the endpoint has been assigned to a thread. + * EEXIST is returned if a context has been set already. + * + * \see zap_thrstat_ctxt_get + */ +typedef void (*zap_thrstat_app_reset_fn)(void *ctxt); +int zap_thrstat_ctxt_set(zap_ep_t zep, void *ctxt, zap_thrstat_app_reset_fn reset_fn); +/** + * \brief Get application context of the statistics of a thread corresponding to \c zep + * + * \param zep a Zap endpoint + * + * \return Application context in the Zap thread corresponding to \c zep. + * NULL returned if Zap cannot retrieve application's context, and + * errno is set. 0 means no application's context has been set. + * EBUSY means no Zap thread has been assigned to the Zap endpoint \c zep. + * + * \see zap_thrstat_ctxt_set + */ +void *zap_thrstat_ctxt_get(zap_ep_t zep); + /** * \brief Return the time difference in microseconds * diff --git a/lib/src/zap/zap_priv.h b/lib/src/zap/zap_priv.h index dce830d24..a01a34cdc 100644 --- a/lib/src/zap/zap_priv.h +++ b/lib/src/zap/zap_priv.h @@ -535,6 +535,9 @@ struct zap_thrstat { int pool_idx; uint64_t thread_id; + + void *app_ctxt; /* Application statistics corresponding to the thread */ + zap_thrstat_app_reset_fn app_reset_fn; }; #define ZAP_THRSTAT_WINDOW 4096 /*< default window size */ From 51e4f6a33c0264ab0d5f0c7716205cbeb81624dc Mon Sep 17 00:00:00 2001 From: Nichamon Naksinehaboon Date: Tue, 28 Nov 2023 22:13:48 -0600 Subject: [PATCH 3/9] Add more infomation to Zap's thread stats --- lib/src/zap/zap.c | 19 ++++++++++++++++++- lib/src/zap/zap.h | 14 ++++++++++++++ lib/src/zap/zap_priv.h | 2 ++ 3 files changed, 34 insertions(+), 1 deletion(-) diff --git a/lib/src/zap/zap.c b/lib/src/zap/zap.c index b2265db35..fdb5de1fc 100644 --- a/lib/src/zap/zap.c +++ b/lib/src/zap/zap.c @@ -937,7 +937,8 @@ void zap_thrstat_reset(zap_thrstat_t stats) stats->proc_count = stats->wait_count = 0; memset(stats->wait_window, 0, sizeof(uint64_t) * stats->window_size); memset(stats->proc_window, 0, sizeof(uint64_t) * stats->window_size); - stats->app_reset_fn(stats->app_ctxt); + if (stats->app_reset_fn) + stats->app_reset_fn(stats->app_ctxt); } static pthread_mutex_t thrstat_list_lock = PTHREAD_MUTEX_INITIALIZER; @@ -1066,6 +1067,7 @@ void zap_thrstat_wait_start(zap_thrstat_t stats) stats->window_size, stats->proc_sum, stats->proc_window); + stats->proc_tot += proc_us; stats->proc_count += 1; } @@ -1081,6 +1083,7 @@ void zap_thrstat_wait_end(zap_thrstat_t stats) stats->window_size, stats->wait_sum, stats->wait_window); + stats->wait_tot += wait_us; stats->wait_count += 1; } @@ -1136,6 +1139,13 @@ struct zap_thrstat_result *zap_thrstat_get_result() res->entries[i].thread_id = t->thread_id; res->entries[i].tid = t->tid; res->entries[i].pool_idx = t->pool_idx; + res->entries[i].idle_time = t->wait_tot; + res->entries[i].active_time = t->proc_tot; + res->entries[i].app_ctxt = t->app_ctxt; + res->entries[i].wait_start = t->wait_start; + res->entries[i].wait_end = t->wait_end; + res->entries[i].waiting = t->waiting; + res->entries[i].start = t->start; i += 1; } out: @@ -1155,6 +1165,13 @@ void zap_thrstat_free_result(struct zap_thrstat_result *res) free(res); } +struct timespec *zap_ep_thrstat_wait_end(zap_ep_t zep) +{ + if (!zep->thread) + return NULL; + return &zep->thread->stat->wait_end; +} + int zap_thrstat_ctxt_set(zap_ep_t zep, void *ctxt, zap_thrstat_app_reset_fn reset_fn) { zap_thrstat_t thrstat = (zep->thread)?(zep->thread->stat):NULL; diff --git a/lib/src/zap/zap.h b/lib/src/zap/zap.h index f99aa76d1..f00e2ad7f 100644 --- a/lib/src/zap/zap.h +++ b/lib/src/zap/zap.h @@ -870,6 +870,13 @@ struct zap_thrstat_result_entry { int pool_idx; /*< Thread pool index */ uint64_t thread_id; /*< The thread ID (pthread_t) */ pid_t tid; /*< The Linux Thread ID (gettid()) */ + uint64_t idle_time; /*< Total idle time in micro-seconds */ + uint64_t active_time; /*< Total active time in micro-seconds */ + struct timespec start; /*< The reset timestamp */ + struct timespec wait_start; /*< The last timestamp the thread started waiting for events */ + struct timespec wait_end; /*< The last timestamp the thread woke up */ + int waiting; /*< A non-zero value means the thread is active. */ + void *app_ctxt; /*< Pointer to application's context */ }; struct zap_thrstat_result { @@ -900,6 +907,13 @@ void zap_thrstat_free_result(struct zap_thrstat_result *result); */ const char *zap_thrstat_get_name(zap_thrstat_t stats); +/** + * \brief Return the timestamp Zap endpoint's thread went to sleep. + * + * \return The timestamp the thread went to sleep + */ +struct timespec *zap_ep_thrstat_wait_end(zap_ep_t zep); + /** * \brief Set application context of the statistics of a thread corresponding to \c zep * diff --git a/lib/src/zap/zap_priv.h b/lib/src/zap/zap_priv.h index a01a34cdc..047972b93 100644 --- a/lib/src/zap/zap_priv.h +++ b/lib/src/zap/zap_priv.h @@ -531,6 +531,8 @@ struct zap_thrstat { uint64_t *proc_window; uint64_t sq_sz; /* send queue size (in entries) */ uint64_t n_eps; /* number of endpoints */ + uint64_t wait_tot; /* Total idle time since reset in micro-seconds */ + uint64_t proc_tot; /* Total busy time since reset in micro-seconds */ LIST_ENTRY(zap_thrstat) entry; int pool_idx; From d92e18267103067230e6c23efe95206c247e6740 Mon Sep 17 00:00:00 2001 From: Nichamon Naksinehaboon Date: Tue, 21 Nov 2023 14:15:23 -0600 Subject: [PATCH 4/9] Collect Zap thread usages of ldms_xprt --- ldms/src/core/ldms.h | 81 +++++++++++++++++ ldms/src/core/ldms_stream.c | 2 + ldms/src/core/ldms_xprt.c | 171 ++++++++++++++++++++++++++++++++++++ 3 files changed, 254 insertions(+) diff --git a/ldms/src/core/ldms.h b/ldms/src/core/ldms.h index f5bfa3302..d4579f8e5 100644 --- a/ldms/src/core/ldms.h +++ b/ldms/src/core/ldms.h @@ -1730,6 +1730,87 @@ struct ldms_xprt_rate_data { double duration; }; +enum ldms_thrstat_op_e { + LDMS_THRSTAT_OP_OTHER, /* Ignore these operations, e.g., notify */ + LDMS_THRSTAT_OP_CONNECT_SETUP, + LDMS_THRSTAT_OP_DIR_REQ, + LDMS_THRSTAT_OP_DIR_REPLY, + LDMS_THRSTAT_OP_LOOKUP_REQ, + LDMS_THRSTAT_OP_LOOKUP_REPLY, + LDMS_THRSTAT_OP_UPDATE_REQ, + LDMS_THRSTAT_OP_UPDATE_REPLY, + LDMS_THRSTAT_OP_STREAM_MSG, + LDMS_THRSTAT_OP_STREAM_CLIENT, + LDMS_THRSTAT_OP_PUSH_REQ, + LDMS_THRSTAT_OP_PUSH_REPLY, + LDMS_THRSTAT_OP_SET_DELETE_REQ, + LDMS_THRSTAT_OP_SET_DELETE_REPLY, + LDMS_THRSTAT_OP_SEND_MSG, + LDMS_THRSTAT_OP_RECV_MSG, + LDMS_THRSTAT_OP_AUTH, + LDMS_THRSTAT_OP_DISCONNECTED, + LDMS_THRSTAT_OP_COUNT +}; + +/* + * TODO: Revise the comment to explain the intended use of the thr stats structure + * + * ldms_xprt ensures that the thread statistics reported account for + * the time from ldms_xprt receiving an event from Zap until it returns + * its Zap callback. The time duration includes the time in its + * application callback. Applications are responsible for keeping + * track of the time usages by its operations. It may cache the data + * in \c app_stats field. + */ + +struct ldms_thrstat_entry { + uint64_t total; /* Operation's Aggregated time in micro-seconds */ + int count; +}; + +struct ldms_thrstat { + struct timespec last_op_start; + struct timespec last_op_end; + enum ldms_thrstat_op_e last_op; + struct ldms_thrstat_entry ops[LDMS_THRSTAT_OP_COUNT]; +}; + +struct ldms_thrstat_result_entry { + struct zap_thrstat_result_entry *zap_res; + uint64_t idle; + uint64_t zap_time; + uint64_t ops[LDMS_THRSTAT_OP_COUNT]; + void *app_ctxt; +}; + +struct ldms_thrstat_result { + int count; + struct zap_thrstat_result *_zres; + struct ldms_thrstat_result_entry entries[0]; +}; + +/** + * \brief Convert \c enum ldms_thrstat_op_e to a string + * + * \return A string of the operation name + */ +char *ldms_thrstat_op_str(enum ldms_thrstat_op_e e); + +/** + * \brief Return thread usage information + * + * Return an ldms_thrstat_result structure or NULL on memory allocation failure. + * This result must be freed with the ldms_thrstat_free_result() function. + * + * \return A pointer to an ldms_thrstat_result structure + */ +struct ldms_thrstat_result *ldms_thrstat_result_get(); + +/** + * \brief Free an ldms_thrstat_result returned by \c ldms_thrstat_result_get + */ +void ldms_thrstat_result_free(struct ldms_thrstat_result *res); + /** * Query daemon telemetry data across transports * diff --git a/ldms/src/core/ldms_stream.c b/ldms/src/core/ldms_stream.c index 899763bcb..f3114c8d1 100644 --- a/ldms/src/core/ldms_stream.c +++ b/ldms/src/core/ldms_stream.c @@ -525,7 +525,9 @@ __stream_deliver(struct ldms_addr *src, uint64_t msg_gn, ref_get(&c->ref, "callback"); pthread_rwlock_unlock(&s->rwlock); _ev.recv.client = c; + /* TODO: Start: Get timing for application's stream handling time. */ rc = c->cb_fn(&_ev, c->cb_arg); + /* TODO: End: Get timing for application's stream handling time. */ if (__stream_stats_level > 0) { pthread_rwlock_wrlock(&c->rwlock); if (rc) { diff --git a/ldms/src/core/ldms_xprt.c b/ldms/src/core/ldms_xprt.c index 395927113..c47453f17 100644 --- a/ldms/src/core/ldms_xprt.c +++ b/ldms/src/core/ldms_xprt.c @@ -1638,12 +1638,15 @@ void __rail_process_send_credit(ldms_t x, struct ldms_request *req); /* implementation is in ldms_stream.c */ void __stream_req_recv(ldms_t x, int cmd, struct ldms_request *req); +enum ldms_thrstat_op_e req2thrstat_op_tbl[]; static int ldms_xprt_recv_request(struct ldms_xprt *x, struct ldms_request *req) { int cmd = ntohl(req->hdr.cmd); + struct ldms_thrstat *thrstat = zap_thrstat_ctxt_get(x->zap_ep); int rc; + thrstat->last_op = req2thrstat_op_tbl[cmd]; switch (cmd) { case LDMS_CMD_LOOKUP: process_lookup_request(x, req); @@ -2092,7 +2095,10 @@ static int ldms_xprt_recv_reply(struct ldms_xprt *x, struct ldms_reply *reply) int cmd = ntohl(reply->hdr.cmd); uint64_t xid = reply->hdr.xid; struct ldms_context *ctxt; + struct ldms_thrstat *thrstat; ctxt = (struct ldms_context *)(unsigned long)xid; + thrstat = zap_thrstat_ctxt_get(x->zap_ep); + thrstat->last_op = req2thrstat_op_tbl[cmd]; switch (cmd) { case LDMS_CMD_PUSH_REPLY: process_push_reply(x, reply, ctxt); @@ -2469,15 +2475,19 @@ static void handle_zap_read_complete(zap_ep_t zep, zap_event_t ev) { struct ldms_context *ctxt = ev->context; struct ldms_xprt *x = zap_get_ucontext(zep); + struct ldms_thrstat *thrstat = zap_thrstat_ctxt_get(x->zap_ep); switch (ctxt->type) { case LDMS_CONTEXT_UPDATE: + thrstat->last_op = LDMS_THRSTAT_OP_UPDATE_REPLY; __handle_update_data(x, ctxt, ev); break; case LDMS_CONTEXT_UPDATE_META: + thrstat->last_op = LDMS_THRSTAT_OP_UPDATE_REPLY; __handle_update_meta(x, ctxt, ev); break; case LDMS_CONTEXT_LOOKUP_READ: + thrstat->last_op = LDMS_THRSTAT_OP_LOOKUP_REPLY; __handle_lookup(x, ctxt, ev); break; default: @@ -2770,16 +2780,20 @@ static void handle_rendezvous_push(zap_ep_t zep, zap_event_t ev, static void handle_zap_rendezvous(zap_ep_t zep, zap_event_t ev) { struct ldms_xprt *x = zap_get_ucontext(zep); + struct ldms_thrstat *thrstat; if (LDMS_XPRT_AUTH_GUARD(x)) return; + thrstat = zap_thrstat_ctxt_get(x->zap_ep); struct ldms_rendezvous_msg *lm = (typeof(lm))ev->data; switch (ntohl(lm->hdr.cmd)) { case LDMS_XPRT_RENDEZVOUS_LOOKUP: + thrstat->last_op = LDMS_THRSTAT_OP_LOOKUP_REPLY; handle_rendezvous_lookup(zep, ev, x, lm); break; case LDMS_XPRT_RENDEZVOUS_PUSH: + thrstat->last_op = LDMS_THRSTAT_OP_PUSH_REPLY; handle_rendezvous_push(zep, ev, x, lm); break; default: @@ -2856,6 +2870,15 @@ void __rail_zap_handle_conn_req(zap_ep_t zep, zap_event_t ev); void __rail_cb(ldms_t x, ldms_xprt_event_t e, void *cb_arg); void __rail_ep_limit(ldms_t x, void *msg, int msg_len); +void __thrstats_reset(void *ctxt) +{ + int i; + struct ldms_thrstat *thrstat = (struct ldms_thrstat *)ctxt; + + for (i = 0; i < LDMS_THRSTAT_OP_COUNT; i++) + thrstat->ops[i].count = thrstat->ops[i].total = 0; +} + /** * ldms-zap event handling function. */ @@ -2867,12 +2890,35 @@ static void ldms_zap_cb(zap_ep_t zep, zap_event_t ev) char rej_msg[128]; struct rbt set_coll; struct ldms_xprt *x = zap_get_ucontext(zep); + struct ldms_thrstat *thrstat; + struct ldms_thrstat_entry *thrstat_e = NULL; + if (x == NULL) return; #ifdef DEBUG XPRT_LOG(x, OVIS_LDEBUG, "ldms_zap_cb: receive %s. %p: ref_count %d\n", zap_event_str(ev->type), x, x->ref_count); #endif /* DEBUG */ + + errno = 0; + thrstat = zap_thrstat_ctxt_get(zep); + if (!thrstat) { + if ((errno == 0) || (ev->type == ZAP_EVENT_CONNECT_REQUEST)) { + thrstat = calloc(1, sizeof(*thrstat)); + if (!thrstat) { + ovis_log(xlog, OVIS_LCRIT, + "Memory allocation failure.\n"); + return; + } + zap_thrstat_ctxt_set(zep, thrstat, __thrstats_reset); + } else { + ovis_log(xlog, OVIS_LCRIT, "Cannot retrieve thread stats " + "from Zap endpoint. Error %d\n", errno); + assert(0); + return; + } + } + (void)clock_gettime(CLOCK_REALTIME, &thrstat->last_op_start); switch(ev->type) { case ZAP_EVENT_RECV_COMPLETE: recv_cb(x, ev->data); @@ -2887,6 +2933,7 @@ static void ldms_zap_cb(zap_ep_t zep, zap_event_t ev) handle_zap_rendezvous(zep, ev); break; case ZAP_EVENT_CONNECT_REQUEST: + thrstat->last_op = LDMS_THRSTAT_OP_CONNECT_SETUP; __sync_fetch_and_add(&xprt_connect_request_count, 1); if (0 != __ldms_conn_msg_verify(x, ev->data, ev->data_len, rej_msg, sizeof(rej_msg))) { @@ -2903,6 +2950,7 @@ static void ldms_zap_cb(zap_ep_t zep, zap_event_t ev) } break; case ZAP_EVENT_REJECTED: + thrstat->last_op = LDMS_THRSTAT_OP_CONNECT_SETUP; (void)clock_gettime(CLOCK_REALTIME, &x->stats.disconnected); __sync_fetch_and_add(&xprt_reject_count, 1); event.type = LDMS_XPRT_EVENT_REJECTED; @@ -2913,6 +2961,7 @@ static void ldms_zap_cb(zap_ep_t zep, zap_event_t ev) ldms_xprt_put(x); break; case ZAP_EVENT_CONNECTED: + thrstat->last_op = LDMS_THRSTAT_OP_CONNECT_SETUP; (void)clock_gettime(CLOCK_REALTIME, &x->stats.connected); __sync_fetch_and_add(&xprt_connect_count, 1); /* actively connected -- expecting conn_msg */ @@ -2930,6 +2979,7 @@ static void ldms_zap_cb(zap_ep_t zep, zap_event_t ev) ldms_xprt_auth_begin(x); break; case ZAP_EVENT_CONNECT_ERROR: + thrstat->last_op = LDMS_THRSTAT_OP_DISCONNECTED; (void)clock_gettime(CLOCK_REALTIME, &x->stats.disconnected); event.type = LDMS_XPRT_EVENT_ERROR; if (x->event_cb) @@ -2939,6 +2989,7 @@ static void ldms_zap_cb(zap_ep_t zep, zap_event_t ev) ldms_xprt_put(x); break; case ZAP_EVENT_DISCONNECTED: + thrstat->last_op = LDMS_THRSTAT_OP_DISCONNECTED; (void)clock_gettime(CLOCK_REALTIME, &x->stats.disconnected); __sync_fetch_and_add(&xprt_disconnect_count, 1); /* deliver only if CONNECTED has been delivered. */ @@ -2995,6 +3046,7 @@ static void ldms_zap_cb(zap_ep_t zep, zap_event_t ev) ldms_xprt_put(x); break; case ZAP_EVENT_SEND_COMPLETE: + thrstat->last_op = LDMS_THRSTAT_OP_SEND_MSG; if (x->auth_flag != LDMS_XPRT_AUTH_APPROVED) { /* * Do not forward the send_complete to applications @@ -3013,6 +3065,10 @@ static void ldms_zap_cb(zap_ep_t zep, zap_event_t ev) "value %d from network\n", (int) ev->type); assert(0 == "network sent bad zap event value to ldms_zap_cb"); } + (void)clock_gettime(CLOCK_REALTIME, &thrstat->last_op_end); + thrstat_e = &thrstat->ops[thrstat->last_op]; + thrstat_e->total += ldms_timespec_diff_us(&thrstat->last_op_start, &thrstat->last_op_end); + thrstat_e->count += 1; } void ldms_zap_auto_cb(zap_ep_t zep, zap_event_t ev) @@ -4236,6 +4292,121 @@ int ldms_xprt_get_threads(ldms_t x, pthread_t *out, int n) return x->ops.get_threads(x, out, n); } +enum ldms_thrstat_op_e req2thrstat_op_tbl[] = { + /* + * TODO: Finish this table + */ + [LDMS_CMD_DIR] = LDMS_THRSTAT_OP_DIR_REQ , + [LDMS_CMD_DIR_CANCEL] = LDMS_THRSTAT_OP_DIR_REQ , + [LDMS_CMD_LOOKUP] = LDMS_THRSTAT_OP_LOOKUP_REQ , + [LDMS_CMD_REQ_NOTIFY] = LDMS_THRSTAT_OP_OTHER , + [LDMS_CMD_CANCEL_NOTIFY] = LDMS_THRSTAT_OP_OTHER , + [LDMS_CMD_SEND_MSG] = LDMS_THRSTAT_OP_SEND_MSG , + [LDMS_CMD_AUTH_MSG] = LDMS_THRSTAT_OP_AUTH , + [LDMS_CMD_CANCEL_PUSH] = LDMS_THRSTAT_OP_OTHER , + [LDMS_CMD_AUTH] = LDMS_THRSTAT_OP_AUTH , + [LDMS_CMD_SET_DELETE] = LDMS_THRSTAT_OP_SET_DELETE_REQ , + [LDMS_CMD_SEND_CREDIT] = LDMS_THRSTAT_OP_OTHER , + + /* stream requests */ +// [LDMS_CMD_STREAM_MSG] =, /* for stream messages */ +// [LDMS_CMD_STREAM_SUB] =, /* stream subscribe request */ +// [LDMS_CMD_STREAM_UNSUB] =, /* stream subscribe request */ +// +// [LDMS_CMD_REPLY = 0x100] =, + [LDMS_CMD_DIR_REPLY] = LDMS_THRSTAT_OP_DIR_REPLY , +// [LDMS_CMD_DIR_CANCEL_REPLY] =, + [LDMS_CMD_DIR_UPDATE_REPLY] = LDMS_THRSTAT_OP_UPDATE_REPLY , + [LDMS_CMD_LOOKUP_REPLY] = LDMS_THRSTAT_OP_LOOKUP_REPLY , +// [LDMS_CMD_REQ_NOTIFY_REPLY] =, + [LDMS_CMD_AUTH_CHALLENGE_REPLY] = LDMS_THRSTAT_OP_AUTH , + [LDMS_CMD_AUTH_APPROVAL_REPLY] = LDMS_THRSTAT_OP_AUTH , +// [LDMS_CMD_PUSH_REPLY] =, + [LDMS_CMD_AUTH_REPLY] = LDMS_THRSTAT_OP_AUTH , +// [LDMS_CMD_SET_DELETE_REPLY] =, +// +// /* stream replies */ +// [LDMS_CMD_STREAM_SUB_REPLY] =, /* stream subscribe reply (result) */ +// [LDMS_CMD_STREAM_UNSUB_REPLY] =, /* stream subscribe reply (result) */ +}; + +char *ldms_thrstat_op_str_tbl[] = { + [LDMS_THRSTAT_OP_OTHER] = "Other", + [LDMS_THRSTAT_OP_CONNECT_SETUP] = "Connecting", + [LDMS_THRSTAT_OP_DIR_REQ] = "Dir Requests", + [LDMS_THRSTAT_OP_DIR_REPLY] = "Dir Replies", + [LDMS_THRSTAT_OP_LOOKUP_REQ] = "Lookup Requests", + [LDMS_THRSTAT_OP_LOOKUP_REPLY] = "Lookup Replies", + [LDMS_THRSTAT_OP_UPDATE_REQ] = "Update Requests", + [LDMS_THRSTAT_OP_UPDATE_REPLY] = "Update Completes", + [LDMS_THRSTAT_OP_STREAM_MSG] = "Stream Data", + [LDMS_THRSTAT_OP_STREAM_CLIENT] = "Stream Client", + [LDMS_THRSTAT_OP_PUSH_REQ] = "Push Requests", + [LDMS_THRSTAT_OP_PUSH_REPLY] = "Push Replies", + [LDMS_THRSTAT_OP_SET_DELETE_REQ] = "Set Delete Requests", + [LDMS_THRSTAT_OP_SET_DELETE_REPLY] = "Set Delete Replies", + [LDMS_THRSTAT_OP_SEND_MSG] = "Send Messages", + [LDMS_THRSTAT_OP_RECV_MSG] = "Receive Messages", + [LDMS_THRSTAT_OP_AUTH] = "Authentication", + [LDMS_THRSTAT_OP_DISCONNECTED] = "Disconnecting", +}; +char *ldms_thrstat_op_str(enum ldms_thrstat_op_e e) +{ + return ldms_thrstat_op_str_tbl[e]; +} + +struct ldms_thrstat_result *ldms_thrstat_result_get() +{ + struct ldms_thrstat *lstats; + struct zap_thrstat_result *zres; + struct ldms_thrstat_result *res; + struct timespec now; + uint64_t ldms_xprt_time; + int i, j; + + (void)clock_gettime(CLOCK_REALTIME, &now); + + zres = zap_thrstat_get_result(); + + res = calloc(1, sizeof(*res) + + zres->count * sizeof(struct ldms_thrstat_result_entry)); + if (!res) + goto out; + res->_zres = zres; + res->count = zres->count; + for (i = 0; i < zres->count; i++) { + res->entries[i].zap_res = &zres->entries[i]; + res->entries[i].idle = zres->entries[i].idle_time; + + lstats = (struct ldms_thrstat *)zres->entries[i].app_ctxt; + if (!lstats) + continue; + ldms_xprt_time = 0; + for (j = 0; j < LDMS_THRSTAT_OP_COUNT; j++) { + res->entries[i].ops[j] = lstats->ops[j].total; + ldms_xprt_time += lstats->ops[j].total; + } + res->entries[i].zap_time = zres->entries[i].active_time; + if (!zres->entries[i].waiting) { + /* The thread is active. */ + res->entries[i].zap_time += ldms_timespec_diff_us( + &zres->entries[i].wait_end, + &now); + } + res->entries[i].zap_time -= ldms_xprt_time; + } +out: + return res; +} + +void ldms_thrstat_result_free(struct ldms_thrstat_result *res) +{ + if (!res) + return; + zap_thrstat_free_result(res->_zres); + free(res); +} + static void __attribute__ ((constructor)) cs_init(void) { pthread_mutex_init(&xprt_list_lock, 0); From 74f6d4db826ad96be4911d1a9c1909304c1dfe8c Mon Sep 17 00:00:00 2001 From: Nichamon Naksinehaboon Date: Tue, 5 Dec 2023 09:32:23 -0600 Subject: [PATCH 5/9] Cache the timestamp LDMSD storing a set --- ldms/src/ldmsd/ldmsd_updtr.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ldms/src/ldmsd/ldmsd_updtr.c b/ldms/src/ldmsd/ldmsd_updtr.c index 8b60a4cdb..f8542a07a 100644 --- a/ldms/src/ldmsd/ldmsd_updtr.c +++ b/ldms/src/ldmsd/ldmsd_updtr.c @@ -261,6 +261,9 @@ static void updtr_update_cb(ldms_t t, ldms_set_t set, int status, void *arg) clock_gettime(CLOCK_REALTIME, &start); strgp->update_fn(strgp, prd_set); clock_gettime(CLOCK_REALTIME, &end); + if (prd_set->store_stat.start.tv_sec == 0) + prd_set->store_stat.start = start; + prd_set->store_stat.end = end; ldmsd_stat_update(&prd_set->store_stat, &start, &end); ldmsd_strgp_unlock(strgp); } From 1d4ac1123d19ff469ba3d8aa84729b2bdae8c594 Mon Sep 17 00:00:00 2001 From: Nichamon Naksinehaboon Date: Tue, 5 Dec 2023 08:34:22 -0600 Subject: [PATCH 6/9] Update the thread_stats command in ldmsd_controller --- ldms/python/ldmsd/ldmsd_controller | 15 ++- ldms/src/ldmsd/ldmsd_request.c | 143 ++++++++++++++++++++++++++--- 2 files changed, 140 insertions(+), 18 deletions(-) diff --git a/ldms/python/ldmsd/ldmsd_controller b/ldms/python/ldmsd/ldmsd_controller index 19c086e5d..92c1165b5 100755 --- a/ldms/python/ldmsd/ldmsd_controller +++ b/ldms/python/ldmsd/ldmsd_controller @@ -2162,13 +2162,13 @@ class LdmsdCmdParser(cmd.Cmd): return self.__complete_attr_list('set_info', text) def display_thread_stats(self, stats): - print(f"{'Name':16} {'Samples':12} {'Sample Rate':12} " \ + print(f"{'Thread ID':15} {'Linux Thread ID':20} {'Name':16} {'Samples':12} {'Sample Rate':12} " \ f"{'Utilization':12} {'Send Queue Size':16} " \ f"{'Num of EPs':12}") - print("---------------- ------------ ------------ ------------ "\ + print(f"{'-'*15} {'-'*20} ---------------- ------------ ------------ ------------ "\ "---------------- ------------") for e in stats['entries']: - print(f"{e['name']:16} {e['sample_count']:12.0f} " \ + print(f"{e['tid']:^15} {e['thread_id']:20} {e['name']:16} {e['sample_count']:12.0f} " \ f"{e['sample_rate']:12.2f} {e['utilization'] * 100:12.2f} " \ f"{e['sq_sz']:16} {e['n_eps']:12}") @@ -2196,6 +2196,15 @@ class LdmsdCmdParser(cmd.Cmd): return msg = fmt_status(msg) self.display_thread_stats(msg) + print(f"{'='*60}") + for thr in msg['entries']: + thr['ldms_xprt'] = dict(sorted(thr['ldms_xprt'].items(), key = lambda item: item[1], reverse = True)) + total = sum(v for v in thr['ldms_xprt'].values()) + print(f"{thr['tid']} {thr['thread_id']} {thr['name']}") + print(" ", end="") + display = dict([(k, v) for k, v in thr['ldms_xprt'].items() if v != 0]) + print('\n '.join(f"{key:20} {(value/total*100):12.6f}% {value}" for key, value in display.items())) + print(f"{'-'*60}") def complete_thread_stats(self, text, line, begidx, endidx): return self.__complete_attr_list('thread_stats', text) diff --git a/ldms/src/ldmsd/ldmsd_request.c b/ldms/src/ldmsd/ldmsd_request.c index 1869d15d4..97ddab199 100644 --- a/ldms/src/ldmsd/ldmsd_request.c +++ b/ldms/src/ldmsd/ldmsd_request.c @@ -7051,15 +7051,74 @@ static int xprt_stats_handler(ldmsd_req_ctxt_t req) return ENOMEM; } +struct store_time_thread { + pid_t tid; + uint64_t store_time; + struct rbn rbn; +}; + +int __store_time_thread_cmp(void *tree_key, const void *key) +{ + const pid_t a = (pid_t)(uint64_t)tree_key; + pid_t b = (pid_t)(uint64_t)key; + return a - b; +} + +static int __store_time_thread_tree(struct rbt *tree) +{ + ldmsd_prdcr_t prdcr; + struct rbn *prdset_rbn, *rbn; + ldmsd_prdcr_set_t prdset; + struct store_time_thread *ent; + pid_t tid; + int rc = 0; + + for (prdcr = ldmsd_prdcr_first(); prdcr; prdcr = ldmsd_prdcr_next(prdcr)) { + RBT_FOREACH(prdset_rbn, &prdcr->set_tree) { + prdset = container_of(prdset_rbn, struct ldmsd_prdcr_set, rbn); + if (!prdset->set) + continue; + tid = ldms_set_thread_id_get(prdset->set); + rbn = rbt_find(tree, (void*)(uint64_t)tid); + if (!rbn) { + ent = calloc(1, sizeof(*ent)); + if (!ent) { + ovis_log(config_log, OVIS_LCRITICAL, + "Memory Allocation Failure."); + rc = ENOMEM; + goto out; + } + rbn_init(&ent->rbn, (void*)(uint64_t)tid); + rbt_ins(tree, &ent->rbn); + } else { + ent = container_of(rbn, struct store_time_thread, rbn); + } + ent->store_time += (uint64_t)(prdset->store_stat.avg * prdset->store_stat.count); + } + } +out: + return rc; +} + /* * Sends a JSON formatted summary of Zap thread statistics as follows: * * { "count" : , * "entries" : [ - * { "name" : , + * { "name" : , + * "tid" : , + * "thread_id" : , * "sample_count" : , * "sample_rate" : , - * "utilization" : + * "utilization" : , + * "sq_sz" : , + * "n_eps" : , + * "ldms_xprt" : + * { "Idle" : , + * "Zap" :