Skip to content

Commit

Permalink
[HUDI-7384][secondary-index][write] build secondary index on the keys…
Browse files Browse the repository at this point in the history
… defined through options

Initial commit. Supports the following features:
1. Modify schema to add secondary index to metadata
2. New partition type  in the metadata table to store secondary_keys-to-record_keys mapping
3. Various options to support secondary index enablement, column mappings (for secondary keys) etc
4. Initialization of secondary keys
5. Update secondary keys on inserts/upserts

Supports only one secondary index at the moment. The PR is still a WIP and needs more work to handle deletions,
proper merging, compaction, (re) clustering among other things.
  • Loading branch information
Vinaykumar Bhat committed Feb 16, 2024
1 parent fe488bc commit 804d739
Show file tree
Hide file tree
Showing 17 changed files with 549 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2493,6 +2493,10 @@ public boolean isLogCompactionEnabledOnMetadata() {
return getBoolean(HoodieMetadataConfig.ENABLE_LOG_COMPACTION_ON_METADATA_TABLE);
}

public boolean isSecondaryIndexEnabled() {
return metadataConfig.enableSecondaryIndex();
}

public boolean isRecordIndexEnabled() {
return metadataConfig.enableRecordIndex();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroup;
Expand Down Expand Up @@ -112,11 +114,14 @@
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchemaForFunctionalIndex;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.readRecordKeysFromBaseFiles;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.readSecondaryKeysFromFileSlices;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.readSecondaryKeysFromFiles;
import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
import static org.apache.hudi.metadata.MetadataPartitionType.FUNCTIONAL_INDEX;
import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
import static org.apache.hudi.metadata.MetadataPartitionType.SECONDARY_INDEX;

/**
* Writer implementation backed by an internal hudi table. Partition and file listing are saved within an internal MOR table
Expand Down Expand Up @@ -223,6 +228,11 @@ private void enablePartitions() {
}
if (dataWriteConfig.isRecordIndexEnabled() || dataMetaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX)) {
this.enabledPartitionTypes.add(RECORD_INDEX);

// Enable secondary index only iff record index is enabled
if (dataWriteConfig.isSecondaryIndexEnabled() || dataMetaClient.getTableConfig().isMetadataPartitionAvailable(SECONDARY_INDEX)) {
this.enabledPartitionTypes.add(SECONDARY_INDEX);
}
}
if (dataMetaClient.getFunctionalIndexMetadata().isPresent()) {
this.enabledPartitionTypes.add(FUNCTIONAL_INDEX);
Expand Down Expand Up @@ -437,6 +447,9 @@ private boolean initializeFromFilesystem(String initializationTime, List<Metadat
case FUNCTIONAL_INDEX:
fileGroupCountAndRecordsPair = initializeFunctionalIndexPartition();
break;
case SECONDARY_INDEX:
fileGroupCountAndRecordsPair = initializeSecondaryIndexPartition();
break;
default:
throw new HoodieMetadataException("Unsupported MDT partition type: " + partitionType);
}
Expand Down Expand Up @@ -546,6 +559,40 @@ private HoodieFunctionalIndexDefinition getFunctionalIndexDefinition(String inde
}
}

private Pair<Integer, HoodieData<HoodieRecord>> initializeSecondaryIndexPartition() throws IOException {
HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(dataMetaClient, dataMetaClient.getActiveTimeline(), metadata);
// Collect the list of latest file slices present in each partition
List<String> partitions = metadata.getAllPartitionPaths();
fsView.loadAllPartitions();

List<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<>();
partitions.forEach(partition -> {
fsView.getLatestFileSlices(partition).forEach(fs -> {
partitionFileSlicePairs.add(Pair.of(partition, fs));
});
});

// Reuse record index parallelism config to build secondary index
int parallelism = Math.min(partitionFileSlicePairs.size(), dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());

HoodieData<HoodieRecord> records = readSecondaryKeysFromFileSlices(
engineContext,
partitionFileSlicePairs,
false,
parallelism,
this.getClass().getSimpleName(),
dataMetaClient,
EngineType.SPARK);

// Initialize the file groups - using the same estimation logic as that of record index
final int fileGroupCount = HoodieTableMetadataUtil.estimateFileGroupCount(RECORD_INDEX, records.count(),
RECORD_INDEX_AVERAGE_RECORD_SIZE, dataWriteConfig.getRecordIndexMinFileGroupCount(),
dataWriteConfig.getRecordIndexMaxFileGroupCount(), dataWriteConfig.getRecordIndexGrowthFactor(),
dataWriteConfig.getRecordIndexMaxFileGroupSizeBytes());

return Pair.of(fileGroupCount, records);
}

private Pair<Integer, HoodieData<HoodieRecord>> initializeRecordIndexPartition() throws IOException {
final HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(dataMetaClient,
dataMetaClient.getActiveTimeline(), metadata);
Expand Down Expand Up @@ -941,6 +988,8 @@ public void updateFromWriteStatuses(HoodieCommitMetadata commitMetadata, HoodieD
HoodieData<HoodieRecord> additionalUpdates = getRecordIndexAdditionalUpserts(updatesFromWriteStatuses, commitMetadata);
partitionToRecordMap.put(RECORD_INDEX, updatesFromWriteStatuses.union(additionalUpdates));
updateFunctionalIndexIfPresent(commitMetadata, instantTime, partitionToRecordMap);
updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap);

return partitionToRecordMap;
});
closeInternal();
Expand Down Expand Up @@ -1005,6 +1054,61 @@ private HoodieData<HoodieRecord> getFunctionalIndexUpdates(HoodieCommitMetadata
return getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition, dataMetaClient, parallelism, readerSchema, hadoopConf);
}

private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata, Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordMap) {
dataMetaClient.getTableConfig().getMetadataPartitions()
.stream()
.filter(partition -> partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX))
.forEach(partition -> {
HoodieData<HoodieRecord> secondaryIndexRecords;
try {
secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata);
} catch (Exception e) {
throw new HoodieMetadataException("Failed to get secondary index updates for partition " + partition, e);
}
partitionToRecordMap.put(SECONDARY_INDEX, secondaryIndexRecords);
});
}

private HoodieData<HoodieRecord> getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata) throws Exception {
/* Build a list of basefiles+delta-log-files for every partition that this commit touches */
// {
// {
// "partition1",
// {
// {"baseFile11", {"logFile11", "logFile12"}}, {"baseFile12", {"logFile11"}}
// },
// },
// {
// "partition2",
// {
// {"baseFile21", {"logFile21", "logFile22"}}, {"baseFile22", {"logFile21"}}
// }
// }
// }
List<Pair<String, Pair<String, List<String>>>> partitionFilePairs = new ArrayList<>();
commitMetadata.getPartitionToWriteStats().forEach((dataPartition, writeStats) -> {
writeStats.forEach(writeStat -> {
if (writeStat instanceof HoodieDeltaWriteStat) {
partitionFilePairs.add(Pair.of(dataPartition, Pair.of(((HoodieDeltaWriteStat) writeStat).getBaseFile(), ((HoodieDeltaWriteStat) writeStat).getLogFiles())));
} else {
partitionFilePairs.add(Pair.of(dataPartition, Pair.of(writeStat.getPath(), new ArrayList<>())));
}
});
});

// Reuse record index parallelism config to build secondary index
int parallelism = Math.min(partitionFilePairs.size(), dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());

return readSecondaryKeysFromFiles(
engineContext,
partitionFilePairs,
false,
parallelism,
this.getClass().getSimpleName(),
dataMetaClient,
EngineType.SPARK);
}

/**
* Update from {@code HoodieCleanMetadata}.
*
Expand Down
28 changes: 28 additions & 0 deletions hudi-common/src/main/avro/HoodieMetadata.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,34 @@
}
],
"default" : null
},
{
"name": "SecondaryIndexMetadata",
"doc": "Metadata Index that contains information about secondary keys and the corresponding record keys in the dataset",
"type": [
"null",
{
"type": "record",
"name": "HoodieSecondaryIndexInfo",
"fields": [
{
"name": "recordKey",
"type": [
"null",
"string"
],
"default": null,
"doc": "Refers to the record key that this secondary key maps to"
},
{
"name": "isDeleted",
"type": "boolean",
"doc": "True if this entry has been deleted"
}
]
}
],
"default" : null
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,13 @@ public final class HoodieMetadataConfig extends HoodieConfig {
.withDocumentation("When there is a pending instant in data table, this config limits the allowed number of deltacommits in metadata table to "
+ "prevent the metadata table's timeline from growing unboundedly as compaction won't be triggered due to the pending data table instant.");

public static final ConfigProperty<Boolean> SECONDARY_INDEX_ENABLE_PROP = ConfigProperty
.key(METADATA_PREFIX + ".secondary.index.enable")
.defaultValue(false)
.markAdvanced()
.sinceVersion("1.0.0")
.withDocumentation("Create the HUDI Secondary (non-unique) Index within the Metadata Table");

public static final ConfigProperty<Boolean> RECORD_INDEX_ENABLE_PROP = ConfigProperty
.key(METADATA_PREFIX + ".record.index.enable")
.defaultValue(false)
Expand Down Expand Up @@ -418,6 +425,11 @@ public int getMaxNumDeltacommitsWhenPending() {
return getIntOrDefault(METADATA_MAX_NUM_DELTACOMMITS_WHEN_PENDING);
}

public boolean enableSecondaryIndex() {
// Secondary index is enabled only iff record index (primary key index) is also enabled
return enableRecordIndex() && getBoolean(SECONDARY_INDEX_ENABLE_PROP);
}

public boolean enableRecordIndex() {
return enabled() && getBoolean(RECORD_INDEX_ENABLE_PROP);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ public class HoodieTableConfig extends HoodieConfig {
.withDocumentation("Columns used to uniquely identify the table. Concatenated values of these fields are used as "
+ " the record key component of HoodieKey.");

public static final ConfigProperty<String> SECONDARYKEY_FIELDS = ConfigProperty
.key("hoodie.table.secondarykey.fields")
.noDefaultValue()
.withDocumentation("Columns used to uniquely identify the table. Concatenated values of these fields are used as "
+ " the record key component of HoodieKey.");

public static final ConfigProperty<Boolean> CDC_ENABLED = ConfigProperty
.key("hoodie.table.cdc.enabled")
.defaultValue(false)
Expand Down Expand Up @@ -540,6 +546,16 @@ public Option<String[]> getRecordKeyFields() {
}
}

public Option<String[]> getSecondaryKeyFields() {
String secondaryKeyFieldsValue = getStringOrDefault(SECONDARYKEY_FIELDS, null);
if (secondaryKeyFieldsValue == null) {
return Option.empty();
} else {
return Option.of(Arrays.stream(secondaryKeyFieldsValue.split(","))
.filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new String[] {}));
}
}

public Option<String[]> getPartitionFields() {
if (contains(PARTITION_FIELDS)) {
return Option.of(Arrays.stream(getString(PARTITION_FIELDS).split(","))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,7 @@ public static class PropertyBuilder {
private String tableName;
private String tableCreateSchema;
private String recordKeyFields;
private String secondaryKeyFields;
private String archiveLogFolder;
private String payloadClassName;
private String payloadType;
Expand Down Expand Up @@ -988,6 +989,11 @@ public PropertyBuilder setRecordKeyFields(String recordKeyFields) {
return this;
}

public PropertyBuilder setSecondaryKeyFields(String secondaryKeyFields) {
this.secondaryKeyFields = secondaryKeyFields;
return this;
}

public PropertyBuilder setArchiveLogFolder(String archiveLogFolder) {
this.archiveLogFolder = archiveLogFolder;
return this;
Expand Down Expand Up @@ -1208,6 +1214,9 @@ public PropertyBuilder fromProperties(Properties properties) {
if (hoodieConfig.contains(HoodieTableConfig.RECORDKEY_FIELDS)) {
setRecordKeyFields(hoodieConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS));
}
if (hoodieConfig.contains(HoodieTableConfig.SECONDARYKEY_FIELDS)) {
setSecondaryKeyFields(hoodieConfig.getString(HoodieTableConfig.SECONDARYKEY_FIELDS));
}
if (hoodieConfig.contains(HoodieTableConfig.CDC_ENABLED)) {
setCDCEnabled(hoodieConfig.getBoolean(HoodieTableConfig.CDC_ENABLED));
}
Expand Down Expand Up @@ -1322,6 +1331,11 @@ public Properties build() {
if (null != recordKeyFields) {
tableConfig.setValue(HoodieTableConfig.RECORDKEY_FIELDS, recordKeyFields);
}

if (null != secondaryKeyFields) {
tableConfig.setValue(HoodieTableConfig.SECONDARYKEY_FIELDS, secondaryKeyFields);
}

if (null != cdcEnabled) {
tableConfig.setValue(HoodieTableConfig.CDC_ENABLED, Boolean.toString(cdcEnabled));
if (cdcEnabled && null != cdcSupplementalLoggingMode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ public class KeyGeneratorOptions extends HoodieConfig {
+ "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using\n"
+ "the dot notation eg: `a.b.c`");

public static final ConfigProperty<String> SECONDARYKEY_FIELD_NAME = ConfigProperty
.key("hoodie.datasource.write.secondarykey.field")
.noDefaultValue()
.withDocumentation("Secondary key field. Columns that consitute the secondary key component.\n"
+ "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using\n"
+ "the dot notation eg: `a.b.c`");

public static final ConfigProperty<String> PARTITIONPATH_FIELD_NAME = ConfigProperty
.key("hoodie.datasource.write.partitionpath.field")
.noDefaultValue()
Expand Down
Loading

0 comments on commit 804d739

Please sign in to comment.