diff --git a/ldms/src/ldmsd/ldmsd_decomp.c b/ldms/src/ldmsd/ldmsd_decomp.c
index 24c9c3c36..eef89ea2b 100644
--- a/ldms/src/ldmsd/ldmsd_decomp.c
+++ b/ldms/src/ldmsd/ldmsd_decomp.c
@@ -797,6 +797,41 @@ static const char *col_type_str(enum ldms_value_type type)
 	return type_str[type];
 }
 
+static const char *col_default(enum ldms_value_type type)
+{
+	static char *default_str[] = {
+	    [LDMS_V_CHAR] = "\" \"",
+	    [LDMS_V_U8] = "0",
+	    [LDMS_V_S8] = "0",
+	    [LDMS_V_U16] = "0",
+	    [LDMS_V_S16] = "0",
+	    [LDMS_V_U32] = "0",
+	    [LDMS_V_S32] = "0",
+	    [LDMS_V_U64] = "0",
+	    [LDMS_V_S64] = "0",
+	    [LDMS_V_F32] = "0.0",
+	    [LDMS_V_D64] = "0.0",
+	    [LDMS_V_CHAR_ARRAY] = "\"\"",
+	    [LDMS_V_U8_ARRAY] = "[]",
+	    [LDMS_V_S8_ARRAY] = "[]",
+	    [LDMS_V_U16_ARRAY] = "[]",
+	    [LDMS_V_S16_ARRAY] = "[]",
+	    [LDMS_V_U32_ARRAY] = "[]",
+	    [LDMS_V_S32_ARRAY] = "[]",
+	    [LDMS_V_U64_ARRAY] = "[]",
+	    [LDMS_V_S64_ARRAY] = "[]",
+	    [LDMS_V_F32_ARRAY] = "[]",
+	    [LDMS_V_D64_ARRAY] = "[]",
+	    [LDMS_V_LIST] = "nosup",
+	    [LDMS_V_LIST_ENTRY] = "nosup",
+	    [LDMS_V_RECORD_TYPE] = "nosup",
+	    [LDMS_V_RECORD_INST] = "nosup",
+	    [LDMS_V_RECORD_ARRAY] = "nosup",
+	    [LDMS_V_TIMESTAMP] = "0"
+	};
+	return default_str[type];
+}
+
 static char get_avro_char(char c)
 {
 	if (isalnum(c))
@@ -844,6 +879,12 @@ int ldmsd_row_to_json_avro_schema(ldmsd_row_t row, char **str, size_t *len)
 		}
 		switch (col->type) {
 		case LDMS_V_TIMESTAMP:
+			rc = strbuf_printf(&h,
+                                           "{\"name\":\"%s\",\"type\":%s}",
+					   avro_name, col_type_str(col->type));
+			if (rc)
+				goto err_0;
+			break;
 		case LDMS_V_CHAR:
 		case LDMS_V_U8:
 		case LDMS_V_S8:
@@ -856,8 +897,11 @@ int ldmsd_row_to_json_avro_schema(ldmsd_row_t row, char **str, size_t *len)
 		case LDMS_V_F32:
 		case LDMS_V_D64:
 		case LDMS_V_CHAR_ARRAY:
-			rc = strbuf_printf(&h, "{\"name\":\"%s\",\"type\":%s}",
-					   avro_name, col_type_str(col->type));
+			rc = strbuf_printf(&h,
+                                           "{\"name\":\"%s\",\"type\":%s,"
+                                           "\"default\":%s}",
+					   avro_name, col_type_str(col->type),
+                                           col_default(col->type));
 			if (rc)
 				goto err_0;
 			break;
@@ -873,8 +917,10 @@ int ldmsd_row_to_json_avro_schema(ldmsd_row_t row, char **str, size_t *len)
 		case LDMS_V_D64_ARRAY:
 			rc = strbuf_printf(&h,
 					   "{\"name\":\"%s\","
-					   "\"type\":{ \"type\" : \"array\", \"items\": %s }}",
-					   avro_name, col_type_str(col->type));
+					   "\"type\":{ \"type\" : \"array\", \"items\": %s },"
+                                           "\"default\":%s}",
+					   avro_name, col_type_str(col->type),
+                                           col_default(col->type));
 			if (rc)
 				goto err_0;
 			break;
diff --git a/ldms/src/store/avro_kafka/store_avro_kafka.c b/ldms/src/store/avro_kafka/store_avro_kafka.c
index e78c25d81..1edddb9cf 100644
--- a/ldms/src/store/avro_kafka/store_avro_kafka.c
+++ b/ldms/src/store/avro_kafka/store_avro_kafka.c
@@ -107,7 +107,8 @@ serdes_schema_find(aks_handle_t sh, char *schema_name,
 		  ldms_schema_t lschema, ldmsd_row_t row)
 {
 	struct rbn *rbn;
-	serdes_schema_t *sschema = NULL;
+	serdes_schema_t *previous_schema = NULL;
+	serdes_schema_t *current_schema = NULL;
 	struct schema_entry *entry;
 	char *json_buf = NULL;
 	size_t json_len;
@@ -115,40 +116,64 @@ serdes_schema_find(aks_handle_t sh, char *schema_name,
 	int rc;
 
 	pthread_mutex_lock(&schema_rbt_lock);
-	/* Check if the schema is already cached */
+	/* Check if the schema is already cached in this plugin */
 	rbn = rbt_find(&schema_tree, schema_name);
 	if (rbn) {
 		entry = container_of(rbn, struct schema_entry, rbn);
-		sschema = entry->serdes_schema;
+		current_schema = entry->serdes_schema;
 		goto out;
 	}
 	entry = calloc(1, sizeof(*entry));
 	if (!entry)
 		goto out;
 
-	/* Check if the schema is already present in the registry */
-	sschema = serdes_schema_get(sh->serdes, schema_name, -1,
-				    errstr, sizeof(errstr));
-	if (sschema)
-		/* Yes, cache it */
-		goto cache;
+	/* Look up the schema by name in the registry.
+           Name alone does not tell us if the schema matches this row, so we
+           will still need to continue on and try pushing an updated schema.
+           The registry should handle duplicates or schema evolution testing.
+        */
+	previous_schema = serdes_schema_get(sh->serdes,
+                                            schema_name, -1,
+                                            errstr, sizeof(errstr));
 
-	/* Create a new schema from the row specification and LDMS schema */
+	/* Generate a new schema from the row specification and LDMS schema */
 	rc = ldmsd_row_to_json_avro_schema(row, &json_buf, &json_len);
 	if (rc)
 		goto out;
-	sschema =
-	    serdes_schema_add(sh->serdes,
-			      schema_name, -1,
-			      json_buf, json_len,
-			      errstr, sizeof(errstr));
-	if (!sschema) {
+
+        /* Push the generated schema to the registry */
+        current_schema = serdes_schema_add(sh->serdes,
+                                             schema_name, -1,
+                                             json_buf, json_len,
+                                             errstr, sizeof(errstr));
+	if (!current_schema) {
 		LOG_ERROR("%s\n", json_buf);
 		LOG_ERROR("Error '%s' creating schema '%s'\n", errstr, schema_name);
 		goto out;
 	}
-cache:
-	entry->serdes_schema = sschema;
+
+        /* Log information about which schema was used */
+        if (previous_schema != NULL) {
+                if (serdes_schema_id(current_schema)
+                    == serdes_schema_id(previous_schema)) {
+                        LOG_INFO("Using existing id %d for schema name '%s'\n",
+                                 serdes_schema_id(current_schema),
+                                 schema_name);
+                } else {
+                        LOG_WARN("Using replacement id %d for schema name '%s' (previous id %d)\n",
+                                 serdes_schema_id(current_schema),
+                                 schema_name,
+                                 serdes_schema_id(previous_schema));
+                        serdes_schema_destroy(previous_schema);
+                }
+        } else {
+                LOG_INFO("Using brand new id %d for schema name '%s'\n",
+                         serdes_schema_id(current_schema),
+                         schema_name);
+        }
+
+        /* Cache the schema in this plugin */
+	entry->serdes_schema = current_schema;
 	entry->ldms_schema = lschema;
 	entry->schema_name = strdup(schema_name);
 	rbn_init(&entry->rbn, entry->schema_name);
@@ -157,7 +182,7 @@ serdes_schema_find(aks_handle_t sh, char *schema_name,
 	pthread_mutex_unlock(&schema_rbt_lock);
 	if (json_buf)
 		free(json_buf);
-	return sschema;
+	return current_schema;
 }
 
 pthread_mutex_t sk_lock = PTHREAD_MUTEX_INITIALIZER;