diff --git a/ldms/src/ldmsd/ldmsd_decomp.c b/ldms/src/ldmsd/ldmsd_decomp.c index 24c9c3c36..eef89ea2b 100644 --- a/ldms/src/ldmsd/ldmsd_decomp.c +++ b/ldms/src/ldmsd/ldmsd_decomp.c @@ -797,6 +797,41 @@ static const char *col_type_str(enum ldms_value_type type) return type_str[type]; } +static const char *col_default(enum ldms_value_type type) +{ + static char *default_str[] = { + [LDMS_V_CHAR] = "\" \"", + [LDMS_V_U8] = "0", + [LDMS_V_S8] = "0", + [LDMS_V_U16] = "0", + [LDMS_V_S16] = "0", + [LDMS_V_U32] = "0", + [LDMS_V_S32] = "0", + [LDMS_V_U64] = "0", + [LDMS_V_S64] = "0", + [LDMS_V_F32] = "0.0", + [LDMS_V_D64] = "0.0", + [LDMS_V_CHAR_ARRAY] = "\"\"", + [LDMS_V_U8_ARRAY] = "[]", + [LDMS_V_S8_ARRAY] = "[]", + [LDMS_V_U16_ARRAY] = "[]", + [LDMS_V_S16_ARRAY] = "[]", + [LDMS_V_U32_ARRAY] = "[]", + [LDMS_V_S32_ARRAY] = "[]", + [LDMS_V_U64_ARRAY] = "[]", + [LDMS_V_S64_ARRAY] = "[]", + [LDMS_V_F32_ARRAY] = "[]", + [LDMS_V_D64_ARRAY] = "[]", + [LDMS_V_LIST] = "nosup", + [LDMS_V_LIST_ENTRY] = "nosup", + [LDMS_V_RECORD_TYPE] = "nosup", + [LDMS_V_RECORD_INST] = "nosup", + [LDMS_V_RECORD_ARRAY] = "nosup", + [LDMS_V_TIMESTAMP] = "0" + }; + return default_str[type]; +} + static char get_avro_char(char c) { if (isalnum(c)) @@ -844,6 +879,12 @@ int ldmsd_row_to_json_avro_schema(ldmsd_row_t row, char **str, size_t *len) } switch (col->type) { case LDMS_V_TIMESTAMP: + rc = strbuf_printf(&h, + "{\"name\":\"%s\",\"type\":%s}", + avro_name, col_type_str(col->type)); + if (rc) + goto err_0; + break; case LDMS_V_CHAR: case LDMS_V_U8: case LDMS_V_S8: @@ -856,8 +897,11 @@ int ldmsd_row_to_json_avro_schema(ldmsd_row_t row, char **str, size_t *len) case LDMS_V_F32: case LDMS_V_D64: case LDMS_V_CHAR_ARRAY: - rc = strbuf_printf(&h, "{\"name\":\"%s\",\"type\":%s}", - avro_name, col_type_str(col->type)); + rc = strbuf_printf(&h, + "{\"name\":\"%s\",\"type\":%s," + "\"default\":%s}", + avro_name, col_type_str(col->type), + col_default(col->type)); if (rc) goto err_0; break; @@ -873,8 +917,10 @@ int ldmsd_row_to_json_avro_schema(ldmsd_row_t row, char **str, size_t *len) case LDMS_V_D64_ARRAY: rc = strbuf_printf(&h, "{\"name\":\"%s\"," - "\"type\":{ \"type\" : \"array\", \"items\": %s }}", - avro_name, col_type_str(col->type)); + "\"type\":{ \"type\" : \"array\", \"items\": %s }," + "\"default\":%s}", + avro_name, col_type_str(col->type), + col_default(col->type)); if (rc) goto err_0; break; diff --git a/ldms/src/store/avro_kafka/store_avro_kafka.c b/ldms/src/store/avro_kafka/store_avro_kafka.c index e78c25d81..1edddb9cf 100644 --- a/ldms/src/store/avro_kafka/store_avro_kafka.c +++ b/ldms/src/store/avro_kafka/store_avro_kafka.c @@ -107,7 +107,8 @@ serdes_schema_find(aks_handle_t sh, char *schema_name, ldms_schema_t lschema, ldmsd_row_t row) { struct rbn *rbn; - serdes_schema_t *sschema = NULL; + serdes_schema_t *previous_schema = NULL; + serdes_schema_t *current_schema = NULL; struct schema_entry *entry; char *json_buf = NULL; size_t json_len; @@ -115,40 +116,64 @@ serdes_schema_find(aks_handle_t sh, char *schema_name, int rc; pthread_mutex_lock(&schema_rbt_lock); - /* Check if the schema is already cached */ + /* Check if the schema is already cached in this plugin */ rbn = rbt_find(&schema_tree, schema_name); if (rbn) { entry = container_of(rbn, struct schema_entry, rbn); - sschema = entry->serdes_schema; + current_schema = entry->serdes_schema; goto out; } entry = calloc(1, sizeof(*entry)); if (!entry) goto out; - /* Check if the schema is already present in the registry */ - sschema = serdes_schema_get(sh->serdes, schema_name, -1, - errstr, sizeof(errstr)); - if (sschema) - /* Yes, cache it */ - goto cache; + /* Look up the schema by name in the registry. + Name alone does not tell us if the schema matches this row, so we + will still need to continue on and try pushing an updated schema. + The registry should handle duplicates or schema evolution testing. + */ + previous_schema = serdes_schema_get(sh->serdes, + schema_name, -1, + errstr, sizeof(errstr)); - /* Create a new schema from the row specification and LDMS schema */ + /* Generate a new schema from the row specification and LDMS schema */ rc = ldmsd_row_to_json_avro_schema(row, &json_buf, &json_len); if (rc) goto out; - sschema = - serdes_schema_add(sh->serdes, - schema_name, -1, - json_buf, json_len, - errstr, sizeof(errstr)); - if (!sschema) { + + /* Push the generated schema to the registry */ + current_schema = serdes_schema_add(sh->serdes, + schema_name, -1, + json_buf, json_len, + errstr, sizeof(errstr)); + if (!current_schema) { LOG_ERROR("%s\n", json_buf); LOG_ERROR("Error '%s' creating schema '%s'\n", errstr, schema_name); goto out; } -cache: - entry->serdes_schema = sschema; + + /* Log information about which schema was used */ + if (previous_schema != NULL) { + if (serdes_schema_id(current_schema) + == serdes_schema_id(previous_schema)) { + LOG_INFO("Using existing id %d for schema name '%s'\n", + serdes_schema_id(current_schema), + schema_name); + } else { + LOG_WARN("Using replacement id %d for schema name '%s' (previous id %d)\n", + serdes_schema_id(current_schema), + schema_name, + serdes_schema_id(previous_schema)); + serdes_schema_destroy(previous_schema); + } + } else { + LOG_INFO("Using brand new id %d for schema name '%s'\n", + serdes_schema_id(current_schema), + schema_name); + } + + /* Cache the schema in this plugin */ + entry->serdes_schema = current_schema; entry->ldms_schema = lschema; entry->schema_name = strdup(schema_name); rbn_init(&entry->rbn, entry->schema_name); @@ -157,7 +182,7 @@ serdes_schema_find(aks_handle_t sh, char *schema_name, pthread_mutex_unlock(&schema_rbt_lock); if (json_buf) free(json_buf); - return sschema; + return current_schema; } pthread_mutex_t sk_lock = PTHREAD_MUTEX_INITIALIZER;