From 1e187578ab3a16dd27b1ab2a6c618fd3551d3877 Mon Sep 17 00:00:00 2001 From: Narate Taerat Date: Tue, 17 Dec 2024 16:28:08 -0600 Subject: [PATCH] Add AVRO/Serdes support in LDMS Stream --- configure.ac | 2 + ldms/python/ldms.pxd | 7 + ldms/python/ldms.pyx | 252 ++++++++++++------ ldms/src/core/Makefile.am | 3 +- ldms/src/core/ldms.c | 28 ++ ldms/src/core/ldms.h | 146 +++++++++- ldms/src/core/ldms_stream.c | 73 ++++- ldms/src/core/ldms_stream.h | 12 + ldms/src/core/ldms_stream_avro_ser.c | 179 +++++++++++++ ldms/src/core/ldms_stream_avro_ser.h | 59 ++++ ldms/src/sampler/hello_stream/hello_sampler.c | 3 + 11 files changed, 674 insertions(+), 90 deletions(-) create mode 100644 ldms/src/core/ldms_stream_avro_ser.c create mode 100644 ldms/src/core/ldms_stream_avro_ser.h diff --git a/configure.ac b/configure.ac index 4eb068da0..482b96645 100644 --- a/configure.ac +++ b/configure.ac @@ -307,6 +307,8 @@ AS_IF([test "x$enable_store_avro_kafka" != xno],[ ]) ]) AM_CONDITIONAL([ENABLE_STORE_AVRO_KAFKA], [test "x$enable_store_avro_kafka" != xno]) +AM_CONDITIONAL([HAVE_LIBAVRO], [test "x$HAVE_LIBAVRO" != xno]) +AM_CONDITIONAL([HAVE_LIBSERDES], [test "x$HAVE_LIBSERDES" != xno]) dnl Options for sampler OPTION_DEFAULT_ENABLE([sampler], [ENABLE_SAMPLER]) diff --git a/ldms/python/ldms.pxd b/ldms/python/ldms.pxd index 1639ea761..e224fd957 100644 --- a/ldms/python/ldms.pxd +++ b/ldms/python/ldms.pxd @@ -820,6 +820,7 @@ cdef extern from "ldms.h" nogil: cpdef enum ldms_stream_type_e: LDMS_STREAM_STRING LDMS_STREAM_JSON + LDMS_STREAM_AVRO_SER enum ldms_stream_event_type: LDMS_STREAM_EVENT_RECV LDMS_STREAM_EVENT_SUBSCRIBE_STATUS @@ -860,6 +861,12 @@ cdef extern from "ldms.h" nogil: ldms_stream_client_t ldms_stream_subscribe(const char *stream, int is_regex, ldms_stream_event_cb_t cb_fn, void *cb_arg, const char *desc) + + + ldms_stream_client_t ldms_stream_python_subscribe(const char *stream, + int is_regex, ldms_stream_event_cb_t cb_fn, + void *cb_arg, const char *desc) + void ldms_stream_close(ldms_stream_client_t c) int ldms_stream_remote_subscribe(ldms_t x, const char *stream, int is_regex, ldms_stream_event_cb_t cb_fn, void *cb_arg, diff --git a/ldms/python/ldms.pyx b/ldms/python/ldms.pyx index ead8e25ca..d9a139aa7 100644 --- a/ldms/python/ldms.pyx +++ b/ldms/python/ldms.pyx @@ -434,31 +434,46 @@ def JSON_OBJ(o): # otherwise, the object is expected to have `.json_obj()` function return o.json_obj() -def stream_publish(name, stream_data, stream_type=None, perm=0o444, uid=None, gid=None): - """stream_publish(name, stream_data, stream_type=None, perm=0o444, uid=None, gid=None) - - Publish a stream locally. If the remote peer subscribe to the stream, it - will also receive the data. - - Arguments: - - name (str): The name of the stream being published. - - stream_data (bytes, str, dict): - The data being published. If it is `dict` and stream_type is - LDMS_STREAM_JSON or None, the stream_data is converted into JSON - string representation with `json.dumps(stream_data)`. - - stream_type (enum): - LDMS_STREAM_JSON or LDMS_STREAM_STRING or None. - If the type is `None`, it is inferred from the - `type(stream_data)`: LDMS_STREAM_STRING for `str` and `bytes` - types, and LDMS_STREAM_JSON for `dict` type. If - `type(stream_data)` is something else, TypeError is raised. - - perm (int): The file-system-style permission bits (e.g. 0o444). - - uid (int or str): Publish as the given uid; None for euid. - - gid (int or str): Publish as the given gid; None for egid. - - """ +def __avro_stream_data(sr_cli, sch_def, obj): + import avro.schema + import avro.io + import confluent_kafka.schema_registry as sr + + # Resolving Schema (Avro and SchemaRegistry) + sr_sch = None + if type(sch_def) == avro.schema.Schema: + av_sch = sch_def + elif type(sch_def) == sr.Schema: + av_sch = avro.schema.parse(sch_def.schema_str) + sr_sch = sch_def + elif type(sch_def) == dict: + av_sch = avro.schema.make_avsc_object(sch_def) + elif type(sch_def) == str: + av_sch = avro.schema.parse(sch_def) + else: + raise ValueError("Unsupported `sch_def` type") + if sr_sch is None: + sr_sch = sr.Schema(json.dumps(av_sch.to_json()), 'AVRO') + sch_id = sr_cli.register_schema(av_sch.name, sr_sch) + + # framing + framing = struct.pack(">bI", 0, sch_id) + buff = io.BytesIO() + buff.write(framing) + # avro payload + dw = avro.io.DatumWriter(av_sch) + dw.write(obj, avro.io.BinaryEncoder(buff)) + data = buff.getvalue() + return data + +def __stream_publish(Ptr x_ptr, name, stream_data, stream_type=None, + perm=0o444, uid=None, gid=None, + sr_client=None, schema_def=None): cdef int rc cdef ldms_cred cred + cdef ldms_t c_xprt + + c_xprt = NULL if x_ptr is None else x_ptr.c_ptr _t = type(stream_data) # stream type @@ -470,6 +485,14 @@ def stream_publish(name, stream_data, stream_type=None, perm=0o444, uid=None, gi stream_type = ldms.LDMS_STREAM_STRING else: raise TypeError(f"Cannot infer stream_type from the type of stream_data ({type(stream_data)})") + # Avro/Serdes + if stream_type == ldms.LDMS_STREAM_AVRO_SER: + if not sr_client: + raise ValueError(f"LDMS_STREAM_AVRO_SER requires `sr_client`") + if not schema_def: + raise ValueError(f"LDMS_STREAM_AVRO_SER requires `schema_def`") + stream_data = __avro_stream_data(sr_client, schema_def, stream_data) + # Json if stream_type == ldms.LDMS_STREAM_JSON and _t is dict: stream_data = json.dumps(stream_data) @@ -495,12 +518,56 @@ def stream_publish(name, stream_data, stream_type=None, perm=0o444, uid=None, gi else: raise TypeError(f"Type '{type(gid)}' is not supported for `gid`") - rc = ldms_stream_publish(NULL, BYTES(name), stream_type, &cred, perm, + rc = ldms_stream_publish(c_xprt, BYTES(name), stream_type, &cred, perm, BYTES(stream_data), len(stream_data)) if rc: raise RuntimeError(f"ldms_stream_publish() failed, rc: {rc}") +def stream_publish(name, stream_data, stream_type=None, perm=0o444, + uid=None, gid=None, sr_client=None, schema_def=None): + """stream_publish(name, stream_data, stream_type=None, perm=0o444, uid=None, + gid=None, sr_client=None, schema_def=None) + + Publish a stream locally. If the remote peer subscribe to the stream, it + will also receive the data. + + For LDMS_STREAM_AVRO_SER type, SchemaRegistryClient `sr_client` and schema + definition `schema_def` is required. The `stream_data` object will be + serialized by Avro using `schema_def` (see parameter description below). The + `schema_def` is also registered to the Schema Registry with `sr_client`. + + Arguments: + - name (str): The name of the stream being published. + - stream_data (bytes, str, dict): + The data being published. If it is `dict` and stream_type is + LDMS_STREAM_JSON or None, the stream_data is converted into JSON + string representation with `json.dumps(stream_data)`. + - stream_type (enum): + LDMS_STREAM_JSON or LDMS_STREAM_STRING or LDMS_STREAM_AVRO_SER or + None. If the type is `None`, it is inferred from the + `type(stream_data)`: LDMS_STREAM_STRING for `str` and `bytes` types, + and LDMS_STREAM_JSON for `dict` type. If `type(stream_data)` is + something else, TypeError is raised. + - perm (int): The file-system-style permission bits (e.g. 0o444). + - uid (int or str): Publish as the given uid; None for euid. + - gid (int or str): Publish as the given gid; None for egid. + - sr_client (SchemaRegistryClient): + required if `stream_type` is `LDMS_STREAM_AVRO_SER`. In this case, + the stream data is encoded in Avro format, and the schema is + registered to SchemaRegistry. + - schema_def (object): The Schema definition, required for + LDMS_STREAM_AVRO_SER stream_type. This can be of type: + `dict`, `str` (JSON formatted), `avro.Schema`, or + `confluent_kafka.schema_registry.Schema`. The `dict` and `str` + (JSON) must follow Apache Avro Schema specification: + https://avro.apache.org/docs/1.11.1/specification/ + """ + + return __stream_publish(None, name, stream_data, stream_type, perm, uid, + gid, sr_client = sr_client, schema_def = schema_def) + + # ============================ # # == metric getter wrappers == # # ============================ # @@ -3895,11 +3962,20 @@ cdef class Xprt(object): free(tmp) return lst - def stream_publish(self, name, stream_data, stream_type=None, perm=0o444, uid=None, gid=None): - """r.stream_publish(name, stream_data, stream_type=None, perm=0o444, uid=None, gid=None) + def stream_publish(self, name, stream_data, stream_type=None, + perm=0o444, uid=None, gid=None, + sr_client=None, schema_def=None): + """x.stream_publish(name, stream_data, stream_type=None, perm=0o444, + uid=None, gid=None, sr_client=None, schema_def=None) Publish a stream directly to the remote peer. The local stream client - will not get the stream data. + will NOT get the stream data. + + For LDMS_STREAM_AVRO_SER type, SchemaRegistryClient `sr_client` and + schema definition `schema_def` is required. The `stream_data` object + will be serialized by Avro using `schema_def` (see parameter description + below). The `schema_def` is also registered to the Schema Registry with + `sr_client`. Arguments: - name (str): The name of the stream being published. @@ -3908,61 +3984,31 @@ cdef class Xprt(object): LDMS_STREAM_JSON or None, the stream_data is converted into JSON string representation with `json.dumps(stream_data)`. - stream_type (enum): - LDMS_STREAM_JSON or LDMS_STREAM_STRING or None. - If the type is `None`, it is inferred from the + LDMS_STREAM_JSON or LDMS_STREAM_STRING or LDMS_STREAM_AVRO_SER + or None. If the type is `None`, it is inferred from the `type(stream_data)`: LDMS_STREAM_STRING for `str` and `bytes` types, and LDMS_STREAM_JSON for `dict` type. If `type(stream_data)` is something else, TypeError is raised. - perm (int): The file-system-style permission bits (e.g. 0o444). - uid (int or str): Publish as the given uid; None for euid. - gid (int or str): Publish as the given gid; None for egid. + - sr_client (SchemaRegistryClient): + required if `stream_type` is `LDMS_STREAM_AVRO_SER`. In this + case, the stream data is encoded in Avro format, and the schema + is registered to SchemaRegistry. + - schema_def (object): The Schema definition, required for + LDMS_STREAM_AVRO_SER stream_type. This can be of type: + `dict`, `str` (JSON formatted), `avro.Schema`, or + `confluent_kafka.schema_registry.Schema`. The `dict` and `str` + (JSON) must follow Apache Avro Schema specification: + https://avro.apache.org/docs/1.11.1/specification/ """ - cdef int rc - cdef ldms_cred cred - - _t = type(stream_data) - # stream type - if stream_type is None: - if _t is dict: - # JSON - stream_type = ldms.LDMS_STREAM_JSON - elif _t in (str, bytes): - stream_type = ldms.LDMS_STREAM_STRING - else: - raise TypeError(f"Cannot infer stream_type from the type of stream_data ({type(stream_data)})") - if stream_type == ldms.LDMS_STREAM_JSON and _t is dict: - stream_data = json.dumps(stream_data) - - # uid - if type(uid) is str: - o = getpwnam(uid) - cred.uid = o.pw_uid - elif type(uid) is int: - cred.uid = uid - elif uid is None: - cred.uid = geteuid() - else: - raise TypeError(f"Type '{type(uid)}' is not supported for `uid`") - - # gid - if type(gid) is str: - o = getgrnam(gid) - cred.gid = o.gr_gid - elif type(gid) is int: - cred.gid = gid - elif gid is None: - cred.gid = getegid() - else: - raise TypeError(f"Type '{type(gid)}' is not supported for `gid`") - - rc = ldms_stream_publish(self.xprt, BYTES(name), stream_type, &cred, perm, - BYTES(stream_data), len(stream_data)) - if rc: - raise RuntimeError(f"ldms_stream_publish() failed, rc: {rc}") + return __stream_publish(PTR(self.xprt), name, stream_data, stream_type, + perm, uid, gid, sr_client = sr_client, schema_def = schema_def) def stream_subscribe(self, match, is_regex, cb=None, cb_arg=None, rx_rate=-1): - """r.stream_subscribe(match, is_regex, cb=None, cb_arg=None) + """x.stream_subscribe(match, is_regex, cb=None, cb_arg=None) `cb()` signature: `cb(StreamStatusEvent ev, object cb_arg)` @@ -4208,6 +4254,31 @@ cdef class LdmsAddr(object): return False return False +SD_FRAME_FMT = ">bI" + +cdef object __deserialize_avro_ser(Ptr ev_ptr, StreamClient c): + cdef ldms_stream_event_t ev = ev_ptr.c_ptr + + import avro.io + import avro.schema + + # unframe + cdef int magic, schema_id + raw_data = ev.recv.data[:ev.recv.data_len] + sd_frame = raw_data[:5] + av_data = raw_data[5:] + + # get schema + magic, schema_id = struct.unpack(SD_FRAME_FMT, sd_frame) + sr_sch = c.sr_client.get_schema(schema_id) + av_sch = avro.schema.parse(sr_sch.schema_str) + + # parse data + dr = avro.io.DatumReader(av_sch) + de = avro.io.BinaryDecoder(io.BytesIO(av_data)) + obj = dr.read(de) + return obj + cdef class StreamData(object): """Stream Data""" cdef public bytes raw_data # bytes raw data @@ -4219,8 +4290,11 @@ cdef class StreamData(object): cdef public int gid # gid of the original publisher cdef public int perm # the permission of the data cdef public uint64_t tid # the thread ID creating the StreamData + cdef public object type # stream type + def __init__(self, name=None, src=None, tid=None, uid=None, gid=None, - perm=None, is_json=None, data=None, raw_data=None): + perm=None, is_json=None, data=None, raw_data=None, + _type=None): self.name = name self.src = src self.tid = tid @@ -4230,6 +4304,7 @@ cdef class StreamData(object): self.is_json = is_json self.data = data self.raw_data = raw_data + self.type = _type def __str__(self): return str(self.data) @@ -4250,23 +4325,32 @@ cdef class StreamData(object): return True @classmethod - def from_ldms_stream_event(cls, Ptr ev_ptr): + def from_ldms_stream_event(cls, Ptr ev_ptr, StreamClient c = None): cdef ldms_stream_event_t ev = ev_ptr.c_ptr assert( ev.type == LDMS_STREAM_EVENT_RECV ) raw_data = ev.recv.data[:ev.recv.data_len] - if ev.recv.type == LDMS_STREAM_JSON: + if ev.recv.type == LDMS_STREAM_STRING: + is_json = False + data = raw_data.decode() + elif ev.recv.type == LDMS_STREAM_JSON: is_json = True data = json.loads(raw_data.strip(b'\x00').strip()) + elif ev.recv.type == LDMS_STREAM_AVRO_SER: + is_json = False + data = __deserialize_avro_ser(ev_ptr, c) else: + # no data decode is_json = False - data = raw_data.decode() + data = raw_data name = ev.recv.name.decode() src = LdmsAddr.from_ldms_addr(PTR(&ev.recv.src)) uid = ev.recv.cred.uid gid = ev.recv.cred.gid perm = ev.recv.perm tid = threading.get_native_id() - obj = StreamData(name, src, tid, uid, gid, perm, is_json, data, raw_data) + obj = StreamData(name, src, tid, uid, gid, perm, is_json, data, + raw_data, + ldms.ldms_stream_type_e(ev.recv.type)) return obj cdef int __stream_client_cb(ldms_stream_event_t ev, void *arg) with gil: @@ -4276,7 +4360,7 @@ cdef int __stream_client_cb(ldms_stream_event_t ev, void *arg) with gil: if ev.type != LDMS_STREAM_EVENT_RECV: return 0 - sdata = StreamData.from_ldms_stream_event(PTR(ev)) + sdata = StreamData.from_ldms_stream_event(PTR(ev), c) if c.cb: c.cb(c, sdata, c.cb_arg) else: @@ -4440,24 +4524,30 @@ cdef class StreamClient(object): `def cb(StreamClient client, StreamData data, object cb_arg)` - cb_arg (object): an optional application callback argument. - desc (str): a short description of the client. + - sr_client (SchemaRegistryClient): a Confluent Kafka Client object, + required for processing AVRO_SER stream + data. """ cdef ldms_stream_client_t c cdef object cb # optional application callback cdef object cb_arg # optional application callback argument cdef object data_q + cdef object sr_client - def __init__(self, match, is_regex, cb=None, cb_arg=None, desc=None): + def __init__(self, match, is_regex, cb=None, cb_arg=None, desc=None, + sr_client=None): self.data_q = Queue() self.cb = cb self.cb_arg = cb_arg + self.sr_client = sr_client if desc is None: desc = "" - self.c = ldms_stream_subscribe(BYTES(match), is_regex, + self.c = ldms_stream_python_subscribe(BYTES(match), is_regex, __stream_client_cb, self, CSTR(BYTES(desc))) if not self.c: - raise RuntimeError(f"ldms_stream_subscribe() error, errno: {errno}") + raise RuntimeError(f"ldms_stream_python_subscribe() error, errno: {errno}") def close(self): if not self.c: diff --git a/ldms/src/core/Makefile.am b/ldms/src/core/Makefile.am index 0fa019bd8..f0df3ca2a 100644 --- a/ldms/src/core/Makefile.am +++ b/ldms/src/core/Makefile.am @@ -18,6 +18,7 @@ libldms_la_SOURCES = ldms.c ldms_xprt.c ldms_private.h \ ldms_heap.c ldms_heap.h \ ldms_rail.c ldms_rail.h \ ldms_stream.c ldms_stream.h \ + ldms_stream_avro_ser.c ldms_stream_avro_ser.h \ ldms_qgroup.c ldms_qgroup.h libldms_la_LIBADD = -ldl -lpthread $(top_builddir)/lib/src/coll/libcoll.la \ $(top_builddir)/lib/src/ovis_json/libovis_json.la \ @@ -25,6 +26,6 @@ libldms_la_LIBADD = -ldl -lpthread $(top_builddir)/lib/src/coll/libcoll.la \ $(top_builddir)/lib/src/mmalloc/libmmalloc.la \ $(top_builddir)/lib/src/zap/libzap.la \ $(top_builddir)/lib/src/ovis_log/libovis_log.la \ - @OPENSSL_LIBS@ + @OPENSSL_LIBS@ @LIBAVRO@ @LIBSERDES@ lib_LTLIBRARIES += libldms.la diff --git a/ldms/src/core/ldms.c b/ldms/src/core/ldms.c index a4825167f..94cdf11a1 100644 --- a/ldms/src/core/ldms.c +++ b/ldms/src/core/ldms.c @@ -6074,3 +6074,31 @@ ldms_mval_as_timestamp(ldms_mval_t mv, enum ldms_value_type type, int idx) } return ts; } + +const char *ldms_stream_type_sym(ldms_stream_type_t t) +{ + static const char *tbl[] = { + [LDMS_STREAM_STRING] = "LDMS_STREAM_STRING", + [LDMS_STREAM_JSON] = "LDMS_STREAM_JSON", + [LDMS_STREAM_AVRO_SER] = "LDMS_STREAM_AVRO_SER", + }; + + if (t < LDMS_STREAM_LAST) + return tbl[t]; + return "UNKNOWN"; +} + +const char *ldms_stream_event_type_sym(enum ldms_stream_event_type t) +{ + static const char *tbl[] = { + [LDMS_STREAM_EVENT_RECV] = "LDMS_STREAM_EVENT_RECV", + [LDMS_STREAM_EVENT_CLOSE] = "LDMS_STREAM_EVENT_CLOSE", + [LDMS_STREAM_EVENT_SUBSCRIBE_STATUS] = + "LDMS_STREAM_EVENT_SUBSCRIBE_STATUS", + [LDMS_STREAM_EVENT_UNSUBSCRIBE_STATUS] = + "LDMS_STREAM_EVENT_UNSUBSCRIBE_STATUS", + }; + if (t < LDMS_STREAM_EVENT_LAST) + return tbl[t]; + return "UNKNOWN"; +} diff --git a/ldms/src/core/ldms.h b/ldms/src/core/ldms.h index c93c1baa2..cbbf82d91 100644 --- a/ldms/src/core/ldms.h +++ b/ldms/src/core/ldms.h @@ -1381,8 +1381,35 @@ void ldms_qgroup_info_free(ldms_qgroup_info_t qinfo); typedef enum ldms_stream_type_e { LDMS_STREAM_STRING, LDMS_STREAM_JSON, + LDMS_STREAM_AVRO_SER, + LDMS_STREAM_LAST, /* the last enumureation; not a real type */ } ldms_stream_type_t; +/** + * \brief Stream Type Symbol. + * + * This function returns a constant string symbol (e.g. "LDMS_STREAM_STRING") of + * the given Stream type \c t. If the given type is out of valid range, + * "UNKNOWN" is returned. + * + * \param t The Stream type. + * + * \retval s The symbol (e.g. "LDMS_STREAM_STRING"). + */ +const char *ldms_stream_type_sym(ldms_stream_type_t t); + +typedef struct ldms_stream_client_s *ldms_stream_client_t; +typedef struct json_entity_s *json_entity_t; + +/* from avro/schema.h */ +typedef struct avro_obj_t *avro_schema_t; +/* from avro/value.h */ +typedef struct avro_value avro_value_t; +void avro_value_decref(avro_value_t *value); +/* from serdes.h */ +typedef struct serdes_s serdes_t; +typedef struct serdes_schema_s serdes_schema_t; + /** * \brief Publish stream data. * @@ -1408,6 +1435,81 @@ int ldms_stream_publish(ldms_t x, const char *stream_name, uint32_t perm, const char *data, size_t data_len); +struct ldms_stream_pubobj_str_s { + int len; /* strlen(str) + 1 */ + const char *str; +}; + +struct ldms_stream_pubobj_json_s { + int len; /* strlen(str) + 1 */ + const char *str; /* string of a JSON object */ +}; + +struct ldms_stream_pubobj_avro_ser_s { + avro_schema_t schema; + avro_value_t *value; + serdes_t *serdes; +}; + +typedef struct ldms_stream_pubobj_s { + const char *stream_name; /**< stream name */ + const struct ldms_cred *cred; /**< publisher credential */ + uint32_t perm; /**< permission */ + ldms_stream_type_t stream_type; /**< type of the stream */ + union { + struct ldms_stream_pubobj_str_s str; + struct ldms_stream_pubobj_json_s json; + struct ldms_stream_pubobj_avro_ser_s avro_ser; + }; + +} *ldms_stream_pubobj_t; + +/** + * Generic publish interface. + * + * \note \c pub object can be discarded after the call returns. + */ +int ldms_stream_publish_obj(ldms_t x, const struct ldms_stream_pubobj_s *pub); + +/** + * \brief Publish Avro \c value to LDMS Stream. + * + * To publish an Avro \c value, a Serdes connection \c serdes is required to + * register the schema of the given \c value. The \c sch parameter is an in/out + * parameter to supply/obtain such schema. + * + * If \c sch is NOT \c NULL and the value of \c *sch is also NOT \c NULL, the + * \c *sch Serdes Schema will be used in the data serialization; skipping the + * schema extraction (saving time). If \c sch is NOT \c NULL but \c *sch IS + * \c NULL, a Serdes Schema extraction is performed and the \c *sch is set to + * the extracted Schema so that the application can save it for later use. + * + * If \c sch IS \c NULL, a Serdes Schema extraction is still performed but won't + * be returned to the application. + * + * If \c x is not \c NULL, LDMS publishes the \c value to the peer over stream + * named \c stream_name. Otherwise, this will be local publishing where all + * matching clients (local and remote) will receive the data. + * + * If \c cred is \c NULL (recommended), the process UID/GID is used. Otherwise, + * the given \c cred is used if the publisher is \c root. If the publisher is + * not \c root, this will result in an error. + * + * \param x The LDMS transport. + * \param value The Avro value object. + * \param serdes The Schema Registry handle (required). + * \param[in,out] sch The serdes schema. + * + * \retval 0 If success. + * \retval ENOPROTOOPT If \c serdes "serializer.framing" is disabled. + * \retval EIO For other \c serdes -related errors. + * \retval errno Other errors. + */ +int ldms_stream_publish_avro_ser(ldms_t x, const char *stream_name, + ldms_cred_t cred, uint32_t perm, + avro_value_t *value, serdes_t *serdes, + struct serdes_schema_s **sch); + /** * Like \c ldms_stream_publsh(), but publish the content of a file. * @@ -1429,9 +1531,6 @@ int ldms_stream_publish_file(ldms_t x, const char *stream_name, uint32_t perm, FILE *file); -typedef struct ldms_stream_client_s *ldms_stream_client_t; -typedef struct json_entity_s *json_entity_t; - enum ldms_stream_event_type { LDMS_STREAM_EVENT_RECV, /* stream data received */ LDMS_STREAM_EVENT_SUBSCRIBE_STATUS, /* reporting subscription status */ @@ -1439,8 +1538,18 @@ enum ldms_stream_event_type { LDMS_STREAM_EVENT_CLOSE, /* reporting stream client close event. * This is the last event to deliver from a * client. */ + + LDMS_STREAM_EVENT_LAST, /* The last enumeration; not a real event */ }; +/** + * \brief String symbol of event type \c t for printing. + * + * \retval s The string symbol of the given event type (e.g. "LDMS_STREAM_EVENT_RECV"). + * \retval "UNKNOWn" If the event type \c t is out of range. + */ +const char *ldms_stream_event_type_sym(enum ldms_stream_event_type t); + /* For stream data delivery to the application */ struct ldms_stream_recv_data_s { ldms_stream_client_t client; @@ -1451,7 +1560,12 @@ struct ldms_stream_recv_data_s { uint32_t data_len; const char *name; /* stream name */ const char *data; /* stream data */ - json_entity_t json; /* json entity */ + + json_entity_t json; /* json entity (LDMS_STREAM_JSON type) */ + + avro_value_t *avro_value; /* avro value (for LDMS_STREAM_AVRO type) */ + serdes_schema_t *serdes_schema; /* serdes schema (for LDMS_STREAM_AVRO type)*/ + struct ldms_cred cred; /* credential */ uint32_t perm; /* permission */ uint32_t name_hash; /* stream name hash */ @@ -1506,6 +1620,30 @@ ldms_stream_subscribe(const char *stream, int is_regex, ldms_stream_event_cb_t cb_fn, void *cb_arg, const char *desc); +/** + * \brief Subscribe to a stream with Avro/Serdes decoding. + * + * This is essentially the same as \c ldms_stream_subscribe(), but with + * \c serdes handle to decode data encoded by Avro/Serdes (see also: + * \c ldms_stream_publish_avro_ser()). + * + * \param stream The stream name or regular expression. + * \param is_regex 1 if `stream` is a regular expression. Otherwise, 0. + * \param cb_fn The callback function for stream data delivery. + * \param cb_arg The application context to the `cb_fn`. + * \param desc An optional short description of the client of this subscription. + * This could be useful for client stats. + * \param serdes The \c serdes handle. + * + * \retval NULL If there is an error. In this case `errno` is set to describe + * the error. + * \retval ptr The stream client handle. + */ +ldms_stream_client_t +ldms_stream_subscribe_avro_ser(const char *stream, int is_regex, + ldms_stream_event_cb_t cb_fn, void *cb_arg, + const char *desc, serdes_t *serdes); + /** * \brief Terminate the stream client. * diff --git a/ldms/src/core/ldms_stream.c b/ldms/src/core/ldms_stream.c index 460216b0f..50393457f 100644 --- a/ldms/src/core/ldms_stream.c +++ b/ldms/src/core/ldms_stream.c @@ -80,6 +80,7 @@ #include "ldms_rail.h" #include "ldms_stream.h" #include "ldms_qgroup.h" +#include "ldms_stream_avro_ser.h" /* The definition is in ldms.c. */ extern int __enable_profiling[LDMS_XPRT_OP_COUNT]; @@ -628,6 +629,8 @@ __stream_deliver(struct __stream_buf_s *sbuf, uint64_t msg_gn, .sbuf = sbuf, }; json_entity_t json = NULL; + avro_value_t *av = NULL; + serdes_schema_t *ssch = NULL; if (sbuf) _ev.pub.recv.src = sbuf->msg->src; @@ -693,7 +696,18 @@ __stream_deliver(struct __stream_buf_s *sbuf, uint64_t msg_gn, gc = 1; continue; } - if (!json && stream_type == LDMS_STREAM_JSON && !c->x) { + + if (c->is_python) { + /* data parsing for Python client is done in ldms.pyx */ + goto skip_parse; + } + if (c->x) { + /* no need to parse data for remote client */ + goto skip_parse; + } + + /* JSON parsing */ + if (!json && stream_type == LDMS_STREAM_JSON) { /* json object is only required to parse once for * the local client */ struct json_parser_s *jp = json_parser_new(0); @@ -708,6 +722,24 @@ __stream_deliver(struct __stream_buf_s *sbuf, uint64_t msg_gn, goto cleanup; } } + + /* AVRO/Serdes parsing */ + if (!av && stream_type == LDMS_STREAM_AVRO_SER) { + /* avro value is only required to parse once for + * the local client. + * + * The remote client does not need translation; we can + * just forward it. */ + rc = avro_value_from_stream_data(data, data_len, + c->serdes, + &av, &ssch); + if (rc) + continue; + _ev.pub.recv.avro_value = av; + _ev.pub.recv.serdes_schema = ssch; + } + + skip_parse: ref_get(&c->ref, "callback"); pthread_rwlock_unlock(&s->rwlock); _ev.pub.recv.client = c; @@ -730,6 +762,11 @@ __stream_deliver(struct __stream_buf_s *sbuf, uint64_t msg_gn, cleanup: if (json) json_entity_free(json); + if (av) { + /* av is a structrue allocated by avro_value_from_stream_data */ + avro_value_decref(av); + free(av); + } pthread_rwlock_unlock(&s->rwlock); if (gc) { /* remove unbound sce from s->client_tq */ @@ -1040,7 +1077,7 @@ static void __client_ref_free(void *arg) } /* subscribe the client to the streams */ -static int +int __client_subscribe(struct ldms_stream_client_s *c) { int rc = 0; @@ -1089,7 +1126,7 @@ __client_subscribe(struct ldms_stream_client_s *c) return rc; } -static ldms_stream_client_t +ldms_stream_client_t __client_alloc(const char *stream, int is_regex, ldms_stream_event_cb_t cb_fn, void *cb_arg, const char *desc) @@ -1135,7 +1172,7 @@ __client_alloc(const char *stream, int is_regex, return c; } -static void +void __client_free(ldms_stream_client_t c) { ref_put(&c->ref, "init"); @@ -1168,6 +1205,34 @@ ldms_stream_subscribe(const char *stream, int is_regex, return c; } +ldms_stream_client_t +ldms_stream_python_subscribe(const char *stream, int is_regex, + ldms_stream_event_cb_t cb_fn, void *cb_arg, + const char *desc) +{ + ldms_stream_client_t c = NULL; + int rc; + + if (!cb_fn) { + errno = EINVAL; + goto out; + } + + c = __client_alloc(stream, is_regex, cb_fn, cb_arg, desc); + if (!c) + goto out; + c->is_python = 1; + rc = __client_subscribe(c); + if (rc) { + __client_free(c); + c = NULL; + errno = rc; + } + + out: + return c; +} + void ldms_stream_close(ldms_stream_client_t c) { struct ldms_stream_client_entry_s *sce; diff --git a/ldms/src/core/ldms_stream.h b/ldms/src/core/ldms_stream.h index ba896a23f..51e77aaea 100644 --- a/ldms/src/core/ldms_stream.h +++ b/ldms/src/core/ldms_stream.h @@ -120,6 +120,10 @@ struct ldms_stream_client_s { struct ldms_rail_rate_quota_s rate_quota; + serdes_t *serdes; /* serdes handle */ + + int is_python; /* mark if this is a client from Python */ + int desc_len; char *desc; /* a short description at &match[match_len] */ int match_len; /* length of c->match[], including '\0' */ @@ -179,4 +183,12 @@ int __rep_publish(struct ldms_rail_ep_s *rep, const char *stream_name, struct ldms_stream_hop *hops, const char *data, size_t data_len, struct strm_publish_profile_s *pts); + +ldms_stream_client_t +__client_alloc(const char *stream, int is_regex, + ldms_stream_event_cb_t cb_fn, void *cb_arg, + const char *desc); +void __client_free(ldms_stream_client_t c); +int __client_subscribe(struct ldms_stream_client_s *c); + #endif /* __LDMS_STREAM_H__ */ diff --git a/ldms/src/core/ldms_stream_avro_ser.c b/ldms/src/core/ldms_stream_avro_ser.c new file mode 100644 index 000000000..3e48c34e2 --- /dev/null +++ b/ldms/src/core/ldms_stream_avro_ser.c @@ -0,0 +1,179 @@ +#include "config.h" +#include "ldms.h" +#include "ldms_stream.h" +#include "ldms_stream_avro_ser.h" + +#if HAVE_LIBAVRO && HAVE_LIBSERDES + +#include "avro.h" +#include +#include + +serdes_schema_t * serdes_schema_from_avro(serdes_t *sd, avro_schema_t asch) +{ + char buf[4096] = ""; /* should be sufficient? */ + char ebuf[4096]; + serdes_schema_t *ssch = NULL; + const char *name; + int rc; + avro_writer_t aw = avro_writer_memory(buf, sizeof(buf)); + rc = avro_schema_to_json(asch, aw); + if (rc) { + errno = rc; + goto out; + } + name = avro_schema_name(asch); + ssch = serdes_schema_add(sd, name, -1, buf, strlen(buf), + ebuf, sizeof(ebuf)); + if (!ssch) { + errno = EIO; + } + /* serdes schema is cached */ + avro_writer_free(aw); + out: + return ssch; +} + +int ldms_stream_publish_avro_ser(ldms_t x, const char *stream_name, + ldms_cred_t cred, uint32_t perm, + avro_value_t *value, serdes_t *sd, + struct serdes_schema_s **sch) +{ + serdes_schema_t *ssch = NULL; + avro_schema_t asch; + char ebuf[4096]; + serdes_err_t serr; + int rc; + size_t sz; + void *payload; + + if (0 == serdes_serializer_framing_size(sd)) { + /* Need serdes "serializer.framing" enabled */ + return ENOPROTOOPT; + } + + if (sch) + ssch = *sch; + if (!ssch) { + /* need to build serdes schema */ + asch = avro_value_get_schema(value); + ssch = serdes_schema_from_avro(sd, asch); + if (!ssch) + return errno; + } + if (sch) + *sch = ssch; + payload = NULL; + serr = serdes_schema_serialize_avro(ssch, value, &payload, &sz, + ebuf, sizeof(ebuf)); + if (serr != SERDES_ERR_OK) { + return EIO; + } + + /* We can use existing stream_publish to publish the serialized data */ + rc = ldms_stream_publish(x, stream_name, LDMS_STREAM_AVRO_SER, + cred, perm, payload, sz); + return rc; +} + +int avro_value_from_stream_data(const char *data, size_t data_len, + serdes_t *sd, avro_value_t **aout, + serdes_schema_t **sout) +{ + int rc = 0; + avro_value_t *av; + char ebuf[4096]; + serdes_err_t serr; + + if (!sd) { + rc = EINVAL; + goto out; + } + + av = malloc(sizeof(*av)); + if (!av) { + rc = errno; + goto out; + } + + serr = serdes_deserialize_avro(sd, av, sout, + data, data_len, + ebuf, sizeof(ebuf)); + if (serr) { + free(av); + av = NULL; + rc = EIO; + goto out; + } + /* caller will free av later */ + *aout = av; + rc = 0; + out: + return rc; +} + +ldms_stream_client_t +ldms_stream_subscribe_avro_ser(const char *stream, int is_regex, + ldms_stream_event_cb_t cb_fn, void *cb_arg, + const char *desc, serdes_t *serdes) +{ + ldms_stream_client_t c = NULL; + int rc; + + if (!cb_fn) { + errno = EINVAL; + goto out; + } + + if (!serdes) { + errno = EINVAL; + goto out; + } + + c = __client_alloc(stream, is_regex, cb_fn, cb_arg, desc); + if (!c) + goto out; + c->serdes = serdes; + rc = __client_subscribe(c); + if (rc) { + __client_free(c); + c = NULL; + errno = rc; + } + + out: + return c; +} + +#else +/* HAVE_LIBAVRO == 0 or HAVE_LIBSERDES == 0 */ + +void avro_value_decref(avro_value_t *value) +{ + /* no-op */ +} + +int ldms_stream_publish_avro_ser(ldms_t x, const char *stream_name, + ldms_cred_t cred, uint32_t perm, + avro_value_t *value, serdes_t *sd, + struct serdes_schema_s **sch) +{ + return ENOSYS; +} + +int avro_value_from_stream_data(const char *data, size_t data_len, + serdes_t *sd, avro_value_t **aout, + serdes_schema_t **sout) +{ + return ENOSYS; +} + +ldms_stream_client_t +ldms_stream_subscribe_avro_ser(const char *stream, int is_regex, + ldms_stream_event_cb_t cb_fn, void *cb_arg, + const char *desc, serdes_t *serdes) +{ + errno = ENOSYS; + return NULL; +} +#endif diff --git a/ldms/src/core/ldms_stream_avro_ser.h b/ldms/src/core/ldms_stream_avro_ser.h new file mode 100644 index 000000000..5fb10d1ab --- /dev/null +++ b/ldms/src/core/ldms_stream_avro_ser.h @@ -0,0 +1,59 @@ +/* -*- c-basic-offset: 8 -*- + * Copyright (c) 2024 National Technology & Engineering Solutions + * of Sandia, LLC (NTESS). Under the terms of Contract DE-NA0003525 with + * NTESS, the U.S. Government retains certain rights in this software. + * Copyright (c) 2024 Open Grid Computing, Inc. All rights reserved. + * + * This software is available to you under a choice of one of two + * licenses. You may choose to be licensed under the terms of the GNU + * General Public License (GPL) Version 2, available from the file + * COPYING in the main directory of this source tree, or the BSD-type + * license below: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials provided + * with the distribution. + * + * Neither the name of Sandia nor the names of any contributors may + * be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * Neither the name of Open Grid Computing nor the names of any + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * Modified source versions must be plainly marked as such, and + * must not be misrepresented as being the original software. + * + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef __LDMS_STREAM_AVRO_SER_H__ +#define __LDMS_STREAM_AVRO_SER_H__ + +#include "ldms.h" + +int avro_value_from_stream_data(const char *data, size_t data_len, + serdes_t *sd, avro_value_t **aout, + serdes_schema_t **sout); + +#endif diff --git a/ldms/src/sampler/hello_stream/hello_sampler.c b/ldms/src/sampler/hello_stream/hello_sampler.c index bc67da294..166f1446e 100644 --- a/ldms/src/sampler/hello_stream/hello_sampler.c +++ b/ldms/src/sampler/hello_stream/hello_sampler.c @@ -103,6 +103,9 @@ static int hello_recv_cb(ldms_stream_event_t ev, void *arg) case LDMS_STREAM_STRING: type = "STRING"; break; + default: + /* unhandled type */ + return 0; } ovis_log(mylog, OVIS_LCRITICAL, "stream_type: %s, msg: \"%s\", " "msg_len: %d, entity: %p\n",