From 1a69c6a5dec0d8ba1e0d465eda0cf4e488a22b30 Mon Sep 17 00:00:00 2001 From: Vinaykumar Bhat Date: Wed, 31 Jan 2024 10:22:21 +0530 Subject: [PATCH] [HUDI-7384] [HUDI-7405] [secondary-index] Secondary index support Initial commit. Supports the following features: 1. Modify schema to add secondary index to metadata 2. New partition type in the metadata table to store secondary_keys-to-record_keys mapping 3. Various options to support secondary index enablement, column mappings (for secondary keys) etc 4. Initialization of secondary keys 5. Update secondary keys on inserts/upsert/deletes 6. Add hooks in HoodieFileIndex to prune candidate files (to scan) based on secondary key column filters. 7. Add ability in HoodieMetadataLogRecordReader to use unmerged log record scanner (HoodieUnMergedLogRecordScanner) 8. Use the unmerged log record sacnner to buffer records from secondary index partition (of metadata table) 9. Add support for merging secondary index records (from delta log files and base files) Limitations: 1. Supports only one secondary index at the moment. 2. HoodieUnMergedLogRecordScanner uses a in-memory buffer to store records and subsequently merge the records 3. Scanning of the secondary index partition is done sequentially (both on the query side and the index-maintainance side) Pending items: 1. Integrate with compaction --- .../apache/hudi/config/HoodieWriteConfig.java | 4 + .../HoodieBackedTableMetadataWriter.java | 118 +++++++ .../FlinkHoodieBackedTableMetadataWriter.java | 5 + .../JavaHoodieBackedTableMetadataWriter.java | 5 + .../SparkHoodieBackedTableMetadataWriter.java | 16 + hudi-common/src/main/avro/HoodieMetadata.avsc | 28 ++ .../common/config/HoodieMetadataConfig.java | 12 + .../hudi/common/table/HoodieTableConfig.java | 16 + .../common/table/HoodieTableMetaClient.java | 14 + .../table/log/HoodieFileSliceReader.java | 0 .../log/HoodieUnMergedLogRecordScanner.java | 13 +- .../common/table/log/LogFileIterator.java | 0 .../exception/HoodieClusteringException.java | 0 .../keygen/constant/KeyGeneratorOptions.java | 7 + .../hudi/metadata/BaseTableMetadata.java | 51 +++ .../FileSystemBackedTableMetadata.java | 5 + .../metadata/HoodieBackedTableMetadata.java | 295 +++++++++++++++++- .../HoodieMetadataLogRecordReader.java | 77 ++++- .../hudi/metadata/HoodieMetadataPayload.java | 74 ++++- .../hudi/metadata/HoodieTableMetadata.java | 6 + .../metadata/HoodieTableMetadataUtil.java | 156 ++++++++- .../hudi/metadata/MetadataPartitionType.java | 3 +- .../org/apache/hudi/DataSourceOptions.scala | 7 + .../org/apache/hudi/HoodieFileIndex.scala | 34 +- .../apache/hudi/HoodieSparkSqlWriter.scala | 1 + .../scala/org/apache/hudi/Iterators.scala | 2 +- .../apache/hudi/RecordLevelIndexSupport.scala | 88 +++++- .../functional/RecordLevelIndexTestBase.scala | 25 ++ .../hudi/functional/TestSecondaryIndex.scala | 222 +++++++++++++ 29 files changed, 1212 insertions(+), 72 deletions(-) rename {hudi-client/hudi-client-common => hudi-common}/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java (100%) rename {hudi-client/hudi-client-common => hudi-common}/src/main/java/org/apache/hudi/common/table/log/LogFileIterator.java (100%) rename {hudi-client/hudi-client-common => hudi-common}/src/main/java/org/apache/hudi/exception/HoodieClusteringException.java (100%) create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndex.scala diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 8fd3546671e5a..965db88f643bb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -2493,6 +2493,10 @@ public boolean isLogCompactionEnabledOnMetadata() { return getBoolean(HoodieMetadataConfig.ENABLE_LOG_COMPACTION_ON_METADATA_TABLE); } + public boolean isSecondaryIndexEnabled() { + return metadataConfig.enableSecondaryIndex(); + } + public boolean isRecordIndexEnabled() { return metadataConfig.enableRecordIndex(); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 8d40fc240952e..18c6f05d66dd5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -29,11 +29,13 @@ import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieDeltaWriteStat; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileGroup; @@ -112,11 +114,14 @@ import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchemaForFunctionalIndex; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.readRecordKeysFromBaseFiles; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.readSecondaryKeysFromFileSlices; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.readSecondaryKeysFromFiles; import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS; import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS; import static org.apache.hudi.metadata.MetadataPartitionType.FILES; import static org.apache.hudi.metadata.MetadataPartitionType.FUNCTIONAL_INDEX; import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX; +import static org.apache.hudi.metadata.MetadataPartitionType.SECONDARY_INDEX; /** * Writer implementation backed by an internal hudi table. Partition and file listing are saved within an internal MOR table @@ -223,6 +228,11 @@ private void enablePartitions() { } if (dataWriteConfig.isRecordIndexEnabled() || dataMetaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX)) { this.enabledPartitionTypes.add(RECORD_INDEX); + + // Enable secondary index only iff record index is enabled + if (dataWriteConfig.isSecondaryIndexEnabled() || dataMetaClient.getTableConfig().isMetadataPartitionAvailable(SECONDARY_INDEX)) { + this.enabledPartitionTypes.add(SECONDARY_INDEX); + } } if (dataMetaClient.getFunctionalIndexMetadata().isPresent()) { this.enabledPartitionTypes.add(FUNCTIONAL_INDEX); @@ -437,6 +447,9 @@ private boolean initializeFromFilesystem(String initializationTime, List getFunctionalIndexRecords(List getDeletedSecondaryRecordMapping(HoodieEngineContext engineContext, + Map recordKeySecondaryKeyMap); + private Pair> initializeFunctionalIndexPartition() throws Exception { HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(dataMetaClient, dataMetaClient.getActiveTimeline(), metadata); String indexName = dataWriteConfig.getFunctionalIndexConfig().getIndexName(); @@ -546,6 +562,39 @@ private HoodieFunctionalIndexDefinition getFunctionalIndexDefinition(String inde } } + private Pair> initializeSecondaryIndexPartition() throws IOException { + HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(dataMetaClient, dataMetaClient.getActiveTimeline(), metadata); + // Collect the list of latest file slices present in each partition + List partitions = metadata.getAllPartitionPaths(); + fsView.loadAllPartitions(); + + List> partitionFileSlicePairs = new ArrayList<>(); + partitions.forEach(partition -> { + fsView.getLatestFileSlices(partition).forEach(fs -> { + partitionFileSlicePairs.add(Pair.of(partition, fs)); + }); + }); + + // Reuse record index parallelism config to build secondary index + int parallelism = Math.min(partitionFileSlicePairs.size(), dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism()); + + HoodieData records = readSecondaryKeysFromFileSlices( + engineContext, + partitionFileSlicePairs, + parallelism, + this.getClass().getSimpleName(), + dataMetaClient, + EngineType.SPARK); + + // Initialize the file groups - using the same estimation logic as that of record index + final int fileGroupCount = HoodieTableMetadataUtil.estimateFileGroupCount(RECORD_INDEX, records.count(), + RECORD_INDEX_AVERAGE_RECORD_SIZE, dataWriteConfig.getRecordIndexMinFileGroupCount(), + dataWriteConfig.getRecordIndexMaxFileGroupCount(), dataWriteConfig.getRecordIndexGrowthFactor(), + dataWriteConfig.getRecordIndexMaxFileGroupSizeBytes()); + + return Pair.of(fileGroupCount, records); + } + private Pair> initializeRecordIndexPartition() throws IOException { final HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(dataMetaClient, dataMetaClient.getActiveTimeline(), metadata); @@ -941,6 +990,8 @@ public void updateFromWriteStatuses(HoodieCommitMetadata commitMetadata, HoodieD HoodieData additionalUpdates = getRecordIndexAdditionalUpserts(updatesFromWriteStatuses, commitMetadata); partitionToRecordMap.put(RECORD_INDEX, updatesFromWriteStatuses.union(additionalUpdates)); updateFunctionalIndexIfPresent(commitMetadata, instantTime, partitionToRecordMap); + updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, writeStatus); + return partitionToRecordMap; }); closeInternal(); @@ -1005,6 +1056,73 @@ private HoodieData getFunctionalIndexUpdates(HoodieCommitMetadata return getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition, dataMetaClient, parallelism, readerSchema, hadoopConf); } + private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata, Map> partitionToRecordMap, HoodieData writeStatus) { + dataMetaClient.getTableConfig().getMetadataPartitions() + .stream() + .filter(partition -> partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX)) + .forEach(partition -> { + HoodieData secondaryIndexRecords; + try { + secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata, writeStatus); + } catch (Exception e) { + throw new HoodieMetadataException("Failed to get secondary index updates for partition " + partition, e); + } + partitionToRecordMap.put(SECONDARY_INDEX, secondaryIndexRecords); + }); + } + + private HoodieData getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, HoodieData writeStatus) throws Exception { + // Build a list of basefiles+delta-log-files for every partition that this commit touches + // { + // { + // "partition1", { {"baseFile11", {"logFile11", "logFile12"}}, {"baseFile12", {"logFile11"} } }, + // }, + // { + // "partition2", { {"baseFile21", {"logFile21", "logFile22"}}, {"baseFile22", {"logFile21"} } } + // } + // } + List>>> partitionFilePairs = new ArrayList<>(); + commitMetadata.getPartitionToWriteStats().forEach((dataPartition, writeStats) -> { + writeStats.forEach(writeStat -> { + if (writeStat instanceof HoodieDeltaWriteStat) { + partitionFilePairs.add(Pair.of(dataPartition, Pair.of(((HoodieDeltaWriteStat) writeStat).getBaseFile(), ((HoodieDeltaWriteStat) writeStat).getLogFiles()))); + } else { + partitionFilePairs.add(Pair.of(dataPartition, Pair.of(writeStat.getPath(), new ArrayList<>()))); + } + }); + }); + + // Build a list of keys that need to be removed. A 'delete' record will be emitted into the respective FileGroup of + // the secondary index partition for each of these keys. For a commit which is deleting/updating a lot of records, this + // operation is going to be expensive (in CPU, memory and IO) + List keysToRemove = new ArrayList<>(); + writeStatus.collectAsList().forEach(status -> { + status.getWrittenRecordDelegates().forEach(recordDelegate -> { + // Consider those keys which were either updated or deleted in this commit + if (!recordDelegate.getNewLocation().isPresent() || (recordDelegate.getCurrentLocation().isPresent() && recordDelegate.getNewLocation().isPresent())) { + keysToRemove.add(recordDelegate.getRecordKey()); + } + }); + }); + + // Fetch the secondary keys that each of the record keys ('keysToRemove') maps to + // This is obtained by scanning the entire secondary index partition in the metadata table + // This could be an expensive operation for a large commit (updating/deleting millions of rows) + Map recordKeySecondaryKeyMap = metadata.getSecondaryKeys(keysToRemove); + HoodieData deletedRecords = getDeletedSecondaryRecordMapping(engineContext, recordKeySecondaryKeyMap); + + // Reuse record index parallelism config to build secondary index + int parallelism = Math.min(partitionFilePairs.size(), dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism()); + + return deletedRecords.union(readSecondaryKeysFromFiles( + engineContext, + partitionFilePairs, + parallelism, + this.getClass().getSimpleName(), + dataMetaClient, + EngineType.SPARK)); + } + /** * Update from {@code HoodieCleanMetadata}. * diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 9ce17cbb8c688..6824c373b80b1 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -210,4 +210,9 @@ protected HoodieData getFunctionalIndexRecords(List getDeletedSecondaryRecordMapping(HoodieEngineContext engineContext, Map recordKeySecondaryKeyMap) { + throw new HoodieNotSupportedException("Flink metadata table does not support secondary index yet."); + } } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java index c9c2730a7ea61..b757ad5b92e1e 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java @@ -122,4 +122,9 @@ protected HoodieData getFunctionalIndexRecords(List getDeletedSecondaryRecordMapping(HoodieEngineContext engineContext, Map recordKeySecondaryKeyMap) { + throw new HoodieNotSupportedException("Secondary index not supported for Java metadata table writer yet."); + } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index 3045a9b3762e6..a74e417f2ec34 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -50,6 +50,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -216,4 +217,19 @@ protected HoodieData getFunctionalIndexRecords(List, ?, ?> initializeWriteClient() { return new SparkRDDWriteClient(engineContext, metadataWriteConfig, true); } + + @Override + public HoodieData getDeletedSecondaryRecordMapping(HoodieEngineContext engineContext, + Map recordKeySecondaryKeyMap) { + HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) engineContext; + + if (recordKeySecondaryKeyMap.isEmpty()) { + return sparkEngineContext.emptyHoodieData(); + } + + List deletedRecords = new ArrayList<>(); + recordKeySecondaryKeyMap.forEach((key, value) -> deletedRecords.add(HoodieMetadataPayload.createSecondaryIndex(key, value, true))); + + return HoodieJavaRDD.of(deletedRecords, sparkEngineContext, 1); + } } diff --git a/hudi-common/src/main/avro/HoodieMetadata.avsc b/hudi-common/src/main/avro/HoodieMetadata.avsc index dc11095e3c7f4..4eefc3c37c815 100644 --- a/hudi-common/src/main/avro/HoodieMetadata.avsc +++ b/hudi-common/src/main/avro/HoodieMetadata.avsc @@ -432,6 +432,34 @@ } ], "default" : null + }, + { + "name": "SecondaryIndexMetadata", + "doc": "Metadata Index that contains information about secondary keys and the corresponding record keys in the dataset", + "type": [ + "null", + { + "type": "record", + "name": "HoodieSecondaryIndexInfo", + "fields": [ + { + "name": "recordKey", + "type": [ + "null", + "string" + ], + "default": null, + "doc": "Refers to the record key that this secondary key maps to" + }, + { + "name": "isDeleted", + "type": "boolean", + "doc": "True if this entry has been deleted" + } + ] + } + ], + "default" : null } ] } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index 28043c857d472..fb855d4b2ed85 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -235,6 +235,13 @@ public final class HoodieMetadataConfig extends HoodieConfig { .withDocumentation("When there is a pending instant in data table, this config limits the allowed number of deltacommits in metadata table to " + "prevent the metadata table's timeline from growing unboundedly as compaction won't be triggered due to the pending data table instant."); + public static final ConfigProperty SECONDARY_INDEX_ENABLE_PROP = ConfigProperty + .key(METADATA_PREFIX + ".secondary.index.enable") + .defaultValue(false) + .markAdvanced() + .sinceVersion("1.0.0") + .withDocumentation("Create the HUDI Secondary (non-unique) Index within the Metadata Table"); + public static final ConfigProperty RECORD_INDEX_ENABLE_PROP = ConfigProperty .key(METADATA_PREFIX + ".record.index.enable") .defaultValue(false) @@ -418,6 +425,11 @@ public int getMaxNumDeltacommitsWhenPending() { return getIntOrDefault(METADATA_MAX_NUM_DELTACOMMITS_WHEN_PENDING); } + public boolean enableSecondaryIndex() { + // Secondary index is enabled only iff record index (primary key index) is also enabled + return enableRecordIndex() && getBoolean(SECONDARY_INDEX_ENABLE_PROP); + } + public boolean enableRecordIndex() { return enabled() && getBoolean(RECORD_INDEX_ENABLE_PROP); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index f70f9456a8689..451f458244ba5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -131,6 +131,12 @@ public class HoodieTableConfig extends HoodieConfig { .withDocumentation("Columns used to uniquely identify the table. Concatenated values of these fields are used as " + " the record key component of HoodieKey."); + public static final ConfigProperty SECONDARYKEY_FIELDS = ConfigProperty + .key("hoodie.table.secondarykey.fields") + .noDefaultValue() + .withDocumentation("Columns used to uniquely identify the table. Concatenated values of these fields are used as " + + " the record key component of HoodieKey."); + public static final ConfigProperty CDC_ENABLED = ConfigProperty .key("hoodie.table.cdc.enabled") .defaultValue(false) @@ -540,6 +546,16 @@ public Option getRecordKeyFields() { } } + public Option getSecondaryKeyFields() { + String secondaryKeyFieldsValue = getStringOrDefault(SECONDARYKEY_FIELDS, null); + if (secondaryKeyFieldsValue == null) { + return Option.empty(); + } else { + return Option.of(Arrays.stream(secondaryKeyFieldsValue.split(",")) + .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new String[] {})); + } + } + public Option getPartitionFields() { if (contains(PARTITION_FIELDS)) { return Option.of(Arrays.stream(getString(PARTITION_FIELDS).split(",")) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 9d451893a630e..e2337b04613a8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -921,6 +921,7 @@ public static class PropertyBuilder { private String tableName; private String tableCreateSchema; private String recordKeyFields; + private String secondaryKeyFields; private String archiveLogFolder; private String payloadClassName; private String payloadType; @@ -988,6 +989,11 @@ public PropertyBuilder setRecordKeyFields(String recordKeyFields) { return this; } + public PropertyBuilder setSecondaryKeyFields(String secondaryKeyFields) { + this.secondaryKeyFields = secondaryKeyFields; + return this; + } + public PropertyBuilder setArchiveLogFolder(String archiveLogFolder) { this.archiveLogFolder = archiveLogFolder; return this; @@ -1208,6 +1214,9 @@ public PropertyBuilder fromProperties(Properties properties) { if (hoodieConfig.contains(HoodieTableConfig.RECORDKEY_FIELDS)) { setRecordKeyFields(hoodieConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS)); } + if (hoodieConfig.contains(HoodieTableConfig.SECONDARYKEY_FIELDS)) { + setSecondaryKeyFields(hoodieConfig.getString(HoodieTableConfig.SECONDARYKEY_FIELDS)); + } if (hoodieConfig.contains(HoodieTableConfig.CDC_ENABLED)) { setCDCEnabled(hoodieConfig.getBoolean(HoodieTableConfig.CDC_ENABLED)); } @@ -1322,6 +1331,11 @@ public Properties build() { if (null != recordKeyFields) { tableConfig.setValue(HoodieTableConfig.RECORDKEY_FIELDS, recordKeyFields); } + + if (null != secondaryKeyFields) { + tableConfig.setValue(HoodieTableConfig.SECONDARYKEY_FIELDS, secondaryKeyFields); + } + if (null != cdcEnabled) { tableConfig.setValue(HoodieTableConfig.CDC_ENABLED, Boolean.toString(cdcEnabled)); if (cdcEnabled && null != cdcSupplementalLoggingMode) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java similarity index 100% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java rename to hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java index 4d870618e7b68..bce007af73dac 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; /** @@ -45,10 +46,10 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordReade private HoodieUnMergedLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, int bufferSize, LogRecordScannerCallback callback, Option instantRange, InternalSchema internalSchema, - boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger, + Option keyFieldOverride, boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger, Option hoodieTableMetaClientOption) { super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange, - false, true, Option.empty(), internalSchema, Option.empty(), enableOptimizedLogBlocksScan, recordMerger, + false, true, keyFieldOverride, internalSchema, Option.empty(), enableOptimizedLogBlocksScan, recordMerger, hoodieTableMetaClientOption); this.callback = callback; } @@ -113,6 +114,7 @@ public static class Builder extends AbstractHoodieLogRecordReader.Builder { private boolean enableOptimizedLogBlocksScan; private HoodieRecordMerger recordMerger = HoodiePreCombineAvroRecordMerger.INSTANCE; private HoodieTableMetaClient hoodieTableMetaClient; + private String keyFieldOverride; public Builder withFileSystem(FileSystem fs) { this.fs = fs; @@ -191,13 +193,18 @@ public HoodieUnMergedLogRecordScanner.Builder withTableMetaClient( return this; } + public HoodieUnMergedLogRecordScanner.Builder withKeyFieldOverride(String keyFieldOverride) { + this.keyFieldOverride = Objects.requireNonNull(keyFieldOverride); + return this; + } + @Override public HoodieUnMergedLogRecordScanner build() { ValidationUtils.checkArgument(recordMerger != null); return new HoodieUnMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, callback, instantRange, - internalSchema, enableOptimizedLogBlocksScan, recordMerger, Option.ofNullable(hoodieTableMetaClient)); + internalSchema, Option.ofNullable(keyFieldOverride), enableOptimizedLogBlocksScan, recordMerger, Option.ofNullable(hoodieTableMetaClient)); } } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/LogFileIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogFileIterator.java similarity index 100% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/LogFileIterator.java rename to hudi-common/src/main/java/org/apache/hudi/common/table/log/LogFileIterator.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieClusteringException.java b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieClusteringException.java similarity index 100% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieClusteringException.java rename to hudi-common/src/main/java/org/apache/hudi/exception/HoodieClusteringException.java diff --git a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java index db4a9162129fa..d7703d25313ef 100644 --- a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java +++ b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java @@ -54,6 +54,13 @@ public class KeyGeneratorOptions extends HoodieConfig { + "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using\n" + "the dot notation eg: `a.b.c`"); + public static final ConfigProperty SECONDARYKEY_FIELD_NAME = ConfigProperty + .key("hoodie.datasource.write.secondarykey.field") + .noDefaultValue() + .withDocumentation("Secondary key field. Columns that consitute the secondary key component.\n" + + "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using\n" + + "the dot notation eg: `a.b.c`"); + public static final ConfigProperty PARTITIONPATH_FIELD_NAME = ConfigProperty .key("hoodie.datasource.write.partitionpath.field") .noDefaultValue() diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java index 0a1435ed12e85..07d46e225d83d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java @@ -305,6 +305,52 @@ public Map> readRecordIndex(List + * If the Metadata Table is not enabled, an exception is thrown to distinguish this from the absence of the key. + * + * @param secondaryKeys The list of secondary keys to read + */ + @Override + public Map> readSecondaryIndex(List secondaryKeys) { + ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX), + "Record index is not initialized in MDT"); + ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.SECONDARY_INDEX), + "Secondary index is not initialized in MDT"); + + HoodieTimer timer = HoodieTimer.start(); + + // Fetch secondary-index records + Map>> secondaryKeyRecords = getSecondaryIndexRecords(secondaryKeys, MetadataPartitionType.SECONDARY_INDEX.getPartitionPath()); + + // Now collect the record-keys and fetch the RLI records + List recordKeys = new ArrayList<>(); + secondaryKeyRecords.forEach((key, records) -> { + records.forEach(record -> { + if (!record.getData().isDeleted()) { + recordKeys.add(record.getData().getRecordKeyFromSecondaryIndex()); + } + }); + }); + + return readRecordIndex(recordKeys); + } + + // Returns a map of (record-key -> secondary-key) for the provided record-keys + public Map getSecondaryKeys(List recordKeys) { + ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX), + "Record index is not initialized in MDT"); + ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.SECONDARY_INDEX), + "Secondary index is not initialized in MDT"); + + HoodieTimer timer = HoodieTimer.start(); + Map recordKeyToSecondaryKeyMap = getSecondaryKeysForRecordKeys(recordKeys, MetadataPartitionType.SECONDARY_INDEX.getPartitionPath()); + + + return recordKeyToSecondaryKeyMap; + } + /** * Returns a list of all partitions. */ @@ -415,6 +461,11 @@ private void checkForSpuriousDeletes(HoodieMetadataPayload metadataPayload, Stri protected abstract Map> getRecordsByKeys(List keys, String partitionName); + // returns a map of (secondary-key, list-of-secondary-index-records) + protected abstract Map>> getSecondaryIndexRecords(List keys, String partitionName); + + protected abstract Map getSecondaryKeysForRecordKeys(List recordKeys, String partitionName); + public HoodieMetadataConfig getMetadataConfig() { return metadataConfig; } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java index 0f8bf68550c0d..afea8af4eb6a8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java @@ -296,6 +296,11 @@ public Map> readRecordIndex(List> readSecondaryIndex(List secondaryKeys) { + throw new HoodieMetadataException("Unsupported operation: readSecondaryIndex!"); + } + @Override public int getNumFileGroupsForPartition(MetadataPartitionType partition) { throw new HoodieMetadataException("Unsupported operation: getNumFileGroupsForPartition"); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index cd16294da7293..f17f4ac608157 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -35,6 +35,7 @@ import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.HoodieTimer; @@ -65,6 +66,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -208,7 +210,7 @@ public HoodieData> getRecordsByKeyPrefixes(L // NOTE: Since this will be executed by executors, we can't access previously cached // readers, and therefore have to always open new ones Pair, HoodieMetadataLogRecordReader> readers = - openReaders(partitionName, fileSlice); + openReaders(partitionName, fileSlice, Option.empty()); try { List timings = new ArrayList<>(); @@ -327,6 +329,60 @@ public Map>> getAllRecordsByKey return result; } + // returns a map of (secondary-key, list-of-secondary-index-records) + @Override + protected Map>> getSecondaryIndexRecords(List keys, String partitionName) { + if (keys.isEmpty()) { + return Collections.emptyMap(); + } + + Map>> result = new HashMap<>(); + + // Load the file slices for the partition. Each file slice is a shard which saves a portion of the keys. + List partitionFileSlices = partitionFileSliceMap.computeIfAbsent(partitionName, + k -> HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, metadataFileSystemView, partitionName)); + final int numFileSlices = partitionFileSlices.size(); + ValidationUtils.checkState(numFileSlices > 0, "Number of file slices for partition " + partitionName + " should be > 0"); + + // Lookup keys from each file slice + // TODO: parallelize this loop + for (FileSlice partition : partitionFileSlices) { + Map>> currentFileSliceResult = lookupSecondaryKeysFromFileSlice(partitionName, keys, partition); + + currentFileSliceResult.forEach((secondaryKey, secondaryRecords) -> { + result.merge(secondaryKey, secondaryRecords, (oldRecords, newRecords) -> { + newRecords.addAll(oldRecords); + return newRecords; + }); + }); + } + + return result; + } + + @Override + protected Map getSecondaryKeysForRecordKeys(List recordKeys, String partitionName) { + if (recordKeys.isEmpty()) { + return Collections.emptyMap(); + } + + // Load the file slices for the partition. Each file slice is a shard which saves a portion of the keys. + List partitionFileSlices = partitionFileSliceMap.computeIfAbsent(partitionName, + k -> HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, metadataFileSystemView, partitionName)); + final int numFileSlices = partitionFileSlices.size(); + ValidationUtils.checkState(numFileSlices > 0, "Number of file slices for partition " + partitionName + " should be > 0"); + + // Lookup keys from each file slice + // TODO: parallelize this loop + Map reverseSecondaryKeyMap = new HashMap<>(); + for (FileSlice partition : partitionFileSlices) { + reverseLookupSecondaryKeys(partitionName, recordKeys, partition, reverseSecondaryKeyMap); + } + + return reverseSecondaryKeyMap; + } + + /** * Lookup list of keys from a single file slice. * @@ -336,7 +392,7 @@ public Map>> getAllRecordsByKey * @return A {@code Map} of key name to {@code HoodieRecord} for the keys which were found in the file slice */ private Map> lookupKeysFromFileSlice(String partitionName, List keys, FileSlice fileSlice) { - Pair, HoodieMetadataLogRecordReader> readers = getOrCreateReaders(partitionName, fileSlice); + Pair, HoodieMetadataLogRecordReader> readers = getOrCreateReaders(partitionName, fileSlice, Option.empty()); try { List timings = new ArrayList<>(1); HoodieSeekingFileReader baseFileReader = readers.getKey(); @@ -360,6 +416,179 @@ private Map> lookupKeysFromFileSlice } } + /** + * Lookup list of keys from a single file slice. + * + * @param partitionName Name of the partition + * @param keys The list of secondary keys to lookup + * @param fileSlice The file slice to read + * @return A {@code Map} of secondary-key to list of {@code HoodieRecord} for the secondary-keys which were found in the file slice + */ + private Map>> lookupSecondaryKeysFromFileSlice(String partitionName, List keys, FileSlice fileSlice) { + Map>> logRecordsMap = new HashMap<>(); + Set keySet = new HashSet<>(keys.size()); + + // Using HoodieUnMergedLogRecordScanner with a in-memory record buffer for custom merging logic. + // The default HoodieMergedLogRecordScanner uses a Map (DiskSpillableMap) structure for buffering records (which is + // efficient) but does not serve the purpose of scanning secondary index's log record. The log records will contain + // duplicates and having a simple Map based buffer will immediately overwrite keys. + // TODO: Revisit the merge logic and make it more efficient (in terms of memory and CPU) + HoodieUnMergedLogRecordScanner.LogRecordScannerCallback callback = (newRecord) -> { + if (((HoodieMetadataPayload)newRecord.getData()).isDeleted() || !keySet.contains(newRecord.getRecordKey())) { + return; + } + + String key = newRecord.getRecordKey(); + HoodieMetadataPayload newPayload = (HoodieMetadataPayload) newRecord.getData(); + + if (logRecordsMap.containsKey(key)) { + List> records = logRecordsMap.get(key); + List> mergedRecords = new ArrayList<>(records.size()); + + boolean merged = false; + Option mergedPayload = Option.empty(); + Option mergedKey = Option.empty(); + for (HoodieRecord hoodieRecord : records) { + if (merged) { + // Current record is already merged, copy the remaining records + mergedRecords.add(hoodieRecord); + continue; + } + + HoodieMetadataPayload oldPayload = hoodieRecord.getData(); + if (oldPayload.getRecordKeyFromSecondaryIndex().equals(newPayload.getRecordKeyFromSecondaryIndex())) { + mergedPayload = HoodieMetadataPayload.combineSecondaryIndexPayload(oldPayload, newPayload); + mergedKey = Option.of(hoodieRecord.getKey()); + + // Already merged into a new record. Discard current record and stop merging future records + merged = true; + } else { + mergedRecords.add(hoodieRecord); + } + } + + // If there exists a merged record, add it at the end + if (mergedPayload.isPresent()) { + mergedRecords.add(new HoodieAvroRecord<>(mergedKey.get(), mergedPayload.get())); + } + + if (!merged) { + // Current record was not merged and needs to be added to the merged record list as a new record + mergedRecords.add((HoodieRecord)newRecord); + } + + if (mergedRecords.isEmpty()) { + // The newRecord indicated that it was deleted. Hence remove the oldrecord from the map + logRecordsMap.remove(key); + } else { + // Replace the oldRecord with the newly merged record + logRecordsMap.put(key, mergedRecords); + } + + } else { + logRecordsMap.put(key, Collections.singletonList((HoodieRecord) newRecord)); + } + }; + + Pair, HoodieMetadataLogRecordReader> readers = getOrCreateReaders(partitionName, fileSlice, Option.of(callback)); + try { + List timings = new ArrayList<>(1); + HoodieSeekingFileReader baseFileReader = readers.getKey(); + HoodieMetadataLogRecordReader logRecordScanner = readers.getRight(); + if (baseFileReader == null && logRecordScanner == null) { + return Collections.emptyMap(); + } + + // Sort it here once so that we don't need to sort individually for base file and for each individual log files. + List sortedKeys = new ArrayList<>(keys); + Collections.sort(sortedKeys); + keySet.addAll(sortedKeys); + + readAllLogRecords(logRecordScanner, sortedKeys, timings); + + return readFromBaseAndMergeWithAllLogRecords(baseFileReader, sortedKeys, true, logRecordsMap, timings, partitionName); + } catch (IOException ioe) { + throw new HoodieIOException("Error merging records from metadata table for " + keys.size() + " key : ", ioe); + } finally { + if (!reuse) { + closeReader(readers); + } + } + } + + private void reverseLookupSecondaryKeys(String partitionName, List recordKeys, FileSlice fileSlice, Map recordKeyMap) { + Set keySet = new HashSet<>(recordKeys.size()); + Map> logRecords = new HashMap<>(); + + // Using HoodieUnMergedLogRecordScanner with a in-memory record buffer for custom merging logic. + // The default HoodieMergedLogRecordScanner uses a Map (DiskSpillableMap) structure for buffering records (which is + // efficient) but does not serve the purpose of scanning secondary index's log record. The log records will contain + // duplicates and having a simple Map based buffer will immediately overwrite keys. + HoodieUnMergedLogRecordScanner.LogRecordScannerCallback callback = (record) -> { + HoodieMetadataPayload payload = (HoodieMetadataPayload)record.getData(); + if (payload.isDeleted() || !keySet.contains(payload.getRecordKeyFromSecondaryIndex())) { + return; + } + + String recordKey = payload.getRecordKeyFromSecondaryIndex(); + if (logRecords.containsKey(recordKey)) { + HoodieRecord prevRecord = logRecords.get(recordKey); + Option> mergedRecord = HoodieMetadataPayload.combineSecondaryIndexRecord(prevRecord, (HoodieRecord) record); + if (mergedRecord.isPresent()) { + logRecords.put(recordKey, mergedRecord.get()); + } else { + logRecords.remove(recordKey); + } + } else { + logRecords.put(recordKey, (HoodieRecord) record); + } + }; + + Pair, HoodieMetadataLogRecordReader> readers = getOrCreateReaders(partitionName, fileSlice, Option.of(callback)); + try { + List timings = new ArrayList<>(1); + HoodieSeekingFileReader baseFileReader = readers.getKey(); + HoodieMetadataLogRecordReader logRecordScanner = readers.getRight(); + if (baseFileReader == null && logRecordScanner == null) { + return; + } + + // Sort it here once so that we don't need to sort individually for base file and for each individual log files. + List sortedKeys = new ArrayList<>(recordKeys); + Collections.sort(sortedKeys); + keySet.addAll(sortedKeys); + + logRecordScanner.scan(); + + // Map of (record-key, secondary-index-record) + Map> baseFileRecords = fetchBaseFileAllRecordsByPayload(baseFileReader, keySet, partitionName); + + // Iterate over all provided log-records, merging them into existing records + logRecords.entrySet().forEach(kv -> { + baseFileRecords.merge( + kv.getKey(), + kv.getValue(), + (oldRecord, newRecord) -> { + Option> mergedRecord = + HoodieMetadataPayload.combineSecondaryIndexRecord(oldRecord, newRecord); + return mergedRecord.orElseGet(null); + } + ); + }); + + + baseFileRecords.forEach((key, value) -> { + recordKeyMap.put(key, value.getRecordKey()); + }); + } catch (IOException ioe) { + throw new HoodieIOException("Error merging records from metadata table for " + recordKeys.size() + " key : ", ioe); + } finally { + if (!reuse) { + closeReader(readers); + } + } + } + private Map> readLogRecords(HoodieMetadataLogRecordReader logRecordReader, List sortedKeys, boolean fullKey, @@ -436,7 +665,7 @@ private Map> fetchBaseFileRecordsByK } private Map>> lookupAllKeysFromFileSlice(String partitionName, List keys, FileSlice fileSlice) { - Pair, HoodieMetadataLogRecordReader> readers = getOrCreateReaders(partitionName, fileSlice); + Pair, HoodieMetadataLogRecordReader> readers = getOrCreateReaders(partitionName, fileSlice, Option.empty()); try { List timings = new ArrayList<>(); HoodieSeekingFileReader baseFileReader = readers.getKey(); @@ -466,7 +695,8 @@ private Map>> readAllLogRecords } try { - return logRecordReader.getAllRecordsByKeys(sortedKeys); + Map>> records = logRecordReader.getAllRecordsByKeys(sortedKeys); + return records; } finally { timings.add(timer.endTimer()); } @@ -494,7 +724,6 @@ private Map>> readFromBaseAndMe metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer())); // Iterate over all provided log-records, merging them into existing records - logRecords.entrySet().forEach(kv -> { records.merge( kv.getKey(), @@ -538,6 +767,7 @@ private Map>> fetchBaseFileAllR ? reader.getRecordsByKeysIterator(sortedKeys) : reader.getRecordsByKeyPrefixIterator(sortedKeys); + return toStream(records) .map(record -> { GenericRecord data = (GenericRecord) record.getData(); @@ -548,6 +778,31 @@ private Map>> fetchBaseFileAllR .collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toList()))); } + private Map> fetchBaseFileAllRecordsByPayload(HoodieSeekingFileReader reader, + Set keySet, + String partitionName) throws IOException { + if (reader == null) { + // No base file at all + return new HashMap<>(); + } + + ClosableIterator> records = reader.getRecordIterator(); + + return toStream(records) + .map(record -> { + GenericRecord data = (GenericRecord) record.getData(); + return composeRecord(data, partitionName); + }) + .filter(record -> { + HoodieMetadataPayload payload = (HoodieMetadataPayload)record.getData(); + return keySet.contains(payload.getRecordKeyFromSecondaryIndex()); + }) + .collect(Collectors.toMap(record -> { + HoodieMetadataPayload payload = (HoodieMetadataPayload)record.getData(); + return payload.getRecordKeyFromSecondaryIndex(); + }, record -> record)); + } + private HoodieRecord composeRecord(GenericRecord avroRecord, String partitionName) { if (metadataTableConfig.populateMetaFields()) { return SpillableMapUtils.convertToHoodieRecordPayload(avroRecord, @@ -567,16 +822,19 @@ private HoodieRecord composeRecord(GenericRecord avroReco * @param slice - The file slice to open readers for * @return File reader and the record scanner pair for the requested file slice */ - private Pair, HoodieMetadataLogRecordReader> getOrCreateReaders(String partitionName, FileSlice slice) { + private Pair, HoodieMetadataLogRecordReader> getOrCreateReaders(String partitionName, FileSlice slice, + Option callback) { if (reuse) { Pair key = Pair.of(partitionName, slice.getFileId()); - return partitionReaders.get().computeIfAbsent(key, ignored -> openReaders(partitionName, slice)); + return partitionReaders.get().computeIfAbsent(key, ignored -> openReaders(partitionName, slice, callback)); } else { - return openReaders(partitionName, slice); + return openReaders(partitionName, slice, callback); } } - private Pair, HoodieMetadataLogRecordReader> openReaders(String partitionName, FileSlice slice) { + private Pair, HoodieMetadataLogRecordReader> openReaders(String partitionName, + FileSlice slice, + Option callback) { try { HoodieTimer timer = HoodieTimer.start(); // Open base file reader @@ -587,7 +845,7 @@ private Pair, HoodieMetadataLogRecordReader> openRead // Open the log record scanner using the log files from the latest file slice List logFiles = slice.getLogFiles().collect(Collectors.toList()); Pair logRecordScannerOpenTimePair = - getLogRecordScanner(logFiles, partitionName, Option.empty()); + getLogRecordScanner(logFiles, partitionName, Option.empty(), callback); HoodieMetadataLogRecordReader logRecordScanner = logRecordScannerOpenTimePair.getKey(); final long logScannerOpenMs = logRecordScannerOpenTimePair.getValue(); @@ -621,7 +879,8 @@ private Pair, Long> getBaseFileReader(FileSlice slice public Pair getLogRecordScanner(List logFiles, String partitionName, - Option allowFullScanOverride) { + Option allowFullScanOverride, + Option callback) { HoodieTimer timer = HoodieTimer.start(); List sortedLogFilePaths = logFiles.stream() .sorted(HoodieLogFile.getLogFileComparator()) @@ -641,7 +900,8 @@ public Pair getLogRecordScanner(List getLogRecordScanner(List getLogRecordScanner(List> getRecords() { // NOTE: Locking is necessary since we're accessing [[HoodieMetadataLogRecordReader]] // materialized state, to make sure there's no concurrent access synchronized (this) { - logRecordScanner.scan(); - return logRecordScanner.getRecords().values() + assert logRecordScanner instanceof HoodieMergedLogRecordScanner; + return ((HoodieMergedLogRecordScanner)logRecordScanner).getRecords().values() .stream() .map(record -> (HoodieRecord) record) .collect(Collectors.toList()); } } + public void scan() { + assert logRecordScanner instanceof HoodieUnMergedLogRecordScanner; + ((HoodieUnMergedLogRecordScanner) logRecordScanner).scan(); + } + @SuppressWarnings("unchecked") public Map> getRecordsByKeyPrefixes(List sortedKeyPrefixes) { if (sortedKeyPrefixes.isEmpty()) { @@ -83,9 +92,10 @@ public Map> getRecordsByKeyPrefixes( // NOTE: Locking is necessary since we're accessing [[HoodieMetadataLogRecordReader]] // materialized state, to make sure there's no concurrent access synchronized (this) { - logRecordScanner.scanByKeyPrefixes(sortedKeyPrefixes); + assert logRecordScanner instanceof HoodieMergedLogRecordScanner; + ((HoodieMergedLogRecordScanner)logRecordScanner).scanByKeyPrefixes(sortedKeyPrefixes); Predicate p = createPrefixMatchingPredicate(sortedKeyPrefixes); - return logRecordScanner.getRecords().entrySet() + return ((HoodieMergedLogRecordScanner)logRecordScanner).getRecords().entrySet() .stream() .filter(r -> r != null && p.test(r.getKey())) .map(r -> (HoodieRecord) r.getValue()) @@ -106,8 +116,9 @@ public Map> getRecordsByKeys(List allRecords = logRecordScanner.getRecords(); + assert logRecordScanner instanceof HoodieMergedLogRecordScanner; + ((HoodieMergedLogRecordScanner)logRecordScanner).scanByFullKeys(sortedKeys); + Map allRecords = ((HoodieMergedLogRecordScanner)logRecordScanner).getRecords(); return sortedKeys.stream() .map(key -> (HoodieRecord) allRecords.get(key)) .filter(Objects::nonNull) @@ -123,18 +134,35 @@ public Map>> getAllRecordsByKey // NOTE: Locking is necessary since we're accessing [[HoodieMetadataLogRecordReader]] // materialized state, to make sure there's no concurrent access synchronized (this) { - logRecordScanner.scanByFullKeys(sortedKeys); - Map allRecords = logRecordScanner.getRecords(); - return sortedKeys.stream() - .map(key -> (HoodieRecord) allRecords.get(key)) - .filter(Objects::nonNull) - .collect(Collectors.groupingBy(HoodieRecord::getRecordKey)); + if (logRecordScanner instanceof HoodieMergedLogRecordScanner) { + ((HoodieMergedLogRecordScanner)logRecordScanner).scanByFullKeys(sortedKeys); + Map allRecords = ((HoodieMergedLogRecordScanner)logRecordScanner).getRecords(); + + Map>> result = new HashMap<>(); + sortedKeys.stream() + .map(key -> (HoodieRecord) allRecords.get(key)) + .filter(Objects::nonNull) + .forEach(record -> { + List> records = result.getOrDefault(record.getRecordKey(), new ArrayList<>()); + records.add(record); + result.put(record.getRecordKey(), records); + }); + return result; + } + + assert logRecordScanner instanceof HoodieUnMergedLogRecordScanner; + ((HoodieUnMergedLogRecordScanner)logRecordScanner).scan(false); + + // TODO: fix this return of dummy hashmap. + return new HashMap<>(); } } @Override public void close() throws IOException { - logRecordScanner.close(); + if (logRecordScanner instanceof HoodieMergedLogRecordScanner) { + ((HoodieMergedLogRecordScanner)logRecordScanner).close(); + } } private static Predicate createPrefixMatchingPredicate(List keyPrefixes) { @@ -160,33 +188,45 @@ public static class Builder { .withReverseReader(false) .withOperationField(false); + private HoodieUnMergedLogRecordScanner.Builder unmergedScannerBuilder = + new HoodieUnMergedLogRecordScanner.Builder() + .withKeyFieldOverride(HoodieMetadataPayload.KEY_FIELD_NAME) + .withReadBlocksLazily(true) + .withReverseReader(false); + public Builder withFileSystem(FileSystem fs) { scannerBuilder.withFileSystem(fs); + unmergedScannerBuilder.withFileSystem(fs); return this; } public Builder withBasePath(String basePath) { scannerBuilder.withBasePath(basePath); + unmergedScannerBuilder.withBasePath(basePath); return this; } public Builder withLogFilePaths(List logFilePaths) { scannerBuilder.withLogFilePaths(logFilePaths); + unmergedScannerBuilder.withLogFilePaths(logFilePaths); return this; } public Builder withReaderSchema(Schema schema) { scannerBuilder.withReaderSchema(schema); + unmergedScannerBuilder.withReaderSchema(schema); return this; } public Builder withLatestInstantTime(String latestInstantTime) { scannerBuilder.withLatestInstantTime(latestInstantTime); + unmergedScannerBuilder.withLatestInstantTime(latestInstantTime); return this; } public Builder withBufferSize(int bufferSize) { scannerBuilder.withBufferSize(bufferSize); + unmergedScannerBuilder.withBufferSize(bufferSize); return this; } @@ -230,16 +270,23 @@ public Builder enableFullScan(boolean enableFullScan) { public Builder withEnableOptimizedLogBlocksScan(boolean enableOptimizedLogBlocksScan) { scannerBuilder.withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan); + unmergedScannerBuilder.withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan); return this; } public Builder withTableMetaClient(HoodieTableMetaClient hoodieTableMetaClient) { scannerBuilder.withTableMetaClient(hoodieTableMetaClient); + unmergedScannerBuilder.withTableMetaClient(hoodieTableMetaClient); return this; } public HoodieMetadataLogRecordReader build() { return new HoodieMetadataLogRecordReader(scannerBuilder.build()); } + + public HoodieMetadataLogRecordReader buildWithUnmergedLogRecordScanner(HoodieUnMergedLogRecordScanner.LogRecordScannerCallback callback) { + unmergedScannerBuilder.withLogRecordScannerCallback(callback); + return new HoodieMetadataLogRecordReader(unmergedScannerBuilder.build()); + } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 40c0debf28e95..e6b263da45de5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -23,6 +23,7 @@ import org.apache.hudi.avro.model.HoodieMetadataFileInfo; import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.avro.model.HoodieRecordIndexInfo; +import org.apache.hudi.avro.model.HoodieSecondaryIndexInfo; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.EmptyHoodieRecordPayload; import org.apache.hudi.common.model.HoodieAvroRecord; @@ -109,6 +110,7 @@ public class HoodieMetadataPayload implements HoodieRecordPayload @@ -183,6 +192,7 @@ public class HoodieMetadataPayload implements HoodieRecordPayload orderingVal) { @@ -259,6 +269,11 @@ public HoodieMetadataPayload(Option recordOpt) { recordIndexRecord.get(RECORD_INDEX_FIELD_FILEID).toString(), Long.parseLong(recordIndexRecord.get(RECORD_INDEX_FIELD_INSTANT_TIME).toString()), Integer.parseInt(recordIndexRecord.get(RECORD_INDEX_FIELD_FILEID_ENCODING).toString())); + } else if (type == METADATA_TYPE_SECONDARY_INDEX) { + GenericRecord secondaryIndexRecord = getNestedFieldValue(record, SCHEMA_FIELD_ID_SECONDARY_INDEX); + secondaryIndexMetadata = new HoodieSecondaryIndexInfo( + secondaryIndexRecord.get(SECONDARY_INDEX_FIELD_RECORD_KEY).toString(), + (Boolean) secondaryIndexRecord.get(SECONDARY_INDEX_FIELD_IS_DELETED)); } } else { this.isDeletedRecord = true; @@ -294,6 +309,11 @@ protected HoodieMetadataPayload(String key, int type, this.recordIndexMetadata = recordIndexMetadata; } + public HoodieMetadataPayload(String key, HoodieSecondaryIndexInfo secondaryIndexMetadata) { + this(key, METADATA_TYPE_SECONDARY_INDEX, null, null, null, null); + this.secondaryIndexMetadata = secondaryIndexMetadata; + } + /** * Create and return a {@code HoodieMetadataPayload} to save list of partitions. * @@ -403,6 +423,9 @@ public HoodieMetadataPayload preCombine(HoodieMetadataPayload previousRecord) { // No need to merge with previous record index, always pick the latest payload. return this; + case METADATA_TYPE_SECONDARY_INDEX: + // TODO (vinay): Fix with the correct merge logic + return this; default: throw new HoodieMetadataException("Unknown type of HoodieMetadataPayload: " + type); } @@ -447,7 +470,7 @@ public Option getInsertValue(Schema schemaIgnored, Properties pro } HoodieMetadataRecord record = new HoodieMetadataRecord(key, type, filesystemMetadata, bloomFilterMetadata, - columnStatMetadata, recordIndexMetadata); + columnStatMetadata, recordIndexMetadata, secondaryIndexMetadata); return Option.of(record); } @@ -456,6 +479,34 @@ public Option getInsertValue(Schema schema) throws IOException { return getInsertValue(schema, new Properties()); } + public static Option> combineSecondaryIndexRecord( + HoodieRecord oldRecord, + HoodieRecord newRecord) { + String oldSecondaryKey = oldRecord.getRecordKey(); + String newSecondaryKey = newRecord.getRecordKey(); + + if (newRecord.getData().secondaryIndexMetadata.getIsDeleted()) { + return Option.empty(); + } else if (oldRecord.getData().secondaryIndexMetadata.getIsDeleted()) { + return Option.of(newRecord); + } + + return Option.of(newRecord); + } + + public static Option combineSecondaryIndexPayload( + HoodieMetadataPayload oldPayload, + HoodieMetadataPayload newPayload) { + + if (newPayload.secondaryIndexMetadata.getIsDeleted()) { + return Option.empty(); + } else if (oldPayload.secondaryIndexMetadata.getIsDeleted()) { + return Option.of(newPayload); + } + + return Option.of(newPayload); + } + /** * Returns the list of filenames added as part of this record. */ @@ -685,6 +736,22 @@ private static HoodieMetadataColumnStats mergeColumnStatsRecords(HoodieMetadataC .build(); } + /** + * Create and return a {@code HoodieMetadataPayload} to insert or update an entry for the secondary index. + *

+ * Each entry maps the secondary key of a single record in HUDI to its record (or primary) key + * + * @param recordKey Primary key of the record + * @param secondaryKey Secondary key of the record + * @param isDeleted true if this record is deleted + */ + public static HoodieRecord createSecondaryIndex(String recordKey, String secondaryKey, Boolean isDeleted) { + + HoodieKey key = new HoodieKey(secondaryKey, MetadataPartitionType.SECONDARY_INDEX.getPartitionPath()); + HoodieMetadataPayload payload = new HoodieMetadataPayload(secondaryKey, new HoodieSecondaryIndexInfo(recordKey, isDeleted)); + return new HoodieAvroRecord<>(key, payload); + } + /** * Create and return a {@code HoodieMetadataPayload} to insert or update an entry for the record index. *

@@ -765,6 +832,11 @@ public HoodieRecordGlobalLocation getRecordGlobalLocation() { return HoodieTableMetadataUtil.getLocationFromRecordIndexInfo(recordIndexMetadata); } + public String getRecordKeyFromSecondaryIndex() { + // TODO (vinay): Handle deletes and any other necessary checks etc. Check getRecordGlobalLocation() + return secondaryIndexMetadata.getRecordKey(); + } + public boolean isDeleted() { return isDeletedRecord; } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java index c19b2b62cd7c5..7238f64c0052a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java @@ -215,6 +215,12 @@ Map, HoodieMetadataColumnStats> getColumnStats(final List

> readRecordIndex(List recordKeys); + /** + * Returns the location of records which the provided secondary keys maps to. + * Records that are not found are ignored and won't be part of map object that is returned. + */ + Map> readSecondaryIndex(List secondaryKeys); + /** * Fetch records by key prefixes. Key prefix passed is expected to match the same prefix as stored in Metadata table partitions. For eg, in case of col stats partition, * actual keys in metadata partition is encoded values of column name, partition name and file name. So, key prefixes passed to this method is expected to be encoded already. diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index e9eba01bf83b8..eabb89d9187ce 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -56,10 +56,12 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.model.HoodieRecordGlobalLocation; +import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.log.HoodieFileSliceReader; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; @@ -67,6 +69,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.HoodieRecordUtils; @@ -88,6 +91,7 @@ import org.apache.avro.AvroTypeException; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -107,11 +111,13 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.UUID; import java.util.function.BiFunction; @@ -152,6 +158,8 @@ public class HoodieTableMetadataUtil { public static final String PARTITION_NAME_RECORD_INDEX = "record_index"; public static final String PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX = "func_index_"; + public static final String PARTITION_NAME_SECONDARY_INDEX_PREFIX = "secondary_index_"; + public static final Set> COLUMN_STATS_RECORD_SUPPORTED_TYPES = new HashSet<>(Arrays.asList( IntWrapper.class, BooleanWrapper.class, DateWrapper.class, DoubleWrapper.class, FloatWrapper.class, LongWrapper.class, @@ -438,7 +446,7 @@ public static List convertMetadataToFilesPartitionRecords(HoodieCo return HoodieMetadataPayload.createPartitionFilesRecord(partitionStatName, updatedFilesToSizesMapping, Collections.emptyList()); }) - .collect(Collectors.toList()); + .collect(toList()); records.addAll(updatedPartitionFilesRecords); @@ -452,7 +460,7 @@ private static List getPartitionsAdded(HoodieCommitMetadata commitMetada return commitMetadata.getPartitionToWriteStats().keySet().stream() // We need to make sure we properly handle case of non-partitioned tables .map(HoodieTableMetadataUtil::getPartitionIdentifierForFilesPartition) - .collect(Collectors.toList()); + .collect(toList()); } /** @@ -468,7 +476,7 @@ public static HoodieData convertMetadataToBloomFilterRecords( HoodieEngineContext context, HoodieConfig hoodieConfig, HoodieCommitMetadata commitMetadata, String instantTime, MetadataRecordsGenerationParams recordsGenerationParams) { final List allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream() - .flatMap(entry -> entry.stream()).collect(Collectors.toList()); + .flatMap(entry -> entry.stream()).collect(toList()); if (allWriteStats.isEmpty()) { return context.emptyHoodieData(); } @@ -1052,7 +1060,7 @@ private static List getPartitionFileSlices(HoodieTableMetaClient meta } else { fileSliceStream = fsView.getLatestFileSlices(partition); } - return fileSliceStream.sorted(Comparator.comparing(FileSlice::getFileId)).collect(Collectors.toList()); + return fileSliceStream.sorted(Comparator.comparing(FileSlice::getFileId)).collect(toList()); } /** @@ -1070,14 +1078,14 @@ public static List getPartitionLatestFileSlicesIncludingInflight(Hood Stream fileSliceStream = fsView.fetchLatestFileSlicesIncludingInflight(partition); return fileSliceStream .sorted(Comparator.comparing(FileSlice::getFileId)) - .collect(Collectors.toList()); + .collect(toList()); } public static HoodieData convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata, HoodieEngineContext engineContext, MetadataRecordsGenerationParams recordsGenerationParams) { List allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream() - .flatMap(Collection::stream).collect(Collectors.toList()); + .flatMap(Collection::stream).collect(toList()); if (allWriteStats.isEmpty()) { return engineContext.emptyHoodieData(); @@ -1132,7 +1140,7 @@ private static List getColumnsToIndex(MetadataRecordsGenerationParams re .map(writerSchema -> writerSchema.getFields().stream() .map(Schema.Field::name) - .collect(Collectors.toList())) + .collect(toList())) .orElse(Collections.emptyList()); } @@ -1160,7 +1168,7 @@ private static Stream getColumnStatsRecords(String partitionPath, // TODO we should delete records instead of stubbing them List> columnRangeMetadataList = columnsToIndex.stream() .map(entry -> HoodieColumnRangeMetadata.stub(fileName, entry)) - .collect(Collectors.toList()); + .collect(toList()); return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, columnRangeMetadataList, true); } @@ -1195,7 +1203,7 @@ private static List> readColumnRangeMetada /** * Does an upcast for {@link BigDecimal} instance to align it with scale/precision expected by - * the {@link org.apache.avro.LogicalTypes.Decimal} Avro logical type + * the {@link LogicalTypes.Decimal} Avro logical type */ public static BigDecimal tryUpcastDecimal(BigDecimal value, final LogicalTypes.Decimal decimal) { final int scale = decimal.getScale(); @@ -1335,7 +1343,7 @@ public static Set getValidInstantTimestamps(HoodieTableMetaClient dataMe .filter(instant -> instant.isCompleted() && isValidInstant(instant)) .getInstantsAsStream() .map(HoodieInstant::getTimestamp) - .collect(Collectors.toList())); + .collect(toList())); // For any rollbacks and restores, we cannot neglect the instants that they are rolling back. // The rollback instant should be more recent than the start of the timeline for it to have rolled back any @@ -1750,7 +1758,7 @@ public static HoodieRecordGlobalLocation getLocationFromRecordIndexInfo( fileId = originalFileId; } - final java.util.Date instantDate = new java.util.Date(instantTime); + final Date instantDate = new Date(instantTime); return new HoodieRecordGlobalLocation(partition, HoodieActiveTimeline.formatDate(instantDate), fileId); } @@ -1781,7 +1789,7 @@ public static HoodieData readRecordKeysFromBaseFiles(HoodieEngineC final String fileId = baseFile.getFileId(); final String instantTime = baseFile.getCommitTime(); - HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) + HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO) .getFileReader(config, configuration.get(), dataFilePath); ClosableIterator recordKeyIterator = reader.getRecordKeyIterator(); @@ -1877,7 +1885,7 @@ public HoodieRecord next() { final String fileId = baseFile.getFileId(); final String instantTime = baseFile.getCommitTime(); HoodieConfig hoodieConfig = getReaderConfigs(configuration.get()); - HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) + HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO) .getFileReader(hoodieConfig, configuration.get(), dataFilePath); ClosableIterator recordKeyIterator = reader.getRecordKeyIterator(); @@ -1902,6 +1910,128 @@ public HoodieRecord next() { }); } + public static HoodieData readSecondaryKeysFromFiles(HoodieEngineContext engineContext, + List>>> partitionFiles, + int recordIndexMaxParallelism, + String activeModule, HoodieTableMetaClient metaClient, EngineType engineType) { + if (partitionFiles.isEmpty()) { + return engineContext.emptyHoodieData(); + } + + engineContext.setJobStatus(activeModule, "Secondary Index: reading secondary keys from " + partitionFiles.size() + " partitions"); + final int parallelism = Math.min(partitionFiles.size(), recordIndexMaxParallelism); + final String basePath = metaClient.getBasePathV2().toString(); + final SerializableConfiguration configuration = new SerializableConfiguration(metaClient.getHadoopConf()); + + return engineContext.parallelize(partitionFiles, parallelism).flatMap(partitionAndBaseFile -> { + final String partition = partitionAndBaseFile.getKey(); + final Pair> baseAndLogFiles = partitionAndBaseFile.getValue(); + List logFilePaths = baseAndLogFiles.getValue(); + String filePath = baseAndLogFiles.getKey(); + + Path dataFilePath = filePath(basePath, "", filePath); + Schema tableSchema = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readAvroSchema(metaClient.getHadoopConf(), dataFilePath); + + return createSecondaryIndexGenerator(metaClient, engineType, logFilePaths, tableSchema, partition, dataFilePath, !filePath.isEmpty()); + }); + } + + private static ClosableIterator createSecondaryIndexGenerator(HoodieTableMetaClient metaClient, + EngineType engineType, List logFilePaths, + Schema tableSchema, String partition, + Path dataFilePath, boolean createBaseFileReader) throws Exception { + final String basePath = metaClient.getBasePathV2().toString(); + final SerializableConfiguration configuration = new SerializableConfiguration(metaClient.getHadoopConf()); + + HoodieRecordMerger recordMerger = HoodieRecordUtils.createRecordMerger( + basePath, + engineType, + Collections.emptyList(), + metaClient.getTableConfig().getRecordMergerStrategy()); + + HoodieMergedLogRecordScanner mergedLogRecordScanner = HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(metaClient.getFs()) + .withBasePath(basePath) + .withLogFilePaths(logFilePaths) + .withReaderSchema(tableSchema) + .withLatestInstantTime(metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse("")) + .withReadBlocksLazily(configuration.get().getBoolean("", true)) + .withReverseReader(false) + .withMaxMemorySizeInBytes(configuration.get().getLongBytes(MAX_MEMORY_FOR_COMPACTION.key(), DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)) + .withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath()) + .withPartition(partition) + .withOptimizedLogBlocksScan(configuration.get().getBoolean("hoodie" + HoodieMetadataConfig.OPTIMIZED_LOG_BLOCKS_SCAN, false)) + .withDiskMapType(configuration.get().getEnum(SPILLABLE_DISK_MAP_TYPE.key(), SPILLABLE_DISK_MAP_TYPE.defaultValue())) + .withBitCaskDiskMapCompressionEnabled(configuration.get().getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())) + .withRecordMerger(recordMerger) + .build(); + + Option baseFileReader = Option.empty(); + if (createBaseFileReader) { + baseFileReader = Option.of(HoodieFileReaderFactory.getReaderFactory(recordMerger.getRecordType()).getFileReader(metaClient.getTableConfig(), configuration.get(), dataFilePath)); + } + HoodieFileSliceReader fileSliceReader = new HoodieFileSliceReader(baseFileReader, mergedLogRecordScanner, tableSchema, metaClient.getTableConfig().getPreCombineField(), recordMerger, + metaClient.getTableConfig().getProps(), + Option.empty()); + ClosableIterator fileSliceIterator = ClosableIterator.wrap(fileSliceReader); + return new ClosableIterator() { + @Override + public void close() { + fileSliceIterator.close(); + } + + @Override + public boolean hasNext() { + return fileSliceIterator.hasNext(); + } + + @Override + public HoodieRecord next() { + HoodieRecord record = fileSliceIterator.next(); + String recordKey = record.getRecordKey(tableSchema, HoodieRecord.RECORD_KEY_METADATA_FIELD); + String secondaryKeyFields = String.join(".", metaClient.getTableConfig().getSecondaryKeyFields().get()); + String secondaryKey; + try { + GenericRecord genericRecord = (GenericRecord) (record.toIndexedRecord(tableSchema, new Properties()).get()).getData(); + secondaryKey = HoodieAvroUtils.getNestedFieldValAsString(genericRecord, secondaryKeyFields, true, false); + } catch (IOException e) { + throw new RuntimeException("Failed to fetch records." + e); + } + + return HoodieMetadataPayload.createSecondaryIndex(recordKey, secondaryKey, false); + } + }; + } + + public static HoodieData readSecondaryKeysFromFileSlices(HoodieEngineContext engineContext, + List> partitionFileSlicePairs, + int recordIndexMaxParallelism, + String activeModule, HoodieTableMetaClient metaClient, EngineType engineType) throws IOException { + if (partitionFileSlicePairs.isEmpty()) { + return engineContext.emptyHoodieData(); + } + + engineContext.setJobStatus(activeModule, "Secondary Index: reading secondary keys from " + partitionFileSlicePairs.size() + " file slices"); + final int parallelism = Math.min(partitionFileSlicePairs.size(), recordIndexMaxParallelism); + final String basePath = metaClient.getBasePathV2().toString(); + final SerializableConfiguration configuration = new SerializableConfiguration(metaClient.getHadoopConf()); + return engineContext.parallelize(partitionFileSlicePairs, parallelism).flatMap(partitionAndBaseFile -> { + final String partition = partitionAndBaseFile.getKey(); + final FileSlice fileSlice = partitionAndBaseFile.getValue(); + + final HoodieBaseFile baseFile = fileSlice.getBaseFile().get(); + final String filename = baseFile.getFileName(); + Path dataFilePath = filePath(basePath, partition, filename); + TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient); + Schema tableSchema = schemaResolver.getTableAvroSchema(); + + List logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) + .map(l -> l.getPath().toString()).collect(toList()); + + return createSecondaryIndexGenerator(metaClient, engineType, logFilePaths, tableSchema, partition, dataFilePath, fileSlice.getBaseFile().isPresent()); + }); + } + public static Schema getProjectedSchemaForFunctionalIndex(HoodieFunctionalIndexDefinition indexDefinition, HoodieTableMetaClient metaClient) throws Exception { TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient); Schema tableSchema = schemaResolver.getTableAvroSchema(); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java index 32ce529fa3ba6..0be8751ff5274 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java @@ -29,7 +29,8 @@ public enum MetadataPartitionType { COLUMN_STATS(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, "col-stats-"), BLOOM_FILTERS(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS, "bloom-filters-"), RECORD_INDEX(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX, "record-index-"), - FUNCTIONAL_INDEX(HoodieTableMetadataUtil.PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX, "func-index-"); + FUNCTIONAL_INDEX(HoodieTableMetadataUtil.PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX, "func-index-"), + SECONDARY_INDEX(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX, "secondary-index-"); // Partition path in metadata table. private final String partitionPath; diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 99080629e1709..1755906abd2c2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -427,6 +427,13 @@ object DataSourceWriteOptions { */ val RECORDKEY_FIELD = KeyGeneratorOptions.RECORDKEY_FIELD_NAME + /** + * Secondary key field. Columns to be used as the secondary index columns. Actual value + * will be obtained by invoking .toString() on the field value. Nested fields can be specified using + * the dot notation eg: `a.b.c` + */ + val SECONDARYKEY_FIELD = KeyGeneratorOptions.SECONDARYKEY_FIELD_NAME + /** * Partition path field. Value to be used at the `partitionPath` component of `HoodieKey`. Actual * value obtained by invoking .toString() diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 3444feaecff61..3b92f64499a7e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -95,7 +95,7 @@ case class HoodieFileIndex(spark: SparkSession, /** * NOTE: [[ColumnStatsIndexSupport]] is a transient state, since it's only relevant while logical plan - * is handled by the Spark's driver + * is handled by the Spark's driver */ @transient private lazy val columnStatsIndex = new ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient) @@ -171,7 +171,7 @@ case class HoodieFileIndex(spark: SparkSession, } }).filter(slice => slice != null) val c = fileSlices.filter(f => (includeLogFiles && f.getLogFiles.findAny().isPresent) - || (f.getBaseFile.isPresent && f.getBaseFile.get().getBootstrapBaseFile.isPresent)). + || (f.getBaseFile.isPresent && f.getBaseFile.get().getBootstrapBaseFile.isPresent)). foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId -> f) } if (c.nonEmpty) { sparkAdapter.getSparkPartitionedFileUtils.newPartitionDirectory( @@ -205,7 +205,7 @@ case class HoodieFileIndex(spark: SparkSession, } else if (shouldEmbedFileSlices) { assert(partitionSchema.isEmpty) prunedPartitionsAndFilteredFileSlices - }else { + } else { Seq(PartitionDirectory(InternalRow.empty, prunedPartitionsAndFilteredFileSlices.flatMap(_.files))) } } @@ -214,7 +214,7 @@ case class HoodieFileIndex(spark: SparkSession, * The functions prunes the partition paths based on the input partition filters. For every partition path, the file * slices are further filtered after querying metadata table based on the data filters. * - * @param dataFilters data columns filters + * @param dataFilters data columns filters * @param partitionFilters partition column filters * @return A sequence of pruned partitions and corresponding filtered file slices */ @@ -235,16 +235,16 @@ case class HoodieFileIndex(spark: SparkSession, // - List of predicates (filters) is present val shouldPushDownFilesFilter = !partitionFilters.isEmpty val candidateFilesNamesOpt: Option[Set[String]] = - lookupCandidateFilesInMetadataTable(dataFilters, prunedPartitionsAndFileSlices, shouldPushDownFilesFilter) match { - case Success(opt) => opt - case Failure(e) => - logError("Failed to lookup candidate files in File Index", e) - - spark.sqlContext.getConf(DataSkippingFailureMode.configName, DataSkippingFailureMode.Fallback.value) match { - case DataSkippingFailureMode.Fallback.value => Option.empty - case DataSkippingFailureMode.Strict.value => throw new HoodieException(e); - } - } + lookupCandidateFilesInMetadataTable(dataFilters, prunedPartitionsAndFileSlices, shouldPushDownFilesFilter) match { + case Success(opt) => opt + case Failure(e) => + logError("Failed to lookup candidate files in File Index", e) + + spark.sqlContext.getConf(DataSkippingFailureMode.configName, DataSkippingFailureMode.Fallback.value) match { + case DataSkippingFailureMode.Fallback.value => Option.empty + case DataSkippingFailureMode.Strict.value => throw new HoodieException(e); + } + } logDebug(s"Overlapping candidate files from Column Stats or Record Level Index: ${candidateFilesNamesOpt.getOrElse(Set.empty)}") @@ -286,7 +286,7 @@ case class HoodieFileIndex(spark: SparkSession, } } - def getFileSlicesForPrunedPartitions(partitionFilters: Seq[Expression]) : Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])] = { + def getFileSlicesForPrunedPartitions(partitionFilters: Seq[Expression]): Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])] = { // Prune the partition path by the partition filters // NOTE: Non-partitioned tables are assumed to consist from a single partition // encompassing the whole table @@ -344,11 +344,15 @@ case class HoodieFileIndex(spark: SparkSession, lazy val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema) lazy val (_, recordKeys) = recordLevelIndex.filterQueriesWithRecordKey(queryFilters) + lazy val (_, secondaryKeys) = recordLevelIndex.filterQueriesWithSecondaryKey(queryFilters) if (!isMetadataTableEnabled || !isDataSkippingEnabled) { validateConfig() Option.empty } else if (recordKeys.nonEmpty) { Option.apply(recordLevelIndex.getCandidateFiles(getAllFiles(), recordKeys)) + } else if (secondaryKeys.nonEmpty) { + // If available, secondary keys are used to prune candidate files (before other indexes below) + Option.apply(recordLevelIndex.getCandidateFilesFromSecondaryIndex(getAllFiles(), secondaryKeys)) } else if (functionalIndex.isIndexAvailable && !queryFilters.isEmpty) { val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices) val shouldReadInMemory = functionalIndex.shouldReadInMemory(this, queryReferencedColumns) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 00ec59c5b8fd7..639afb758374d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -314,6 +314,7 @@ class HoodieSparkSqlWriterInternal { .setPartitionFields(partitionColumns) .setPopulateMetaFields(populateMetaFields) .setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD)) + .setSecondaryKeyFields(hoodieConfig.getString(SECONDARYKEY_FIELD)) .setCDCEnabled(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED)) .setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE)) .setKeyGeneratorClassProp(keyGenProp) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala index 4519334d4d421..626c989280eb7 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala @@ -367,7 +367,7 @@ object LogFileIterator extends SparkAdapterSupport { val relativePartitionPath = getRelativePartitionPath(new Path(tablePath), partitionPath) val logRecordReader = - metadataTable.getLogRecordScanner(logFiles.asJava, relativePartitionPath, toJavaOption(Some(forceFullScan))) + metadataTable.getLogRecordScanner(logFiles.asJava, relativePartitionPath, toJavaOption(Some(forceFullScan)), toJavaOption(None)) .getLeft val recordList = closing(logRecordReader) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala index 764ce69795d99..9d76433f78a0f 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala @@ -64,6 +64,32 @@ class RecordLevelIndexSupport(spark: SparkSession, candidateFiles.toSet } + /** + * Returns the list of candidate files which store the provided record keys based on Metadata Table Secondary Index + * and Metadata Table Record Index. + * @param secondaryKeys - List of secondary keys. + * @return Sequence of file names which need to be queried + */ + def getCandidateFilesFromSecondaryIndex(allFiles: Seq[FileStatus], secondaryKeys: List[String]): Set[String] = { + val recordKeyLocationsMap = metadataTable.readSecondaryIndex(JavaConverters.seqAsJavaListConverter(secondaryKeys).asJava) + val fileIdToPartitionMap: mutable.Map[String, String] = mutable.Map.empty + val candidateFiles: mutable.Set[String] = mutable.Set.empty + for (locations <- JavaConverters.collectionAsScalaIterableConverter(recordKeyLocationsMap.values()).asScala) { + for (location <- JavaConverters.collectionAsScalaIterableConverter(locations).asScala) { + fileIdToPartitionMap.put(location.getFileId, location.getPartitionPath) + } + } + + for (file <- allFiles) { + val fileId = FSUtils.getFileIdFromFilePath(file.getPath) + val partitionOpt = fileIdToPartitionMap.get(fileId) + if (partitionOpt.isDefined) { + candidateFiles += file.getPath.getName + } + } + candidateFiles.toSet + } + /** * Returns the configured record key for the table if it is a simple record key else returns empty option. */ @@ -79,16 +105,33 @@ class RecordLevelIndexSupport(spark: SparkSession, } /** - * Matches the configured simple record key with the input attribute name. + * Returns the configured secondary key for the table + * TODO: Handle multiple secondary indexes (similar to functional index) + */ + private def getSecondaryKeyConfig: Option[String] = { + val secondaryKeysOpt: org.apache.hudi.common.util.Option[Array[String]] = metaClient.getTableConfig.getSecondaryKeyFields + val secondaryKeyOpt = secondaryKeysOpt.map[String](JFunction.toJavaFunction[Array[String], String](arr => + if (arr.length == 1) { + arr(0) + } else { + null + })) + Option.apply(secondaryKeyOpt.orElse(null)) + } + + /** + * Matches the configured key (record key or secondary key) with the input attribute name. * @param attributeName The attribute name provided in the query - * @return true if input attribute name matches the configured simple record key + * @param isPrimaryKey Should match primary key (record key). If false, it matches secondary key. + * @return true if input attribute name matches the configured key + * TODO: Handle multiple secondary indexes (similar to functional index) */ - private def attributeMatchesRecordKey(attributeName: String): Boolean = { - val recordKeyOpt = getRecordKeyConfig - if (recordKeyOpt.isDefined && recordKeyOpt.get == attributeName) { + private def attributeMatchesKey(attributeName: String, isPrimaryKey: Boolean): Boolean = { + val keyOpt = if (isPrimaryKey) getRecordKeyConfig else getSecondaryKeyConfig + if (keyOpt.isDefined && keyOpt.get == attributeName) { true } else { - HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName == recordKeyOpt.get + false } } @@ -132,7 +175,7 @@ class RecordLevelIndexSupport(spark: SparkSession, var recordKeyQueries: List[Expression] = List.empty var recordKeys: List[String] = List.empty for (query <- queryFilters) { - filterQueryWithRecordKey(query).foreach({ + filterQueryWithKey(query, true).foreach({ case (exp: Expression, recKeys: List[String]) => recordKeys = recordKeys ++ recKeys recordKeyQueries = recordKeyQueries :+ exp @@ -143,18 +186,37 @@ class RecordLevelIndexSupport(spark: SparkSession, } } + def filterQueriesWithSecondaryKey(queryFilters: Seq[Expression]): (List[Expression], List[String]) = { + if (!isSecondaryIndexAvailable) { + (List.empty, List.empty) + } else { + var secondaryKeyQueries: List[Expression] = List.empty + var secondaryKeys: List[String] = List.empty + for (query <- queryFilters) { + filterQueryWithKey(query, false).foreach({ + case (exp: Expression, recKeys: List[String]) => + secondaryKeys = secondaryKeys ++ recKeys + secondaryKeyQueries = secondaryKeyQueries :+ exp + }) + } + + Tuple2.apply(secondaryKeyQueries, secondaryKeys) + } + } + /** * If the input query is an EqualTo or IN query on simple record key columns, the function returns a tuple of * list of the query and list of record key literals present in the query otherwise returns an empty option. * * @param queryFilter The query that need to be filtered. + * @param isPrimaryKey Should use primary key (record key) to filter. If false, it uses secondary key. * @return Tuple of filtered query and list of record key literals that need to be matched */ - private def filterQueryWithRecordKey(queryFilter: Expression): Option[(Expression, List[String])] = { + private def filterQueryWithKey(queryFilter: Expression, isPrimaryKey: Boolean): Option[(Expression, List[String])] = { queryFilter match { case equalToQuery: EqualTo => val (attribute, literal) = getAttributeLiteralTuple(equalToQuery.left, equalToQuery.right).orNull - if (attribute != null && attribute.name != null && attributeMatchesRecordKey(attribute.name)) { + if (attribute != null && attribute.name != null && attributeMatchesKey(attribute.name, isPrimaryKey)) { Option.apply(equalToQuery, List.apply(literal.value.toString)) } else { Option.empty @@ -165,6 +227,7 @@ class RecordLevelIndexSupport(spark: SparkSession, case _: AttributeReference => case _ => validINQuery = false } + var literals: List[String] = List.empty inQuery.list.foreach { case literal: Literal => literals = literals :+ literal.value.toString @@ -185,4 +248,11 @@ class RecordLevelIndexSupport(spark: SparkSession, def isIndexAvailable: Boolean = { metadataConfig.enabled && metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX) } + + /** + * Return true if metadata table is enabled and secondary index metadata partition is available. + */ + def isSecondaryIndexAvailable: Boolean = { + isIndexAvailable && metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX) + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala index 95c07b987a0bd..47bc5a94a1105 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.functions.{col, not} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.api._ +import org.slf4j.LoggerFactory import java.util.concurrent.atomic.AtomicInteger import java.util.stream.Collectors @@ -57,7 +58,29 @@ class RecordLevelIndexTestBase extends HoodieSparkClientTestBase { PRECOMBINE_FIELD.key -> "timestamp", HoodieTableConfig.POPULATE_META_FIELDS.key -> "true" ) ++ metadataOpts + + val secondaryIndexOpts = Map( + HoodieMetadataConfig.SECONDARY_INDEX_ENABLE_PROP.key -> "true" + ) + + val commonOptsWithSecondaryIndex = commonOpts ++ secondaryIndexOpts ++ metadataOpts + + val commonOptsNewTableSITest = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "trips_table", + RECORDKEY_FIELD.key -> "uuid", + SECONDARYKEY_FIELD.key -> "city", + PARTITIONPATH_FIELD.key -> "state", + PRECOMBINE_FIELD.key -> "ts", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true" + ) ++ metadataOpts + + val commonOptsWithSecondaryIndexSITest = commonOptsNewTableSITest ++ secondaryIndexOpts + + var mergedDfList: List[DataFrame] = List.empty + private val log = LoggerFactory.getLogger(classOf[RecordLevelIndexTestBase]) @BeforeEach override def setUp() { @@ -172,7 +195,9 @@ class RecordLevelIndexTestBase extends HoodieSparkClientTestBase { latestBatch = recordsToStrings(dataGen.generateInserts(getInstantTime(), 5)).asScala } val latestBatchDf = spark.read.json(spark.sparkContext.parallelize(latestBatch, 2)) + latestBatchDf.cache() + log.error("VINAY: latestBatchDf schema is " + latestBatchDf.schema.simpleString) latestBatchDf.write.format("org.apache.hudi") .options(hudiOpts) .option(DataSourceWriteOptions.OPERATION.key, operation) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndex.scala new file mode 100644 index 0000000000000..f0a6a0248d546 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndex.scala @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.DataSourceWriteOptions +import org.apache.hudi.DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL +import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.DataSourceReadOptions +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.metadata.HoodieTableMetadata +import org.apache.spark.sql.SaveMode +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.EnumSource +import org.scalatest.Assertions.assertResult +import org.slf4j.LoggerFactory + +class TestSecondaryIndex extends RecordLevelIndexTestBase { + private val log = LoggerFactory.getLogger(getClass) + @ParameterizedTest + @EnumSource(classOf[HoodieTableType]) + def testSIInitialization(tableType: HoodieTableType): Unit = { + //create a new table + val tableName = "trips_table1" + val basePath = "file:///tmp/trips_table1" + val columns = Seq("ts", "uuid", "rider", "driver", "fare", "city", "state") + val data = + Seq((1695159649087L, "334e26e9-8355-45cc-97c6-c31daf0df330", "rider-A", "driver-K", 19.10, "san_francisco", "california"), + (1695091554787L, "e96c4396-3fad-413a-a942-4cb36106d720", "rider-B", "driver-M", 27.70, "sao_paulo", "texas"), + (1695091554788L, "e96c4396-3fad-413a-a942-4cb36106d721", "rider-C", "driver-K", 27.70, "san_francisco", "california"), + (1695046462179L, "9909a8b1-2d15-4d3d-8ec9-efc48c536a00", "rider-D", "driver-L", 33.90, "san_francisco", "california"), + (1695516137016L, "e3cf430c-889d-4015-bc98-59bdce1e530c", "rider-E", "driver-P", 34.15, "sao_paulo", "texas"), + (1695046462179L, "9909a8b1-2d15-4d3d-8ec9-efc48c536a01", "rider-D", "driver-L", 33.90, "los-angeles", "california"), + (1695516137016L, "e3cf430c-889d-4015-bc98-59bdce1e530b", "rider-E", "driver-P", 34.15, "bengaluru", "karnataka"), + (1695115999911L, "c8abbe79-8d89-47ea-b4ce-4d224bae5bfa", "rider-F", "driver-T", 17.85, "chennai", "tamil-nadu")); + + val inserts = spark.createDataFrame(data).toDF(columns: _*) + inserts.write.format("hudi"). + option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()). + option("hoodie.datasource.write.operation", "insert"). + options(commonOptsWithSecondaryIndexSITest). + mode(SaveMode.Overwrite). + save(basePath) + + val tripsDF = spark.read.format("hudi"). + load(basePath). + select("ts", "uuid", "rider", "driver", "fare", "city", "state"). + orderBy("uuid") + tripsDF.show(false) + + assertEquals(tripsDF.count(), data.length) + val rowCount = tripsDF.count().toInt; + assertResult(inserts.orderBy("uuid").take(rowCount))(tripsDF.take(rowCount)) + + // Get the metadata reader and read secondary index + val schemaStr = tripsDF.schema.simpleString + val writeConfig: HoodieWriteConfig = HoodieWriteConfig.newBuilder.withPath(basePath).withSchema(schemaStr).build() + val metadataReader = HoodieTableMetadata.create(context, writeConfig.getMetadataConfig, basePath) + val recordLocationFromSecondaryIndex = metadataReader.readSecondaryIndex(scala.collection.JavaConverters.seqAsJavaList(Seq("san_francisco"))) + assertEquals(recordLocationFromSecondaryIndex.size(), tripsDF.select("city").filter("city='san_francisco'").count()) + } + + @ParameterizedTest + @EnumSource(classOf[HoodieTableType]) + def testSIUpsert(tableType: HoodieTableType): Unit = { + val tableName = "trips_table2" + val basePath1 = "file:///tmp/trips_table2" + val columns = Seq("ts", "uuid", "rider", "driver", "fare", "city", "state") + val data1 = + Seq((1695159649087L, "334e26e9-8355-45cc-97c6-c31daf0df330", "rider-A", "driver-K", 19.10, "san_francisco", "california"), + (1695091554787L, "e96c4396-3fad-413a-a942-4cb36106d720", "rider-B", "driver-M", 27.70, "sao_paulo", "texas"), + (1695091554788L, "e96c4396-3fad-413a-a942-4cb36106d721", "rider-C", "driver-K", 27.70, "delhi", "delhi"), + (1695115999911L, "c8abbe79-8d89-47ea-b4ce-4d224bae5bfa", "rider-F", "driver-T", 17.85, "chennai", "tamil-nadu")); + + val inserts = spark.createDataFrame(data1).toDF(columns: _*) + inserts.write.format("hudi"). + option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()). + option("hoodie.datasource.write.operation", "insert"). + options(commonOptsNewTableSITest). + mode(SaveMode.Overwrite). + save(basePath1) + + val data2 = + Seq((1695046462179L, "9909a8b1-2d15-4d3d-8ec9-efc48c536a00", "rider-D", "driver-L", 33.90, "london", "greater-london"), + (1695516137016L, "e96c4396-3fad-413a-a942-4cb36106d722", "rider-E", "driver-P", 34.15, "austin", "texas")) + + val inserts2 = spark.createDataFrame(data2).toDF(columns: _*) + inserts2.write.format("hudi"). + option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()). + option("hoodie.datasource.write.operation", "upsert"). + options(commonOptsWithSecondaryIndexSITest). + mode(SaveMode.Append). + save(basePath1) + + val tripsDF = spark.read.format("hudi").load(basePath1) + tripsDF.show(false) + assertEquals(tripsDF.count(), data1.length + data2.length) + + // Get the metadata reader and read secondary index + val schemaStr = tripsDF.schema.simpleString + val writeConfig: HoodieWriteConfig = HoodieWriteConfig.newBuilder.withPath(basePath1).withSchema(schemaStr).build() + val metadataReader = HoodieTableMetadata.create(context, writeConfig.getMetadataConfig, basePath1) + val recordLocationFromSecondaryIndex = metadataReader.readSecondaryIndex(scala.collection.JavaConverters.seqAsJavaList(Seq("san_francisco"))) + assertEquals(recordLocationFromSecondaryIndex.size(), tripsDF.select("city").filter("city='san_francisco'").count()) + } + + @Test + def testSIWithDelete(): Unit = { + val tableName = "trips_table3" + val basePath1 = "file:///tmp/trips_table3" + val columns = Seq("ts", "uuid", "rider", "driver", "fare", "city", "state") + val data1 = + Seq((1695159649087L, "334e26e9-8355-45cc-97c6-c31daf0df330", "rider-A", "driver-K", 19.10, "san_francisco", "california"), + (1695091554787L, "334e26e9-8355-45cc-97c6-c31daf0df331", "rider-B", "driver-M", 27.70, "san_francisco", "california"), + (1695091554787L, "334e26e9-8355-45cc-97c6-c31daf0df332", "rider-B", "driver-M", 27.70, "san_francisco", "california"), + (1695091554788L, "e96c4396-3fad-413a-a942-4cb36106d721", "rider-C", "driver-K", 27.70, "delhi", "delhi"), + (1695115999911L, "c8abbe79-8d89-47ea-b4ce-4d224bae5bfa", "rider-F", "driver-T", 17.85, "chennai", "tamil-nadu")); + + val inserts = spark.createDataFrame(data1).toDF(columns: _*) + inserts.write.format("hudi"). + option(DataSourceWriteOptions.TABLE_TYPE.key, HoodieTableType.COPY_ON_WRITE.name()). + option("hoodie.datasource.write.operation", "insert"). + options(commonOptsWithSecondaryIndexSITest). + mode(SaveMode.Overwrite). + save(basePath1) + + val deleteDF = inserts.filter("uuid='334e26e9-8355-45cc-97c6-c31daf0df330'") + deleteDF.write.format("org.apache.hudi") + .option(DataSourceWriteOptions.TABLE_TYPE.key, HoodieTableType.COPY_ON_WRITE.name()) + .option(DataSourceWriteOptions.OPERATION.key, DELETE_OPERATION_OPT_VAL) + .options(commonOptsWithSecondaryIndexSITest) + .mode(SaveMode.Append) + .save(basePath1) + + // Read entry using secondary key + val tripsDF = spark.read.format("hudi"). + option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key, "true"). + options(commonOptsWithSecondaryIndexSITest). + load(basePath1). + select("ts", "uuid", "rider", "driver", "fare", "city", "state"). + where("city='san_francisco'"). + orderBy("uuid") + val expectedDF = inserts. + filter("city='san_francisco'"). + filter("uuid != '334e26e9-8355-45cc-97c6-c31daf0df330'"). + orderBy("uuid") + assertEquals(tripsDF.count(), expectedDF.count()) + + val rowCount = tripsDF.count().toInt + assertResult(expectedDF.take(rowCount))(tripsDF.take(rowCount)) + + val schemaStr = tripsDF.schema.simpleString + val writeConfig: HoodieWriteConfig = HoodieWriteConfig.newBuilder.withPath(basePath1).withSchema(schemaStr).build() + val metadataReader = HoodieTableMetadata.create(context, writeConfig.getMetadataConfig, basePath1) + val recordLocationFromSecondaryIndex = metadataReader.readSecondaryIndex(scala.collection.JavaConverters.seqAsJavaList(Seq("san_francisco"))) + assertEquals(recordLocationFromSecondaryIndex.size(), tripsDF.filter("city='san_francisco'").count()) + } + + @ParameterizedTest + @EnumSource(classOf[HoodieTableType]) + def testSIRead(tableType: HoodieTableType): Unit = { + //create a new table + val tableName = "trips_table3" + val basePath1 = "file:///tmp/trips_table3" + val columns = Seq("ts", "uuid", "rider", "driver", "fare", "city", "state") + val data = + Seq((1695159649087L, "334e26e9-8355-45cc-97c6-c31daf0df330", "rider-A", "driver-K", 19.10, "san_francisco", "california"), + (1695091554787L, "e96c4396-3fad-413a-a942-4cb36106d720", "rider-B", "driver-M", 27.70, "sao_paulo", "texas"), + (1695091554788L, "e96c4396-3fad-413a-a942-4cb36106d721", "rider-C", "driver-K", 27.70, "ithaca", "new-york"), + (1695046462179L, "9909a8b1-2d15-4d3d-8ec9-efc48c536a00", "rider-D", "driver-L", 33.90, "seattle", "washington"), + (1695516137016L, "e3cf430c-889d-4015-bc98-59bdce1e530c", "rider-E", "driver-P", 34.15, "chennai", "karnataka"), + (1695115999911L, "c8abbe79-8d89-47ea-b4ce-4d224bae5bfa", "rider-F", "driver-T", 17.85, "chennai", "tamil-nadu")); + + val inserts = spark.createDataFrame(data).toDF(columns: _*) + inserts.write.format("hudi"). + option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()). + option("hoodie.datasource.write.operation", "insert"). + options(commonOptsWithSecondaryIndexSITest). + mode(SaveMode.Overwrite). + save(basePath1) + + // Predicate on secondary key + val tripsDF = spark.read.format("hudi"). + option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key, "true"). + options(commonOptsWithSecondaryIndexSITest). + load(basePath1). + select("ts", "uuid", "rider", "driver", "fare", "city", "state"). + where("city='chennai'"). + orderBy("uuid") + // TODO: Assert the usage of secondary index through query plan + log.info("VINAY: Executed Plan - \n" + tripsDF.queryExecution.executedPlan.toString()) + + val expectedDF = inserts.filter("city='chennai'").orderBy("uuid") + assertEquals(tripsDF.count(), expectedDF.count()) + + val rowCount = tripsDF.count().toInt + assertResult(expectedDF.take(rowCount))(tripsDF.take(rowCount)) + + val schemaStr = tripsDF.schema.simpleString + val writeConfig: HoodieWriteConfig = HoodieWriteConfig.newBuilder.withPath(basePath1).withSchema(schemaStr).build() + val metadataReader = HoodieTableMetadata.create(context, writeConfig.getMetadataConfig, basePath1) + val recordLocationFromSecondaryIndex = metadataReader.readSecondaryIndex(scala.collection.JavaConverters.seqAsJavaList(Seq("chennai"))) + assertEquals(recordLocationFromSecondaryIndex.size(), tripsDF.filter("city='chennai'").count()) + } +}