diff --git a/docs/Makefile.am b/docs/Makefile.am index 1a4a5e56f..478d64f61 100644 --- a/docs/Makefile.am +++ b/docs/Makefile.am @@ -11,4 +11,7 @@ if HAVE_RST2MAN %.man: %.rst rst2man $< $@ +EXTRA_DIST += ldms_stream.rst +man7_MANS += ldms_stream.man + endif diff --git a/docs/index.rst b/docs/index.rst index d71982be6..46a720096 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -3,3 +3,5 @@ .. toctree:: :hidden: :maxdepth: 2 + + ldms_stream diff --git a/docs/ldms_stream.rst b/docs/ldms_stream.rst new file mode 100644 index 000000000..021c02893 --- /dev/null +++ b/docs/ldms_stream.rst @@ -0,0 +1,473 @@ +=========== +ldms_stream +=========== + +--------------------------------------------- +publish/subscribe stream data in LDMS network +--------------------------------------------- + +:Version: LDMS 4.5 +:Date: 2024-12-03 +:Manual section: 7 +:Manual group: LDMS + +SYNOPSIS +======== + +ldmsd_controller commands +------------------------- + +.. parsed-literal:: + + ``prdcr_subscribe`` ``regex``\ =\ `PRDCR_REGEX` ``stream``\ =\ `NAME_REGEX` + + ``prdcr_unsubscribe`` ``regex``\ =\ `PRDCR_REGEX` ``stream``\ =\ `NAME_REGEX` + + +C APIs +------ + +.. code:: c + + #include "ldms.h" + + int ldms_stream_publish(ldms_t x, const char *stream_name, + ldms_stream_type_t stream_type, + ldms_cred_t cred, + uint32_t perm, + const char *data, size_t data_len); + + typedef int (*ldms_stream_event_cb_t)(ldms_stream_event_t ev, void *cb_arg); + + ldms_stream_client_t ldms_stream_subscribe(const char *stream, + int is_regex, ldms_stream_event_cb_t cb_fn, + void *cb_arg, const char *desc); + void ldms_stream_close(ldms_stream_client_t c); + + int ldms_stream_remote_subscribe(ldms_t x, const char *stream, int is_regex, + ldms_stream_event_cb_t cb_fn, void *cb_arg, + int64_t rate); + int ldms_stream_remote_unsubscribe(ldms_t x, const char *stream, int is_regex, + ldms_stream_event_cb_t cb_fn, void *cb_arg); + + /* See "ldms.h" for the detailed API documentation */ + + +Python APIs +----------- + +.. code:: python + + from ovis_ldms import ldms + + ldms.stream_publish(name=, stream_data=, + stream_type=, + perm=) + + xprt = ldms.Xprt() + xprt.connect(host="node0", port=411) + + xprt.stream_publish(name=, stream_data=, + stream_type=, + perm=) + + xprt.stream_subscribe(match=, is_regex=) + + xprt.stream_unsubscribe(match=, is_regex=) + + cli = ldms.StreamClient(match=, is_regex=, cb=, + cb_arg=) + # StreamClient callback signature + def cb(StreamClient client, StreamData data, object cb_arg) + + data = cli.get_data() + + cli.close() + + # for more detailed description and usage + help(ldms) + + +DESCRIPTION +=========== + +LDMS Stream is a service in LDMS for publishing variable-length data to LDMS +proecesses, and for receiving such data from LDMS processes via stream +subscription. When the published data arrive at an LDMS process the +`stream_client`'s in the process that are authorized to see data will receive +the data via the callback function `cb_fn()`. If there are remote subscribers on +the LDMS process, the data will be forwarded to them if they are allowed to see +the data. + +An LDMS Daemon (``ldmsd``) has to be configured with ``prdcr_subscribe`` +commands in order to receive stream data from its producers (``prdcr``). +``prdcr_subscribe`` can be issued many times, e.g. + +.. code:: sh + + # subscribe "s0" stream on all producers + prdcr_subscribe regex=.* stream=s0 + # subscribe "s1" stream on all producers + prdcr_subscribe regex=.* stream=s1 + +The ``stream`` can also be regular expression, e.g. + +.. code:: sh + + # subscribe streams matching "app.*" or "sys.*" + prdcr_subscribe regex=.* stream=app.* + prdcr_subscribe regex=.* stream=sys.* + +This is the setup for the following figure: + +- ``bob_app``: an application run by ``bob``. It LDMS-connects to ``samp``. + +- ``samp``: an LDMS daemon (sampler). + + - A plugin in ``samp`` has an LDMS Stream Client ``cli`` that subscribes to + all streams (regex ``.*``). + + - Another plugin ``plug0`` in ``samp`` publishes ``s1`` stream. + +- ``agg``: another LDMS daemon (aggregator). It has an LDMS connection to + ``samp``. + + - ``agg`` subscribes ``.*`` streams on ``samp`` with the following command: + + - ``prdcr_subscribe regex=samp stream=.*`` + +- ``alice_app``: an application run by alice that LDMS-conencts to ``agg``. + + - ``alice_app`` subscribe for ``s0`` + + - ``alice_app`` has an LDMS Stream Client ``cli`` that subscribes to ``"my"`` + stream. + +The ``-->`` arrows illustrate possible stream data paths. + +:: + + ┌──────────────┐ ┌────────┐ + ┌───────────┐ │ samp │ │ agg │ + │bob_app │ ├──────────────┤ ├────────┤ + ├───────────┤ │ .----. │ │ .----. │ + │ │ .----->|ldms|---------------->|ldms| │ + │publish(s0)│ | │ '-+-+'<---.│ │ '----' │ + │ | │ | │ | |│ └────|───┘ + │ v │ | │.----' |│ .-------' + │.----. │ | │| .------. |│ | ┌────────────┐ + │|ldms|--------' │| |cli:.*| |│ | │ alice_app │ + │'----' │ │| |------| |│ | ├────────────┤ + └───────────┘ │'>|cb_fn | |│ | │ .----. │ + │ '------' |│ '---->|ldms|--.│ + │ |│ │ '----' |│ + │ |│ │ |│ + │.-----------.|│ │ |│ + │| plug0 ||│ │ .------. |│ + │|-----------||│ │ |cli:s0| |│ + │|publish(s1)|'│ │ |------| |│ + │'-----------' │ │ |cb_fn |<'│ + └──────────────┘ │ '------' │ + └────────────┘ + + + +``bob_app`` publishes stream data by calling ``ldms_stream_publish()`` function. +Let's assume that ``bob_app`` publishes ``s0`` stream data over the LDMS +transport to ``samp`` with ``0400`` permission. + +When ``s0`` stream data from ``bob_app`` arrives ``samp`` daemon, the logic in +``ldms`` library does the following: + +1. **Credential check**: ``ldms`` library checks the credential in the stream + message against the credential in the transport. If they are not the same, + the message is dropped to prevent user impersonation. The exception + is that ``root`` can impersonate any user so that ``ldmsd``'s can propagate + user messages as user. + +2. **Client iteration**: ``ldms`` library Goes through all clients that + subscribe for ``s0`` stream (including the macthing clients that subscribe + streams with regular expression). + +3. **Authorization check**: Then, ``ldms`` library checks if the clients should + be seeing the data with the credential information in the client, the + credential and permission information in the stream message. + +4. **Callbak**: clients' ``cb_fn()`` is called for the authorized clients. + Examples of information availble in the stream callback event are stream + name, stream data, original publisher's ``uid``, ``gid`` and address. + Currently, a user can publish data to any stream. It is up to the receiver + side to decide what to do. + +In this particular case, we will have 2 clients on ``samp``: the ``cli`` that +subscribes for all streams (regex ``.*``), and a *hidden* client for remote +subscription (remote client for short) created when ``samp`` received a +subscription request message from ``agg`` (by ``prdcr_subscribe`` command in +``agg``). The ``cb_fn()`` of the remote client is an internal function in LDMS +library that forwards the stream message to the subscribing peer. Note that the +credential of the remote client is the credential from the LDMS transport +authentication. + +Now, ``s0`` stream data has reached ``agg``, which has only one remote client: +``alice_app`` that subscribed ``s0`` stream. The ``ldms`` logic in ``agg`` will +NOT forward this particular stream message to ``alice_app`` because ``bob_app`` +the original publisher set ``0400`` permission. + +If ``bob_app`` published another message on ``s0`` stream to ``samp`` with +``0444`` permission, when it reached ``agg``, it will be forwarded it to +``alice_app``. ``cb_fn()`` on ``alice_app`` will be called once the ``s0`` data +reached it. + +On another path, let's consider ``publish(s1)`` in ``plug0`` plugin in ``samp`` +process. When ``plug0`` publishes ``s1`` with ``NULL`` transport (publishing +locally), the ``ldms`` library in ``samp`` process does the same thing as if the +data were received from a remote peer. The ``cli`` client in another plugin that +subscribed for all streams will get the data (via ``cb_fn()``), and the remote +client to ``agg`` will also get the data if authorized. + + +CREDENTIALS AND PERMISSIONS +=========================== + +The ``ldms_stream_publish()`` function in C and the ``stream_publish()`` method +in Python both receive credential ``cred`` and permission ``perm``. If ``cred`` +is not set, the process' ``UID/GID`` are used. If a non-root user tries to +impersonate anotehr user, the ``ldms`` library on the receiver side will drop +the message. We allow ``root`` to impersonate other ``UID/GID`` so that users' +stream data can be preserved when propagated down the stream. Before forwarding +the stream data to the remote client, the remote client credential is checked if +it is allowed to see the data from ``cred`` with ``perm``. + + +CODE EXAMPLES +============= + +C publish example +----------------- + +.. code:: c + + #include "ldms.h" + + int main(int argc, char **argv) + { + ldms_t x; + int rc; + x = ldms_xprt_new_with_auth("sock", "munge", NULL); + /* synchronous connect for simplicity */ + rc = ldms_xprt_connect_by_name(x, "node1", "411", NULL, NULL); + if (rc) + return rc; + + /* publish to peer */ + rc = ldms_stream_publish(x, "s0", LDMS_STREAM_STRING, NULL, + 0400, "data", 5); + + /* publish to our process */ + rc = ldms_stream_publish(NULL, "json_stream", LDMS_STREAM_JSON, NULL, + 0400, "{\"attr\":\"value\"}", 17); + return rc; + } + + +C subscribe example +------------------- + +.. code:: c + + #include + #include + #include "ldms.h" + + int cb_fn0(ldms_stream_event_t ev, void *cb_arg); + int success_cb(ldms_stream_event_t ev, void *cb_arg); + + int main(int argc, char **argv) + { + int rc; + ldms_t x; + + /* connect to an ldmsd */ + x = ldms_xprt_new_with_auth("sock", "munge", NULL); + ldms_xprt_connect_by_name(x, "node1", "411", NULL, NULL); + + /* subscribe "s0" stream that reached us; cb_fn0 is the callback function */ + cli0 = ldms_stream_subscribe("s0", 0, cb_fn0, NULL, "s0 only"); + + + /* Ask ldmsd to forward "s0" stream to us; + * There will be NO success report callback since the function is `NULL`. */ + rc = ldms_stream_remote_subscribe(x, "s0", 0, NULL, NULL, LDMS_UNLIMITED); + if (rc) + return rc; + /* The non-zero `rc` is a synchronous error that can still be returned, + * e.g. EIO, ENOMEM, ENAMETOOLONG. */ + + /* ask ldmsd to forward streams matching "app.*" regex to us. + * `success_cb()` will be called once we know the result of the + * subscription. */ + rc = ldms_stream_remote_subscribe(x, "app.*", 1, success_cb, NULL, LDMS_UNLIMITED); + if (rc) + return rc; + + sleep(10); /* sleep 10 sec */ + + /* Request an unsubscription to "s0" stream. Note that the `stream` must + * match the subscription request. */ + rc = ldms_stream_remote_unsubscribe(x, "s0", 0, success_cb, NULL); + if (rc) + return rc; + + /* Request an unsubscription to "app.*" streams. Note that the `stream` must + * match the subscription request. */ + rc = ldms_stream_remote_unsubscribe(x, "app.*", 1, success_cb, NULL); + if (rc) + return rc; + + ldms_stream_close(cli0); + + sleep(5); /* wait a bit so that we can see the events */ + + return 0; + } + + int cb_fn0(ldms_stream_event_t ev, void *cb_arg) + { + if (ev->type == LDMS_STREAM_EVENT_CLOSE) { + /* + * The client is "closed". We can clean up resources + * associated with it here. No more event will occur + * on this client. + */ + struct ldms_stream_stats_s *stat; + stat = ldms_stream_client_get_stats(ev->close.client, 0); + printf("client closed:\n"); + printf(" - match: %s\n", stat->match); + printf(" - is_regex: %d\n", stat->is_regex); + printf(" - desc: %s\n", stat->desc); + ldms_stream_client_stats_free(stat); + return 0; + } + assert(ev->type == LDMS_STREAM_EVENT_RECV); + /* we expect RECV event or CLOSE event only */ + if (ev->recv.type == LDMS_STREAM_STRING) { + printf("stream name: %s\n", ev->recv.name); + printf("stream data: %s\n", ev->recv.data); + } + if (ev->recv.type == LDMS_STREAM_JSON) { + /* process `ev->recv.json` */ + } + } + + int success_cb(ldms_stream_event_t ev, void *cb_arg) + { + switch (ev->type) { + case LDMS_STREAM_EVENT_SUBSCRIBE_STATUS: + printf("stream '%s' subscription status: %d\n", ev->status.match, + ev->status.status); + break; + case LDMS_STREAM_EVENT_UNSUBSCRIBE_STATUS: + printf("stream '%s' unsubscription status: %d\n", ev->status.match, + ev->status.status); + break; + default: + printf("Unexpected event: %d\n", ev->type); + } + return 0; + } + + +Python publish examples +----------------------- + +.. code:: python + + from ovis_ldms import ldms + x = ldms.Xprt(name="sock", auth="munge") # LDMS socket transport /w munge + x.connect(host="node0", port=411) + + # Explicitly specify STRING type. + x.stream_publish(name="s0", "somedata", stream_type=ldms.LDMS_STREAM_STRING, + perm=0o400) + + # JSON; the `dict` data will be converted to JSON + x.stream_publish(name="s0", {"attr": "value"}, + stream_type=ldms.LDMS_STREAM_JSON, perm=0o400) + + # Assumed STRING type if data is `str` or `bytes` when `stream_type` is omitted + x.stream_publish(name="s0", "somedata", perm=0o400) + + # Assumed JSON type if data is `dict` when `stream_type` is omitted + x.stream_publish(name="app0", {"attr": "value"}, perm=0o400) + + # We can publish to our process too + ldms.stream_publish(name="s0", "data") + + +Python subscribe examples +------------------------- + +.. code:: python + + import time + from ovis_ldms import ldms + + x = ldms.Xprt(name="sock", auth="munge") # LDMS socket transport /w munge + x.connect(host="node0", port=411) + + def stream_recv_cb(cli, sd, cb_arg): + print(f"stream[{sd.name}]: {sd.data}") + + def stream_sub_status_cb(ev, cb_arg): + print(f"stream '{ev.name}' subscription status: {ev.status}") + + def stream_unsub_status_cb(ev, cb_arg): + print(f"stream '{ev.name}' unsubscription status: {ev.status}") + + # Subscribe "s0" stream that reaches our process. + # `stream_recv_cb()` will be called when "s0" stream reached our process. + cli0 = ldms.StreamClient(match="s0", cb=stream_recv_cb, cb_arg=None) + + # Subscribe "app.*" streams that reaches our process. + # Since no `cb` is given, "app.*" data that reaches our process will be + # stored in cli1. + cli1 = ldms.StreamClient(match="app.*", is_regex=True) + + # Request peer for "s0" stream data forwarding to us. + # The status result of the subscription will be notified via + # `stream_sub_status_cb`. + x.stream_subscribe("s0", cb=stream_sub_status_cb, cb_arg=None) + + # Request peer for "app.*" stream data forwarding to us. + # Since no `cb` is given, this call becomes blocking, waiting for the status + # event, and returns it. + ev = x.stream_subscribe("app.*", is_regex=True) + print(f"stream '{ev.name}' subscription status: {ev.status}") + + time.sleep(10) # wait a bit to get events + + # "s0" stream data were handled by `stream_recv_cb`. + + # Data of "app.*" streams are stored in `cli1` since no `cb` was given. + sd = cli1.get_data() + while sd is not None: + print(f"stream[{sd.name}]: {sd.data}") + sd = cli1.get_data() + + # Cancel our "s0" subscription from peer; notify result via `cb` + x.stream_unsubscribe("s0", cb=stream_unsub_status_cb, cb_arg=None) + + # Cancel our "app.*" subscription from peer; result via return object + ev = x.stream_unsubscribe("app.*", is_regex=True) + print(f"stream '{ev.name}' unsubscription status: {ev.status}") + + # Terminate stream clients and the connection + cli0.close() + cli1.close() + x.close() + + +SEE ALSO +======== + +ldmsd_controller(8)