Skip to content

Commit

Permalink
Add AVRO/Serdes support in LDMS Stream
Browse files Browse the repository at this point in the history
  • Loading branch information
narategithub committed Jan 23, 2025
1 parent 0c19380 commit 15db962
Show file tree
Hide file tree
Showing 11 changed files with 653 additions and 90 deletions.
2 changes: 2 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ AS_IF([test "x$enable_store_avro_kafka" != xno],[
])
])
AM_CONDITIONAL([ENABLE_STORE_AVRO_KAFKA], [test "x$enable_store_avro_kafka" != xno])
AM_CONDITIONAL([HAVE_LIBAVRO], [test "x$HAVE_LIBAVRO" != xno])
AM_CONDITIONAL([HAVE_LIBSERDES], [test "x$HAVE_LIBSERDES" != xno])

dnl Options for sampler
OPTION_DEFAULT_ENABLE([sampler], [ENABLE_SAMPLER])
Expand Down
7 changes: 7 additions & 0 deletions ldms/python/ldms.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,7 @@ cdef extern from "ldms.h" nogil:
cpdef enum ldms_stream_type_e:
LDMS_STREAM_STRING
LDMS_STREAM_JSON
LDMS_STREAM_AVRO_SER
enum ldms_stream_event_type:
LDMS_STREAM_EVENT_RECV
LDMS_STREAM_EVENT_SUBSCRIBE_STATUS
Expand Down Expand Up @@ -860,6 +861,12 @@ cdef extern from "ldms.h" nogil:
ldms_stream_client_t ldms_stream_subscribe(const char *stream, int is_regex,
ldms_stream_event_cb_t cb_fn, void *cb_arg,
const char *desc)


ldms_stream_client_t ldms_stream_python_subscribe(const char *stream,
int is_regex, ldms_stream_event_cb_t cb_fn,
void *cb_arg, const char *desc)

void ldms_stream_close(ldms_stream_client_t c)
int ldms_stream_remote_subscribe(ldms_t x, const char *stream, int is_regex,
ldms_stream_event_cb_t cb_fn, void *cb_arg,
Expand Down
252 changes: 171 additions & 81 deletions ldms/python/ldms.pyx

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion ldms/src/core/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ libldms_la_SOURCES = ldms.c ldms_xprt.c ldms_private.h \
ldms_heap.c ldms_heap.h \
ldms_rail.c ldms_rail.h \
ldms_stream.c ldms_stream.h \
ldms_stream_avro_ser.c ldms_stream_avro_ser.h \
ldms_qgroup.c ldms_qgroup.h
libldms_la_LIBADD = -ldl -lpthread $(top_builddir)/lib/src/coll/libcoll.la \
$(top_builddir)/lib/src/ovis_json/libovis_json.la \
$(top_builddir)/lib/src/ovis_event/libovis_event.la \
$(top_builddir)/lib/src/mmalloc/libmmalloc.la \
$(top_builddir)/lib/src/zap/libzap.la \
$(top_builddir)/lib/src/ovis_log/libovis_log.la \
@OPENSSL_LIBS@
@OPENSSL_LIBS@ @LIBAVRO@ @LIBSERDES@

lib_LTLIBRARIES += libldms.la
28 changes: 28 additions & 0 deletions ldms/src/core/ldms.c
Original file line number Diff line number Diff line change
Expand Up @@ -6074,3 +6074,31 @@ ldms_mval_as_timestamp(ldms_mval_t mv, enum ldms_value_type type, int idx)
}
return ts;
}

const char *ldms_stream_type_sym(ldms_stream_type_t t)
{
static const char *tbl[] = {
[LDMS_STREAM_STRING] = "LDMS_STREAM_STRING",
[LDMS_STREAM_JSON] = "LDMS_STREAM_JSON",
[LDMS_STREAM_AVRO_SER] = "LDMS_STREAM_AVRO_SER",
};

if (t < LDMS_STREAM_LAST)
return tbl[t];
return "UNKNOWN";
}

const char *ldms_stream_event_type_sym(enum ldms_stream_event_type t)
{
static const char *tbl[] = {
[LDMS_STREAM_EVENT_RECV] = "LDMS_STREAM_EVENT_RECV",
[LDMS_STREAM_EVENT_CLOSE] = "LDMS_STREAM_EVENT_CLOSE",
[LDMS_STREAM_EVENT_SUBSCRIBE_STATUS] =
"LDMS_STREAM_EVENT_SUBSCRIBE_STATUS",
[LDMS_STREAM_EVENT_UNSUBSCRIBE_STATUS] =
"LDMS_STREAM_EVENT_UNSUBSCRIBE_STATUS",
};
if (t < LDMS_STREAM_EVENT_LAST)
return tbl[t];
return "UNKNOWN";
}
125 changes: 121 additions & 4 deletions ldms/src/core/ldms.h
Original file line number Diff line number Diff line change
Expand Up @@ -1381,8 +1381,35 @@ void ldms_qgroup_info_free(ldms_qgroup_info_t qinfo);
typedef enum ldms_stream_type_e {
LDMS_STREAM_STRING,
LDMS_STREAM_JSON,
LDMS_STREAM_AVRO_SER,
LDMS_STREAM_LAST, /* the last enumureation; not a real type */
} ldms_stream_type_t;

/**
* \brief Stream Type Symbol.
*
* This function returns a constant string symbol (e.g. "LDMS_STREAM_STRING") of
* the given Stream type \c t. If the given type is out of valid range,
* "UNKNOWN" is returned.
*
* \param t The Stream type.
*
* \retval s The symbol (e.g. "LDMS_STREAM_STRING").
*/
const char *ldms_stream_type_sym(ldms_stream_type_t t);

typedef struct ldms_stream_client_s *ldms_stream_client_t;
typedef struct json_entity_s *json_entity_t;

/* from avro/schema.h */
typedef struct avro_obj_t *avro_schema_t;
/* from avro/value.h */
typedef struct avro_value avro_value_t;
void avro_value_decref(avro_value_t *value);
/* from serdes.h */
typedef struct serdes_s serdes_t;
typedef struct serdes_schema_s serdes_schema_t;

/**
* \brief Publish stream data.
*
Expand All @@ -1408,6 +1435,81 @@ int ldms_stream_publish(ldms_t x, const char *stream_name,
uint32_t perm,
const char *data, size_t data_len);

struct ldms_stream_pubobj_str_s {
int len; /* strlen(str) + 1 */
const char *str;
};

struct ldms_stream_pubobj_json_s {
int len; /* strlen(str) + 1 */
const char *str; /* string of a JSON object */
};

struct ldms_stream_pubobj_avro_ser_s {
avro_schema_t schema;
avro_value_t *value;
serdes_t *serdes;
};

typedef struct ldms_stream_pubobj_s {
const char *stream_name; /**< stream name */
const struct ldms_cred *cred; /**< publisher credential */
uint32_t perm; /**< permission */
ldms_stream_type_t stream_type; /**< type of the stream */
union {
struct ldms_stream_pubobj_str_s str;
struct ldms_stream_pubobj_json_s json;
struct ldms_stream_pubobj_avro_ser_s avro_ser;
};

} *ldms_stream_pubobj_t;

/**
* Generic publish interface.
*
* \note \c pub object can be discarded after the call returns.
*/
int ldms_stream_publish_obj(ldms_t x, const struct ldms_stream_pubobj_s *pub);

/**
* \brief Publish Avro \c value to LDMS Stream.
*
* To publish an Avro \c value, a Serdes connection \c serdes is required to
* register the schema of the given \c value. The \c sch parameter is an in/out
* parameter to supply/obtain such schema.
*
* If \c sch is NOT \c NULL and the value of \c *sch is also NOT \c NULL, the
* \c *sch Serdes Schema will be used in the data serialization; skipping the
* schema extraction (saving time). If \c sch is NOT \c NULL but \c *sch IS
* \c NULL, a Serdes Schema extraction is performed and the \c *sch is set to
* the extracted Schema so that the application can save it for later use.
*
* If \c sch IS \c NULL, a Serdes Schema extraction is still performed but won't
* be returned to the application.
*
* If \c x is not \c NULL, LDMS publishes the \c value to the peer over stream
* named \c stream_name. Otherwise, this will be local publishing where all
* matching clients (local and remote) will receive the data.
*
* If \c cred is \c NULL (recommended), the process UID/GID is used. Otherwise,
* the given \c cred is used if the publisher is \c root. If the publisher is
* not \c root, this will result in an error.
*
* \param x The LDMS transport.
* \param value The Avro value object.
* \param serdes The Schema Registry handle (required).
* \param[in,out] sch The serdes schema.
*
* \retval 0 If success.
* \retval ENOPROTOOPT If \c serdes "serializer.framing" is disabled.
* \retval EIO For other \c serdes -related errors.
* \retval errno Other errors.
*/
int ldms_stream_publish_avro_ser(ldms_t x, const char *stream_name,
ldms_cred_t cred, uint32_t perm,
avro_value_t *value, serdes_t *serdes,
struct serdes_schema_s **sch);

/**
* Like \c ldms_stream_publsh(), but publish the content of a file.
*
Expand All @@ -1429,18 +1531,22 @@ int ldms_stream_publish_file(ldms_t x, const char *stream_name,
uint32_t perm,
FILE *file);

typedef struct ldms_stream_client_s *ldms_stream_client_t;
typedef struct json_entity_s *json_entity_t;

enum ldms_stream_event_type {
LDMS_STREAM_EVENT_RECV, /* stream data received */
LDMS_STREAM_EVENT_SUBSCRIBE_STATUS, /* reporting subscription status */
LDMS_STREAM_EVENT_UNSUBSCRIBE_STATUS, /* reporting unsubscription status */
LDMS_STREAM_EVENT_CLOSE, /* reporting stream client close event.
* This is the last event to deliver from a
* client. */

LDMS_STREAM_EVENT_LAST, /* The last enumeration; not a real event */
};

/**
* TODO DOC ME
*/
const char *ldms_stream_event_type_sym(enum ldms_stream_event_type t);

/* For stream data delivery to the application */
struct ldms_stream_recv_data_s {
ldms_stream_client_t client;
Expand All @@ -1451,7 +1557,12 @@ struct ldms_stream_recv_data_s {
uint32_t data_len;
const char *name; /* stream name */
const char *data; /* stream data */
json_entity_t json; /* json entity */

json_entity_t json; /* json entity (LDMS_STREAM_JSON type) */

avro_value_t *avro_value; /* avro value (for LDMS_STREAM_AVRO type) */
serdes_schema_t *serdes_schema; /* serdes schema (for LDMS_STREAM_AVRO type)*/

struct ldms_cred cred; /* credential */
uint32_t perm; /* permission */
uint32_t name_hash; /* stream name hash */
Expand Down Expand Up @@ -1506,6 +1617,12 @@ ldms_stream_subscribe(const char *stream, int is_regex,
ldms_stream_event_cb_t cb_fn, void *cb_arg,
const char *desc);

/* TODO Just try it out .. to be revised. */
ldms_stream_client_t
ldms_stream_subscribe_avro_ser(const char *stream, int is_regex,
ldms_stream_event_cb_t cb_fn, void *cb_arg,
const char *desc, serdes_t *serdes);

/**
* \brief Terminate the stream client.
*
Expand Down
73 changes: 69 additions & 4 deletions ldms/src/core/ldms_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
#include "ldms_rail.h"
#include "ldms_stream.h"
#include "ldms_qgroup.h"
#include "ldms_stream_avro_ser.h"

/* The definition is in ldms.c. */
extern int __enable_profiling[LDMS_XPRT_OP_COUNT];
Expand Down Expand Up @@ -628,6 +629,8 @@ __stream_deliver(struct __stream_buf_s *sbuf, uint64_t msg_gn,
.sbuf = sbuf,
};
json_entity_t json = NULL;
avro_value_t *av = NULL;
serdes_schema_t *ssch = NULL;

if (sbuf)
_ev.pub.recv.src = sbuf->msg->src;
Expand Down Expand Up @@ -693,7 +696,18 @@ __stream_deliver(struct __stream_buf_s *sbuf, uint64_t msg_gn,
gc = 1;
continue;
}
if (!json && stream_type == LDMS_STREAM_JSON && !c->x) {

if (c->is_python) {
/* data parsing for Python client is done in ldms.pyx */
goto skip_parse;
}
if (c->x) {
/* no need to parse data for remote client */
goto skip_parse;
}

/* JSON parsing */
if (!json && stream_type == LDMS_STREAM_JSON) {
/* json object is only required to parse once for
* the local client */
struct json_parser_s *jp = json_parser_new(0);
Expand All @@ -708,6 +722,24 @@ __stream_deliver(struct __stream_buf_s *sbuf, uint64_t msg_gn,
goto cleanup;
}
}

/* AVRO/Serdes parsing */
if (!av && stream_type == LDMS_STREAM_AVRO_SER) {
/* avro value is only required to parse once for
* the local client.
*
* The remote client does not need translation; we can
* just forward it. */
rc = avro_value_from_stream_data(data, data_len,
c->serdes,
&av, &ssch);
if (rc)
continue;
_ev.pub.recv.avro_value = av;
_ev.pub.recv.serdes_schema = ssch;
}

skip_parse:
ref_get(&c->ref, "callback");
pthread_rwlock_unlock(&s->rwlock);
_ev.pub.recv.client = c;
Expand All @@ -730,6 +762,11 @@ __stream_deliver(struct __stream_buf_s *sbuf, uint64_t msg_gn,
cleanup:
if (json)
json_entity_free(json);
if (av) {
/* av is a structrue allocated by avro_value_from_stream_data */
avro_value_decref(av);
free(av);
}
pthread_rwlock_unlock(&s->rwlock);
if (gc) {
/* remove unbound sce from s->client_tq */
Expand Down Expand Up @@ -1040,7 +1077,7 @@ static void __client_ref_free(void *arg)
}

/* subscribe the client to the streams */
static int
int
__client_subscribe(struct ldms_stream_client_s *c)
{
int rc = 0;
Expand Down Expand Up @@ -1089,7 +1126,7 @@ __client_subscribe(struct ldms_stream_client_s *c)
return rc;
}

static ldms_stream_client_t
ldms_stream_client_t
__client_alloc(const char *stream, int is_regex,
ldms_stream_event_cb_t cb_fn, void *cb_arg,
const char *desc)
Expand Down Expand Up @@ -1135,7 +1172,7 @@ __client_alloc(const char *stream, int is_regex,
return c;
}

static void
void
__client_free(ldms_stream_client_t c)
{
ref_put(&c->ref, "init");
Expand Down Expand Up @@ -1168,6 +1205,34 @@ ldms_stream_subscribe(const char *stream, int is_regex,
return c;
}

ldms_stream_client_t
ldms_stream_python_subscribe(const char *stream, int is_regex,
ldms_stream_event_cb_t cb_fn, void *cb_arg,
const char *desc)
{
ldms_stream_client_t c = NULL;
int rc;

if (!cb_fn) {
errno = EINVAL;
goto out;
}

c = __client_alloc(stream, is_regex, cb_fn, cb_arg, desc);
if (!c)
goto out;
c->is_python = 1;
rc = __client_subscribe(c);
if (rc) {
__client_free(c);
c = NULL;
errno = rc;
}

out:
return c;
}

void ldms_stream_close(ldms_stream_client_t c)
{
struct ldms_stream_client_entry_s *sce;
Expand Down
Loading

0 comments on commit 15db962

Please sign in to comment.