Skip to content

Commit

Permalink
Modify the releasing Zap endpoints from IO thread process
Browse files Browse the repository at this point in the history
The patch separates the process of reducing the number of Zap endpoints
associated with an IO thread from the process of releasing the endpoint
from the IO thread. Additionally, it defers the nullification of
`zap->thread` until the endpoint reference reaches zero.

`zap_sock` and `zap_ugni` need to remove their events from epoll before
delivering the disconnected event to applications, preventing the
reception of additional epoll events. However, without this patch, this
results in resetting `zap->thread` to NULL. Thus, the applications
cannot retrive Zap thread information in the disconnected path. The
patch addresses this limitation.
  • Loading branch information
nichamon committed Dec 4, 2023
1 parent 05de014 commit 2daaf53
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 4 deletions.
2 changes: 2 additions & 0 deletions lib/src/zap/fabric/zap_fabric.c
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,8 @@ static void z_fi_destroy(zap_ep_t zep)

DLOG("rep %p has %d ctxts\n", rep, rep->num_ctxts);

zap_io_thread_ep_remove(zep);

/* Do this first. */
while (!LIST_EMPTY(&rep->ep.map_list)) {
map = (zap_map_t)LIST_FIRST(&rep->ep.map_list);
Expand Down
4 changes: 3 additions & 1 deletion lib/src/zap/rdma/zap_rdma.c
Original file line number Diff line number Diff line change
Expand Up @@ -499,8 +499,10 @@ static void __rdma_teardown_conn(struct z_rdma_ep *ep)
static void z_rdma_destroy(zap_ep_t zep)
{
struct z_rdma_ep *rep = (void*)zep;
if (zep->thread)
if (zep->thread) {
zap_io_thread_ep_release(zep);
zap_io_thread_ep_remove(zep);
}
pthread_mutex_lock(&rep->ep.lock);
__rdma_teardown_conn(rep);
pthread_mutex_unlock(&rep->ep.lock);
Expand Down
2 changes: 2 additions & 0 deletions lib/src/zap/sock/zap_sock.c
Original file line number Diff line number Diff line change
Expand Up @@ -1992,6 +1992,8 @@ static void z_sock_destroy(zap_ep_t ep)

DEBUG_LOG(sep, "%ld z_sock_destroy(%p)\n", GETTID(), sep);

zap_io_thread_ep_remove(ep);

while (!TAILQ_EMPTY(&sep->sq)) {
wr = TAILQ_FIRST(&sep->sq);
TAILQ_REMOVE(&sep->sq, wr, link);
Expand Down
3 changes: 3 additions & 0 deletions lib/src/zap/ugni/zap_ugni.c
Original file line number Diff line number Diff line change
Expand Up @@ -2235,6 +2235,9 @@ static void z_ugni_destroy(zap_ep_t ep)
{
struct z_ugni_ep *uep = (void*)ep;
CONN_LOG("destroying endpoint %p\n", uep);

zap_io_thread_ep_remove(ep);

pthread_mutex_lock(&z_ugni_list_mutex);
ZUGNI_LIST_REMOVE(uep, link);
pthread_mutex_unlock(&z_ugni_list_mutex);
Expand Down
10 changes: 8 additions & 2 deletions lib/src/zap/zap.c
Original file line number Diff line number Diff line change
Expand Up @@ -910,15 +910,21 @@ zap_err_t zap_io_thread_ep_assign(zap_ep_t ep, int tpi)
zap_err_t zap_io_thread_ep_release(zap_ep_t ep)
{
zap_err_t zerr;
zerr = ep->z->io_thread_ep_release(ep->thread, ep);
__atomic_fetch_sub(&ep->thread->stat->sq_sz, ep->sq_sz, __ATOMIC_SEQ_CST);
return zerr;
}

zap_err_t zap_io_thread_ep_remove(zap_ep_t ep)
{
zap_err_t zerr = 0;
zap_io_thread_t t = ep->thread;

pthread_mutex_lock(&t->mutex);
LIST_REMOVE(ep, _entry);
t->_n_ep--;
t->stat->n_eps = t->_n_ep;
pthread_mutex_unlock(&t->mutex);
zerr = ep->z->io_thread_ep_release(ep->thread, ep);
__atomic_fetch_sub(&ep->thread->stat->sq_sz, ep->sq_sz, __ATOMIC_SEQ_CST);
ep->thread = NULL;
return zerr;
}
Expand Down
11 changes: 10 additions & 1 deletion lib/src/zap/zap_priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -478,14 +478,23 @@ double zap_env_dbl(char *name, double default_value);
zap_err_t zap_io_thread_ep_assign(zap_ep_t ep, int tpi);

/**
* Release \c ep from the zap io thread.
* Disable \c ep assigned to the zap io thread.
*
* The transport shall call this function to release an endpoint from the
* associated io thread. \c zap.io_thread_ep_release() will also be called as a
* subsequence.
*/
zap_err_t zap_io_thread_ep_release(zap_ep_t ep);

/**
* Free \c ep from the zap io thread.
*
* The transport shall call this function to release an endpoint from the
* associated io thread. \c zap.io_thread_ep_release() will also be called as a
* subsequence.
*/
zap_err_t zap_io_thread_ep_remove(zap_ep_t ep);

/*
* The zap_thrstat structure maintains state for
* the Zap thread utilization tracking functions.
Expand Down

0 comments on commit 2daaf53

Please sign in to comment.