Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add map type support #13906

Merged
merged 1 commit into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.pinot.segment.spi.memory.PinotInputStream;
import org.apache.pinot.spi.utils.BigDecimalUtils;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.MapUtils;
import org.roaringbitmap.RoaringBitmap;

import static java.nio.charset.StandardCharsets.UTF_8;
Expand Down Expand Up @@ -302,6 +303,17 @@ public String[] getStringArray(int rowId, int colId) {
return strings;
}

@Override
public Map<String, Object> getMap(int rowId, int colId) {
int offsetInFixed = getOffsetInFixedBuffer(rowId, colId);
int size = _fixedSizeData.getInt(offsetInFixed + 4);
int offsetInVar = _fixedSizeData.getInt(offsetInFixed);

byte[] buffer = new byte[size];
_variableSizeData.copyTo(offsetInVar, buffer, 0, size);
return MapUtils.deserializeMap(buffer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How expensive will be deserializeMap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be expensive for general map conversion and it's ok to be slow for select the whole map column.
The major purpose of introducing MAP type is to support more indexing for filtering and aggregation.

}

@Nullable
@Override
public CustomObject getCustomObject(int rowId, int colId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ List<ByteBuffer> serialize()

String[] getStringArray(int rowId, int colId);

Map<String, Object> getMap(int rowId, int colId);

CustomObject getCustomObject(int rowId, int colId);

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ byte[] toBytes()

String[] getStringArray(int rowId, int colId);

@Nullable
Map<String, Object> getMap(int rowId, int colId);

@Nullable
CustomObject getCustomObject(int rowId, int colId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.BigDecimalUtils;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.MapUtils;
import org.roaringbitmap.RoaringBitmap;

import static java.nio.charset.StandardCharsets.UTF_8;
Expand Down Expand Up @@ -317,6 +318,18 @@ public String[] getStringArray(int rowId, int colId) {
return strings;
}

@Nullable
@Override
public Map<String, Object> getMap(int rowId, int colId) {
int size = positionOffsetInVariableBufferAndGetLength(rowId, colId);
if (size == 0) {
return null;
}
ByteBuffer buffer = _variableSizeData.slice();
buffer.limit(size);
return MapUtils.deserializeMap(buffer);
}

@Nullable
@Override
public CustomObject getCustomObject(int rowId, int colId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ private FunctionUtils() {
put(float[].class, PinotDataType.PRIMITIVE_FLOAT_ARRAY);
put(double[].class, PinotDataType.PRIMITIVE_DOUBLE_ARRAY);
put(String[].class, PinotDataType.STRING_ARRAY);
put(Map.class, PinotDataType.MAP);
put(Object.class, PinotDataType.OBJECT);
}};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,10 @@ public enum TransformFunctionType {
COSH("cosh"),
TANH("tanh"),
DEGREES("degrees"),
RADIANS("radians");
RADIANS("radians"),

// Complex type handling
ITEM("item");

private final String _name;
private final List<String> _names;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.type.SqlTypeName;
Expand Down Expand Up @@ -288,6 +290,12 @@ public RelDataType toType(RelDataTypeFactory typeFactory) {
return typeFactory.createSqlType(SqlTypeName.VARCHAR);
}
},
MAP(NullValuePlaceHolder.MAP) {
@Override
public RelDataType toType(RelDataTypeFactory typeFactory) {
return typeFactory.createSqlType(SqlTypeName.MAP);
}
},
BYTES(NullValuePlaceHolder.INTERNAL_BYTES) {
@Override
public RelDataType toType(RelDataTypeFactory typeFactory) {
Expand Down Expand Up @@ -634,6 +642,8 @@ public Serializable convertAndFormat(Object value) {
return value.toString();
case BYTES:
return ((ByteArray) value).toHexString();
case MAP:
return toMap(value);
case INT_ARRAY:
return (int[]) value;
case LONG_ARRAY:
Expand All @@ -655,6 +665,16 @@ public Serializable convertAndFormat(Object value) {
}
}

private Serializable toMap(Object value) {
if (value instanceof Serializable) {
return (Serializable) value;
}
if (value instanceof Map) {
return new HashMap<>((Map) value);
}
throw new IllegalStateException(String.format("Cannot convert: '%s' to Map", value));
}

private static int[] toIntArray(Object value) {
if (value instanceof int[]) {
return (int[]) value;
Expand Down Expand Up @@ -815,6 +835,8 @@ public static ColumnDataType fromDataTypeSV(DataType dataType) {
return JSON;
case BYTES:
return BYTES;
case MAP:
return MAP;
case UNKNOWN:
return UNKNOWN;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.sql.Timestamp;
import java.util.Base64;
import java.util.Collection;
import java.util.Map;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.spi.data.FieldSpec;
Expand Down Expand Up @@ -816,6 +817,25 @@ public Object convert(Object value, PinotDataType sourceType) {
}
},

MAP {
@Override
public Object convert(Object value, PinotDataType sourceType) {
switch (sourceType) {
case OBJECT:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we adding Object Type as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Source type may not be recognized as MAP, it could be OBJECT as well.

case MAP:
if (value instanceof Map) {
return value;
} else {
throw new UnsupportedOperationException(String.format("Cannot convert '%s' (Class of value: '%s') to MAP",
sourceType, value.getClass()));
}
default:
throw new UnsupportedOperationException(String.format("Cannot convert '%s' (Class of value: '%s') to MAP",
sourceType, value.getClass()));
}
}
},

BYTE_ARRAY {
@Override
public byte[] toBytes(Object value) {
Expand Down Expand Up @@ -1379,6 +1399,9 @@ public static PinotDataType getSingleValueType(Class<?> cls) {
if (cls == Short.class) {
return SHORT;
}
if (cls != null && Map.class.isAssignableFrom(cls)) {
return MAP;
}
return OBJECT;
}

Expand Down Expand Up @@ -1468,6 +1491,11 @@ public static PinotDataType getPinotDataTypeForIngestion(FieldSpec fieldSpec) {
return fieldSpec.isSingleValueField() ? STRING : STRING_ARRAY;
case BYTES:
return fieldSpec.isSingleValueField() ? BYTES : BYTES_ARRAY;
case MAP:
if (fieldSpec.isSingleValueField()) {
return MAP;
}
throw new IllegalStateException("There is no multi-value type for MAP");
default:
throw new UnsupportedOperationException(
"Unsupported data type: " + dataType + " in field: " + fieldSpec.getName());
Expand Down
1 change: 1 addition & 0 deletions pinot-common/src/main/proto/expressions.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ enum ColumnDataType {
STRING_ARRAY = 17;
BYTES_ARRAY = 18;
UNKNOWN = 19;
MAP = 20;
}

message InputRef {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ public void testGetSingleValueType() {
assertEquals(getSingleValueType(tc.getKey()), tc.getValue());
}
assertEquals(getSingleValueType(Object.class), OBJECT);
assertEquals(getSingleValueType(Map.class), MAP);
assertEquals(getSingleValueType(null), OBJECT);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.spi.data.FieldSpec;


Expand Down Expand Up @@ -401,4 +402,8 @@ private <T> T getValues(FieldSpec.DataType dataType, String column) {
private void putValues(FieldSpec.DataType dataType, String column, Object values) {
_valuesMap.get(dataType).put(column, values);
}

public void addDataSource(String fullColumnKeyName, DataSource keyDataSource) {
_dataFetcher.addDataSource(fullColumnKeyName, keyDataSource);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.MapUtils;


/**
Expand Down Expand Up @@ -64,14 +65,9 @@ public DataFetcher(Map<String, DataSource> dataSourceMap) {
_columnValueReaderMap = new HashMap<>();
int maxNumValuesPerMVEntry = 0;
for (Map.Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
String column = entry.getKey();
DataSource dataSource = entry.getValue();
addDataSource(entry.getKey(), dataSource);
DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
ForwardIndexReader<?> forwardIndexReader = dataSource.getForwardIndex();
Preconditions.checkState(forwardIndexReader != null,
"Forward index disabled for column: %s, cannot create DataFetcher!", column);
ColumnValueReader columnValueReader = new ColumnValueReader(forwardIndexReader, dataSource.getDictionary());
_columnValueReaderMap.put(column, columnValueReader);
if (!dataSourceMetadata.isSingleValue()) {
maxNumValuesPerMVEntry = Math.max(maxNumValuesPerMVEntry, dataSourceMetadata.getMaxNumValuesPerMVEntry());
}
Expand All @@ -80,6 +76,14 @@ public DataFetcher(Map<String, DataSource> dataSourceMap) {
_maxNumValuesPerMVEntry = maxNumValuesPerMVEntry;
}

public void addDataSource(String column, DataSource dataSource) {
ForwardIndexReader<?> forwardIndexReader = dataSource.getForwardIndex();
Preconditions.checkState(forwardIndexReader != null,
"Forward index disabled for column: %s, cannot create DataFetcher!", column);
ColumnValueReader columnValueReader = new ColumnValueReader(forwardIndexReader, dataSource.getDictionary());
_columnValueReaderMap.put(column, columnValueReader);
}

/**
* SINGLE-VALUED COLUMN API
*/
Expand Down Expand Up @@ -180,6 +184,22 @@ public void fetchBytesValues(String column, int[] inDocIds, int length, byte[][]
_columnValueReaderMap.get(column).readBytesValues(inDocIds, length, outValues);
}

/**
* Fetch byte[] values for a single-valued column.
*
* @param column Column to read
* @param inDocIds Input document id's buffer
* @param length Number of input document id'
* @param outValues Buffer for output
*/
public void fetchBytesValues(String[] column, int[] inDocIds, int length, byte[][] outValues) {
_columnValueReaderMap.get(column).readBytesValues(inDocIds, length, outValues);
}

public void fetchMapValues(String column, int[] inDocIds, int length, Map[] outValues) {
_columnValueReaderMap.get(column).readMapValues(inDocIds, length, outValues);
}

/**
* MULTI-VALUED COLUMN API
*/
Expand Down Expand Up @@ -421,6 +441,11 @@ void readStringValues(int[] docIds, int length, String[] valueBuffer) {
valueBuffer[i] = BytesUtils.toHexString(_reader.getBytes(docIds[i], readerContext));
}
break;
case MAP:
for (int i = 0; i < length; i++) {
valueBuffer[i] = MapUtils.toString(_reader.getMap(docIds[i], readerContext));
}
break;
default:
throw new IllegalStateException();
}
Expand All @@ -441,6 +466,20 @@ void readBytesValues(int[] docIds, int length, byte[][] valueBuffer) {
}
}

void readMapValues(int[] docIds, int length, Map[] valueBuffer) {
Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
ForwardIndexReaderContext readerContext = getReaderContext();
if (_dictionary != null) {
int[] dictIdBuffer = THREAD_LOCAL_DICT_IDS.get();
_reader.readDictIds(docIds, length, dictIdBuffer, readerContext);
_dictionary.readMapValues(dictIdBuffer, length, valueBuffer);
} else {
for (int i = 0; i < length; i++) {
valueBuffer[i] = MapUtils.deserializeMap(_reader.getBytes(docIds[i], readerContext));
}
}
}

void readDictIdsMV(int[] docIds, int length, int[][] dictIdsBuffer) {
Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
ForwardIndexReaderContext readerContext = getReaderContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.pinot.core.common;

import java.math.BigDecimal;
import java.util.Map;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.MapUtils;


public class RowBasedBlockValueFetcher {
Expand Down Expand Up @@ -67,6 +69,8 @@ private ValueFetcher createFetcher(BlockValSet blockValSet) {
return new StringSingleValueFetcher(blockValSet.getStringValuesSV());
case BYTES:
return new BytesValueFetcher(blockValSet.getBytesValuesSV());
case MAP:
return new MapValueFetcher(blockValSet.getBytesValuesSV());
case UNKNOWN:
return new UnknownValueFetcher();
default:
Expand Down Expand Up @@ -154,6 +158,18 @@ public BigDecimal getValue(int docId) {
}
}

private static class MapValueFetcher implements ValueFetcher {
private final byte[][] _values;

MapValueFetcher(byte[][] values) {
_values = values;
}

public Map getValue(int docId) {
return MapUtils.deserializeMap(_values[docId]);
}
}

private static class StringSingleValueFetcher implements ValueFetcher {
private final String[] _values;

Expand Down
Loading
Loading