Skip to content

Commit

Permalink
store_avro_kafka: always generate and push schema to registry
Browse files Browse the repository at this point in the history
The name alone of the avro schema in the kafka registry is insufficient to
match an ldms schema. A schema in the kafka registry is uniquely identified
by a combination of a name and an integer ID. The schema with the same name
can evolve over time, and each iteration if given new unique ID.

We want to _always_ generate an avro schema, and attempt to push it to the
registry. When the schema hasn't changed, the kafka registry handles the
attempt to add a duplicate gracefully. The existing ID is used. But when
the schema is different, the registry applies schema compatibility checks,
and if they pass it updates the schema with a new ID.

This patch first looks up the schema name in the registry, then
generates its own schema and attemps to push it. This allows us to generate
informational messages about whether the schema is new, existing, or
updated/evolved.
  • Loading branch information
morrone committed Dec 6, 2023
1 parent 962eab1 commit 3bf7e84
Showing 1 changed file with 44 additions and 19 deletions.
63 changes: 44 additions & 19 deletions ldms/src/store/avro_kafka/store_avro_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,48 +107,73 @@ serdes_schema_find(aks_handle_t sh, char *schema_name,
ldms_schema_t lschema, ldmsd_row_t row)
{
struct rbn *rbn;
serdes_schema_t *sschema = NULL;
serdes_schema_t *previous_schema = NULL;
serdes_schema_t *current_schema = NULL;
struct schema_entry *entry;
char *json_buf = NULL;
size_t json_len;
char errstr[512];
int rc;

pthread_mutex_lock(&schema_rbt_lock);
/* Check if the schema is already cached */
/* Check if the schema is already cached in this plugin */
rbn = rbt_find(&schema_tree, schema_name);
if (rbn) {
entry = container_of(rbn, struct schema_entry, rbn);
sschema = entry->serdes_schema;
current_schema = entry->serdes_schema;
goto out;
}
entry = calloc(1, sizeof(*entry));
if (!entry)
goto out;

/* Check if the schema is already present in the registry */
sschema = serdes_schema_get(sh->serdes, schema_name, -1,
errstr, sizeof(errstr));
if (sschema)
/* Yes, cache it */
goto cache;
/* Look up the schema by name in the registry.
Name alone does not tell us if the schema matches this row, so we
will still need to continue on and try pushing an updated schema.
The registry should handle duplicates or schema evolution testing.
*/
previous_schema = serdes_schema_get(sh->serdes,
schema_name, -1,
errstr, sizeof(errstr));

/* Create a new schema from the row specification and LDMS schema */
/* Generate a new schema from the row specification and LDMS schema */
rc = ldmsd_row_to_json_avro_schema(row, &json_buf, &json_len);
if (rc)
goto out;
sschema =
serdes_schema_add(sh->serdes,
schema_name, -1,
json_buf, json_len,
errstr, sizeof(errstr));
if (!sschema) {

/* Push the generated schema to the registry */
current_schema = serdes_schema_add(sh->serdes,
schema_name, -1,
json_buf, json_len,
errstr, sizeof(errstr));
if (!current_schema) {
LOG_ERROR("%s\n", json_buf);
LOG_ERROR("Error '%s' creating schema '%s'\n", errstr, schema_name);
goto out;
}
cache:
entry->serdes_schema = sschema;

/* Log information about which schema was used */
if (previous_schema != NULL) {
if (serdes_schema_id(current_schema)
== serdes_schema_id(previous_schema)) {
LOG_INFO("Using existing id %d for schema name '%s'\n",
serdes_schema_id(current_schema),
schema_name);
} else {
LOG_WARN("Using replacement id %d for schema name '%s' (previous id %d)\n",
serdes_schema_id(current_schema),
schema_name,
serdes_schema_id(previous_schema));
serdes_schema_destroy(previous_schema);
}
} else {
LOG_INFO("Using brand new id %d for schema name '%s'\n",
serdes_schema_id(current_schema),
schema_name);
}

/* Cache the schema in this plugin */
entry->serdes_schema = current_schema;
entry->ldms_schema = lschema;
entry->schema_name = strdup(schema_name);
rbn_init(&entry->rbn, entry->schema_name);
Expand All @@ -157,7 +182,7 @@ serdes_schema_find(aks_handle_t sh, char *schema_name,
pthread_mutex_unlock(&schema_rbt_lock);
if (json_buf)
free(json_buf);
return sschema;
return current_schema;
}

pthread_mutex_t sk_lock = PTHREAD_MUTEX_INITIALIZER;
Expand Down

0 comments on commit 3bf7e84

Please sign in to comment.