Skip to content

Commit

Permalink
[HUDI-7384] [HUDI-7405] [secondary-index] Secondary index support
Browse files Browse the repository at this point in the history
Initial commit. Supports the following features:
1. Modify schema to add secondary index to metadata
2. New partition type  in the metadata table to store secondary_keys-to-record_keys mapping
3. Various options to support secondary index enablement, column mappings (for secondary keys) etc
4. Initialization of secondary keys
5. Update secondary keys on inserts/upsert/deletes
6. Add hooks in HoodieFileIndex to prune candidate files (to scan) based on secondary key column filters.
7. Add ability in HoodieMetadataLogRecordReader to use unmerged log record scanner (HoodieUnMergedLogRecordScanner)
8. Use the unmerged log record sacnner to buffer records from secondary index partition (of metadata table)
9. Add support for merging secondary index records (from delta log files and base files)

Limitations:
1. Supports only one secondary index at the moment.
2. HoodieUnMergedLogRecordScanner uses a in-memory buffer to store records and subsequently merge the records
3. Scanning of the secondary index partition is done sequentially (both on the query side and the index-maintainance side)

Pending items:
1. Integrate with compaction
  • Loading branch information
Vinaykumar Bhat committed Feb 20, 2024
1 parent fe488bc commit 1a69c6a
Show file tree
Hide file tree
Showing 29 changed files with 1,212 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2493,6 +2493,10 @@ public boolean isLogCompactionEnabledOnMetadata() {
return getBoolean(HoodieMetadataConfig.ENABLE_LOG_COMPACTION_ON_METADATA_TABLE);
}

public boolean isSecondaryIndexEnabled() {
return metadataConfig.enableSecondaryIndex();
}

public boolean isRecordIndexEnabled() {
return metadataConfig.enableRecordIndex();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroup;
Expand Down Expand Up @@ -112,11 +114,14 @@
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchemaForFunctionalIndex;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.readRecordKeysFromBaseFiles;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.readSecondaryKeysFromFileSlices;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.readSecondaryKeysFromFiles;
import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
import static org.apache.hudi.metadata.MetadataPartitionType.FUNCTIONAL_INDEX;
import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
import static org.apache.hudi.metadata.MetadataPartitionType.SECONDARY_INDEX;

/**
* Writer implementation backed by an internal hudi table. Partition and file listing are saved within an internal MOR table
Expand Down Expand Up @@ -223,6 +228,11 @@ private void enablePartitions() {
}
if (dataWriteConfig.isRecordIndexEnabled() || dataMetaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX)) {
this.enabledPartitionTypes.add(RECORD_INDEX);

// Enable secondary index only iff record index is enabled
if (dataWriteConfig.isSecondaryIndexEnabled() || dataMetaClient.getTableConfig().isMetadataPartitionAvailable(SECONDARY_INDEX)) {
this.enabledPartitionTypes.add(SECONDARY_INDEX);
}
}
if (dataMetaClient.getFunctionalIndexMetadata().isPresent()) {
this.enabledPartitionTypes.add(FUNCTIONAL_INDEX);
Expand Down Expand Up @@ -437,6 +447,9 @@ private boolean initializeFromFilesystem(String initializationTime, List<Metadat
case FUNCTIONAL_INDEX:
fileGroupCountAndRecordsPair = initializeFunctionalIndexPartition();
break;
case SECONDARY_INDEX:
fileGroupCountAndRecordsPair = initializeSecondaryIndexPartition();
break;
default:
throw new HoodieMetadataException("Unsupported MDT partition type: " + partitionType);
}
Expand Down Expand Up @@ -519,6 +532,9 @@ protected abstract HoodieData<HoodieRecord> getFunctionalIndexRecords(List<Pair<
int parallelism, Schema readerSchema,
SerializableConfiguration hadoopConf);

public abstract HoodieData<HoodieRecord> getDeletedSecondaryRecordMapping(HoodieEngineContext engineContext,
Map<String, String> recordKeySecondaryKeyMap);

private Pair<Integer, HoodieData<HoodieRecord>> initializeFunctionalIndexPartition() throws Exception {
HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(dataMetaClient, dataMetaClient.getActiveTimeline(), metadata);
String indexName = dataWriteConfig.getFunctionalIndexConfig().getIndexName();
Expand Down Expand Up @@ -546,6 +562,39 @@ private HoodieFunctionalIndexDefinition getFunctionalIndexDefinition(String inde
}
}

private Pair<Integer, HoodieData<HoodieRecord>> initializeSecondaryIndexPartition() throws IOException {
HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(dataMetaClient, dataMetaClient.getActiveTimeline(), metadata);
// Collect the list of latest file slices present in each partition
List<String> partitions = metadata.getAllPartitionPaths();
fsView.loadAllPartitions();

List<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<>();
partitions.forEach(partition -> {
fsView.getLatestFileSlices(partition).forEach(fs -> {
partitionFileSlicePairs.add(Pair.of(partition, fs));
});
});

// Reuse record index parallelism config to build secondary index
int parallelism = Math.min(partitionFileSlicePairs.size(), dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());

HoodieData<HoodieRecord> records = readSecondaryKeysFromFileSlices(
engineContext,
partitionFileSlicePairs,
parallelism,
this.getClass().getSimpleName(),
dataMetaClient,
EngineType.SPARK);

// Initialize the file groups - using the same estimation logic as that of record index
final int fileGroupCount = HoodieTableMetadataUtil.estimateFileGroupCount(RECORD_INDEX, records.count(),
RECORD_INDEX_AVERAGE_RECORD_SIZE, dataWriteConfig.getRecordIndexMinFileGroupCount(),
dataWriteConfig.getRecordIndexMaxFileGroupCount(), dataWriteConfig.getRecordIndexGrowthFactor(),
dataWriteConfig.getRecordIndexMaxFileGroupSizeBytes());

return Pair.of(fileGroupCount, records);
}

private Pair<Integer, HoodieData<HoodieRecord>> initializeRecordIndexPartition() throws IOException {
final HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(dataMetaClient,
dataMetaClient.getActiveTimeline(), metadata);
Expand Down Expand Up @@ -941,6 +990,8 @@ public void updateFromWriteStatuses(HoodieCommitMetadata commitMetadata, HoodieD
HoodieData<HoodieRecord> additionalUpdates = getRecordIndexAdditionalUpserts(updatesFromWriteStatuses, commitMetadata);
partitionToRecordMap.put(RECORD_INDEX, updatesFromWriteStatuses.union(additionalUpdates));
updateFunctionalIndexIfPresent(commitMetadata, instantTime, partitionToRecordMap);
updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, writeStatus);

return partitionToRecordMap;
});
closeInternal();
Expand Down Expand Up @@ -1005,6 +1056,73 @@ private HoodieData<HoodieRecord> getFunctionalIndexUpdates(HoodieCommitMetadata
return getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition, dataMetaClient, parallelism, readerSchema, hadoopConf);
}

private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata, Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordMap, HoodieData<WriteStatus> writeStatus) {
dataMetaClient.getTableConfig().getMetadataPartitions()
.stream()
.filter(partition -> partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX))
.forEach(partition -> {
HoodieData<HoodieRecord> secondaryIndexRecords;
try {
secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata, writeStatus);
} catch (Exception e) {
throw new HoodieMetadataException("Failed to get secondary index updates for partition " + partition, e);
}
partitionToRecordMap.put(SECONDARY_INDEX, secondaryIndexRecords);
});
}

private HoodieData<HoodieRecord> getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, HoodieData<WriteStatus> writeStatus) throws Exception {
// Build a list of basefiles+delta-log-files for every partition that this commit touches
// {
// {
// "partition1", { {"baseFile11", {"logFile11", "logFile12"}}, {"baseFile12", {"logFile11"} } },
// },
// {
// "partition2", { {"baseFile21", {"logFile21", "logFile22"}}, {"baseFile22", {"logFile21"} } }
// }
// }
List<Pair<String, Pair<String, List<String>>>> partitionFilePairs = new ArrayList<>();
commitMetadata.getPartitionToWriteStats().forEach((dataPartition, writeStats) -> {
writeStats.forEach(writeStat -> {
if (writeStat instanceof HoodieDeltaWriteStat) {
partitionFilePairs.add(Pair.of(dataPartition, Pair.of(((HoodieDeltaWriteStat) writeStat).getBaseFile(), ((HoodieDeltaWriteStat) writeStat).getLogFiles())));
} else {
partitionFilePairs.add(Pair.of(dataPartition, Pair.of(writeStat.getPath(), new ArrayList<>())));
}
});
});

// Build a list of keys that need to be removed. A 'delete' record will be emitted into the respective FileGroup of
// the secondary index partition for each of these keys. For a commit which is deleting/updating a lot of records, this
// operation is going to be expensive (in CPU, memory and IO)
List<String> keysToRemove = new ArrayList<>();
writeStatus.collectAsList().forEach(status -> {
status.getWrittenRecordDelegates().forEach(recordDelegate -> {
// Consider those keys which were either updated or deleted in this commit
if (!recordDelegate.getNewLocation().isPresent() || (recordDelegate.getCurrentLocation().isPresent() && recordDelegate.getNewLocation().isPresent())) {
keysToRemove.add(recordDelegate.getRecordKey());
}
});
});

// Fetch the secondary keys that each of the record keys ('keysToRemove') maps to
// This is obtained by scanning the entire secondary index partition in the metadata table
// This could be an expensive operation for a large commit (updating/deleting millions of rows)
Map<String, String> recordKeySecondaryKeyMap = metadata.getSecondaryKeys(keysToRemove);
HoodieData<HoodieRecord> deletedRecords = getDeletedSecondaryRecordMapping(engineContext, recordKeySecondaryKeyMap);

// Reuse record index parallelism config to build secondary index
int parallelism = Math.min(partitionFilePairs.size(), dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());

return deletedRecords.union(readSecondaryKeysFromFiles(
engineContext,
partitionFilePairs,
parallelism,
this.getClass().getSimpleName(),
dataMetaClient,
EngineType.SPARK));
}

/**
* Update from {@code HoodieCleanMetadata}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,9 @@ protected HoodieData<HoodieRecord> getFunctionalIndexRecords(List<Pair<String, F
int parallelism, Schema readerSchema, SerializableConfiguration hadoopConf) {
throw new HoodieNotSupportedException("Flink metadata table does not support functional index yet.");
}

@Override
public HoodieData<HoodieRecord> getDeletedSecondaryRecordMapping(HoodieEngineContext engineContext, Map<String, String> recordKeySecondaryKeyMap) {
throw new HoodieNotSupportedException("Flink metadata table does not support secondary index yet.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,9 @@ protected HoodieData<HoodieRecord> getFunctionalIndexRecords(List<Pair<String, F
int parallelism, Schema readerSchema, SerializableConfiguration hadoopConf) {
throw new HoodieNotSupportedException("Functional index not supported for Java metadata table writer yet.");
}

@Override
public HoodieData<HoodieRecord> getDeletedSecondaryRecordMapping(HoodieEngineContext engineContext, Map<String, String> recordKeySecondaryKeyMap) {
throw new HoodieNotSupportedException("Secondary index not supported for Java metadata table writer yet.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -216,4 +217,19 @@ protected HoodieData<HoodieRecord> getFunctionalIndexRecords(List<Pair<String, F
public BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?, ?> initializeWriteClient() {
return new SparkRDDWriteClient(engineContext, metadataWriteConfig, true);
}

@Override
public HoodieData<HoodieRecord> getDeletedSecondaryRecordMapping(HoodieEngineContext engineContext,
Map<String, String> recordKeySecondaryKeyMap) {
HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) engineContext;

if (recordKeySecondaryKeyMap.isEmpty()) {
return sparkEngineContext.emptyHoodieData();
}

List<HoodieRecord> deletedRecords = new ArrayList<>();
recordKeySecondaryKeyMap.forEach((key, value) -> deletedRecords.add(HoodieMetadataPayload.createSecondaryIndex(key, value, true)));

return HoodieJavaRDD.of(deletedRecords, sparkEngineContext, 1);
}
}
28 changes: 28 additions & 0 deletions hudi-common/src/main/avro/HoodieMetadata.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,34 @@
}
],
"default" : null
},
{
"name": "SecondaryIndexMetadata",
"doc": "Metadata Index that contains information about secondary keys and the corresponding record keys in the dataset",
"type": [
"null",
{
"type": "record",
"name": "HoodieSecondaryIndexInfo",
"fields": [
{
"name": "recordKey",
"type": [
"null",
"string"
],
"default": null,
"doc": "Refers to the record key that this secondary key maps to"
},
{
"name": "isDeleted",
"type": "boolean",
"doc": "True if this entry has been deleted"
}
]
}
],
"default" : null
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,13 @@ public final class HoodieMetadataConfig extends HoodieConfig {
.withDocumentation("When there is a pending instant in data table, this config limits the allowed number of deltacommits in metadata table to "
+ "prevent the metadata table's timeline from growing unboundedly as compaction won't be triggered due to the pending data table instant.");

public static final ConfigProperty<Boolean> SECONDARY_INDEX_ENABLE_PROP = ConfigProperty
.key(METADATA_PREFIX + ".secondary.index.enable")
.defaultValue(false)
.markAdvanced()
.sinceVersion("1.0.0")
.withDocumentation("Create the HUDI Secondary (non-unique) Index within the Metadata Table");

public static final ConfigProperty<Boolean> RECORD_INDEX_ENABLE_PROP = ConfigProperty
.key(METADATA_PREFIX + ".record.index.enable")
.defaultValue(false)
Expand Down Expand Up @@ -418,6 +425,11 @@ public int getMaxNumDeltacommitsWhenPending() {
return getIntOrDefault(METADATA_MAX_NUM_DELTACOMMITS_WHEN_PENDING);
}

public boolean enableSecondaryIndex() {
// Secondary index is enabled only iff record index (primary key index) is also enabled
return enableRecordIndex() && getBoolean(SECONDARY_INDEX_ENABLE_PROP);
}

public boolean enableRecordIndex() {
return enabled() && getBoolean(RECORD_INDEX_ENABLE_PROP);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ public class HoodieTableConfig extends HoodieConfig {
.withDocumentation("Columns used to uniquely identify the table. Concatenated values of these fields are used as "
+ " the record key component of HoodieKey.");

public static final ConfigProperty<String> SECONDARYKEY_FIELDS = ConfigProperty
.key("hoodie.table.secondarykey.fields")
.noDefaultValue()
.withDocumentation("Columns used to uniquely identify the table. Concatenated values of these fields are used as "
+ " the record key component of HoodieKey.");

public static final ConfigProperty<Boolean> CDC_ENABLED = ConfigProperty
.key("hoodie.table.cdc.enabled")
.defaultValue(false)
Expand Down Expand Up @@ -540,6 +546,16 @@ public Option<String[]> getRecordKeyFields() {
}
}

public Option<String[]> getSecondaryKeyFields() {
String secondaryKeyFieldsValue = getStringOrDefault(SECONDARYKEY_FIELDS, null);
if (secondaryKeyFieldsValue == null) {
return Option.empty();
} else {
return Option.of(Arrays.stream(secondaryKeyFieldsValue.split(","))
.filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new String[] {}));
}
}

public Option<String[]> getPartitionFields() {
if (contains(PARTITION_FIELDS)) {
return Option.of(Arrays.stream(getString(PARTITION_FIELDS).split(","))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,7 @@ public static class PropertyBuilder {
private String tableName;
private String tableCreateSchema;
private String recordKeyFields;
private String secondaryKeyFields;
private String archiveLogFolder;
private String payloadClassName;
private String payloadType;
Expand Down Expand Up @@ -988,6 +989,11 @@ public PropertyBuilder setRecordKeyFields(String recordKeyFields) {
return this;
}

public PropertyBuilder setSecondaryKeyFields(String secondaryKeyFields) {
this.secondaryKeyFields = secondaryKeyFields;
return this;
}

public PropertyBuilder setArchiveLogFolder(String archiveLogFolder) {
this.archiveLogFolder = archiveLogFolder;
return this;
Expand Down Expand Up @@ -1208,6 +1214,9 @@ public PropertyBuilder fromProperties(Properties properties) {
if (hoodieConfig.contains(HoodieTableConfig.RECORDKEY_FIELDS)) {
setRecordKeyFields(hoodieConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS));
}
if (hoodieConfig.contains(HoodieTableConfig.SECONDARYKEY_FIELDS)) {
setSecondaryKeyFields(hoodieConfig.getString(HoodieTableConfig.SECONDARYKEY_FIELDS));
}
if (hoodieConfig.contains(HoodieTableConfig.CDC_ENABLED)) {
setCDCEnabled(hoodieConfig.getBoolean(HoodieTableConfig.CDC_ENABLED));
}
Expand Down Expand Up @@ -1322,6 +1331,11 @@ public Properties build() {
if (null != recordKeyFields) {
tableConfig.setValue(HoodieTableConfig.RECORDKEY_FIELDS, recordKeyFields);
}

if (null != secondaryKeyFields) {
tableConfig.setValue(HoodieTableConfig.SECONDARYKEY_FIELDS, secondaryKeyFields);
}

if (null != cdcEnabled) {
tableConfig.setValue(HoodieTableConfig.CDC_ENABLED, Boolean.toString(cdcEnabled));
if (cdcEnabled && null != cdcSupplementalLoggingMode) {
Expand Down
Loading

0 comments on commit 1a69c6a

Please sign in to comment.