Skip to content

Commit

Permalink
add support to differentiate null and emptyLists for multi-value colu…
Browse files Browse the repository at this point in the history
…mns in avro decoder (#13572)
  • Loading branch information
deemoliu authored Sep 13, 2024
1 parent f546add commit ec3d6ed
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public void init(@Nullable Set<String> fields, @Nullable RecordExtractorConfig r
AvroRecordExtractorConfig config = (AvroRecordExtractorConfig) recordExtractorConfig;
if (config != null) {
_applyLogicalTypes = config.isEnableLogicalTypes();
_differentiateNullAndEmptyForMV = config.isDifferentiateNullAndEmptyForMV();
}
if (fields == null || fields.isEmpty()) {
_extractAll = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
*/
public class AvroRecordExtractorConfig implements RecordExtractorConfig {
private boolean _enableLogicalTypes = false;
private boolean _differentiateNullAndEmptyForMV = false;

@Override
public void init(Map<String, String> props) {
_enableLogicalTypes = Boolean.parseBoolean(props.get("enableLogicalTypes"));
_differentiateNullAndEmptyForMV = Boolean.parseBoolean(props.get("differentiateNullAndEmptyForMV"));
}

public boolean isEnableLogicalTypes() {
Expand All @@ -40,4 +42,12 @@ public boolean isEnableLogicalTypes() {
public void setEnableLogicalTypes(boolean enableLogicalTypes) {
_enableLogicalTypes = enableLogicalTypes;
}

public boolean isDifferentiateNullAndEmptyForMV() {
return _differentiateNullAndEmptyForMV;
}

public void setDifferentiateNullAndEmptyForMV(boolean differentiateNullAndEmptyForMV) {
_differentiateNullAndEmptyForMV = differentiateNullAndEmptyForMV;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
package org.apache.pinot.plugin.inputformat.avro;

import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -52,4 +55,63 @@ public void testIncomingTimeColumn()
genericRow.getFieldToValueMap().keySet().containsAll(Arrays.asList("incomingTime", "outgoingTime")));
Assert.assertEquals(genericRow.getValue("incomingTime"), 12345L);
}

@Test
public void testNoDifferentiateNullAndEmptyForMultiValueFields() {
AvroRecordExtractorConfig config = new AvroRecordExtractorConfig();
AvroRecordExtractor avroRecordExtractor = new AvroRecordExtractor();
avroRecordExtractor.init(null, config);

Schema schema =
SchemaBuilder.record("GenericRow").fields().name("arrField1").type().array().items().stringType().noDefault()
.name("arrField2").type().array().items().stringType().noDefault().name("arrField3").type().array().items()
.stringType().noDefault().endRecord();

GenericRecord genericRecord = new GenericData.Record(schema);
List<String> arrayData1 = Arrays.asList("value1", "value2", "value3");
List<String> arrayData2 = null;
List<String> arrayData3 = new ArrayList<>();

GenericRow genericRow = new GenericRow();
genericRecord.put("arrField1", arrayData1);
genericRecord.put("arrField2", arrayData2);
genericRecord.put("arrField3", arrayData3);

avroRecordExtractor.extract(genericRecord, genericRow);
Assert.assertTrue(
genericRow.getFieldToValueMap().keySet().containsAll(Arrays.asList("arrField1", "arrField2", "arrField3")));
Assert.assertEquals(genericRow.getValue("arrField1"), arrayData1.toArray());
Assert.assertEquals(genericRow.getValue("arrField2"), null);
Assert.assertEquals(genericRow.getValue("arrField3"), null);
}

@Test
public void testDifferentiateNullAndEmptyForMultiValueFields() {
AvroRecordExtractorConfig config = new AvroRecordExtractorConfig();
config.setDifferentiateNullAndEmptyForMV(true);
AvroRecordExtractor avroRecordExtractor = new AvroRecordExtractor();
avroRecordExtractor.init(null, config);

Schema schema =
SchemaBuilder.record("GenericRow").fields().name("arrField1").type().array().items().stringType().noDefault()
.name("arrField2").type().array().items().stringType().noDefault().name("arrField3").type().array().items()
.stringType().noDefault().endRecord();

GenericRecord genericRecord = new GenericData.Record(schema);
List<String> arrayData1 = Arrays.asList("value1", "value2", "value3");
List<String> arrayData2 = null;
List<String> arrayData3 = new ArrayList<>();

GenericRow genericRow = new GenericRow();
genericRecord.put("arrField1", arrayData1);
genericRecord.put("arrField2", arrayData2);
genericRecord.put("arrField3", arrayData3);

avroRecordExtractor.extract(genericRecord, genericRow);
Assert.assertTrue(
genericRow.getFieldToValueMap().keySet().containsAll(Arrays.asList("arrField1", "arrField2", "arrField3")));
Assert.assertEquals(genericRow.getValue("arrField1"), arrayData1.toArray());
Assert.assertEquals(genericRow.getValue("arrField2"), null);
Assert.assertEquals(genericRow.getValue("arrField3"), new String[0]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
*/
public abstract class BaseRecordExtractor<T> implements RecordExtractor<T> {

protected boolean _differentiateNullAndEmptyForMV = false;

/**
* Converts the field value to either a single value (string, number, byte[]), multi value (Object[]) or a Map.
* Returns {@code null} if the value is an empty array/collection/map.
Expand Down Expand Up @@ -107,7 +109,7 @@ protected Object convertRecord(Object value) {
protected Object convertMultiValue(Object value) {
Collection collection = (Collection) value;
if (collection.isEmpty()) {
return null;
return _differentiateNullAndEmptyForMV ? new Object[0] : null;
}

int numValues = collection.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@
* Interface for configs of {@link RecordExtractor}
*/
public interface RecordExtractorConfig {
default void init(Map<String, String> props) { }
default void init(Map<String, String> props) {
}
}

0 comments on commit ec3d6ed

Please sign in to comment.