Skip to content

Commit

Permalink
Add LDMS_STREAM_EVENT_CLOSE
Browse files Browse the repository at this point in the history
Due to the asynchronous nature of `ldms_stream`, a stream event may
already in the process of being delivered to the application callback
function at the same time that the application closes the stream client
and releases application's resources associated with it. This results in
a use-after-free.

This patch adds `LDMS_STREAM_EVENT_CLOSE` callback event that is
delivered to the application callback function when the stream client is
closed. The `LDMS_STREAM_EVENT_CLOSE` is the last event to be delivered
to the client call back function, so that the application can then
safely release resources asscoiated with the stream client after
receiving it.
  • Loading branch information
narategithub committed Jun 3, 2024
1 parent 437ecf5 commit 5d0be59
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 7 deletions.
7 changes: 6 additions & 1 deletion ldms/python/ldms.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3875,7 +3875,12 @@ cdef class StreamData(object):

cdef int __stream_client_cb(ldms_stream_event_t ev, void *arg) with gil:
cdef StreamClient c = <StreamClient>arg
cdef StreamData sdata = StreamData.from_ldms_stream_event(PTR(ev))
cdef StreamData sdata

if ev.type != LDMS_STREAM_EVENT_RECV:
return 0

sdata = StreamData.from_ldms_stream_event(PTR(ev))
if c.cb:
c.cb(c, sdata, c.cb_arg)
else:
Expand Down
9 changes: 9 additions & 0 deletions ldms/src/core/ldms.h
Original file line number Diff line number Diff line change
Expand Up @@ -1153,6 +1153,9 @@ enum ldms_stream_event_type {
LDMS_STREAM_EVENT_RECV, /* stream data received */
LDMS_STREAM_EVENT_SUBSCRIBE_STATUS, /* reporting subscription status */
LDMS_STREAM_EVENT_UNSUBSCRIBE_STATUS, /* reporting unsubscription status */
LDMS_STREAM_EVENT_CLOSE, /* reporting stream client close event.
* This is the last event to deliver from a
* client. */
};

/* For stream data delivery to the application */
Expand All @@ -1177,12 +1180,18 @@ struct ldms_stream_return_status_s {
int status;
};

/* For stream close event */
struct ldms_stream_close_event_s {
ldms_stream_client_t client;
};

typedef struct ldms_stream_event_s {
ldms_t r; /* rail */
enum ldms_stream_event_type type;
union {
struct ldms_stream_recv_data_s recv;
struct ldms_stream_return_status_s status;
struct ldms_stream_close_event_s close;
};
} *ldms_stream_event_t;

Expand Down
66 changes: 60 additions & 6 deletions ldms/src/core/ldms_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ TAILQ_HEAD(, ldms_stream_client_s) __regex_client_tq = TAILQ_HEAD_INITIALIZER(__

static uint64_t stream_gn = 0;

static pthread_mutex_t __stream_close_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t __stream_close_cond = PTHREAD_COND_INITIALIZER;
static pthread_t __stream_close_thread;
static TAILQ_HEAD(, ldms_stream_client_s)
__stream_close_tq = TAILQ_HEAD_INITIALIZER(__stream_close_tq);

int __rail_rep_send_raw(struct ldms_rail_ep_s *rep, void *data, int len);

/*
Expand Down Expand Up @@ -253,6 +259,8 @@ __remote_client_cb(ldms_stream_event_t ev, void *cb_arg)
ldms_rail_t r;
int ep_idx;
int rc;
if (ev->type == LDMS_STREAM_EVENT_CLOSE)
return 0;
assert( ev->type == LDMS_STREAM_EVENT_RECV );
if (!XTYPE_IS_RAIL(ev->recv.client->x->xtype))
return ENOTSUP;
Expand Down Expand Up @@ -966,7 +974,12 @@ void ldms_stream_close(ldms_stream_client_t c)
ref_put(&c->ref, "__regex_client_tq");
}
__STREAM_UNLOCK();
ref_put(&c->ref, "init");

/* reuse the c->entry for 'close' event queing */
pthread_mutex_lock(&__stream_close_mutex);
TAILQ_INSERT_TAIL(&__stream_close_tq, c, entry);
pthread_cond_signal(&__stream_close_cond);
pthread_mutex_unlock(&__stream_close_mutex);
}

struct __sub_req_ctxt_s {
Expand Down Expand Up @@ -2106,12 +2119,53 @@ int ldms_stream_publish_file(ldms_t x, const char *stream_name,
return rc;
}

static void __ldms_stream_init();

static void *__stream_close_proc(void *arg)
{
struct ldms_stream_client_s *c;
struct ldms_stream_event_s ev;

pthread_atfork(NULL, NULL, __ldms_stream_init); /* re-initialize at fork */

pthread_mutex_lock(&__stream_close_mutex);
loop:
c = TAILQ_FIRST(&__stream_close_tq);
if (!c) {
pthread_cond_wait(&__stream_close_cond, &__stream_close_mutex);
goto loop;
}
TAILQ_REMOVE(&__stream_close_tq, c, entry);
pthread_mutex_unlock(&__stream_close_mutex);

ev.r = c->x;
ev.type = LDMS_STREAM_EVENT_CLOSE;
ev.close.client = c;

ref_get(&c->ref, "cb");
c->cb_fn(&ev, c->cb_arg);
ref_put(&c->ref, "cb");

ref_put(&c->ref, "init");

pthread_mutex_lock(&__stream_close_mutex);
goto loop;

return NULL;
}

__attribute__((constructor))
static void __ldms_stream_init()
{
static int once = 0;
if (once)
return ;
__ldms_stream_log = ovis_log_register("ldms.stream", "LDMS Stream Library");
once = 1;
int rc;
pthread_mutex_init(&__stream_close_mutex, NULL);
pthread_cond_init(&__stream_close_cond, NULL);
if (!__ldms_stream_log)
__ldms_stream_log = ovis_log_register("ldms.stream", "LDMS Stream Library");
rc = pthread_create(&__stream_close_thread, NULL, __stream_close_proc, NULL);
if (rc) {
__ERROR("cannot create ldms_stream_close thread, rc: %d, errno: %d\n", rc, errno);
} else {
pthread_setname_np(__stream_close_thread, "ldms_strm_cls");
}
}

0 comments on commit 5d0be59

Please sign in to comment.