Skip to content

Commit

Permalink
Support map type in complex schema (#13906)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangfu0 authored Oct 2, 2024
1 parent 143ffd7 commit ea103c1
Show file tree
Hide file tree
Showing 78 changed files with 3,737 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.pinot.segment.spi.memory.PinotInputStream;
import org.apache.pinot.spi.utils.BigDecimalUtils;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.MapUtils;
import org.roaringbitmap.RoaringBitmap;

import static java.nio.charset.StandardCharsets.UTF_8;
Expand Down Expand Up @@ -302,6 +303,17 @@ public String[] getStringArray(int rowId, int colId) {
return strings;
}

@Override
public Map<String, Object> getMap(int rowId, int colId) {
int offsetInFixed = getOffsetInFixedBuffer(rowId, colId);
int size = _fixedSizeData.getInt(offsetInFixed + 4);
int offsetInVar = _fixedSizeData.getInt(offsetInFixed);

byte[] buffer = new byte[size];
_variableSizeData.copyTo(offsetInVar, buffer, 0, size);
return MapUtils.deserializeMap(buffer);
}

@Nullable
@Override
public CustomObject getCustomObject(int rowId, int colId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ List<ByteBuffer> serialize()

String[] getStringArray(int rowId, int colId);

Map<String, Object> getMap(int rowId, int colId);

CustomObject getCustomObject(int rowId, int colId);

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ byte[] toBytes()

String[] getStringArray(int rowId, int colId);

@Nullable
Map<String, Object> getMap(int rowId, int colId);

@Nullable
CustomObject getCustomObject(int rowId, int colId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.BigDecimalUtils;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.MapUtils;
import org.roaringbitmap.RoaringBitmap;

import static java.nio.charset.StandardCharsets.UTF_8;
Expand Down Expand Up @@ -317,6 +318,18 @@ public String[] getStringArray(int rowId, int colId) {
return strings;
}

@Nullable
@Override
public Map<String, Object> getMap(int rowId, int colId) {
int size = positionOffsetInVariableBufferAndGetLength(rowId, colId);
if (size == 0) {
return null;
}
ByteBuffer buffer = _variableSizeData.slice();
buffer.limit(size);
return MapUtils.deserializeMap(buffer);
}

@Nullable
@Override
public CustomObject getCustomObject(int rowId, int colId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ private FunctionUtils() {
put(float[].class, PinotDataType.PRIMITIVE_FLOAT_ARRAY);
put(double[].class, PinotDataType.PRIMITIVE_DOUBLE_ARRAY);
put(String[].class, PinotDataType.STRING_ARRAY);
put(Map.class, PinotDataType.MAP);
put(Object.class, PinotDataType.OBJECT);
}};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,10 @@ public enum TransformFunctionType {
COSH("cosh"),
TANH("tanh"),
DEGREES("degrees"),
RADIANS("radians");
RADIANS("radians"),

// Complex type handling
ITEM("item");

private final String _name;
private final List<String> _names;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.type.SqlTypeName;
Expand Down Expand Up @@ -288,6 +290,12 @@ public RelDataType toType(RelDataTypeFactory typeFactory) {
return typeFactory.createSqlType(SqlTypeName.VARCHAR);
}
},
MAP(NullValuePlaceHolder.MAP) {
@Override
public RelDataType toType(RelDataTypeFactory typeFactory) {
return typeFactory.createSqlType(SqlTypeName.MAP);
}
},
BYTES(NullValuePlaceHolder.INTERNAL_BYTES) {
@Override
public RelDataType toType(RelDataTypeFactory typeFactory) {
Expand Down Expand Up @@ -634,6 +642,8 @@ public Serializable convertAndFormat(Object value) {
return value.toString();
case BYTES:
return ((ByteArray) value).toHexString();
case MAP:
return toMap(value);
case INT_ARRAY:
return (int[]) value;
case LONG_ARRAY:
Expand All @@ -655,6 +665,16 @@ public Serializable convertAndFormat(Object value) {
}
}

private Serializable toMap(Object value) {
if (value instanceof Serializable) {
return (Serializable) value;
}
if (value instanceof Map) {
return new HashMap<>((Map) value);
}
throw new IllegalStateException(String.format("Cannot convert: '%s' to Map", value));
}

private static int[] toIntArray(Object value) {
if (value instanceof int[]) {
return (int[]) value;
Expand Down Expand Up @@ -815,6 +835,8 @@ public static ColumnDataType fromDataTypeSV(DataType dataType) {
return JSON;
case BYTES:
return BYTES;
case MAP:
return MAP;
case UNKNOWN:
return UNKNOWN;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.sql.Timestamp;
import java.util.Base64;
import java.util.Collection;
import java.util.Map;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.spi.data.FieldSpec;
Expand Down Expand Up @@ -816,6 +817,25 @@ public Object convert(Object value, PinotDataType sourceType) {
}
},

MAP {
@Override
public Object convert(Object value, PinotDataType sourceType) {
switch (sourceType) {
case OBJECT:
case MAP:
if (value instanceof Map) {
return value;
} else {
throw new UnsupportedOperationException(String.format("Cannot convert '%s' (Class of value: '%s') to MAP",
sourceType, value.getClass()));
}
default:
throw new UnsupportedOperationException(String.format("Cannot convert '%s' (Class of value: '%s') to MAP",
sourceType, value.getClass()));
}
}
},

BYTE_ARRAY {
@Override
public byte[] toBytes(Object value) {
Expand Down Expand Up @@ -1379,6 +1399,9 @@ public static PinotDataType getSingleValueType(Class<?> cls) {
if (cls == Short.class) {
return SHORT;
}
if (cls != null && Map.class.isAssignableFrom(cls)) {
return MAP;
}
return OBJECT;
}

Expand Down Expand Up @@ -1468,6 +1491,11 @@ public static PinotDataType getPinotDataTypeForIngestion(FieldSpec fieldSpec) {
return fieldSpec.isSingleValueField() ? STRING : STRING_ARRAY;
case BYTES:
return fieldSpec.isSingleValueField() ? BYTES : BYTES_ARRAY;
case MAP:
if (fieldSpec.isSingleValueField()) {
return MAP;
}
throw new IllegalStateException("There is no multi-value type for MAP");
default:
throw new UnsupportedOperationException(
"Unsupported data type: " + dataType + " in field: " + fieldSpec.getName());
Expand Down
1 change: 1 addition & 0 deletions pinot-common/src/main/proto/expressions.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ enum ColumnDataType {
STRING_ARRAY = 17;
BYTES_ARRAY = 18;
UNKNOWN = 19;
MAP = 20;
}

message InputRef {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ public void testGetSingleValueType() {
assertEquals(getSingleValueType(tc.getKey()), tc.getValue());
}
assertEquals(getSingleValueType(Object.class), OBJECT);
assertEquals(getSingleValueType(Map.class), MAP);
assertEquals(getSingleValueType(null), OBJECT);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.spi.data.FieldSpec;


Expand Down Expand Up @@ -401,4 +402,8 @@ private <T> T getValues(FieldSpec.DataType dataType, String column) {
private void putValues(FieldSpec.DataType dataType, String column, Object values) {
_valuesMap.get(dataType).put(column, values);
}

public void addDataSource(String fullColumnKeyName, DataSource keyDataSource) {
_dataFetcher.addDataSource(fullColumnKeyName, keyDataSource);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.MapUtils;


/**
Expand Down Expand Up @@ -64,14 +65,9 @@ public DataFetcher(Map<String, DataSource> dataSourceMap) {
_columnValueReaderMap = new HashMap<>();
int maxNumValuesPerMVEntry = 0;
for (Map.Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
String column = entry.getKey();
DataSource dataSource = entry.getValue();
addDataSource(entry.getKey(), dataSource);
DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
ForwardIndexReader<?> forwardIndexReader = dataSource.getForwardIndex();
Preconditions.checkState(forwardIndexReader != null,
"Forward index disabled for column: %s, cannot create DataFetcher!", column);
ColumnValueReader columnValueReader = new ColumnValueReader(forwardIndexReader, dataSource.getDictionary());
_columnValueReaderMap.put(column, columnValueReader);
if (!dataSourceMetadata.isSingleValue()) {
maxNumValuesPerMVEntry = Math.max(maxNumValuesPerMVEntry, dataSourceMetadata.getMaxNumValuesPerMVEntry());
}
Expand All @@ -80,6 +76,14 @@ public DataFetcher(Map<String, DataSource> dataSourceMap) {
_maxNumValuesPerMVEntry = maxNumValuesPerMVEntry;
}

public void addDataSource(String column, DataSource dataSource) {
ForwardIndexReader<?> forwardIndexReader = dataSource.getForwardIndex();
Preconditions.checkState(forwardIndexReader != null,
"Forward index disabled for column: %s, cannot create DataFetcher!", column);
ColumnValueReader columnValueReader = new ColumnValueReader(forwardIndexReader, dataSource.getDictionary());
_columnValueReaderMap.put(column, columnValueReader);
}

/**
* SINGLE-VALUED COLUMN API
*/
Expand Down Expand Up @@ -180,6 +184,22 @@ public void fetchBytesValues(String column, int[] inDocIds, int length, byte[][]
_columnValueReaderMap.get(column).readBytesValues(inDocIds, length, outValues);
}

/**
* Fetch byte[] values for a single-valued column.
*
* @param column Column to read
* @param inDocIds Input document id's buffer
* @param length Number of input document id'
* @param outValues Buffer for output
*/
public void fetchBytesValues(String[] column, int[] inDocIds, int length, byte[][] outValues) {
_columnValueReaderMap.get(column).readBytesValues(inDocIds, length, outValues);
}

public void fetchMapValues(String column, int[] inDocIds, int length, Map[] outValues) {
_columnValueReaderMap.get(column).readMapValues(inDocIds, length, outValues);
}

/**
* MULTI-VALUED COLUMN API
*/
Expand Down Expand Up @@ -421,6 +441,11 @@ void readStringValues(int[] docIds, int length, String[] valueBuffer) {
valueBuffer[i] = BytesUtils.toHexString(_reader.getBytes(docIds[i], readerContext));
}
break;
case MAP:
for (int i = 0; i < length; i++) {
valueBuffer[i] = MapUtils.toString(_reader.getMap(docIds[i], readerContext));
}
break;
default:
throw new IllegalStateException();
}
Expand All @@ -441,6 +466,20 @@ void readBytesValues(int[] docIds, int length, byte[][] valueBuffer) {
}
}

void readMapValues(int[] docIds, int length, Map[] valueBuffer) {
Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
ForwardIndexReaderContext readerContext = getReaderContext();
if (_dictionary != null) {
int[] dictIdBuffer = THREAD_LOCAL_DICT_IDS.get();
_reader.readDictIds(docIds, length, dictIdBuffer, readerContext);
_dictionary.readMapValues(dictIdBuffer, length, valueBuffer);
} else {
for (int i = 0; i < length; i++) {
valueBuffer[i] = MapUtils.deserializeMap(_reader.getBytes(docIds[i], readerContext));
}
}
}

void readDictIdsMV(int[] docIds, int length, int[][] dictIdsBuffer) {
Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
ForwardIndexReaderContext readerContext = getReaderContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.pinot.core.common;

import java.math.BigDecimal;
import java.util.Map;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.MapUtils;


public class RowBasedBlockValueFetcher {
Expand Down Expand Up @@ -67,6 +69,8 @@ private ValueFetcher createFetcher(BlockValSet blockValSet) {
return new StringSingleValueFetcher(blockValSet.getStringValuesSV());
case BYTES:
return new BytesValueFetcher(blockValSet.getBytesValuesSV());
case MAP:
return new MapValueFetcher(blockValSet.getBytesValuesSV());
case UNKNOWN:
return new UnknownValueFetcher();
default:
Expand Down Expand Up @@ -154,6 +158,18 @@ public BigDecimal getValue(int docId) {
}
}

private static class MapValueFetcher implements ValueFetcher {
private final byte[][] _values;

MapValueFetcher(byte[][] values) {
_values = values;
}

public Map getValue(int docId) {
return MapUtils.deserializeMap(_values[docId]);
}
}

private static class StringSingleValueFetcher implements ValueFetcher {
private final String[] _values;

Expand Down
Loading

0 comments on commit ea103c1

Please sign in to comment.