diff --git a/configure.ac b/configure.ac index 0125665af..5969d2363 100644 --- a/configure.ac +++ b/configure.ac @@ -1116,6 +1116,7 @@ ldms/src/sampler/app_sampler/Makefile ldms/src/sampler/slingshot_metrics/Makefile ldms/src/sampler/slingshot_info/Makefile ldms/src/sampler/slingshot_switch/Makefile +ldms/src/sampler/json/Makefile ldms/src/sampler/zfs_topvdevs/Makefile ldms/src/sampler/zfs_leafvdevs/Makefile ldms/src/sampler/zfs_zpool/Makefile diff --git a/ldms/src/sampler/Makefile.am b/ldms/src/sampler/Makefile.am index 85126fa8c..7fe1bc454 100644 --- a/ldms/src/sampler/Makefile.am +++ b/ldms/src/sampler/Makefile.am @@ -30,6 +30,7 @@ ldmssamplerincludedir = $(includedir)/ldms/sampler ldmssamplerinclude_HEADERS = sampler_base.h SUBDIRS += netlink +SUBDIRS += json if ENABLE_LUSTRE SUBDIRS += lustre_client diff --git a/ldms/src/sampler/json/Makefile.am b/ldms/src/sampler/json/Makefile.am new file mode 100644 index 000000000..53968269b --- /dev/null +++ b/ldms/src/sampler/json/Makefile.am @@ -0,0 +1,19 @@ +pkglib_LTLIBRARIES = +dist_man7_MANS= + +AM_CPPFLAGS = @OVIS_INCLUDE_ABS@ +AM_LDFLAGS = @OVIS_LIB_ABS@ +COMMON_LIBADD = $(top_builddir)/ldms/src/sampler/libsampler_base.la \ + $(top_builddir)/ldms/src/core/libldms.la \ + $(top_builddir)/ldms/src/ldmsd/libldmsd_stream.la \ + @LDFLAGS_GETTIME@ \ + $(top_builddir)/lib/src/ovis_util/libovis_util.la \ + $(top_builddir)/lib/src/coll/libcoll.la \ + $(top_builddir)/lib/src/ovis_json/libovis_json.la \ + $(top_builddir)/lib/src/ovis_log/libovis_log.la \ + -lpthread + +dist_man7_MANS += Plugin_json_stream_sampler.man +libjson_stream_sampler_la_SOURCES = json_stream_sampler.c +libjson_stream_sampler_la_LIBADD = $(COMMON_LIBADD) +pkglib_LTLIBRARIES += libjson_stream_sampler.la diff --git a/ldms/src/sampler/json/Plugin_json_stream_sampler.man b/ldms/src/sampler/json/Plugin_json_stream_sampler.man new file mode 100644 index 000000000..631ed8c2f --- /dev/null +++ b/ldms/src/sampler/json/Plugin_json_stream_sampler.man @@ -0,0 +1,312 @@ +.\" Manpage for json_stream_sampler +.\" Contact ovis-help@ca.sandia.gov to correct errors or typos. +.TH man 7 "5 Aug 2023" "v4" "LDMSD Plugin JSON Stream Sampler man page" + +.SH NAME +Plugin_json_stream_sampler - man page for the LDMSD json_stream_sampler plugin + +.SH SYNOPSIS + +Within ldmsd_controller or a configuration file: + +.SY config +.BR name=\fBjson_stream_sampler\fR +.BI producer=\fIPRODUCER\fR +.BI instance=\fIINSTANCE\fR +.OP component_id=\fICOMP_ID\fR +.OP stream=\fINAME\fR +.OP uid=\fIUID\fR +.OP gid=\fIGID\fR +.OP perm=\fIPERM\fR +.OP heap_szperm=\fIBYTES\fR +.YS + +.SH DESCRIPTION +.P +The \fBjson_stream_store\fR monitors JSON object data presented on a configured +set of streams. JSON object data is encoded in LDMS Metric Sets; the +intention of which is to store these metric sets using decomposition +through a storage plugin. +.P +When publishing JSON dictionary data to \fBjson_stream_plugin\fR, +there are fields in the JSON dictionary that have special +meaning. These fields are shown in the table below: +.P +.TS +tab(@) box; +l l l . +\fBAttribute Name\fR @ \fBData Type\fR @ \fBDescription\fR +_ +schema @ string @ The name of a Metric Set schema for JSON dictionaries received on this stream. +\fINAME\fR_max_len @ integer @ For a list or array named \fINAME\fR, this is maximum length of the list or array. +.TE +.SS "Schema Management" +The value of the \fIschema\fR attribute in the top-level JSON +dictionary is maintained in a tree. The first time the schema name is +seen, an LDMS Schema is created based on the value of the JSON +dictionary. Once created, the schema is used to create the metric +set. Each time a stream message is received, the metric set is +updated. +.PP +The \fIschema\fR attribute is mandatory. If it not present in the +top-level JSON dictionary, an error is logged and the message is ignored. + +.SS "Encoding Types" +Primitive types are encoded as attributes in the LDMS metric set with +their associated LDMS type. The table below shows how the JSON +attributes are mapped to LDMS metric types. +.TS +tab(@) box; +l l l . +\fBJSON Type\fR @ \fBLDMS Type\fR @ \fBExample JSON Value\fR +_ +Integer @ LDMS_V_S64 @ 45 +Floating Point @ LDMS_V_D64 @ 3.1415 +String @ LDMS_V_BYTE_ARRAY @ "hello", 'world' +List @ LDMS_V_LIST @ [ 1, 2, 3 ] +Dictionary @ LDMS_V_RECORD @ { "attr1" : 1, "attr2" : 2, "attr3" : 3 } +.TE +.PP +The encoding of all JSON types except strings, dictionaries and lists is +straightfoward. The coding of Strings, Lists and Dictionaries have additional +limitations as described below. + +.SS "Stream Meta-data" +.PP +Stream events include the user-id, and group-id of the application +publishing the stream data. This data is encoded in the metric set +with the special names \fBS_uid\fR, and \fBS_gid\fR respectively. The +intention is that this data can stored in rows as configured by the +user with a decomposition configuration. + +.SS "Encoding Strings" +Strings are encoded as LDMS_V_BYTE_ARRAY. By default, the length of +the array is 255 unless an attribute with the name \fINAME\fR_max_len +is present in the dictionary along with the string value, its value is +used to size the string array. +.PP +For example: +.PP +.RS 4 +.nf +{ "my_string" : "this is a string", "my_string_max_len" : 4096 } +.fi +.RE +.PP +will result in an LDMS metric with the name "my_string", type +LDMS_V_BYTE_ARRAY, and length of 4096 being created in the metric set. + +.SS "Encoding Arrays" +Any list present in the top-level dictionary is encoded as a list, however, +lists present in a 2nd-level dictionary are encoded as arrays. This is because +LDMS_V_LIST inside an LDMS_V_RECORD is not supported. The length of the array +is determined by the initial value of the array in the record; but can be +overridden with the \fINAME\fR_max_len attribute as described above for +strings. Lists of strings in a 2nd-level dictionary are treated as a +JSON-formatted string of a list. That is, they are encoded as +LDMS_V_CHAR_ARRAY because LDMS does not support arrays of LDMS_V_CHAR_ARRAY. +The length of the array is determined by the length of the JSON-formatted +string of the initial list. + +.SS "Encoding Dictionaries" +The attributes in the top-level JSON dictionary are encoded in the metric +set directly. For example the JSON dictionary: +.PP +.RS 4 +.nf +{ + "schema" : "example", + "component_id", 10001, + "job_id" : 2048, + "seq" : [ 1, 2, 3 ] +} +.fi +.RE +.PP +results in a metric set as follows: +.fi +.RS 4 +.nf +$ ldms_ls -h localhost -p 10411 -a munge -E example -l +ovs-5416_example: consistent, last update: Sat Aug 05 11:38:26 2023 -0500 [281178us] +D s32 S_uid 1002 +D s32 S_gid 1002 +D s64 component_id 10001 +D s64 job_id 2048 +D list<> seq [1,2,3] +D char[] schema "example" +.fi +.RE +.PP +Dictionaries inside the top-level dictionary are encoded as +LDMS_V_RECORD inside a single element LDMS_V_RECORD_ARRAY. This +limitation is because an LDMS_V_RECORD is only allowed inside an +LDMS_V_LIST or LDMS_V_ARRAY. +.PP +The JSON below: +.RS 4 +.nf +{ + "schema" : "dictionary", + "a_dict" : { "attr_1" : 1, "attr_2" : 2 }, + "b_dict" : { "attr_3" : 3, "attr_4" : 4 } +} +.fi +.RE +.PP +results in the following LDMS metric set. +.PP +.RS 4 +.nf +ovs-5416_dict: consistent, last update: Sat Aug 05 21:14:38 2023 -0500 [839029us] +D s32 S_uid 1002 +D s32 S_gid 1002 +M record_type a_dict_record LDMS_V_RECORD_TYPE +D record[] a_dict + attr_2 attr_1 + 2 1 +M record_type b_dict_record LDMS_V_RECORD_TYPE +D record[] b_dict + attr_4 attr_3 + 4 3 +D char[] schema "dict" +.fi +.RE +.PP +Lists of JSON dictionaries results in each dictionary being encoded as +an element in an LDMS_V_LIST. Note that all elements in the list must +be the same type. +.PP +The JSON below: +.PP +.RS 4 +.nf +{ "schema" : "dict_list", + "a_dict_list" : [ + { "attr_1" : 1, "attr_2" : 2 }, + { "attr_1" : 3, "attr_2" : 4 } + ] +} +.fi +.RE +.PP +results in the following LDMS metric set. +.PP +.RS 4 +.nf +ovs-5416_dict_list: consistent, last update: Sat Aug 05 21:23:11 2023 -0500 [52659us] +D s32 S_uid 1002 +D s32 S_gid 1002 +M record_type a_dict_list_record LDMS_V_RECORD_TYPE +D list<> a_dict_list + attr_2 attr_1 + 2 1 + 4 3 +D char[] schema "dict_list" +.fi +.RE +.PP + +The JSON below: +.PP +.RS 4 +.nf +{ 'schema' : 'json_dict', + 'dict' : { 'int' : 10, + 'float' : 1.414, + 'char' : 'a', + 'str' : 'xyz', + 'array_int' : [5, 7, 9], + 'array_float' : [3.14, 1.414, 1.732], + 'array_str' : ['foo', 'bar'], + 'inner_dict' : { 'This': 'is', + 'a' : 'string' + } + } +} +.fi +.RE +.PP +results in the following LDMS metric sets. +.PP +.RS 4 +.nf +ovis-5416_lists_inside_a_dict: consistent, last update: Mon Sep 25 16:21:35 2023 -0500 [310003us] +D s32 S_uid 1000 +D s32 S_gid 1000 +M record_type dict_record LDMS_V_RECORD_TYPE +D record[] dict + int_array char str_array float inner_dict float_array str int + 5,7,9 "a" "["foo","bar"]" 1.414000 "{"This":"is","a":"string"}" 3.140000,1.414000,1.732000 "xyz" 10 +D char[] schema "json_dict" + +.SS "Set Security" +.PP +The metric sets' UID, GID, and permission can be configured using the +configuration attributes uid, gid, and perm consecutively. If one is not given, +the value of the received stream data will be used at set creation. Once a +metric set has been created, the UID, GID, and permission will not be changed +automatically when the stream data's security data gets changed. However, it +could be modified via an LDMSD configuration command, set_sec_mod. See +ldmsd_controller's Man Page. + +Note that the UID, GID, and permissions values given at the configuration line +do not affect the S_uid and S_gid metric values. The S_uid and S_gid metric +values are always the security embeded with the stream data. + +.SH "CONFIG OPTIONS" + +.TP +.BR name=json_stream_sampler +This must be json_stream_sampler (the name of the plugin). +.TP +.BI producer=\fINAME\fR +The \fINAME\fR of the data producer (e.g. hostname). +.TP +.BI instance=\fINAME\fR +The \fINAME\fR of the set produced by this plugin. This option is required. +.TP +.BI component_id=\fIINT\fR +An integer identifying the component (default: \fI0\fR). +.TP +.BI stream=\fINAME\fR +The name of the LDMSD stream to register for JSON object data. +.TP +.BI uid=\fIUID\fR +The user-id to assign to the metric set. +.TP +.BI gid=\fIGID\fR +The group-id to assign to the metric set. +.TP +.BI perm=\fIOCTAL\fR +An octal number specifying the read-write permissions for the metric +set. See open(3). +.TP +.BI heap_sz=\fIBYTES\fR +The number of bytes to reserve for the metric set heap. +.TP + +.SH BUGS +.PP +Not all JSON objects can be encoded as metric sets. Support for +records nested inside other records is accomplished by encoding the +nested records as strings. + +.SH EXAMPLES + +Plugin configuration example: + +.RS +.EX +load name=json_stream_sampler +config name=json_stream_sampler producer=${HOSTNAME} instance=${HOSTNAME}/slurm \\ + component_id=2 stream=darshan_data heap_sz=1024 +start name=json_stream_sampler interval=1000000 +.EE +.RE + +.SH SEE ALSO +.nh +.BR ldmsd (8), +.BR ldmsd_controller (8), +.BR Plugin_store_avro_kakfa (8) diff --git a/ldms/src/sampler/json/json_stream_sampler.c b/ldms/src/sampler/json/json_stream_sampler.c new file mode 100644 index 000000000..184511e9f --- /dev/null +++ b/ldms/src/sampler/json/json_stream_sampler.c @@ -0,0 +1,1147 @@ +/* -*- c-basic-offset: 8 -*- + * Copyright (c) 2023 National Technology & Engineering Solutions + * of Sandia, LLC (NTESS). Under the terms of Contract DE-NA0003525 with + * NTESS, the U.S. Government retains certain rights in this software. + * Copyright (c) 2023 Open Grid Computing, Inc. All rights reserved. + * + * This software is available to you under a choice of one of two + * licenses. You may choose to be licensed under the terms of the GNU + * General Public License (GPL) Version 2, available from the file + * COPYING in the main directory of this source tree, or the BSD-type + * license below: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials provided + * with the distribution. + * + * Neither the name of Sandia nor the names of any contributors may + * be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * Neither the name of Open Grid Computing nor the names of any + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * Modified source versions must be plainly marked as such, and + * must not be misrepresented as being the original software. + * + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include "coll/rbt.h" +#include "ovis_json/ovis_json.h" +#include "ldms.h" +#include "ldmsd.h" +#include "ldmsd_stream.h" +#include "ovis_log/ovis_log.h" + +#define SAMP "json_stream" + +static ovis_log_t __log = NULL; +#define LOG(_level_, _fmt_, ...) ovis_log(__log, _level_, "[%d] " _fmt_, __LINE__, ##__VA_ARGS__) +#define LCRITICAL(_fmt_, ...) ovis_log(__log, OVIS_LCRIT, "[%d]" _fmt_, __LINE__, ##__VA_ARGS__) +#define LERROR(_fmt_, ...) ovis_log(__log, OVIS_LERROR, "[%d] " _fmt_, __LINE__, ##__VA_ARGS__) +#define LWARN(_fmt_, ...) ovis_log(__log, OVIS_LWARN, "[%d] " _fmt_, __LINE__, ##__VA_ARGS__) +#define LINFO(_fmt_, ...) ovis_log(__log, OVIS_LINFO, "[%d] " _fmt_, __LINE__, ##__VA_ARGS__) +#define LDEBUG(_fmt_, ...) ovis_log(__log, OVIS_LDEBUG, "[%d] " _fmt_, __LINE__, ##__VA_ARGS__) + +static int str_cmp(void *tree_key, const void *srch_key) +{ + return strcmp(tree_key, srch_key); +} + +#define DEFAULT_CHAR_ARRAY_LEN 255 + +/* + * Some attributes of the JSON object have special meaning and/or are + * required. These include: + * - "schema" : Defines the unique schema name to use for the + * constructed metric set. If two JSON objects advertise + * the same schema name, but have different contents, the + * resulting object conversion is undefined.. + */ + +/* + * [ { }, { }, ... ] encoded as LIST of LDMS_V_RECORD + * [ int, int, ... ] encoded as list of LDMS_V_S64 + * [ string, string, ... ] encoded as list of ?? + * [ float, float, ... ] encoded as list of LDMS_V_D64 + * { } encoded as LDMS_V_RECORD + */ + +#ifndef ARRAY_LEN +#define ARRAY_LEN(a) (sizeof(a)/sizeof(*a)) +#endif /* ARRAY_LEN */ + +struct attr_entry { + char *name; /* Attribute name */ + int midx; /* The metric index in the set */ + int ridx; /* The record type index */ + enum json_value_e type; /* THE LDMS metric value type */ + struct rbn rbn; /* schema->attr_tree entry */ +}; + +struct schema_entry { + ldms_schema_t schema; /* The LDMS schema */ + char *name; /* The schema name. This is the key + in the schema tree */ + struct rbt attr_tree; /* This tree maps JSON object + attributes to conversion functions */ + struct rbn rbn; +}; +static struct rbt schema_tree = RBT_INITIALIZER(str_cmp); +static pthread_mutex_t schema_tree_lock = PTHREAD_MUTEX_INITIALIZER; + +static const char *usage(struct ldmsd_plugin *self) +{ + return \ + "config name=json_stream_sampler producer= \n" + " heap_sz= stream=\n" + " [instance=] [component_id=] [perm=]\n" + " [uid=] [gid=]\n" + " producer A unique name for the host providing the data\n" + " stream A stream name to subscribe to.\n" + " heap_sz The number of bytes to reserve for the set heap.\n" + " instance A unique name for the metric set. If none is given," + " the set instance name will be _.\n" + " component_id A unique number for the component being monitored.\n" + " The default is 0\n" + " uid The user-id of the set's owner (defaults to geteuid())\n" + " gid The group id of the set's owner (defaults to getegid())\n" + " perm The set's access permissions (defaults to 0777)\n"; +} + +static int make_record_array(ldms_record_t record, json_entity_t list_attr) +{ + json_entity_t list; + json_entity_t item; + size_t list_len; + jbuf_t jbuf; + int rc; + + list = json_attr_value(list_attr); + item = json_item_first(list); + if (!item) { + LERROR("Can't encode an empty list in an LDMS schema.\n"); + return EINVAL; /* Can't parse empty list */ + } + list_len = json_list_len(list); + switch (json_entity_type(item)) { + case JSON_INT_VALUE: + rc = ldms_record_metric_add(record, + json_attr_name(list_attr)->str, NULL, + LDMS_V_S64_ARRAY, list_len); + break; + case JSON_BOOL_VALUE: + rc = ldms_record_metric_add(record, + json_attr_name(list_attr)->str, NULL, + LDMS_V_S8_ARRAY, list_len); + break; + case JSON_FLOAT_VALUE: + rc = ldms_record_metric_add(record, + json_attr_name(list_attr)->str, NULL, + LDMS_V_D64_ARRAY, list_len); + break; + case JSON_STRING_VALUE: + jbuf = json_entity_dump(NULL, list); + if (!jbuf) { + LCRITICAL("Memory allocation failure.\n"); + rc = ENOMEM; + break; + } + rc = ldms_record_metric_add(record, + json_attr_name(list_attr)->str, NULL, + LDMS_V_CHAR_ARRAY, jbuf->cursor+1); + jbuf_free(jbuf); + break; + default: + LERROR("Invalid list entry type (%d) for encoding as array in record\n", + json_entity_type(item)); + rc = EINVAL; + } + return rc; +} + +static int make_record(ldms_schema_t schema, char *name, json_entity_t dict, + ldms_record_t *rec) +{ + int rc = -ENOMEM; + ldms_record_t record; + json_entity_t json_attr; + + record = ldms_record_create(name); + if (!record) + goto err_0; + + for (json_attr = json_attr_first(dict); json_attr; + json_attr = json_attr_next(json_attr)) { + json_entity_t json_value = json_attr_value(json_attr); + switch (json_entity_type(json_value)) { + case JSON_INT_VALUE: + rc = ldms_record_metric_add(record, + json_attr_name(json_attr)->str, NULL, + LDMS_V_S64, 0); + break; + case JSON_BOOL_VALUE: + rc = ldms_record_metric_add(record, + json_attr_name(json_attr)->str, NULL, + LDMS_V_S8, 0); + break; + case JSON_FLOAT_VALUE: + rc = ldms_record_metric_add(record, + json_attr_name(json_attr)->str, NULL, + LDMS_V_D64, 0); + break; + case JSON_STRING_VALUE: + rc = ldms_record_metric_add(record, + json_attr_name(json_attr)->str, NULL, + LDMS_V_CHAR_ARRAY, 255); + break; + case JSON_LIST_VALUE: + rc = make_record_array(record, json_attr); + break; + case JSON_DICT_VALUE: + LINFO("Encoding unsupported nested dictionary '%s' as a string value.\n", + json_attr_name(json_attr)->str); + rc = ldms_record_metric_add(record, + json_attr_name(json_attr)->str, NULL, + LDMS_V_CHAR_ARRAY, 255); + break; + default: + LERROR("Ignoring unsupported entity '%s[%s]') " + "in JSON dictionary.\n", + json_attr_name(json_attr)->str, + json_type_name(json_entity_type(json_value))); + }; + } + *rec = record; + return ldms_schema_record_add(schema, record); + err_0: + return rc; +} + +static int make_list(ldms_schema_t schema, json_entity_t parent, json_entity_t list_attr) +{ + json_entity_t list = json_attr_value(list_attr); + json_entity_t item, len_attr; + size_t item_size; + ldms_record_t record; + char *record_name; + enum json_value_e type; + int rc; + + item = json_item_first(list); + if (!item) + return 0; /* empty list */ + + type = json_entity_type(item); + switch (type) { + case JSON_FLOAT_VALUE: + item_size = ldms_list_heap_size_get(LDMS_V_D64, 1, 1); + break; + case JSON_INT_VALUE: + item_size = ldms_list_heap_size_get(LDMS_V_S64, 1, 1); + break; + case JSON_BOOL_VALUE: + item_size = ldms_list_heap_size_get(LDMS_V_S8, 1, 1); + break; + case JSON_STRING_VALUE: + item_size = ldms_list_heap_size_get(LDMS_V_CHAR_ARRAY, + 1, json_value_str(item)->str_len); + break; + case JSON_DICT_VALUE: + /* + * Add a record definition for the dictionary list item + */ + rc = asprintf(&record_name, "%s_record", json_attr_name(list_attr)->str); + rc = make_record(schema, record_name, item, &record); + free(record_name); + if (rc < 0) + return rc; + item_size = ldms_record_heap_size_get(record); + break; + default: + LERROR("Invalid item type encountered in list\n"); + return -EINVAL; + } + /* Check if there is a max specified for the list to override + * the current length */ + rc = asprintf(&record_name, "%s_max_len", + json_attr_name(list_attr)->str); + len_attr = json_attr_find(parent, record_name); + free(record_name); + size_t list_len = json_list_len(list); + if (len_attr) { + if (json_entity_type(json_attr_value(len_attr)) + != JSON_INT_VALUE) { + LERROR("The list length override for '%s' must be " + "an integer.\n", json_attr_name(list_attr)->str); + } else { + list_len = json_value_int(json_attr_value(len_attr)); + } + } + LINFO("Adding list '%s' with %zd elements of size %zd\n", + json_attr_name(list_attr)->str, list_len, item_size); + return ldms_schema_metric_list_add(schema, + json_attr_name(list_attr)->str, NULL, + 2 * item_size * list_len); +} + +typedef int (*json_setter_t)(ldms_set_t set, ldms_mval_t mval, json_entity_t entity, void *); +static json_setter_t setter_table[]; + +typedef int (*dict_list_setter_t)(ldms_mval_t rec_inst, int mid, int idx, json_entity_t value, void *); + +int JSON_INT_VALUE_setter(ldms_set_t set, ldms_mval_t mval, json_entity_t entity, void *ctxt) +{ + ldms_mval_set_s64(mval, json_value_int(entity)); + return 0; +} + +int JSON_BOOL_VALUE_setter(ldms_set_t set, ldms_mval_t mval, json_entity_t entity, void *ctxt) +{ + ldms_mval_set_s8(mval, (int8_t)json_value_bool(entity)); + return 0; +} + +int JSON_FLOAT_VALUE_setter(ldms_set_t set, ldms_mval_t mval, json_entity_t entity, void *ctxt) +{ + ldms_mval_set_double(mval, json_value_float(entity)); + return 0; +} + +int JSON_STRING_VALUE_setter(ldms_set_t set, ldms_mval_t mval, json_entity_t entity, void *ctxt) +{ + json_str_t v = json_value_str(entity); + ldms_mval_array_set_str(mval, v->str, v->str_len); + return 0; +} + +int JSON_ATTR_VALUE_setter(ldms_set_t set, ldms_mval_t mval, json_entity_t entity, void *ctxt) +{ + assert(0 == "Invalid JSON type setter"); + return EINVAL; +} + +int JSON_LIST_VALUE_setter(ldms_set_t set, ldms_mval_t list_mval, + json_entity_t list, void *ctxt) +{ + json_entity_t item; + enum json_value_e type; + ldms_mval_t item_mval; + int rc, i = 0; + char *rec_type_name = NULL; + int rec_idx; + + rc = ldms_list_purge(set, list_mval); + for (item = json_item_first(list); item; item = json_item_next(item)) { + type = json_entity_type(item); + LDEBUG("Setting list item %d of type %d\n", i, json_entity_type(item)); + + switch (type) { + case JSON_INT_VALUE: + item_mval = ldms_list_append_item(set, list_mval, LDMS_V_S64, 1); + break; + case JSON_BOOL_VALUE: + item_mval = ldms_list_append_item(set, list_mval, LDMS_V_S8, 1); + break; + case JSON_FLOAT_VALUE: + item_mval = ldms_list_append_item(set, list_mval, LDMS_V_D64, 1); + break; + case JSON_STRING_VALUE: + item_mval = ldms_list_append_item(set, list_mval, + LDMS_V_CHAR_ARRAY, 255); + break; + case JSON_DICT_VALUE: + rec_idx = -1; + if (!rec_type_name) { + rc = asprintf(&rec_type_name, "%s_record", (char *)ctxt); + if (rc >= 0) { + rec_idx = ldms_metric_by_name(set, rec_type_name); + free(rec_type_name); + rec_type_name = NULL; + } else { + LERROR("out of memory"); + rc = ENOMEM; + goto err; + } + } + if (rec_idx < 0) { + LERROR("item_not_found"); + rc = EINVAL; + goto err; + } + item_mval = ldms_record_alloc(set, rec_idx); + if (!item_mval) { + rc = ENOMEM; + LERROR("out of memory"); + goto err; + } + rc = ldms_list_append_record(set, list_mval, item_mval); + break; + default: + LERROR("Invalid list entry %d type (%d)\n", i, type); + rc = EINVAL; + goto err; + } + if (item_mval) { + rc = setter_table[type](set, item_mval, item, ctxt); + if (rc) + LERROR("Error %d setting list item %d\n", rc, i); + } else { + LERROR("NULL list item %d mval\n", i); + } + i++; + } + return 0; + err: + return rc; +} + +static int dict_list_set(ldms_mval_t rec_inst, int mid, json_entity_t list, void *ctxt); +int JSON_DICT_VALUE_setter(ldms_set_t set, ldms_mval_t rec_inst, json_entity_t dict, void *ctxt) +{ + json_entity_t attr; + ldms_mval_t mval; + int rc, idx; + jbuf_t jbuf; + size_t array_len; + + for (attr = json_attr_first(dict); attr; attr = json_attr_next(attr)) { + char *name = json_attr_name(attr)->str; + json_entity_t value = json_attr_value(attr); + enum json_value_e type = json_entity_type(value); + + idx = ldms_record_metric_find(rec_inst, name); + /* Ignore skipped values from json dictionary */ + if (idx < 0) { + LINFO("Ignoring '%s' attribute in JSON dictionary.\n", name); + continue; + } + mval = ldms_record_metric_get(rec_inst, idx); + switch (type) { + case JSON_DICT_VALUE: + /* This is a dictionary in another + * dictionary. LDMS_V_RECORD does not support + * nested records, so set the dictionary + * value to a JSON string */ + (void) ldms_record_metric_type_get(rec_inst, idx, &array_len); + jbuf = json_entity_dump(NULL, value); + if (array_len <= jbuf->cursor) { /* jbuf->cursor doesn't include '\0'. */ + LWARN("Dictionary attribute '%s' (%d) is larger " + "than the allocated space (%ld). " + "The string is chunked.\n", + name, jbuf->cursor, array_len); + /* Chunk the string */ + jbuf->buf[array_len] = '\0'; + } else { + /* Ensure that the string is null-terminated. */ + jbuf->buf[jbuf->cursor] = '\0'; + } + ldms_record_array_set_str(rec_inst, idx, jbuf->buf); + jbuf_free(jbuf); + rc = 0; + break; + case JSON_LIST_VALUE: + /* + * Record cannot have lists, so all lists in a dictionary + * is mapped to an array. A list of strings is encoded + * as char[]. + */ + rc = dict_list_set(rec_inst, idx, value, NULL); + break; + default: + rc = setter_table[type](set, mval, value, NULL); + break; + } + if (rc) + LERROR("Error %d setting record attribute '%s'\n", rc, name); + } + return 0; +} + +int JSON_NULL_VALUE_setter(ldms_set_t set, ldms_mval_t mval, json_entity_t entity, void *ctxt) +{ + return 0; +} + + +static json_setter_t setter_table[] = { + [JSON_INT_VALUE] = JSON_INT_VALUE_setter, + [JSON_BOOL_VALUE] = JSON_BOOL_VALUE_setter, + [JSON_FLOAT_VALUE] = JSON_FLOAT_VALUE_setter, + [JSON_STRING_VALUE] = JSON_STRING_VALUE_setter, + [JSON_ATTR_VALUE] = JSON_ATTR_VALUE_setter, + [JSON_LIST_VALUE] = JSON_LIST_VALUE_setter, + [JSON_DICT_VALUE] = JSON_DICT_VALUE_setter, + [JSON_NULL_VALUE] = JSON_NULL_VALUE_setter +}; + +static int DICT_LIST_INT_setter(ldms_mval_t rec_inst, int mid, int idx, json_entity_t item, void *ctxt) +{ + enum json_value_e jtype; + jtype = json_entity_type(item); + if (jtype != JSON_INT_VALUE) { + LERROR("List '%s' in a dictionary contains '%s' but expected '%s'.\n", + ldms_record_metric_name_get(rec_inst, mid), + json_type_name(jtype), json_type_name(JSON_INT_VALUE)); + return EINVAL; + } + + ldms_record_array_set_s64(rec_inst, mid, idx, json_value_int(item)); + return 0; +} + +static int DICT_LIST_BOOL_setter(ldms_mval_t rec_inst, int mid, int idx, json_entity_t item, void *ctxt) +{ + enum json_value_e jtype; + jtype = json_entity_type(item); + if (jtype != JSON_BOOL_VALUE) { + LERROR("List '%s' in a dictionary contains '%s' but expected '%s'.\n", + ldms_record_metric_name_get(rec_inst, mid), + json_type_name(jtype), json_type_name(JSON_BOOL_VALUE)); + return EINVAL; + } + ldms_record_array_set_s8(rec_inst, mid, idx, json_value_int(item)); + return 0; +} + +static int DICT_LIST_FLOAT_setter(ldms_mval_t rec_inst, int mid, int idx, json_entity_t item, void *ctxt) +{ + enum json_value_e jtype; + jtype = json_entity_type(item); + if (jtype != JSON_FLOAT_VALUE) { + LERROR("List '%s' in a dictionary contains '%s' but expected '%s'.\n", + ldms_record_metric_name_get(rec_inst, mid), + json_type_name(jtype), json_type_name(JSON_FLOAT_VALUE)); + return EINVAL; + } + ldms_record_array_set_double(rec_inst, mid, idx, json_value_float(item)); + return 0; +} + +static int DICT_LIST_STRING_setter(ldms_mval_t rec_inst, int mid, int idx, json_entity_t list, void *ctxt) +{ + size_t array_len; + jbuf_t jbuf; + + (void)ldms_record_metric_type_get(rec_inst, mid, &array_len); + + jbuf = json_entity_dump(NULL, list); + if (!jbuf) { + LCRITICAL("Memory allocation failure.\n"); + return ENOMEM; + } + + if (array_len <= jbuf->cursor) { + LWARN("The JSON-formatted of a list of strings (%d) is larger " + "than the allocated space. (%ld) The received data is chunked.\n", + jbuf->cursor, array_len); + jbuf->buf[array_len] = '\0'; + } + + ldms_record_array_set_str(rec_inst, mid, jbuf->buf); + jbuf_free(jbuf); + return 0; +} + +static dict_list_setter_t dl_setter_table[] = { + [JSON_INT_VALUE] = DICT_LIST_INT_setter, + [JSON_BOOL_VALUE] = DICT_LIST_BOOL_setter, + [JSON_FLOAT_VALUE] = DICT_LIST_FLOAT_setter, + [JSON_STRING_VALUE] = DICT_LIST_STRING_setter +}; + +static int dict_list_set(ldms_mval_t rec_inst, int mid, json_entity_t list, void *ctxt) +{ + int idx; + int rc = 0; + json_entity_t item; + enum json_value_e jtype; + size_t array_len; + + (void)ldms_record_metric_type_get(rec_inst, mid, &array_len); + if (json_list_len(list) > array_len) { + LWARN("List '%s' in a dictionary length (%ld) is larger than " + "the encoded array length (%ld). The extra items will be ignored.\n", + ldms_record_metric_name_get(rec_inst, mid), + json_list_len(list), array_len); + } + + item = json_item_first(list); + jtype = json_entity_type(item); + for (idx = 0; idx < array_len && item; idx++ , item = json_item_next(item)) { + switch (jtype) { + case JSON_INT_VALUE: + rc = dl_setter_table[JSON_INT_VALUE](rec_inst, mid, idx, item, ctxt); + break; + case JSON_BOOL_VALUE: + rc = dl_setter_table[JSON_BOOL_VALUE](rec_inst, mid, idx, item, ctxt); + break; + case JSON_FLOAT_VALUE: + rc = dl_setter_table[JSON_FLOAT_VALUE](rec_inst, mid, idx, item, ctxt); + break; + case JSON_STRING_VALUE: + rc = dl_setter_table[JSON_STRING_VALUE](rec_inst, mid, idx, list, ctxt); + break; + default: + LERROR(); + break; + } + } + + return rc; +} + +static int get_schema_for_json(char *name, json_entity_t e, ldms_schema_t *sch) +{ + int i, rc = 0; + ldms_schema_t schema; + struct schema_entry *entry; + struct rbn *rbn; + struct attr_entry *ae; + json_entity_t json_attr; + json_entity_t json_value; + ldms_record_t record; + enum json_value_e type; + char *record_name; + int midx, ridx = -1; + const char *attr_name; + + pthread_mutex_lock(&schema_tree_lock); + rbn = rbt_find(&schema_tree, name); + pthread_mutex_unlock(&schema_tree_lock); + if (rbn) { + entry = container_of(rbn, struct schema_entry, rbn); + *sch = entry->schema; + return 0; + } + schema = ldms_schema_new(name); + if (!schema) { + rc = errno; + goto err_0; + } + entry = calloc(1, sizeof(*entry)); + if (!entry) { + rc = errno; + goto err_1; + } + entry->schema = schema; + entry->name = strdup(name); + if (!entry->name) { + rc = errno; + goto err_2; + } + rbt_init(&entry->attr_tree, str_cmp); + rbn_init(&entry->rbn, entry->name); + + /* Add the special JSON stream attributes. These special + * attributes will have metric indices of 0 (S_uid), + * 1 (S_gid), and 2 (S_perm) + */ + const char *stream_meta_attr[] = { "S_uid", "S_gid", "S_perm" }; + for (i = 0; i < sizeof(stream_meta_attr) / sizeof(stream_meta_attr[0]); i++) { + midx = ldms_schema_metric_add(schema, stream_meta_attr[i], LDMS_V_S32); + if (midx < 0) + goto err_3; + ae = calloc(1, sizeof(*ae)); + if (!ae) { + rc = errno; + goto err_3; + } + ae->name = strdup(stream_meta_attr[i]); + if (!ae->name) { + rc = ENOMEM; + free(ae); + goto err_3; + } + ae->type = JSON_INT_VALUE; + ae->ridx = -1; + ae->midx = midx; + rbn_init(&ae->rbn, ae->name); + rbt_ins(&entry->attr_tree, &ae->rbn); + } + + for (json_attr = json_attr_first(e); json_attr; + json_attr = json_attr_next(json_attr)) { + + attr_name = json_attr_name(json_attr)->str; + if ( 0 == strcmp(attr_name, "S_uid") || + 0 == strcmp(attr_name, "S_gid") || + 0 == strcmp(attr_name, "S_perm")) { + /* S_uid, S_gid, S_perm already added above */ + continue; + } + json_value = json_attr_value(json_attr); + type = json_entity_type(json_value); + switch (type) { + case JSON_INT_VALUE: + midx = ldms_schema_metric_add(schema, + json_attr_name(json_attr)->str, + LDMS_V_S64); + break; + case JSON_BOOL_VALUE: + midx = ldms_schema_metric_add(schema, + json_attr_name(json_attr)->str, + LDMS_V_S8); + break; + case JSON_FLOAT_VALUE: + midx = ldms_schema_metric_add(schema, + json_attr_name(json_attr)->str, + LDMS_V_D64); + break; + case JSON_STRING_VALUE: + midx = ldms_schema_metric_array_add(schema, + json_attr_name(json_attr)->str, + LDMS_V_CHAR_ARRAY, DEFAULT_CHAR_ARRAY_LEN); + break; + case JSON_LIST_VALUE: + midx = make_list(schema, e, json_attr); + break; + case JSON_DICT_VALUE: + /* Add the record definition to the schema */ + rc = asprintf(&record_name, "%s_record", json_attr_name(json_attr)->str); + ridx = make_record(schema, record_name, + json_attr_value(json_attr), &record); + free(record_name); + /* A record must be a member of an array or list. + * Create an array to contain the record */ + midx = ldms_schema_record_array_add(schema, + json_attr_name(json_attr)->str, + record, 1); + break; + default: + LERROR("Unsupported type, '%s', in JSON dictionary.\n", + json_type_name(type)); + // rc = EINVAL; + // goto err_3; + continue; + }; + if (midx < 0) { + rc = EINVAL; + goto err_3; + } + ae = calloc(1, sizeof(*ae)); + if (!ae) { + rc = errno; + goto err_3; + } + ae->name = strdup(json_attr_name(json_attr)->str); + if (!ae->name) { + rc = ENOMEM; + free(ae); + goto err_3; + } + ae->type = type; + ae->ridx = ridx; + ae->midx = midx; + rbn_init(&ae->rbn, ae->name); + rbt_ins(&entry->attr_tree, &ae->rbn); + ridx = -1; + } + pthread_mutex_lock(&schema_tree_lock); + /* Make certain we didn't lose a race with another stream + * thread */ + rbn = rbt_find(&schema_tree, name); + if (rbn) { + rc = EBUSY; + pthread_mutex_unlock(&schema_tree_lock); + goto err_3; + } + rbn_init(&entry->rbn, entry->name); + rbt_ins(&schema_tree, &entry->rbn); + pthread_mutex_unlock(&schema_tree_lock); + *sch = entry->schema; + return 0; + err_3: + while (!rbt_empty(&entry->attr_tree)) { + rbn = rbt_min(&entry->attr_tree); + ae = container_of(rbn, struct attr_entry, rbn); + free(ae->name); + rbt_del(&entry->attr_tree, rbn); + } + free(entry->name); + err_2: + free(entry); + err_1: + ldms_schema_delete(schema); + err_0: + return rc; +} + +static int json_recv_cb(ldmsd_stream_client_t c, void *cb_arg, + ldmsd_stream_type_t stream_type, + const char *data, size_t data_len, + json_entity_t entity); + +pthread_mutex_t cfg_tree_lock = PTHREAD_MUTEX_INITIALIZER; +static struct rbt cfg_tree = RBT_INITIALIZER(str_cmp); +struct json_cfg_inst { + struct ldmsd_plugin *plugin; + char *stream_name; + size_t heap_sz; + char *producer_name; + char *instance_name; + uint64_t comp_id; + uid_t uid; + gid_t gid; + uint32_t perm; + pthread_mutex_t lock; + struct rbn rbn; /* Key is stream_name */ +}; + +#define DEFAULT_HEAP_SZ 512 +/* + * instance=FMT The FMT string is a format specifier for generating + * the instance name from features from the JSON object + * and the transport on which the object was received. + */ +static int config(struct ldmsd_plugin *self, struct attr_value_list *kwl, + struct attr_value_list *avl) +{ + struct json_cfg_inst *inst; + char *value; + int rc; + + inst = calloc(1, sizeof(*inst)); + if (!inst) + return ENOMEM; + + pthread_mutex_init(&inst->lock, NULL); + + /* stream name */ + value = av_value(avl, "stream"); + if (!value) { + rc = EINVAL; + LERROR("The 'stream' configuration parameter is required.\n"); + goto err_0; + } + inst->stream_name = strdup(value); + if (!inst->stream_name) { + rc = ENOMEM; + goto err_0; + } + + /* producer */ + value = av_value(avl, "producer"); + if (!value) { + LERROR("The 'producer' configuration parameter is required.\n"); + rc = EINVAL; + goto err_0; + } + inst->producer_name = strdup(value); + if (!inst->producer_name) { + rc = ENOMEM; + goto err_0; + } + + /* instance name */ + value = av_value(avl, "instance"); + if (value) { + inst->instance_name = strdup(value); + if (!inst->instance_name) { + rc = ENOMEM; + goto err_0; + } + } + + /* component_id */ + value = av_value(avl, "component_id"); + inst->comp_id = 0; + if (value) { + /* Skip non isdigit prefix */ + while (*value != '\0' && !isdigit(*value)) value++; + if (*value != '\0') + inst->comp_id = (uint64_t)(atoi(value)); + } + /* heap_sz */ + inst->heap_sz = DEFAULT_HEAP_SZ; + value = av_value(avl, "heap_sz"); + if (value) + inst->heap_sz = strtol(value, NULL, 0); + + /* Set uid, gid, perm to the default values */ + ldms_set_default_authz(&inst->uid, &inst->gid, &inst->perm, DEFAULT_AUTHZ_READONLY); + /* uid */ + value = av_value(avl, "uid"); + if (value) { + if (isalpha(value[0])) { + /* Lookup the user name */ + struct passwd *pwd = getpwnam(value); + if (!pwd) { + LERROR("The specified user '%s' does not exist\n", value); + rc = EINVAL; + goto err_0; + } + inst->uid = pwd->pw_uid; + } else { + inst->uid = strtol(value, NULL, 0); + } + } + + /* gid */ + value = av_value(avl, "gid"); + if (value) { + if (isalpha(value[0])) { + /* Try to lookup the group name */ + struct group *grp = getgrnam(value); + if (!grp) { + LERROR("The specified group '%s' does not exist\n", value); + rc = EINVAL; + goto err_0; + } + inst->gid = grp->gr_gid; + } else { + inst->gid = strtol(value, NULL, 0); + } + } + + /* permission */ + value = av_value(avl, "perm"); + if (value) { + if (value[0] != '0') { + LINFO("Warning, the permission bits '%s' are not specified " + "as an Octal number.\n", + value); + } + inst->perm = strtol(value, NULL, 0); + } + + rbn_init(&inst->rbn, inst->stream_name); + pthread_mutex_lock(&cfg_tree_lock); + struct rbn *rbn = rbt_find(&cfg_tree, inst->stream_name); + if (rbn) { + rc = EBUSY; + LERROR("The stream name '%s' has already been registered for " + "this sampler.\n", inst->stream_name); + goto err_1; + } + rbt_ins(&cfg_tree, &inst->rbn); + pthread_mutex_unlock(&cfg_tree_lock); + ldmsd_stream_subscribe(inst->stream_name, json_recv_cb, inst); + return 0; + err_1: + pthread_mutex_unlock(&cfg_tree_lock); + err_0: + if (inst) { + free(inst->stream_name); + free(inst->producer_name); + free(inst->instance_name); + free(inst); + } + return rc; +} + +static void update_set_data(struct json_cfg_inst *inst, + ldms_set_t set, + json_entity_t entity) +{ + const char *schema_name = ldms_set_schema_name_get(set); + struct schema_entry *entry; + json_entity_t json_attr; + ldms_mval_t mval; + struct rbn *rbn; + int rc; + + /* Find the schema instance for this set */ + rbn = rbt_find(&schema_tree, schema_name); + if (!rbn) { + LERROR("There is no parsing entity for schema '%s'\n", + schema_name); + return; + } else { + entry = container_of(rbn, struct schema_entry, rbn); + } + + for (json_attr = json_attr_first(entity); json_attr; + json_attr = json_attr_next(json_attr)) { + + char *name = json_attr_name(json_attr)->str; + json_entity_t value = json_attr_value(json_attr); + enum json_value_e type = json_entity_type(json_attr_value(json_attr)); + rbn = rbt_find(&entry->attr_tree, name); + if (!rbn) { + LERROR("Could not find attribute entry for '%s'\n", name); + continue; + } + struct attr_entry *ae = container_of(rbn, struct attr_entry, rbn); + LDEBUG("Updating midx %d with json attribute '%s' of type %d\n", + ae->midx, name, type); + + mval = ldms_metric_get(set, ae->midx); + assert(mval); + + switch (type) { + case JSON_DICT_VALUE: + /* The associated record the 1st and only + * element of the containing array at mval */ + rc = setter_table[JSON_DICT_VALUE](set, + ldms_record_array_get_inst(mval, 0), + value, name); + break; + default: + rc = setter_table[ae->type](set, mval, value, name); + break; + } + if (rc) + LERROR("Error %d setting the metric value '%s'\n", rc, ae->name); + } +} + +static int json_recv_cb(ldmsd_stream_client_t c, void *cb_arg, + ldmsd_stream_type_t stream_type, + const char *data, size_t data_len, + json_entity_t entity) +{ + const char *msg; + int rc = EINVAL; + ldms_schema_t schema = NULL; + struct json_cfg_inst *inst = cb_arg; + json_entity_t schema_name; + + LDEBUG("thread: %lu, stream: '%s', msg: '%s'\n", pthread_self(), inst->stream_name, data); + if (stream_type != LDMSD_STREAM_JSON) { + LERROR("Unexpected stream type data...ignoring\n"); + return 0; + } + + pthread_mutex_lock(&inst->lock); + + msg = data; + + /* Find/create the schema for this JSON object */ + if (JSON_DICT_VALUE != json_entity_type(entity)) { + rc = EINVAL; + LERROR("%s: Ignoring message that is not a JSON dictionary.\n", + inst->stream_name); + goto err_0; + } + + schema_name = json_value_find(entity, "schema"); + if (!schema_name || (JSON_STRING_VALUE != json_entity_type(schema_name))) { + rc = EINVAL; + LERROR("%s: Ignoring message with 'schema' attribute that is " + "missing or not a string.\n", inst->stream_name); + goto err_0; + } + rc = get_schema_for_json(json_value_str(schema_name)->str, entity, &schema); + if (rc) { + LERROR("%s: Error %d creating an LDMS schema for the JSON object '%s'\n", + inst->stream_name, rc, msg); + goto err_0; + } + char *set_name; + if (inst->instance_name) { + set_name = strdup(inst->instance_name); + } else { + rc = asprintf(&set_name, "%s_%s", inst->producer_name, + json_value_str(schema_name)->str); + if (rc < 0) + set_name = NULL; + } + if (!set_name) { + LERROR("Memory allocation failure.\n"); + goto err_0; + } + ldms_set_t set = ldms_set_by_name(set_name); + if (!set) { + set = ldms_set_create(set_name, schema, inst->uid, inst->gid, + inst->perm, inst->heap_sz); + if (set) { + LINFO("Created the set '%s' with schema '%s'\n", + set_name, ldms_schema_name_get(schema)); + ldms_set_publish(set); + } else { + LERROR("Error %d creating the set '%s' with schema '%s'\n", + errno, set_name, ldms_schema_name_get(schema)); + rc = errno; + goto err_1; + } + } + free(set_name); + + /* Update the stream meta-data in the set */ + ldms_transaction_begin(set); + ldms_metric_set_s32(set, 0, inst->uid); + ldms_metric_set_s32(set, 1, inst->gid); + ldms_metric_set_s32(set, 2, inst->perm); + update_set_data(inst, set, entity); + ldms_transaction_end(set); + + pthread_mutex_unlock(&inst->lock); + return 0; + err_1: + free(set_name); + err_0: + pthread_mutex_unlock(&inst->lock); + return rc; +} + +static void term(struct ldmsd_plugin *self) +{ + /* TODO: Cleanup ... maybe ... */ +} + +static int sample(struct ldmsd_sampler *self) +{ + /* no op */ + return 0; +} + +static ldms_set_t get_set(struct ldmsd_sampler *self) +{ + /* no op */ + return NULL; +} + +static struct ldmsd_sampler json_plugin = { + .base = { + .name = SAMP, + .type = LDMSD_PLUGIN_SAMPLER, + .term = term, + .config = config, + .usage = usage, + }, + .get_set = get_set, + .sample = sample, +}; + +struct ldmsd_plugin *get_plugin() +{ + if (!__log) { + /* Log initialization errors are quiet and will result in + * messages going to the application log instead of our subsystem + * specific log */ + __log = ovis_log_register + ("sampler.json_stream", + "JSON sampler that creates LDMS metric sets from JSON messages." + ); + } + return &json_plugin.base; +}