From bf8ed8eabf8d289836a110c9fdc89730e0a6750a Mon Sep 17 00:00:00 2001 From: Nisarg Thakkar Date: Thu, 5 Dec 2024 19:51:07 -0800 Subject: [PATCH 1/2] Use valid schema for Fixed field when deserializing data --- .../FastDeserializerGeneratorTest.java | 61 +++++++++++++++++++ .../fastserde/FastDeserializerGenerator.java | 7 ++- 2 files changed, 67 insertions(+), 1 deletion(-) create mode 100644 fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastDeserializerGeneratorTest.java diff --git a/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastDeserializerGeneratorTest.java b/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastDeserializerGeneratorTest.java new file mode 100644 index 00000000..885f56c3 --- /dev/null +++ b/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastDeserializerGeneratorTest.java @@ -0,0 +1,61 @@ +package com.linkedin.avro.fastserde; + +import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; +import com.linkedin.avroutil1.compatibility.AvroSchemaUtil; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericFixed; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.testng.Assert; +import org.testng.SkipException; +import org.testng.annotations.Test; + + +public class FastDeserializerGeneratorTest { + @Test + public void testSchemaForFixedField() throws IOException, InterruptedException { + if (Utils.isAvro14()) { + throw new SkipException("Avro 1.4 doesn't have schemas for GenericFixed type"); + } + + Schema writerSchema = AvroCompatibilityHelper.parse("{\"type\":\"record\",\"name\":\"topLevelRecord\",\"fields\":[{\"name\":\"fixedField\",\"type\":{\"type\":\"fixed\",\"name\":\"FixedType\",\"size\":5}}]}"); + Schema readerSchema = AvroCompatibilityHelper.parse("{\"type\":\"record\",\"name\":\"topLevelRecord\",\"fields\":[{\"name\":\"fixedField\",\"type\":{\"type\":\"fixed\",\"name\":\"FixedType\",\"size\":5,\"newField\": \"New field to change something\"}}]}"); + + GenericRecord writtenRecord = new GenericData.Record(writerSchema); + writtenRecord.put("fixedField", AvroCompatibilityHelper.newFixed(writerSchema.getField("fixedField").schema(), new byte[]{1,2,3,4,5})); + + byte[] writeBytes = serialize(writtenRecord); + + FastGenericDatumReader datumReader = new FastGenericDatumReader(writerSchema, readerSchema, FastSerdeCache.getDefaultInstance()); + while (!datumReader.isFastDeserializerUsed()) { + deserialize(datumReader, writeBytes); + Thread.sleep(100); + } + + Object data = deserialize(datumReader, writeBytes); + GenericFixed fixedField = ((GenericFixed) ((GenericData.Record) data).get("fixedField")); + + Schema fixedFieldSchema = AvroSchemaUtil.getDeclaredSchema(fixedField); + Assert.assertNotNull(fixedFieldSchema, "Schema for field must always be set."); + } + + private byte[] serialize(GenericRecord record) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + Encoder encoder = AvroCompatibilityHelper.newBinaryEncoder(baos, false, null); + + DatumWriter datumWriter = new FastGenericDatumWriter(record.getSchema(), FastSerdeCache.getDefaultInstance()); + datumWriter.write(record, encoder); + encoder.flush(); + + return baos.toByteArray(); + } + + private Object deserialize(DatumReader datumReader, byte[] bytes) throws IOException { + return datumReader.read(null, AvroCompatibilityHelper.newBinaryDecoder(bytes)); + } +} diff --git a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGenerator.java b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGenerator.java index 5b205979..92915009 100644 --- a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGenerator.java +++ b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGenerator.java @@ -214,7 +214,12 @@ private void processSimpleType(Schema schema, Schema readerSchema, JBlock method processEnum(readerSchema, methodBody, action, putExpressionIntoParent); break; case FIXED: - processFixed(schema, methodBody, action, putExpressionIntoParent, reuseSupplier); + // to preserve reader fixed specific options use reader field schema + if (action.getShouldRead() && readerSchema != null && Schema.Type.FIXED.equals(readerSchema.getType())) { + processFixed(readerSchema, methodBody, action, putExpressionIntoParent, reuseSupplier); + } else { + processFixed(schema, methodBody, action, putExpressionIntoParent, reuseSupplier); + } break; default: // to preserve reader string specific options use reader field schema From 0a88955b4de44b714a235186cae9b2662b05ca8b Mon Sep 17 00:00:00 2001 From: Nisarg Thakkar Date: Fri, 6 Dec 2024 08:11:59 -0800 Subject: [PATCH 2/2] Address review comments --- .../fastserde/FastDeserializerGeneratorTest.java | 7 +++++-- .../fastserde/FastDeserializerGenerator.java | 16 ++++++++++------ 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastDeserializerGeneratorTest.java b/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastDeserializerGeneratorTest.java index 885f56c3..5920696a 100644 --- a/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastDeserializerGeneratorTest.java +++ b/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastDeserializerGeneratorTest.java @@ -18,7 +18,7 @@ public class FastDeserializerGeneratorTest { @Test - public void testSchemaForFixedField() throws IOException, InterruptedException { + public void testDeserializationOfFixedField() throws IOException, InterruptedException { if (Utils.isAvro14()) { throw new SkipException("Avro 1.4 doesn't have schemas for GenericFixed type"); } @@ -27,7 +27,9 @@ public void testSchemaForFixedField() throws IOException, InterruptedException { Schema readerSchema = AvroCompatibilityHelper.parse("{\"type\":\"record\",\"name\":\"topLevelRecord\",\"fields\":[{\"name\":\"fixedField\",\"type\":{\"type\":\"fixed\",\"name\":\"FixedType\",\"size\":5,\"newField\": \"New field to change something\"}}]}"); GenericRecord writtenRecord = new GenericData.Record(writerSchema); - writtenRecord.put("fixedField", AvroCompatibilityHelper.newFixed(writerSchema.getField("fixedField").schema(), new byte[]{1,2,3,4,5})); + + byte[] writtenFixedFieldData = new byte[]{1,2,3,4,5}; + writtenRecord.put("fixedField", AvroCompatibilityHelper.newFixed(writerSchema.getField("fixedField").schema(), writtenFixedFieldData)); byte[] writeBytes = serialize(writtenRecord); @@ -42,6 +44,7 @@ public void testSchemaForFixedField() throws IOException, InterruptedException { Schema fixedFieldSchema = AvroSchemaUtil.getDeclaredSchema(fixedField); Assert.assertNotNull(fixedFieldSchema, "Schema for field must always be set."); + Assert.assertEquals(fixedField.bytes(), writtenFixedFieldData); } private byte[] serialize(GenericRecord record) throws IOException { diff --git a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGenerator.java b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGenerator.java index 92915009..5a50acd5 100644 --- a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGenerator.java +++ b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGenerator.java @@ -214,20 +214,24 @@ private void processSimpleType(Schema schema, Schema readerSchema, JBlock method processEnum(readerSchema, methodBody, action, putExpressionIntoParent); break; case FIXED: - // to preserve reader fixed specific options use reader field schema + final Schema fixedFieldSchema; if (action.getShouldRead() && readerSchema != null && Schema.Type.FIXED.equals(readerSchema.getType())) { - processFixed(readerSchema, methodBody, action, putExpressionIntoParent, reuseSupplier); + // to preserve reader-specific options use reader field schema + fixedFieldSchema = readerSchema; } else { - processFixed(schema, methodBody, action, putExpressionIntoParent, reuseSupplier); + fixedFieldSchema = schema; } + processFixed(fixedFieldSchema, methodBody, action, putExpressionIntoParent, reuseSupplier); break; default: - // to preserve reader string specific options use reader field schema + final Schema primitiveFieldSchema; if (action.getShouldRead() && readerSchema != null && Schema.Type.STRING.equals(readerSchema.getType())) { - processPrimitive(readerSchema, methodBody, action, putExpressionIntoParent, reuseSupplier); + // to preserve reader-specific options use reader field schema + primitiveFieldSchema = readerSchema; } else { - processPrimitive(schema, methodBody, action, putExpressionIntoParent, reuseSupplier); + primitiveFieldSchema = schema; } + processPrimitive(primitiveFieldSchema, methodBody, action, putExpressionIntoParent, reuseSupplier); break; } }