-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
store_avro_kafka: avro schema evolution not working #1262
Comments
@morrone, this looks like a bug, I'll look into it. |
OK, I think that I know what happened here. I had a configuration of ldms running without the "cluster" column, and then I restarted it with a configuration that added the "cluster" column. The issue is in serdes_schema_find(). The first time that store_avro_kafka sees an ldms decomposed row, it looks up the ldms schema name in the kafka schema registrty, looking for an avro schema of the same name. If it does find a matching schema name, it unfortunately just accepts that schema without looking at the schema contents. It doesn't bother to generate an avro schema from the decomposed data, and therefore it definitely doesn't compare the two to see if they are different. If they are different, like they are in my example (the avro schema on in the registry is missing the "cluster" field) it will error out later when it cannot find the field in the schema. So what do we do about that? Ideally, we do want to support schema evolution. schema evolution would require store_avro_kafka to upload a new, compatible, version of a schema. My recommendation it that we:
We need to always craft the avro schema ourselves, not just rely on the version in the registry to be correct. There are probably two main ways to handle things after that: do intelligent schema comparison ourselves, or just push it to the registry and let that figure things out. I don't know if any of the libraries provide convenient avro schema comparison functions. If they don't, then it could be fairly complicated to write the code to do that correctly. I would suggest as a first pass that we just push it to the registry and let that figure things out for us. I suspect (but need to confirm) that it won't have a problem with pushing the same schema multiple times, and when the schema is different, the registry will perform schema compatibility tests for us. What we don't want to happen, of course, is for some sort of schema flapping to happen. For instance, if we had some ldmsd aggregators that generates an older schema A, and some aggregators that generate a newer schema B. If those daemons were restarted with their differing configurations, we might wind up with a long series of schema versions in the registry which are really just alternating versions of A and B. If it worries us enough, would could make the "always push the schema" behavior a configuration option. But I think maybe in practice it might not be so bad. At the moment we only match metric sets by hash, so it is easy enough to ensure in configurations that only one schema will be generated in a cluster if the configuration for the aggregators are all updated at the same time. And to expand on 2), this page is helpful: https://docs.confluent.io/platform/current/schema-registry/fundamentals/schema-evolution.html The default schema registry evolution type is "backward", which means we can remove fields, and add optional fields. And "optional" field is one that has a "default". So technically we only need the default on new fields, but if we just add it to all of them, then we are set up to support "backward", "forward", and "full". We can use the defaults from the decompositon rule, if that information is passed through the layers (I suspect not?). Otherwise I'm fine with filing in zeros and empty strings. |
Fixed well enough in #1305. |
I don't know if this is an issue in decomposition or store_avro_kafka. But I have a decomp column that looks like this:
I was hoping that this would add a "cluster" column with the name "kern" if it didn't appear in the schema. But when used with store_avro_kafka I see the error:
The text was updated successfully, but these errors were encountered: