diff --git a/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastDeserializerGeneratorTest.java b/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastDeserializerGeneratorTest.java new file mode 100644 index 00000000..5920696a --- /dev/null +++ b/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastDeserializerGeneratorTest.java @@ -0,0 +1,64 @@ +package com.linkedin.avro.fastserde; + +import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; +import com.linkedin.avroutil1.compatibility.AvroSchemaUtil; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericFixed; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.testng.Assert; +import org.testng.SkipException; +import org.testng.annotations.Test; + + +public class FastDeserializerGeneratorTest { + @Test + public void testDeserializationOfFixedField() throws IOException, InterruptedException { + if (Utils.isAvro14()) { + throw new SkipException("Avro 1.4 doesn't have schemas for GenericFixed type"); + } + + Schema writerSchema = AvroCompatibilityHelper.parse("{\"type\":\"record\",\"name\":\"topLevelRecord\",\"fields\":[{\"name\":\"fixedField\",\"type\":{\"type\":\"fixed\",\"name\":\"FixedType\",\"size\":5}}]}"); + Schema readerSchema = AvroCompatibilityHelper.parse("{\"type\":\"record\",\"name\":\"topLevelRecord\",\"fields\":[{\"name\":\"fixedField\",\"type\":{\"type\":\"fixed\",\"name\":\"FixedType\",\"size\":5,\"newField\": \"New field to change something\"}}]}"); + + GenericRecord writtenRecord = new GenericData.Record(writerSchema); + + byte[] writtenFixedFieldData = new byte[]{1,2,3,4,5}; + writtenRecord.put("fixedField", AvroCompatibilityHelper.newFixed(writerSchema.getField("fixedField").schema(), writtenFixedFieldData)); + + byte[] writeBytes = serialize(writtenRecord); + + FastGenericDatumReader datumReader = new FastGenericDatumReader(writerSchema, readerSchema, FastSerdeCache.getDefaultInstance()); + while (!datumReader.isFastDeserializerUsed()) { + deserialize(datumReader, writeBytes); + Thread.sleep(100); + } + + Object data = deserialize(datumReader, writeBytes); + GenericFixed fixedField = ((GenericFixed) ((GenericData.Record) data).get("fixedField")); + + Schema fixedFieldSchema = AvroSchemaUtil.getDeclaredSchema(fixedField); + Assert.assertNotNull(fixedFieldSchema, "Schema for field must always be set."); + Assert.assertEquals(fixedField.bytes(), writtenFixedFieldData); + } + + private byte[] serialize(GenericRecord record) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + Encoder encoder = AvroCompatibilityHelper.newBinaryEncoder(baos, false, null); + + DatumWriter datumWriter = new FastGenericDatumWriter(record.getSchema(), FastSerdeCache.getDefaultInstance()); + datumWriter.write(record, encoder); + encoder.flush(); + + return baos.toByteArray(); + } + + private Object deserialize(DatumReader datumReader, byte[] bytes) throws IOException { + return datumReader.read(null, AvroCompatibilityHelper.newBinaryDecoder(bytes)); + } +} diff --git a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGenerator.java b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGenerator.java index 5b205979..5a50acd5 100644 --- a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGenerator.java +++ b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGenerator.java @@ -214,15 +214,24 @@ private void processSimpleType(Schema schema, Schema readerSchema, JBlock method processEnum(readerSchema, methodBody, action, putExpressionIntoParent); break; case FIXED: - processFixed(schema, methodBody, action, putExpressionIntoParent, reuseSupplier); + final Schema fixedFieldSchema; + if (action.getShouldRead() && readerSchema != null && Schema.Type.FIXED.equals(readerSchema.getType())) { + // to preserve reader-specific options use reader field schema + fixedFieldSchema = readerSchema; + } else { + fixedFieldSchema = schema; + } + processFixed(fixedFieldSchema, methodBody, action, putExpressionIntoParent, reuseSupplier); break; default: - // to preserve reader string specific options use reader field schema + final Schema primitiveFieldSchema; if (action.getShouldRead() && readerSchema != null && Schema.Type.STRING.equals(readerSchema.getType())) { - processPrimitive(readerSchema, methodBody, action, putExpressionIntoParent, reuseSupplier); + // to preserve reader-specific options use reader field schema + primitiveFieldSchema = readerSchema; } else { - processPrimitive(schema, methodBody, action, putExpressionIntoParent, reuseSupplier); + primitiveFieldSchema = schema; } + processPrimitive(primitiveFieldSchema, methodBody, action, putExpressionIntoParent, reuseSupplier); break; } }