Skip to content

Commit

Permalink
Add AVRO/Serdes support in LDMS Stream
Browse files Browse the repository at this point in the history
  • Loading branch information
narategithub authored and Narate Taerat committed Jan 24, 2025
1 parent 0c19380 commit 1e18757
Show file tree
Hide file tree
Showing 11 changed files with 674 additions and 90 deletions.
2 changes: 2 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ AS_IF([test "x$enable_store_avro_kafka" != xno],[
])
])
AM_CONDITIONAL([ENABLE_STORE_AVRO_KAFKA], [test "x$enable_store_avro_kafka" != xno])
AM_CONDITIONAL([HAVE_LIBAVRO], [test "x$HAVE_LIBAVRO" != xno])
AM_CONDITIONAL([HAVE_LIBSERDES], [test "x$HAVE_LIBSERDES" != xno])

dnl Options for sampler
OPTION_DEFAULT_ENABLE([sampler], [ENABLE_SAMPLER])
Expand Down
7 changes: 7 additions & 0 deletions ldms/python/ldms.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,7 @@ cdef extern from "ldms.h" nogil:
cpdef enum ldms_stream_type_e:
LDMS_STREAM_STRING
LDMS_STREAM_JSON
LDMS_STREAM_AVRO_SER
enum ldms_stream_event_type:
LDMS_STREAM_EVENT_RECV
LDMS_STREAM_EVENT_SUBSCRIBE_STATUS
Expand Down Expand Up @@ -860,6 +861,12 @@ cdef extern from "ldms.h" nogil:
ldms_stream_client_t ldms_stream_subscribe(const char *stream, int is_regex,
ldms_stream_event_cb_t cb_fn, void *cb_arg,
const char *desc)


ldms_stream_client_t ldms_stream_python_subscribe(const char *stream,
int is_regex, ldms_stream_event_cb_t cb_fn,
void *cb_arg, const char *desc)

void ldms_stream_close(ldms_stream_client_t c)
int ldms_stream_remote_subscribe(ldms_t x, const char *stream, int is_regex,
ldms_stream_event_cb_t cb_fn, void *cb_arg,
Expand Down
252 changes: 171 additions & 81 deletions ldms/python/ldms.pyx

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion ldms/src/core/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ libldms_la_SOURCES = ldms.c ldms_xprt.c ldms_private.h \
ldms_heap.c ldms_heap.h \
ldms_rail.c ldms_rail.h \
ldms_stream.c ldms_stream.h \
ldms_stream_avro_ser.c ldms_stream_avro_ser.h \
ldms_qgroup.c ldms_qgroup.h
libldms_la_LIBADD = -ldl -lpthread $(top_builddir)/lib/src/coll/libcoll.la \
$(top_builddir)/lib/src/ovis_json/libovis_json.la \
$(top_builddir)/lib/src/ovis_event/libovis_event.la \
$(top_builddir)/lib/src/mmalloc/libmmalloc.la \
$(top_builddir)/lib/src/zap/libzap.la \
$(top_builddir)/lib/src/ovis_log/libovis_log.la \
@OPENSSL_LIBS@
@OPENSSL_LIBS@ @LIBAVRO@ @LIBSERDES@

lib_LTLIBRARIES += libldms.la
28 changes: 28 additions & 0 deletions ldms/src/core/ldms.c
Original file line number Diff line number Diff line change
Expand Up @@ -6074,3 +6074,31 @@ ldms_mval_as_timestamp(ldms_mval_t mv, enum ldms_value_type type, int idx)
}
return ts;
}

const char *ldms_stream_type_sym(ldms_stream_type_t t)
{
static const char *tbl[] = {
[LDMS_STREAM_STRING] = "LDMS_STREAM_STRING",
[LDMS_STREAM_JSON] = "LDMS_STREAM_JSON",
[LDMS_STREAM_AVRO_SER] = "LDMS_STREAM_AVRO_SER",
};

if (t < LDMS_STREAM_LAST)
return tbl[t];
return "UNKNOWN";
}

const char *ldms_stream_event_type_sym(enum ldms_stream_event_type t)
{
static const char *tbl[] = {
[LDMS_STREAM_EVENT_RECV] = "LDMS_STREAM_EVENT_RECV",
[LDMS_STREAM_EVENT_CLOSE] = "LDMS_STREAM_EVENT_CLOSE",
[LDMS_STREAM_EVENT_SUBSCRIBE_STATUS] =
"LDMS_STREAM_EVENT_SUBSCRIBE_STATUS",
[LDMS_STREAM_EVENT_UNSUBSCRIBE_STATUS] =
"LDMS_STREAM_EVENT_UNSUBSCRIBE_STATUS",
};
if (t < LDMS_STREAM_EVENT_LAST)
return tbl[t];
return "UNKNOWN";
}
146 changes: 142 additions & 4 deletions ldms/src/core/ldms.h
Original file line number Diff line number Diff line change
Expand Up @@ -1381,8 +1381,35 @@ void ldms_qgroup_info_free(ldms_qgroup_info_t qinfo);
typedef enum ldms_stream_type_e {
LDMS_STREAM_STRING,
LDMS_STREAM_JSON,
LDMS_STREAM_AVRO_SER,
LDMS_STREAM_LAST, /* the last enumureation; not a real type */
} ldms_stream_type_t;

/**
* \brief Stream Type Symbol.
*
* This function returns a constant string symbol (e.g. "LDMS_STREAM_STRING") of
* the given Stream type \c t. If the given type is out of valid range,
* "UNKNOWN" is returned.
*
* \param t The Stream type.
*
* \retval s The symbol (e.g. "LDMS_STREAM_STRING").
*/
const char *ldms_stream_type_sym(ldms_stream_type_t t);

typedef struct ldms_stream_client_s *ldms_stream_client_t;
typedef struct json_entity_s *json_entity_t;

/* from avro/schema.h */
typedef struct avro_obj_t *avro_schema_t;
/* from avro/value.h */
typedef struct avro_value avro_value_t;
void avro_value_decref(avro_value_t *value);
/* from serdes.h */
typedef struct serdes_s serdes_t;
typedef struct serdes_schema_s serdes_schema_t;

/**
* \brief Publish stream data.
*
Expand All @@ -1408,6 +1435,81 @@ int ldms_stream_publish(ldms_t x, const char *stream_name,
uint32_t perm,
const char *data, size_t data_len);

struct ldms_stream_pubobj_str_s {
int len; /* strlen(str) + 1 */
const char *str;
};

struct ldms_stream_pubobj_json_s {
int len; /* strlen(str) + 1 */
const char *str; /* string of a JSON object */
};

struct ldms_stream_pubobj_avro_ser_s {
avro_schema_t schema;
avro_value_t *value;
serdes_t *serdes;
};

typedef struct ldms_stream_pubobj_s {
const char *stream_name; /**< stream name */
const struct ldms_cred *cred; /**< publisher credential */
uint32_t perm; /**< permission */
ldms_stream_type_t stream_type; /**< type of the stream */
union {
struct ldms_stream_pubobj_str_s str;
struct ldms_stream_pubobj_json_s json;
struct ldms_stream_pubobj_avro_ser_s avro_ser;
};

} *ldms_stream_pubobj_t;

/**
* Generic publish interface.
*
* \note \c pub object can be discarded after the call returns.
*/
int ldms_stream_publish_obj(ldms_t x, const struct ldms_stream_pubobj_s *pub);

/**
* \brief Publish Avro \c value to LDMS Stream.
*
* To publish an Avro \c value, a Serdes connection \c serdes is required to
* register the schema of the given \c value. The \c sch parameter is an in/out
* parameter to supply/obtain such schema.
*
* If \c sch is NOT \c NULL and the value of \c *sch is also NOT \c NULL, the
* \c *sch Serdes Schema will be used in the data serialization; skipping the
* schema extraction (saving time). If \c sch is NOT \c NULL but \c *sch IS
* \c NULL, a Serdes Schema extraction is performed and the \c *sch is set to
* the extracted Schema so that the application can save it for later use.
*
* If \c sch IS \c NULL, a Serdes Schema extraction is still performed but won't
* be returned to the application.
*
* If \c x is not \c NULL, LDMS publishes the \c value to the peer over stream
* named \c stream_name. Otherwise, this will be local publishing where all
* matching clients (local and remote) will receive the data.
*
* If \c cred is \c NULL (recommended), the process UID/GID is used. Otherwise,
* the given \c cred is used if the publisher is \c root. If the publisher is
* not \c root, this will result in an error.
*
* \param x The LDMS transport.
* \param value The Avro value object.
* \param serdes The Schema Registry handle (required).
* \param[in,out] sch The serdes schema.
*
* \retval 0 If success.
* \retval ENOPROTOOPT If \c serdes "serializer.framing" is disabled.
* \retval EIO For other \c serdes -related errors.
* \retval errno Other errors.
*/
int ldms_stream_publish_avro_ser(ldms_t x, const char *stream_name,
ldms_cred_t cred, uint32_t perm,
avro_value_t *value, serdes_t *serdes,
struct serdes_schema_s **sch);

/**
* Like \c ldms_stream_publsh(), but publish the content of a file.
*
Expand All @@ -1429,18 +1531,25 @@ int ldms_stream_publish_file(ldms_t x, const char *stream_name,
uint32_t perm,
FILE *file);

typedef struct ldms_stream_client_s *ldms_stream_client_t;
typedef struct json_entity_s *json_entity_t;

enum ldms_stream_event_type {
LDMS_STREAM_EVENT_RECV, /* stream data received */
LDMS_STREAM_EVENT_SUBSCRIBE_STATUS, /* reporting subscription status */
LDMS_STREAM_EVENT_UNSUBSCRIBE_STATUS, /* reporting unsubscription status */
LDMS_STREAM_EVENT_CLOSE, /* reporting stream client close event.
* This is the last event to deliver from a
* client. */

LDMS_STREAM_EVENT_LAST, /* The last enumeration; not a real event */
};

/**
* \brief String symbol of event type \c t for printing.
*
* \retval s The string symbol of the given event type (e.g. "LDMS_STREAM_EVENT_RECV").
* \retval "UNKNOWn" If the event type \c t is out of range.
*/
const char *ldms_stream_event_type_sym(enum ldms_stream_event_type t);

/* For stream data delivery to the application */
struct ldms_stream_recv_data_s {
ldms_stream_client_t client;
Expand All @@ -1451,7 +1560,12 @@ struct ldms_stream_recv_data_s {
uint32_t data_len;
const char *name; /* stream name */
const char *data; /* stream data */
json_entity_t json; /* json entity */

json_entity_t json; /* json entity (LDMS_STREAM_JSON type) */

avro_value_t *avro_value; /* avro value (for LDMS_STREAM_AVRO type) */
serdes_schema_t *serdes_schema; /* serdes schema (for LDMS_STREAM_AVRO type)*/

struct ldms_cred cred; /* credential */
uint32_t perm; /* permission */
uint32_t name_hash; /* stream name hash */
Expand Down Expand Up @@ -1506,6 +1620,30 @@ ldms_stream_subscribe(const char *stream, int is_regex,
ldms_stream_event_cb_t cb_fn, void *cb_arg,
const char *desc);

/**
* \brief Subscribe to a stream with Avro/Serdes decoding.
*
* This is essentially the same as \c ldms_stream_subscribe(), but with
* \c serdes handle to decode data encoded by Avro/Serdes (see also:
* \c ldms_stream_publish_avro_ser()).
*
* \param stream The stream name or regular expression.
* \param is_regex 1 if `stream` is a regular expression. Otherwise, 0.
* \param cb_fn The callback function for stream data delivery.
* \param cb_arg The application context to the `cb_fn`.
* \param desc An optional short description of the client of this subscription.
* This could be useful for client stats.
* \param serdes The \c serdes handle.
*
* \retval NULL If there is an error. In this case `errno` is set to describe
* the error.
* \retval ptr The stream client handle.
*/
ldms_stream_client_t
ldms_stream_subscribe_avro_ser(const char *stream, int is_regex,
ldms_stream_event_cb_t cb_fn, void *cb_arg,
const char *desc, serdes_t *serdes);

/**
* \brief Terminate the stream client.
*
Expand Down
Loading

0 comments on commit 1e18757

Please sign in to comment.