diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 8fd3546671e5a..965db88f643bb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -2493,6 +2493,10 @@ public boolean isLogCompactionEnabledOnMetadata() { return getBoolean(HoodieMetadataConfig.ENABLE_LOG_COMPACTION_ON_METADATA_TABLE); } + public boolean isSecondaryIndexEnabled() { + return metadataConfig.enableSecondaryIndex(); + } + public boolean isRecordIndexEnabled() { return metadataConfig.enableRecordIndex(); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 8d40fc240952e..18c6f05d66dd5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -29,11 +29,13 @@ import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieDeltaWriteStat; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileGroup; @@ -112,11 +114,14 @@ import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchemaForFunctionalIndex; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.readRecordKeysFromBaseFiles; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.readSecondaryKeysFromFileSlices; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.readSecondaryKeysFromFiles; import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS; import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS; import static org.apache.hudi.metadata.MetadataPartitionType.FILES; import static org.apache.hudi.metadata.MetadataPartitionType.FUNCTIONAL_INDEX; import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX; +import static org.apache.hudi.metadata.MetadataPartitionType.SECONDARY_INDEX; /** * Writer implementation backed by an internal hudi table. Partition and file listing are saved within an internal MOR table @@ -223,6 +228,11 @@ private void enablePartitions() { } if (dataWriteConfig.isRecordIndexEnabled() || dataMetaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX)) { this.enabledPartitionTypes.add(RECORD_INDEX); + + // Enable secondary index only iff record index is enabled + if (dataWriteConfig.isSecondaryIndexEnabled() || dataMetaClient.getTableConfig().isMetadataPartitionAvailable(SECONDARY_INDEX)) { + this.enabledPartitionTypes.add(SECONDARY_INDEX); + } } if (dataMetaClient.getFunctionalIndexMetadata().isPresent()) { this.enabledPartitionTypes.add(FUNCTIONAL_INDEX); @@ -437,6 +447,9 @@ private boolean initializeFromFilesystem(String initializationTime, List getFunctionalIndexRecords(List getDeletedSecondaryRecordMapping(HoodieEngineContext engineContext, + Map recordKeySecondaryKeyMap); + private Pair> initializeFunctionalIndexPartition() throws Exception { HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(dataMetaClient, dataMetaClient.getActiveTimeline(), metadata); String indexName = dataWriteConfig.getFunctionalIndexConfig().getIndexName(); @@ -546,6 +562,39 @@ private HoodieFunctionalIndexDefinition getFunctionalIndexDefinition(String inde } } + private Pair> initializeSecondaryIndexPartition() throws IOException { + HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(dataMetaClient, dataMetaClient.getActiveTimeline(), metadata); + // Collect the list of latest file slices present in each partition + List partitions = metadata.getAllPartitionPaths(); + fsView.loadAllPartitions(); + + List> partitionFileSlicePairs = new ArrayList<>(); + partitions.forEach(partition -> { + fsView.getLatestFileSlices(partition).forEach(fs -> { + partitionFileSlicePairs.add(Pair.of(partition, fs)); + }); + }); + + // Reuse record index parallelism config to build secondary index + int parallelism = Math.min(partitionFileSlicePairs.size(), dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism()); + + HoodieData records = readSecondaryKeysFromFileSlices( + engineContext, + partitionFileSlicePairs, + parallelism, + this.getClass().getSimpleName(), + dataMetaClient, + EngineType.SPARK); + + // Initialize the file groups - using the same estimation logic as that of record index + final int fileGroupCount = HoodieTableMetadataUtil.estimateFileGroupCount(RECORD_INDEX, records.count(), + RECORD_INDEX_AVERAGE_RECORD_SIZE, dataWriteConfig.getRecordIndexMinFileGroupCount(), + dataWriteConfig.getRecordIndexMaxFileGroupCount(), dataWriteConfig.getRecordIndexGrowthFactor(), + dataWriteConfig.getRecordIndexMaxFileGroupSizeBytes()); + + return Pair.of(fileGroupCount, records); + } + private Pair> initializeRecordIndexPartition() throws IOException { final HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(dataMetaClient, dataMetaClient.getActiveTimeline(), metadata); @@ -941,6 +990,8 @@ public void updateFromWriteStatuses(HoodieCommitMetadata commitMetadata, HoodieD HoodieData additionalUpdates = getRecordIndexAdditionalUpserts(updatesFromWriteStatuses, commitMetadata); partitionToRecordMap.put(RECORD_INDEX, updatesFromWriteStatuses.union(additionalUpdates)); updateFunctionalIndexIfPresent(commitMetadata, instantTime, partitionToRecordMap); + updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, writeStatus); + return partitionToRecordMap; }); closeInternal(); @@ -1005,6 +1056,73 @@ private HoodieData getFunctionalIndexUpdates(HoodieCommitMetadata return getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition, dataMetaClient, parallelism, readerSchema, hadoopConf); } + private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata, Map> partitionToRecordMap, HoodieData writeStatus) { + dataMetaClient.getTableConfig().getMetadataPartitions() + .stream() + .filter(partition -> partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX)) + .forEach(partition -> { + HoodieData secondaryIndexRecords; + try { + secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata, writeStatus); + } catch (Exception e) { + throw new HoodieMetadataException("Failed to get secondary index updates for partition " + partition, e); + } + partitionToRecordMap.put(SECONDARY_INDEX, secondaryIndexRecords); + }); + } + + private HoodieData getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, HoodieData writeStatus) throws Exception { + // Build a list of basefiles+delta-log-files for every partition that this commit touches + // { + // { + // "partition1", { {"baseFile11", {"logFile11", "logFile12"}}, {"baseFile12", {"logFile11"} } }, + // }, + // { + // "partition2", { {"baseFile21", {"logFile21", "logFile22"}}, {"baseFile22", {"logFile21"} } } + // } + // } + List>>> partitionFilePairs = new ArrayList<>(); + commitMetadata.getPartitionToWriteStats().forEach((dataPartition, writeStats) -> { + writeStats.forEach(writeStat -> { + if (writeStat instanceof HoodieDeltaWriteStat) { + partitionFilePairs.add(Pair.of(dataPartition, Pair.of(((HoodieDeltaWriteStat) writeStat).getBaseFile(), ((HoodieDeltaWriteStat) writeStat).getLogFiles()))); + } else { + partitionFilePairs.add(Pair.of(dataPartition, Pair.of(writeStat.getPath(), new ArrayList<>()))); + } + }); + }); + + // Build a list of keys that need to be removed. A 'delete' record will be emitted into the respective FileGroup of + // the secondary index partition for each of these keys. For a commit which is deleting/updating a lot of records, this + // operation is going to be expensive (in CPU, memory and IO) + List keysToRemove = new ArrayList<>(); + writeStatus.collectAsList().forEach(status -> { + status.getWrittenRecordDelegates().forEach(recordDelegate -> { + // Consider those keys which were either updated or deleted in this commit + if (!recordDelegate.getNewLocation().isPresent() || (recordDelegate.getCurrentLocation().isPresent() && recordDelegate.getNewLocation().isPresent())) { + keysToRemove.add(recordDelegate.getRecordKey()); + } + }); + }); + + // Fetch the secondary keys that each of the record keys ('keysToRemove') maps to + // This is obtained by scanning the entire secondary index partition in the metadata table + // This could be an expensive operation for a large commit (updating/deleting millions of rows) + Map recordKeySecondaryKeyMap = metadata.getSecondaryKeys(keysToRemove); + HoodieData deletedRecords = getDeletedSecondaryRecordMapping(engineContext, recordKeySecondaryKeyMap); + + // Reuse record index parallelism config to build secondary index + int parallelism = Math.min(partitionFilePairs.size(), dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism()); + + return deletedRecords.union(readSecondaryKeysFromFiles( + engineContext, + partitionFilePairs, + parallelism, + this.getClass().getSimpleName(), + dataMetaClient, + EngineType.SPARK)); + } + /** * Update from {@code HoodieCleanMetadata}. * diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 9ce17cbb8c688..6824c373b80b1 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -210,4 +210,9 @@ protected HoodieData getFunctionalIndexRecords(List getDeletedSecondaryRecordMapping(HoodieEngineContext engineContext, Map recordKeySecondaryKeyMap) { + throw new HoodieNotSupportedException("Flink metadata table does not support secondary index yet."); + } } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java index c9c2730a7ea61..b757ad5b92e1e 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java @@ -122,4 +122,9 @@ protected HoodieData getFunctionalIndexRecords(List getDeletedSecondaryRecordMapping(HoodieEngineContext engineContext, Map recordKeySecondaryKeyMap) { + throw new HoodieNotSupportedException("Secondary index not supported for Java metadata table writer yet."); + } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index 3045a9b3762e6..a74e417f2ec34 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -50,6 +50,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -216,4 +217,19 @@ protected HoodieData getFunctionalIndexRecords(List, ?, ?> initializeWriteClient() { return new SparkRDDWriteClient(engineContext, metadataWriteConfig, true); } + + @Override + public HoodieData getDeletedSecondaryRecordMapping(HoodieEngineContext engineContext, + Map recordKeySecondaryKeyMap) { + HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) engineContext; + + if (recordKeySecondaryKeyMap.isEmpty()) { + return sparkEngineContext.emptyHoodieData(); + } + + List deletedRecords = new ArrayList<>(); + recordKeySecondaryKeyMap.forEach((key, value) -> deletedRecords.add(HoodieMetadataPayload.createSecondaryIndex(key, value, true))); + + return HoodieJavaRDD.of(deletedRecords, sparkEngineContext, 1); + } } diff --git a/hudi-common/src/main/avro/HoodieMetadata.avsc b/hudi-common/src/main/avro/HoodieMetadata.avsc index dc11095e3c7f4..4eefc3c37c815 100644 --- a/hudi-common/src/main/avro/HoodieMetadata.avsc +++ b/hudi-common/src/main/avro/HoodieMetadata.avsc @@ -432,6 +432,34 @@ } ], "default" : null + }, + { + "name": "SecondaryIndexMetadata", + "doc": "Metadata Index that contains information about secondary keys and the corresponding record keys in the dataset", + "type": [ + "null", + { + "type": "record", + "name": "HoodieSecondaryIndexInfo", + "fields": [ + { + "name": "recordKey", + "type": [ + "null", + "string" + ], + "default": null, + "doc": "Refers to the record key that this secondary key maps to" + }, + { + "name": "isDeleted", + "type": "boolean", + "doc": "True if this entry has been deleted" + } + ] + } + ], + "default" : null } ] } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index 28043c857d472..fb855d4b2ed85 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -235,6 +235,13 @@ public final class HoodieMetadataConfig extends HoodieConfig { .withDocumentation("When there is a pending instant in data table, this config limits the allowed number of deltacommits in metadata table to " + "prevent the metadata table's timeline from growing unboundedly as compaction won't be triggered due to the pending data table instant."); + public static final ConfigProperty SECONDARY_INDEX_ENABLE_PROP = ConfigProperty + .key(METADATA_PREFIX + ".secondary.index.enable") + .defaultValue(false) + .markAdvanced() + .sinceVersion("1.0.0") + .withDocumentation("Create the HUDI Secondary (non-unique) Index within the Metadata Table"); + public static final ConfigProperty RECORD_INDEX_ENABLE_PROP = ConfigProperty .key(METADATA_PREFIX + ".record.index.enable") .defaultValue(false) @@ -418,6 +425,11 @@ public int getMaxNumDeltacommitsWhenPending() { return getIntOrDefault(METADATA_MAX_NUM_DELTACOMMITS_WHEN_PENDING); } + public boolean enableSecondaryIndex() { + // Secondary index is enabled only iff record index (primary key index) is also enabled + return enableRecordIndex() && getBoolean(SECONDARY_INDEX_ENABLE_PROP); + } + public boolean enableRecordIndex() { return enabled() && getBoolean(RECORD_INDEX_ENABLE_PROP); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index f70f9456a8689..451f458244ba5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -131,6 +131,12 @@ public class HoodieTableConfig extends HoodieConfig { .withDocumentation("Columns used to uniquely identify the table. Concatenated values of these fields are used as " + " the record key component of HoodieKey."); + public static final ConfigProperty SECONDARYKEY_FIELDS = ConfigProperty + .key("hoodie.table.secondarykey.fields") + .noDefaultValue() + .withDocumentation("Columns used to uniquely identify the table. Concatenated values of these fields are used as " + + " the record key component of HoodieKey."); + public static final ConfigProperty CDC_ENABLED = ConfigProperty .key("hoodie.table.cdc.enabled") .defaultValue(false) @@ -540,6 +546,16 @@ public Option getRecordKeyFields() { } } + public Option getSecondaryKeyFields() { + String secondaryKeyFieldsValue = getStringOrDefault(SECONDARYKEY_FIELDS, null); + if (secondaryKeyFieldsValue == null) { + return Option.empty(); + } else { + return Option.of(Arrays.stream(secondaryKeyFieldsValue.split(",")) + .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new String[] {})); + } + } + public Option getPartitionFields() { if (contains(PARTITION_FIELDS)) { return Option.of(Arrays.stream(getString(PARTITION_FIELDS).split(",")) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 9d451893a630e..e2337b04613a8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -921,6 +921,7 @@ public static class PropertyBuilder { private String tableName; private String tableCreateSchema; private String recordKeyFields; + private String secondaryKeyFields; private String archiveLogFolder; private String payloadClassName; private String payloadType; @@ -988,6 +989,11 @@ public PropertyBuilder setRecordKeyFields(String recordKeyFields) { return this; } + public PropertyBuilder setSecondaryKeyFields(String secondaryKeyFields) { + this.secondaryKeyFields = secondaryKeyFields; + return this; + } + public PropertyBuilder setArchiveLogFolder(String archiveLogFolder) { this.archiveLogFolder = archiveLogFolder; return this; @@ -1208,6 +1214,9 @@ public PropertyBuilder fromProperties(Properties properties) { if (hoodieConfig.contains(HoodieTableConfig.RECORDKEY_FIELDS)) { setRecordKeyFields(hoodieConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS)); } + if (hoodieConfig.contains(HoodieTableConfig.SECONDARYKEY_FIELDS)) { + setSecondaryKeyFields(hoodieConfig.getString(HoodieTableConfig.SECONDARYKEY_FIELDS)); + } if (hoodieConfig.contains(HoodieTableConfig.CDC_ENABLED)) { setCDCEnabled(hoodieConfig.getBoolean(HoodieTableConfig.CDC_ENABLED)); } @@ -1322,6 +1331,11 @@ public Properties build() { if (null != recordKeyFields) { tableConfig.setValue(HoodieTableConfig.RECORDKEY_FIELDS, recordKeyFields); } + + if (null != secondaryKeyFields) { + tableConfig.setValue(HoodieTableConfig.SECONDARYKEY_FIELDS, secondaryKeyFields); + } + if (null != cdcEnabled) { tableConfig.setValue(HoodieTableConfig.CDC_ENABLED, Boolean.toString(cdcEnabled)); if (cdcEnabled && null != cdcSupplementalLoggingMode) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java similarity index 100% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java rename to hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java index 4d870618e7b68..bce007af73dac 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; /** @@ -45,10 +46,10 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordReade private HoodieUnMergedLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, int bufferSize, LogRecordScannerCallback callback, Option instantRange, InternalSchema internalSchema, - boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger, + Option keyFieldOverride, boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger, Option hoodieTableMetaClientOption) { super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange, - false, true, Option.empty(), internalSchema, Option.empty(), enableOptimizedLogBlocksScan, recordMerger, + false, true, keyFieldOverride, internalSchema, Option.empty(), enableOptimizedLogBlocksScan, recordMerger, hoodieTableMetaClientOption); this.callback = callback; } @@ -113,6 +114,7 @@ public static class Builder extends AbstractHoodieLogRecordReader.Builder { private boolean enableOptimizedLogBlocksScan; private HoodieRecordMerger recordMerger = HoodiePreCombineAvroRecordMerger.INSTANCE; private HoodieTableMetaClient hoodieTableMetaClient; + private String keyFieldOverride; public Builder withFileSystem(FileSystem fs) { this.fs = fs; @@ -191,13 +193,18 @@ public HoodieUnMergedLogRecordScanner.Builder withTableMetaClient( return this; } + public HoodieUnMergedLogRecordScanner.Builder withKeyFieldOverride(String keyFieldOverride) { + this.keyFieldOverride = Objects.requireNonNull(keyFieldOverride); + return this; + } + @Override public HoodieUnMergedLogRecordScanner build() { ValidationUtils.checkArgument(recordMerger != null); return new HoodieUnMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, callback, instantRange, - internalSchema, enableOptimizedLogBlocksScan, recordMerger, Option.ofNullable(hoodieTableMetaClient)); + internalSchema, Option.ofNullable(keyFieldOverride), enableOptimizedLogBlocksScan, recordMerger, Option.ofNullable(hoodieTableMetaClient)); } } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/LogFileIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogFileIterator.java similarity index 100% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/LogFileIterator.java rename to hudi-common/src/main/java/org/apache/hudi/common/table/log/LogFileIterator.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieClusteringException.java b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieClusteringException.java similarity index 100% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieClusteringException.java rename to hudi-common/src/main/java/org/apache/hudi/exception/HoodieClusteringException.java diff --git a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java index db4a9162129fa..d7703d25313ef 100644 --- a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java +++ b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java @@ -54,6 +54,13 @@ public class KeyGeneratorOptions extends HoodieConfig { + "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using\n" + "the dot notation eg: `a.b.c`"); + public static final ConfigProperty SECONDARYKEY_FIELD_NAME = ConfigProperty + .key("hoodie.datasource.write.secondarykey.field") + .noDefaultValue() + .withDocumentation("Secondary key field. Columns that consitute the secondary key component.\n" + + "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using\n" + + "the dot notation eg: `a.b.c`"); + public static final ConfigProperty PARTITIONPATH_FIELD_NAME = ConfigProperty .key("hoodie.datasource.write.partitionpath.field") .noDefaultValue() diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java index 0a1435ed12e85..07d46e225d83d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java @@ -305,6 +305,52 @@ public Map> readRecordIndex(List + * If the Metadata Table is not enabled, an exception is thrown to distinguish this from the absence of the key. + * + * @param secondaryKeys The list of secondary keys to read + */ + @Override + public Map> readSecondaryIndex(List secondaryKeys) { + ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX), + "Record index is not initialized in MDT"); + ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.SECONDARY_INDEX), + "Secondary index is not initialized in MDT"); + + HoodieTimer timer = HoodieTimer.start(); + + // Fetch secondary-index records + Map>> secondaryKeyRecords = getSecondaryIndexRecords(secondaryKeys, MetadataPartitionType.SECONDARY_INDEX.getPartitionPath()); + + // Now collect the record-keys and fetch the RLI records + List recordKeys = new ArrayList<>(); + secondaryKeyRecords.forEach((key, records) -> { + records.forEach(record -> { + if (!record.getData().isDeleted()) { + recordKeys.add(record.getData().getRecordKeyFromSecondaryIndex()); + } + }); + }); + + return readRecordIndex(recordKeys); + } + + // Returns a map of (record-key -> secondary-key) for the provided record-keys + public Map getSecondaryKeys(List recordKeys) { + ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX), + "Record index is not initialized in MDT"); + ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.SECONDARY_INDEX), + "Secondary index is not initialized in MDT"); + + HoodieTimer timer = HoodieTimer.start(); + Map recordKeyToSecondaryKeyMap = getSecondaryKeysForRecordKeys(recordKeys, MetadataPartitionType.SECONDARY_INDEX.getPartitionPath()); + + + return recordKeyToSecondaryKeyMap; + } + /** * Returns a list of all partitions. */ @@ -415,6 +461,11 @@ private void checkForSpuriousDeletes(HoodieMetadataPayload metadataPayload, Stri protected abstract Map> getRecordsByKeys(List keys, String partitionName); + // returns a map of (secondary-key, list-of-secondary-index-records) + protected abstract Map>> getSecondaryIndexRecords(List keys, String partitionName); + + protected abstract Map getSecondaryKeysForRecordKeys(List recordKeys, String partitionName); + public HoodieMetadataConfig getMetadataConfig() { return metadataConfig; } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java index 0f8bf68550c0d..afea8af4eb6a8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java @@ -296,6 +296,11 @@ public Map> readRecordIndex(List> readSecondaryIndex(List secondaryKeys) { + throw new HoodieMetadataException("Unsupported operation: readSecondaryIndex!"); + } + @Override public int getNumFileGroupsForPartition(MetadataPartitionType partition) { throw new HoodieMetadataException("Unsupported operation: getNumFileGroupsForPartition"); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index cd16294da7293..f17f4ac608157 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -35,6 +35,7 @@ import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.HoodieTimer; @@ -65,6 +66,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -208,7 +210,7 @@ public HoodieData> getRecordsByKeyPrefixes(L // NOTE: Since this will be executed by executors, we can't access previously cached // readers, and therefore have to always open new ones Pair, HoodieMetadataLogRecordReader> readers = - openReaders(partitionName, fileSlice); + openReaders(partitionName, fileSlice, Option.empty()); try { List timings = new ArrayList<>(); @@ -327,6 +329,60 @@ public Map>> getAllRecordsByKey return result; } + // returns a map of (secondary-key, list-of-secondary-index-records) + @Override + protected Map>> getSecondaryIndexRecords(List keys, String partitionName) { + if (keys.isEmpty()) { + return Collections.emptyMap(); + } + + Map>> result = new HashMap<>(); + + // Load the file slices for the partition. Each file slice is a shard which saves a portion of the keys. + List partitionFileSlices = partitionFileSliceMap.computeIfAbsent(partitionName, + k -> HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, metadataFileSystemView, partitionName)); + final int numFileSlices = partitionFileSlices.size(); + ValidationUtils.checkState(numFileSlices > 0, "Number of file slices for partition " + partitionName + " should be > 0"); + + // Lookup keys from each file slice + // TODO: parallelize this loop + for (FileSlice partition : partitionFileSlices) { + Map>> currentFileSliceResult = lookupSecondaryKeysFromFileSlice(partitionName, keys, partition); + + currentFileSliceResult.forEach((secondaryKey, secondaryRecords) -> { + result.merge(secondaryKey, secondaryRecords, (oldRecords, newRecords) -> { + newRecords.addAll(oldRecords); + return newRecords; + }); + }); + } + + return result; + } + + @Override + protected Map getSecondaryKeysForRecordKeys(List recordKeys, String partitionName) { + if (recordKeys.isEmpty()) { + return Collections.emptyMap(); + } + + // Load the file slices for the partition. Each file slice is a shard which saves a portion of the keys. + List partitionFileSlices = partitionFileSliceMap.computeIfAbsent(partitionName, + k -> HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, metadataFileSystemView, partitionName)); + final int numFileSlices = partitionFileSlices.size(); + ValidationUtils.checkState(numFileSlices > 0, "Number of file slices for partition " + partitionName + " should be > 0"); + + // Lookup keys from each file slice + // TODO: parallelize this loop + Map reverseSecondaryKeyMap = new HashMap<>(); + for (FileSlice partition : partitionFileSlices) { + reverseLookupSecondaryKeys(partitionName, recordKeys, partition, reverseSecondaryKeyMap); + } + + return reverseSecondaryKeyMap; + } + + /** * Lookup list of keys from a single file slice. * @@ -336,7 +392,7 @@ public Map>> getAllRecordsByKey * @return A {@code Map} of key name to {@code HoodieRecord} for the keys which were found in the file slice */ private Map> lookupKeysFromFileSlice(String partitionName, List keys, FileSlice fileSlice) { - Pair, HoodieMetadataLogRecordReader> readers = getOrCreateReaders(partitionName, fileSlice); + Pair, HoodieMetadataLogRecordReader> readers = getOrCreateReaders(partitionName, fileSlice, Option.empty()); try { List timings = new ArrayList<>(1); HoodieSeekingFileReader baseFileReader = readers.getKey(); @@ -360,6 +416,179 @@ private Map> lookupKeysFromFileSlice } } + /** + * Lookup list of keys from a single file slice. + * + * @param partitionName Name of the partition + * @param keys The list of secondary keys to lookup + * @param fileSlice The file slice to read + * @return A {@code Map} of secondary-key to list of {@code HoodieRecord} for the secondary-keys which were found in the file slice + */ + private Map>> lookupSecondaryKeysFromFileSlice(String partitionName, List keys, FileSlice fileSlice) { + Map>> logRecordsMap = new HashMap<>(); + Set keySet = new HashSet<>(keys.size()); + + // Using HoodieUnMergedLogRecordScanner with a in-memory record buffer for custom merging logic. + // The default HoodieMergedLogRecordScanner uses a Map (DiskSpillableMap) structure for buffering records (which is + // efficient) but does not serve the purpose of scanning secondary index's log record. The log records will contain + // duplicates and having a simple Map based buffer will immediately overwrite keys. + // TODO: Revisit the merge logic and make it more efficient (in terms of memory and CPU) + HoodieUnMergedLogRecordScanner.LogRecordScannerCallback callback = (newRecord) -> { + if (((HoodieMetadataPayload)newRecord.getData()).isDeleted() || !keySet.contains(newRecord.getRecordKey())) { + return; + } + + String key = newRecord.getRecordKey(); + HoodieMetadataPayload newPayload = (HoodieMetadataPayload) newRecord.getData(); + + if (logRecordsMap.containsKey(key)) { + List> records = logRecordsMap.get(key); + List> mergedRecords = new ArrayList<>(records.size()); + + boolean merged = false; + Option mergedPayload = Option.empty(); + Option mergedKey = Option.empty(); + for (HoodieRecord hoodieRecord : records) { + if (merged) { + // Current record is already merged, copy the remaining records + mergedRecords.add(hoodieRecord); + continue; + } + + HoodieMetadataPayload oldPayload = hoodieRecord.getData(); + if (oldPayload.getRecordKeyFromSecondaryIndex().equals(newPayload.getRecordKeyFromSecondaryIndex())) { + mergedPayload = HoodieMetadataPayload.combineSecondaryIndexPayload(oldPayload, newPayload); + mergedKey = Option.of(hoodieRecord.getKey()); + + // Already merged into a new record. Discard current record and stop merging future records + merged = true; + } else { + mergedRecords.add(hoodieRecord); + } + } + + // If there exists a merged record, add it at the end + if (mergedPayload.isPresent()) { + mergedRecords.add(new HoodieAvroRecord<>(mergedKey.get(), mergedPayload.get())); + } + + if (!merged) { + // Current record was not merged and needs to be added to the merged record list as a new record + mergedRecords.add((HoodieRecord)newRecord); + } + + if (mergedRecords.isEmpty()) { + // The newRecord indicated that it was deleted. Hence remove the oldrecord from the map + logRecordsMap.remove(key); + } else { + // Replace the oldRecord with the newly merged record + logRecordsMap.put(key, mergedRecords); + } + + } else { + logRecordsMap.put(key, Collections.singletonList((HoodieRecord) newRecord)); + } + }; + + Pair, HoodieMetadataLogRecordReader> readers = getOrCreateReaders(partitionName, fileSlice, Option.of(callback)); + try { + List timings = new ArrayList<>(1); + HoodieSeekingFileReader baseFileReader = readers.getKey(); + HoodieMetadataLogRecordReader logRecordScanner = readers.getRight(); + if (baseFileReader == null && logRecordScanner == null) { + return Collections.emptyMap(); + } + + // Sort it here once so that we don't need to sort individually for base file and for each individual log files. + List sortedKeys = new ArrayList<>(keys); + Collections.sort(sortedKeys); + keySet.addAll(sortedKeys); + + readAllLogRecords(logRecordScanner, sortedKeys, timings); + + return readFromBaseAndMergeWithAllLogRecords(baseFileReader, sortedKeys, true, logRecordsMap, timings, partitionName); + } catch (IOException ioe) { + throw new HoodieIOException("Error merging records from metadata table for " + keys.size() + " key : ", ioe); + } finally { + if (!reuse) { + closeReader(readers); + } + } + } + + private void reverseLookupSecondaryKeys(String partitionName, List recordKeys, FileSlice fileSlice, Map recordKeyMap) { + Set keySet = new HashSet<>(recordKeys.size()); + Map> logRecords = new HashMap<>(); + + // Using HoodieUnMergedLogRecordScanner with a in-memory record buffer for custom merging logic. + // The default HoodieMergedLogRecordScanner uses a Map (DiskSpillableMap) structure for buffering records (which is + // efficient) but does not serve the purpose of scanning secondary index's log record. The log records will contain + // duplicates and having a simple Map based buffer will immediately overwrite keys. + HoodieUnMergedLogRecordScanner.LogRecordScannerCallback callback = (record) -> { + HoodieMetadataPayload payload = (HoodieMetadataPayload)record.getData(); + if (payload.isDeleted() || !keySet.contains(payload.getRecordKeyFromSecondaryIndex())) { + return; + } + + String recordKey = payload.getRecordKeyFromSecondaryIndex(); + if (logRecords.containsKey(recordKey)) { + HoodieRecord prevRecord = logRecords.get(recordKey); + Option> mergedRecord = HoodieMetadataPayload.combineSecondaryIndexRecord(prevRecord, (HoodieRecord) record); + if (mergedRecord.isPresent()) { + logRecords.put(recordKey, mergedRecord.get()); + } else { + logRecords.remove(recordKey); + } + } else { + logRecords.put(recordKey, (HoodieRecord) record); + } + }; + + Pair, HoodieMetadataLogRecordReader> readers = getOrCreateReaders(partitionName, fileSlice, Option.of(callback)); + try { + List timings = new ArrayList<>(1); + HoodieSeekingFileReader baseFileReader = readers.getKey(); + HoodieMetadataLogRecordReader logRecordScanner = readers.getRight(); + if (baseFileReader == null && logRecordScanner == null) { + return; + } + + // Sort it here once so that we don't need to sort individually for base file and for each individual log files. + List sortedKeys = new ArrayList<>(recordKeys); + Collections.sort(sortedKeys); + keySet.addAll(sortedKeys); + + logRecordScanner.scan(); + + // Map of (record-key, secondary-index-record) + Map> baseFileRecords = fetchBaseFileAllRecordsByPayload(baseFileReader, keySet, partitionName); + + // Iterate over all provided log-records, merging them into existing records + logRecords.entrySet().forEach(kv -> { + baseFileRecords.merge( + kv.getKey(), + kv.getValue(), + (oldRecord, newRecord) -> { + Option> mergedRecord = + HoodieMetadataPayload.combineSecondaryIndexRecord(oldRecord, newRecord); + return mergedRecord.orElseGet(null); + } + ); + }); + + + baseFileRecords.forEach((key, value) -> { + recordKeyMap.put(key, value.getRecordKey()); + }); + } catch (IOException ioe) { + throw new HoodieIOException("Error merging records from metadata table for " + recordKeys.size() + " key : ", ioe); + } finally { + if (!reuse) { + closeReader(readers); + } + } + } + private Map> readLogRecords(HoodieMetadataLogRecordReader logRecordReader, List sortedKeys, boolean fullKey, @@ -436,7 +665,7 @@ private Map> fetchBaseFileRecordsByK } private Map>> lookupAllKeysFromFileSlice(String partitionName, List keys, FileSlice fileSlice) { - Pair, HoodieMetadataLogRecordReader> readers = getOrCreateReaders(partitionName, fileSlice); + Pair, HoodieMetadataLogRecordReader> readers = getOrCreateReaders(partitionName, fileSlice, Option.empty()); try { List timings = new ArrayList<>(); HoodieSeekingFileReader baseFileReader = readers.getKey(); @@ -466,7 +695,8 @@ private Map>> readAllLogRecords } try { - return logRecordReader.getAllRecordsByKeys(sortedKeys); + Map>> records = logRecordReader.getAllRecordsByKeys(sortedKeys); + return records; } finally { timings.add(timer.endTimer()); } @@ -494,7 +724,6 @@ private Map>> readFromBaseAndMe metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer())); // Iterate over all provided log-records, merging them into existing records - logRecords.entrySet().forEach(kv -> { records.merge( kv.getKey(), @@ -538,6 +767,7 @@ private Map>> fetchBaseFileAllR ? reader.getRecordsByKeysIterator(sortedKeys) : reader.getRecordsByKeyPrefixIterator(sortedKeys); + return toStream(records) .map(record -> { GenericRecord data = (GenericRecord) record.getData(); @@ -548,6 +778,31 @@ private Map>> fetchBaseFileAllR .collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toList()))); } + private Map> fetchBaseFileAllRecordsByPayload(HoodieSeekingFileReader reader, + Set keySet, + String partitionName) throws IOException { + if (reader == null) { + // No base file at all + return new HashMap<>(); + } + + ClosableIterator> records = reader.getRecordIterator(); + + return toStream(records) + .map(record -> { + GenericRecord data = (GenericRecord) record.getData(); + return composeRecord(data, partitionName); + }) + .filter(record -> { + HoodieMetadataPayload payload = (HoodieMetadataPayload)record.getData(); + return keySet.contains(payload.getRecordKeyFromSecondaryIndex()); + }) + .collect(Collectors.toMap(record -> { + HoodieMetadataPayload payload = (HoodieMetadataPayload)record.getData(); + return payload.getRecordKeyFromSecondaryIndex(); + }, record -> record)); + } + private HoodieRecord composeRecord(GenericRecord avroRecord, String partitionName) { if (metadataTableConfig.populateMetaFields()) { return SpillableMapUtils.convertToHoodieRecordPayload(avroRecord, @@ -567,16 +822,19 @@ private HoodieRecord composeRecord(GenericRecord avroReco * @param slice - The file slice to open readers for * @return File reader and the record scanner pair for the requested file slice */ - private Pair, HoodieMetadataLogRecordReader> getOrCreateReaders(String partitionName, FileSlice slice) { + private Pair, HoodieMetadataLogRecordReader> getOrCreateReaders(String partitionName, FileSlice slice, + Option callback) { if (reuse) { Pair key = Pair.of(partitionName, slice.getFileId()); - return partitionReaders.get().computeIfAbsent(key, ignored -> openReaders(partitionName, slice)); + return partitionReaders.get().computeIfAbsent(key, ignored -> openReaders(partitionName, slice, callback)); } else { - return openReaders(partitionName, slice); + return openReaders(partitionName, slice, callback); } } - private Pair, HoodieMetadataLogRecordReader> openReaders(String partitionName, FileSlice slice) { + private Pair, HoodieMetadataLogRecordReader> openReaders(String partitionName, + FileSlice slice, + Option callback) { try { HoodieTimer timer = HoodieTimer.start(); // Open base file reader @@ -587,7 +845,7 @@ private Pair, HoodieMetadataLogRecordReader> openRead // Open the log record scanner using the log files from the latest file slice List logFiles = slice.getLogFiles().collect(Collectors.toList()); Pair logRecordScannerOpenTimePair = - getLogRecordScanner(logFiles, partitionName, Option.empty()); + getLogRecordScanner(logFiles, partitionName, Option.empty(), callback); HoodieMetadataLogRecordReader logRecordScanner = logRecordScannerOpenTimePair.getKey(); final long logScannerOpenMs = logRecordScannerOpenTimePair.getValue(); @@ -621,7 +879,8 @@ private Pair, Long> getBaseFileReader(FileSlice slice public Pair getLogRecordScanner(List logFiles, String partitionName, - Option allowFullScanOverride) { + Option allowFullScanOverride, + Option callback) { HoodieTimer timer = HoodieTimer.start(); List sortedLogFilePaths = logFiles.stream() .sorted(HoodieLogFile.getLogFileComparator()) @@ -641,7 +900,8 @@ public Pair getLogRecordScanner(List getLogRecordScanner(List getLogRecordScanner(List> getRecords() { // NOTE: Locking is necessary since we're accessing [[HoodieMetadataLogRecordReader]] // materialized state, to make sure there's no concurrent access synchronized (this) { - logRecordScanner.scan(); - return logRecordScanner.getRecords().values() + assert logRecordScanner instanceof HoodieMergedLogRecordScanner; + return ((HoodieMergedLogRecordScanner)logRecordScanner).getRecords().values() .stream() .map(record -> (HoodieRecord) record) .collect(Collectors.toList()); } } + public void scan() { + assert logRecordScanner instanceof HoodieUnMergedLogRecordScanner; + ((HoodieUnMergedLogRecordScanner) logRecordScanner).scan(); + } + @SuppressWarnings("unchecked") public Map> getRecordsByKeyPrefixes(List sortedKeyPrefixes) { if (sortedKeyPrefixes.isEmpty()) { @@ -83,9 +92,10 @@ public Map> getRecordsByKeyPrefixes( // NOTE: Locking is necessary since we're accessing [[HoodieMetadataLogRecordReader]] // materialized state, to make sure there's no concurrent access synchronized (this) { - logRecordScanner.scanByKeyPrefixes(sortedKeyPrefixes); + assert logRecordScanner instanceof HoodieMergedLogRecordScanner; + ((HoodieMergedLogRecordScanner)logRecordScanner).scanByKeyPrefixes(sortedKeyPrefixes); Predicate p = createPrefixMatchingPredicate(sortedKeyPrefixes); - return logRecordScanner.getRecords().entrySet() + return ((HoodieMergedLogRecordScanner)logRecordScanner).getRecords().entrySet() .stream() .filter(r -> r != null && p.test(r.getKey())) .map(r -> (HoodieRecord) r.getValue()) @@ -106,8 +116,9 @@ public Map> getRecordsByKeys(List allRecords = logRecordScanner.getRecords(); + assert logRecordScanner instanceof HoodieMergedLogRecordScanner; + ((HoodieMergedLogRecordScanner)logRecordScanner).scanByFullKeys(sortedKeys); + Map allRecords = ((HoodieMergedLogRecordScanner)logRecordScanner).getRecords(); return sortedKeys.stream() .map(key -> (HoodieRecord) allRecords.get(key)) .filter(Objects::nonNull) @@ -123,18 +134,35 @@ public Map>> getAllRecordsByKey // NOTE: Locking is necessary since we're accessing [[HoodieMetadataLogRecordReader]] // materialized state, to make sure there's no concurrent access synchronized (this) { - logRecordScanner.scanByFullKeys(sortedKeys); - Map allRecords = logRecordScanner.getRecords(); - return sortedKeys.stream() - .map(key -> (HoodieRecord) allRecords.get(key)) - .filter(Objects::nonNull) - .collect(Collectors.groupingBy(HoodieRecord::getRecordKey)); + if (logRecordScanner instanceof HoodieMergedLogRecordScanner) { + ((HoodieMergedLogRecordScanner)logRecordScanner).scanByFullKeys(sortedKeys); + Map allRecords = ((HoodieMergedLogRecordScanner)logRecordScanner).getRecords(); + + Map>> result = new HashMap<>(); + sortedKeys.stream() + .map(key -> (HoodieRecord) allRecords.get(key)) + .filter(Objects::nonNull) + .forEach(record -> { + List> records = result.getOrDefault(record.getRecordKey(), new ArrayList<>()); + records.add(record); + result.put(record.getRecordKey(), records); + }); + return result; + } + + assert logRecordScanner instanceof HoodieUnMergedLogRecordScanner; + ((HoodieUnMergedLogRecordScanner)logRecordScanner).scan(false); + + // TODO: fix this return of dummy hashmap. + return new HashMap<>(); } } @Override public void close() throws IOException { - logRecordScanner.close(); + if (logRecordScanner instanceof HoodieMergedLogRecordScanner) { + ((HoodieMergedLogRecordScanner)logRecordScanner).close(); + } } private static Predicate createPrefixMatchingPredicate(List keyPrefixes) { @@ -160,33 +188,45 @@ public static class Builder { .withReverseReader(false) .withOperationField(false); + private HoodieUnMergedLogRecordScanner.Builder unmergedScannerBuilder = + new HoodieUnMergedLogRecordScanner.Builder() + .withKeyFieldOverride(HoodieMetadataPayload.KEY_FIELD_NAME) + .withReadBlocksLazily(true) + .withReverseReader(false); + public Builder withFileSystem(FileSystem fs) { scannerBuilder.withFileSystem(fs); + unmergedScannerBuilder.withFileSystem(fs); return this; } public Builder withBasePath(String basePath) { scannerBuilder.withBasePath(basePath); + unmergedScannerBuilder.withBasePath(basePath); return this; } public Builder withLogFilePaths(List logFilePaths) { scannerBuilder.withLogFilePaths(logFilePaths); + unmergedScannerBuilder.withLogFilePaths(logFilePaths); return this; } public Builder withReaderSchema(Schema schema) { scannerBuilder.withReaderSchema(schema); + unmergedScannerBuilder.withReaderSchema(schema); return this; } public Builder withLatestInstantTime(String latestInstantTime) { scannerBuilder.withLatestInstantTime(latestInstantTime); + unmergedScannerBuilder.withLatestInstantTime(latestInstantTime); return this; } public Builder withBufferSize(int bufferSize) { scannerBuilder.withBufferSize(bufferSize); + unmergedScannerBuilder.withBufferSize(bufferSize); return this; } @@ -230,16 +270,23 @@ public Builder enableFullScan(boolean enableFullScan) { public Builder withEnableOptimizedLogBlocksScan(boolean enableOptimizedLogBlocksScan) { scannerBuilder.withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan); + unmergedScannerBuilder.withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan); return this; } public Builder withTableMetaClient(HoodieTableMetaClient hoodieTableMetaClient) { scannerBuilder.withTableMetaClient(hoodieTableMetaClient); + unmergedScannerBuilder.withTableMetaClient(hoodieTableMetaClient); return this; } public HoodieMetadataLogRecordReader build() { return new HoodieMetadataLogRecordReader(scannerBuilder.build()); } + + public HoodieMetadataLogRecordReader buildWithUnmergedLogRecordScanner(HoodieUnMergedLogRecordScanner.LogRecordScannerCallback callback) { + unmergedScannerBuilder.withLogRecordScannerCallback(callback); + return new HoodieMetadataLogRecordReader(unmergedScannerBuilder.build()); + } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 40c0debf28e95..e6b263da45de5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -23,6 +23,7 @@ import org.apache.hudi.avro.model.HoodieMetadataFileInfo; import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.avro.model.HoodieRecordIndexInfo; +import org.apache.hudi.avro.model.HoodieSecondaryIndexInfo; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.EmptyHoodieRecordPayload; import org.apache.hudi.common.model.HoodieAvroRecord; @@ -109,6 +110,7 @@ public class HoodieMetadataPayload implements HoodieRecordPayload @@ -183,6 +192,7 @@ public class HoodieMetadataPayload implements HoodieRecordPayload orderingVal) { @@ -259,6 +269,11 @@ public HoodieMetadataPayload(Option recordOpt) { recordIndexRecord.get(RECORD_INDEX_FIELD_FILEID).toString(), Long.parseLong(recordIndexRecord.get(RECORD_INDEX_FIELD_INSTANT_TIME).toString()), Integer.parseInt(recordIndexRecord.get(RECORD_INDEX_FIELD_FILEID_ENCODING).toString())); + } else if (type == METADATA_TYPE_SECONDARY_INDEX) { + GenericRecord secondaryIndexRecord = getNestedFieldValue(record, SCHEMA_FIELD_ID_SECONDARY_INDEX); + secondaryIndexMetadata = new HoodieSecondaryIndexInfo( + secondaryIndexRecord.get(SECONDARY_INDEX_FIELD_RECORD_KEY).toString(), + (Boolean) secondaryIndexRecord.get(SECONDARY_INDEX_FIELD_IS_DELETED)); } } else { this.isDeletedRecord = true; @@ -294,6 +309,11 @@ protected HoodieMetadataPayload(String key, int type, this.recordIndexMetadata = recordIndexMetadata; } + public HoodieMetadataPayload(String key, HoodieSecondaryIndexInfo secondaryIndexMetadata) { + this(key, METADATA_TYPE_SECONDARY_INDEX, null, null, null, null); + this.secondaryIndexMetadata = secondaryIndexMetadata; + } + /** * Create and return a {@code HoodieMetadataPayload} to save list of partitions. * @@ -403,6 +423,9 @@ public HoodieMetadataPayload preCombine(HoodieMetadataPayload previousRecord) { // No need to merge with previous record index, always pick the latest payload. return this; + case METADATA_TYPE_SECONDARY_INDEX: + // TODO (vinay): Fix with the correct merge logic + return this; default: throw new HoodieMetadataException("Unknown type of HoodieMetadataPayload: " + type); } @@ -447,7 +470,7 @@ public Option getInsertValue(Schema schemaIgnored, Properties pro } HoodieMetadataRecord record = new HoodieMetadataRecord(key, type, filesystemMetadata, bloomFilterMetadata, - columnStatMetadata, recordIndexMetadata); + columnStatMetadata, recordIndexMetadata, secondaryIndexMetadata); return Option.of(record); } @@ -456,6 +479,34 @@ public Option getInsertValue(Schema schema) throws IOException { return getInsertValue(schema, new Properties()); } + public static Option> combineSecondaryIndexRecord( + HoodieRecord oldRecord, + HoodieRecord newRecord) { + String oldSecondaryKey = oldRecord.getRecordKey(); + String newSecondaryKey = newRecord.getRecordKey(); + + if (newRecord.getData().secondaryIndexMetadata.getIsDeleted()) { + return Option.empty(); + } else if (oldRecord.getData().secondaryIndexMetadata.getIsDeleted()) { + return Option.of(newRecord); + } + + return Option.of(newRecord); + } + + public static Option combineSecondaryIndexPayload( + HoodieMetadataPayload oldPayload, + HoodieMetadataPayload newPayload) { + + if (newPayload.secondaryIndexMetadata.getIsDeleted()) { + return Option.empty(); + } else if (oldPayload.secondaryIndexMetadata.getIsDeleted()) { + return Option.of(newPayload); + } + + return Option.of(newPayload); + } + /** * Returns the list of filenames added as part of this record. */ @@ -685,6 +736,22 @@ private static HoodieMetadataColumnStats mergeColumnStatsRecords(HoodieMetadataC .build(); } + /** + * Create and return a {@code HoodieMetadataPayload} to insert or update an entry for the secondary index. + *

+ * Each entry maps the secondary key of a single record in HUDI to its record (or primary) key + * + * @param recordKey Primary key of the record + * @param secondaryKey Secondary key of the record + * @param isDeleted true if this record is deleted + */ + public static HoodieRecord createSecondaryIndex(String recordKey, String secondaryKey, Boolean isDeleted) { + + HoodieKey key = new HoodieKey(secondaryKey, MetadataPartitionType.SECONDARY_INDEX.getPartitionPath()); + HoodieMetadataPayload payload = new HoodieMetadataPayload(secondaryKey, new HoodieSecondaryIndexInfo(recordKey, isDeleted)); + return new HoodieAvroRecord<>(key, payload); + } + /** * Create and return a {@code HoodieMetadataPayload} to insert or update an entry for the record index. *

@@ -765,6 +832,11 @@ public HoodieRecordGlobalLocation getRecordGlobalLocation() { return HoodieTableMetadataUtil.getLocationFromRecordIndexInfo(recordIndexMetadata); } + public String getRecordKeyFromSecondaryIndex() { + // TODO (vinay): Handle deletes and any other necessary checks etc. Check getRecordGlobalLocation() + return secondaryIndexMetadata.getRecordKey(); + } + public boolean isDeleted() { return isDeletedRecord; } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java index c19b2b62cd7c5..7238f64c0052a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java @@ -215,6 +215,12 @@ Map, HoodieMetadataColumnStats> getColumnStats(final List

> readRecordIndex(List recordKeys); + /** + * Returns the location of records which the provided secondary keys maps to. + * Records that are not found are ignored and won't be part of map object that is returned. + */ + Map> readSecondaryIndex(List secondaryKeys); + /** * Fetch records by key prefixes. Key prefix passed is expected to match the same prefix as stored in Metadata table partitions. For eg, in case of col stats partition, * actual keys in metadata partition is encoded values of column name, partition name and file name. So, key prefixes passed to this method is expected to be encoded already. diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index e9eba01bf83b8..eabb89d9187ce 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -56,10 +56,12 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.model.HoodieRecordGlobalLocation; +import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.log.HoodieFileSliceReader; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; @@ -67,6 +69,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.HoodieRecordUtils; @@ -88,6 +91,7 @@ import org.apache.avro.AvroTypeException; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -107,11 +111,13 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.UUID; import java.util.function.BiFunction; @@ -152,6 +158,8 @@ public class HoodieTableMetadataUtil { public static final String PARTITION_NAME_RECORD_INDEX = "record_index"; public static final String PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX = "func_index_"; + public static final String PARTITION_NAME_SECONDARY_INDEX_PREFIX = "secondary_index_"; + public static final Set> COLUMN_STATS_RECORD_SUPPORTED_TYPES = new HashSet<>(Arrays.asList( IntWrapper.class, BooleanWrapper.class, DateWrapper.class, DoubleWrapper.class, FloatWrapper.class, LongWrapper.class, @@ -438,7 +446,7 @@ public static List convertMetadataToFilesPartitionRecords(HoodieCo return HoodieMetadataPayload.createPartitionFilesRecord(partitionStatName, updatedFilesToSizesMapping, Collections.emptyList()); }) - .collect(Collectors.toList()); + .collect(toList()); records.addAll(updatedPartitionFilesRecords); @@ -452,7 +460,7 @@ private static List getPartitionsAdded(HoodieCommitMetadata commitMetada return commitMetadata.getPartitionToWriteStats().keySet().stream() // We need to make sure we properly handle case of non-partitioned tables .map(HoodieTableMetadataUtil::getPartitionIdentifierForFilesPartition) - .collect(Collectors.toList()); + .collect(toList()); } /** @@ -468,7 +476,7 @@ public static HoodieData convertMetadataToBloomFilterRecords( HoodieEngineContext context, HoodieConfig hoodieConfig, HoodieCommitMetadata commitMetadata, String instantTime, MetadataRecordsGenerationParams recordsGenerationParams) { final List allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream() - .flatMap(entry -> entry.stream()).collect(Collectors.toList()); + .flatMap(entry -> entry.stream()).collect(toList()); if (allWriteStats.isEmpty()) { return context.emptyHoodieData(); } @@ -1052,7 +1060,7 @@ private static List getPartitionFileSlices(HoodieTableMetaClient meta } else { fileSliceStream = fsView.getLatestFileSlices(partition); } - return fileSliceStream.sorted(Comparator.comparing(FileSlice::getFileId)).collect(Collectors.toList()); + return fileSliceStream.sorted(Comparator.comparing(FileSlice::getFileId)).collect(toList()); } /** @@ -1070,14 +1078,14 @@ public static List getPartitionLatestFileSlicesIncludingInflight(Hood Stream fileSliceStream = fsView.fetchLatestFileSlicesIncludingInflight(partition); return fileSliceStream .sorted(Comparator.comparing(FileSlice::getFileId)) - .collect(Collectors.toList()); + .collect(toList()); } public static HoodieData convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata, HoodieEngineContext engineContext, MetadataRecordsGenerationParams recordsGenerationParams) { List allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream() - .flatMap(Collection::stream).collect(Collectors.toList()); + .flatMap(Collection::stream).collect(toList()); if (allWriteStats.isEmpty()) { return engineContext.emptyHoodieData(); @@ -1132,7 +1140,7 @@ private static List getColumnsToIndex(MetadataRecordsGenerationParams re .map(writerSchema -> writerSchema.getFields().stream() .map(Schema.Field::name) - .collect(Collectors.toList())) + .collect(toList())) .orElse(Collections.emptyList()); } @@ -1160,7 +1168,7 @@ private static Stream getColumnStatsRecords(String partitionPath, // TODO we should delete records instead of stubbing them List> columnRangeMetadataList = columnsToIndex.stream() .map(entry -> HoodieColumnRangeMetadata.stub(fileName, entry)) - .collect(Collectors.toList()); + .collect(toList()); return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, columnRangeMetadataList, true); } @@ -1195,7 +1203,7 @@ private static List> readColumnRangeMetada /** * Does an upcast for {@link BigDecimal} instance to align it with scale/precision expected by - * the {@link org.apache.avro.LogicalTypes.Decimal} Avro logical type + * the {@link LogicalTypes.Decimal} Avro logical type */ public static BigDecimal tryUpcastDecimal(BigDecimal value, final LogicalTypes.Decimal decimal) { final int scale = decimal.getScale(); @@ -1335,7 +1343,7 @@ public static Set getValidInstantTimestamps(HoodieTableMetaClient dataMe .filter(instant -> instant.isCompleted() && isValidInstant(instant)) .getInstantsAsStream() .map(HoodieInstant::getTimestamp) - .collect(Collectors.toList())); + .collect(toList())); // For any rollbacks and restores, we cannot neglect the instants that they are rolling back. // The rollback instant should be more recent than the start of the timeline for it to have rolled back any @@ -1750,7 +1758,7 @@ public static HoodieRecordGlobalLocation getLocationFromRecordIndexInfo( fileId = originalFileId; } - final java.util.Date instantDate = new java.util.Date(instantTime); + final Date instantDate = new Date(instantTime); return new HoodieRecordGlobalLocation(partition, HoodieActiveTimeline.formatDate(instantDate), fileId); } @@ -1781,7 +1789,7 @@ public static HoodieData readRecordKeysFromBaseFiles(HoodieEngineC final String fileId = baseFile.getFileId(); final String instantTime = baseFile.getCommitTime(); - HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) + HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO) .getFileReader(config, configuration.get(), dataFilePath); ClosableIterator recordKeyIterator = reader.getRecordKeyIterator(); @@ -1877,7 +1885,7 @@ public HoodieRecord next() { final String fileId = baseFile.getFileId(); final String instantTime = baseFile.getCommitTime(); HoodieConfig hoodieConfig = getReaderConfigs(configuration.get()); - HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) + HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO) .getFileReader(hoodieConfig, configuration.get(), dataFilePath); ClosableIterator recordKeyIterator = reader.getRecordKeyIterator(); @@ -1902,6 +1910,128 @@ public HoodieRecord next() { }); } + public static HoodieData readSecondaryKeysFromFiles(HoodieEngineContext engineContext, + List>>> partitionFiles, + int recordIndexMaxParallelism, + String activeModule, HoodieTableMetaClient metaClient, EngineType engineType) { + if (partitionFiles.isEmpty()) { + return engineContext.emptyHoodieData(); + } + + engineContext.setJobStatus(activeModule, "Secondary Index: reading secondary keys from " + partitionFiles.size() + " partitions"); + final int parallelism = Math.min(partitionFiles.size(), recordIndexMaxParallelism); + final String basePath = metaClient.getBasePathV2().toString(); + final SerializableConfiguration configuration = new SerializableConfiguration(metaClient.getHadoopConf()); + + return engineContext.parallelize(partitionFiles, parallelism).flatMap(partitionAndBaseFile -> { + final String partition = partitionAndBaseFile.getKey(); + final Pair> baseAndLogFiles = partitionAndBaseFile.getValue(); + List logFilePaths = baseAndLogFiles.getValue(); + String filePath = baseAndLogFiles.getKey(); + + Path dataFilePath = filePath(basePath, "", filePath); + Schema tableSchema = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readAvroSchema(metaClient.getHadoopConf(), dataFilePath); + + return createSecondaryIndexGenerator(metaClient, engineType, logFilePaths, tableSchema, partition, dataFilePath, !filePath.isEmpty()); + }); + } + + private static ClosableIterator createSecondaryIndexGenerator(HoodieTableMetaClient metaClient, + EngineType engineType, List logFilePaths, + Schema tableSchema, String partition, + Path dataFilePath, boolean createBaseFileReader) throws Exception { + final String basePath = metaClient.getBasePathV2().toString(); + final SerializableConfiguration configuration = new SerializableConfiguration(metaClient.getHadoopConf()); + + HoodieRecordMerger recordMerger = HoodieRecordUtils.createRecordMerger( + basePath, + engineType, + Collections.emptyList(), + metaClient.getTableConfig().getRecordMergerStrategy()); + + HoodieMergedLogRecordScanner mergedLogRecordScanner = HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(metaClient.getFs()) + .withBasePath(basePath) + .withLogFilePaths(logFilePaths) + .withReaderSchema(tableSchema) + .withLatestInstantTime(metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse("")) + .withReadBlocksLazily(configuration.get().getBoolean("", true)) + .withReverseReader(false) + .withMaxMemorySizeInBytes(configuration.get().getLongBytes(MAX_MEMORY_FOR_COMPACTION.key(), DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)) + .withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath()) + .withPartition(partition) + .withOptimizedLogBlocksScan(configuration.get().getBoolean("hoodie" + HoodieMetadataConfig.OPTIMIZED_LOG_BLOCKS_SCAN, false)) + .withDiskMapType(configuration.get().getEnum(SPILLABLE_DISK_MAP_TYPE.key(), SPILLABLE_DISK_MAP_TYPE.defaultValue())) + .withBitCaskDiskMapCompressionEnabled(configuration.get().getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())) + .withRecordMerger(recordMerger) + .build(); + + Option baseFileReader = Option.empty(); + if (createBaseFileReader) { + baseFileReader = Option.of(HoodieFileReaderFactory.getReaderFactory(recordMerger.getRecordType()).getFileReader(metaClient.getTableConfig(), configuration.get(), dataFilePath)); + } + HoodieFileSliceReader fileSliceReader = new HoodieFileSliceReader(baseFileReader, mergedLogRecordScanner, tableSchema, metaClient.getTableConfig().getPreCombineField(), recordMerger, + metaClient.getTableConfig().getProps(), + Option.empty()); + ClosableIterator fileSliceIterator = ClosableIterator.wrap(fileSliceReader); + return new ClosableIterator() { + @Override + public void close() { + fileSliceIterator.close(); + } + + @Override + public boolean hasNext() { + return fileSliceIterator.hasNext(); + } + + @Override + public HoodieRecord next() { + HoodieRecord record = fileSliceIterator.next(); + String recordKey = record.getRecordKey(tableSchema, HoodieRecord.RECORD_KEY_METADATA_FIELD); + String secondaryKeyFields = String.join(".", metaClient.getTableConfig().getSecondaryKeyFields().get()); + String secondaryKey; + try { + GenericRecord genericRecord = (GenericRecord) (record.toIndexedRecord(tableSchema, new Properties()).get()).getData(); + secondaryKey = HoodieAvroUtils.getNestedFieldValAsString(genericRecord, secondaryKeyFields, true, false); + } catch (IOException e) { + throw new RuntimeException("Failed to fetch records." + e); + } + + return HoodieMetadataPayload.createSecondaryIndex(recordKey, secondaryKey, false); + } + }; + } + + public static HoodieData readSecondaryKeysFromFileSlices(HoodieEngineContext engineContext, + List> partitionFileSlicePairs, + int recordIndexMaxParallelism, + String activeModule, HoodieTableMetaClient metaClient, EngineType engineType) throws IOException { + if (partitionFileSlicePairs.isEmpty()) { + return engineContext.emptyHoodieData(); + } + + engineContext.setJobStatus(activeModule, "Secondary Index: reading secondary keys from " + partitionFileSlicePairs.size() + " file slices"); + final int parallelism = Math.min(partitionFileSlicePairs.size(), recordIndexMaxParallelism); + final String basePath = metaClient.getBasePathV2().toString(); + final SerializableConfiguration configuration = new SerializableConfiguration(metaClient.getHadoopConf()); + return engineContext.parallelize(partitionFileSlicePairs, parallelism).flatMap(partitionAndBaseFile -> { + final String partition = partitionAndBaseFile.getKey(); + final FileSlice fileSlice = partitionAndBaseFile.getValue(); + + final HoodieBaseFile baseFile = fileSlice.getBaseFile().get(); + final String filename = baseFile.getFileName(); + Path dataFilePath = filePath(basePath, partition, filename); + TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient); + Schema tableSchema = schemaResolver.getTableAvroSchema(); + + List logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) + .map(l -> l.getPath().toString()).collect(toList()); + + return createSecondaryIndexGenerator(metaClient, engineType, logFilePaths, tableSchema, partition, dataFilePath, fileSlice.getBaseFile().isPresent()); + }); + } + public static Schema getProjectedSchemaForFunctionalIndex(HoodieFunctionalIndexDefinition indexDefinition, HoodieTableMetaClient metaClient) throws Exception { TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient); Schema tableSchema = schemaResolver.getTableAvroSchema(); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java index 32ce529fa3ba6..0be8751ff5274 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java @@ -29,7 +29,8 @@ public enum MetadataPartitionType { COLUMN_STATS(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, "col-stats-"), BLOOM_FILTERS(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS, "bloom-filters-"), RECORD_INDEX(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX, "record-index-"), - FUNCTIONAL_INDEX(HoodieTableMetadataUtil.PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX, "func-index-"); + FUNCTIONAL_INDEX(HoodieTableMetadataUtil.PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX, "func-index-"), + SECONDARY_INDEX(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX, "secondary-index-"); // Partition path in metadata table. private final String partitionPath; diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 99080629e1709..1755906abd2c2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -427,6 +427,13 @@ object DataSourceWriteOptions { */ val RECORDKEY_FIELD = KeyGeneratorOptions.RECORDKEY_FIELD_NAME + /** + * Secondary key field. Columns to be used as the secondary index columns. Actual value + * will be obtained by invoking .toString() on the field value. Nested fields can be specified using + * the dot notation eg: `a.b.c` + */ + val SECONDARYKEY_FIELD = KeyGeneratorOptions.SECONDARYKEY_FIELD_NAME + /** * Partition path field. Value to be used at the `partitionPath` component of `HoodieKey`. Actual * value obtained by invoking .toString() diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 3444feaecff61..3b92f64499a7e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -95,7 +95,7 @@ case class HoodieFileIndex(spark: SparkSession, /** * NOTE: [[ColumnStatsIndexSupport]] is a transient state, since it's only relevant while logical plan - * is handled by the Spark's driver + * is handled by the Spark's driver */ @transient private lazy val columnStatsIndex = new ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient) @@ -171,7 +171,7 @@ case class HoodieFileIndex(spark: SparkSession, } }).filter(slice => slice != null) val c = fileSlices.filter(f => (includeLogFiles && f.getLogFiles.findAny().isPresent) - || (f.getBaseFile.isPresent && f.getBaseFile.get().getBootstrapBaseFile.isPresent)). + || (f.getBaseFile.isPresent && f.getBaseFile.get().getBootstrapBaseFile.isPresent)). foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId -> f) } if (c.nonEmpty) { sparkAdapter.getSparkPartitionedFileUtils.newPartitionDirectory( @@ -205,7 +205,7 @@ case class HoodieFileIndex(spark: SparkSession, } else if (shouldEmbedFileSlices) { assert(partitionSchema.isEmpty) prunedPartitionsAndFilteredFileSlices - }else { + } else { Seq(PartitionDirectory(InternalRow.empty, prunedPartitionsAndFilteredFileSlices.flatMap(_.files))) } } @@ -214,7 +214,7 @@ case class HoodieFileIndex(spark: SparkSession, * The functions prunes the partition paths based on the input partition filters. For every partition path, the file * slices are further filtered after querying metadata table based on the data filters. * - * @param dataFilters data columns filters + * @param dataFilters data columns filters * @param partitionFilters partition column filters * @return A sequence of pruned partitions and corresponding filtered file slices */ @@ -235,16 +235,16 @@ case class HoodieFileIndex(spark: SparkSession, // - List of predicates (filters) is present val shouldPushDownFilesFilter = !partitionFilters.isEmpty val candidateFilesNamesOpt: Option[Set[String]] = - lookupCandidateFilesInMetadataTable(dataFilters, prunedPartitionsAndFileSlices, shouldPushDownFilesFilter) match { - case Success(opt) => opt - case Failure(e) => - logError("Failed to lookup candidate files in File Index", e) - - spark.sqlContext.getConf(DataSkippingFailureMode.configName, DataSkippingFailureMode.Fallback.value) match { - case DataSkippingFailureMode.Fallback.value => Option.empty - case DataSkippingFailureMode.Strict.value => throw new HoodieException(e); - } - } + lookupCandidateFilesInMetadataTable(dataFilters, prunedPartitionsAndFileSlices, shouldPushDownFilesFilter) match { + case Success(opt) => opt + case Failure(e) => + logError("Failed to lookup candidate files in File Index", e) + + spark.sqlContext.getConf(DataSkippingFailureMode.configName, DataSkippingFailureMode.Fallback.value) match { + case DataSkippingFailureMode.Fallback.value => Option.empty + case DataSkippingFailureMode.Strict.value => throw new HoodieException(e); + } + } logDebug(s"Overlapping candidate files from Column Stats or Record Level Index: ${candidateFilesNamesOpt.getOrElse(Set.empty)}") @@ -286,7 +286,7 @@ case class HoodieFileIndex(spark: SparkSession, } } - def getFileSlicesForPrunedPartitions(partitionFilters: Seq[Expression]) : Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])] = { + def getFileSlicesForPrunedPartitions(partitionFilters: Seq[Expression]): Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])] = { // Prune the partition path by the partition filters // NOTE: Non-partitioned tables are assumed to consist from a single partition // encompassing the whole table @@ -344,11 +344,15 @@ case class HoodieFileIndex(spark: SparkSession, lazy val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema) lazy val (_, recordKeys) = recordLevelIndex.filterQueriesWithRecordKey(queryFilters) + lazy val (_, secondaryKeys) = recordLevelIndex.filterQueriesWithSecondaryKey(queryFilters) if (!isMetadataTableEnabled || !isDataSkippingEnabled) { validateConfig() Option.empty } else if (recordKeys.nonEmpty) { Option.apply(recordLevelIndex.getCandidateFiles(getAllFiles(), recordKeys)) + } else if (secondaryKeys.nonEmpty) { + // If available, secondary keys are used to prune candidate files (before other indexes below) + Option.apply(recordLevelIndex.getCandidateFilesFromSecondaryIndex(getAllFiles(), secondaryKeys)) } else if (functionalIndex.isIndexAvailable && !queryFilters.isEmpty) { val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices) val shouldReadInMemory = functionalIndex.shouldReadInMemory(this, queryReferencedColumns) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 00ec59c5b8fd7..639afb758374d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -314,6 +314,7 @@ class HoodieSparkSqlWriterInternal { .setPartitionFields(partitionColumns) .setPopulateMetaFields(populateMetaFields) .setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD)) + .setSecondaryKeyFields(hoodieConfig.getString(SECONDARYKEY_FIELD)) .setCDCEnabled(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED)) .setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE)) .setKeyGeneratorClassProp(keyGenProp) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala index 4519334d4d421..626c989280eb7 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala @@ -367,7 +367,7 @@ object LogFileIterator extends SparkAdapterSupport { val relativePartitionPath = getRelativePartitionPath(new Path(tablePath), partitionPath) val logRecordReader = - metadataTable.getLogRecordScanner(logFiles.asJava, relativePartitionPath, toJavaOption(Some(forceFullScan))) + metadataTable.getLogRecordScanner(logFiles.asJava, relativePartitionPath, toJavaOption(Some(forceFullScan)), toJavaOption(None)) .getLeft val recordList = closing(logRecordReader) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala index 764ce69795d99..9d76433f78a0f 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala @@ -64,6 +64,32 @@ class RecordLevelIndexSupport(spark: SparkSession, candidateFiles.toSet } + /** + * Returns the list of candidate files which store the provided record keys based on Metadata Table Secondary Index + * and Metadata Table Record Index. + * @param secondaryKeys - List of secondary keys. + * @return Sequence of file names which need to be queried + */ + def getCandidateFilesFromSecondaryIndex(allFiles: Seq[FileStatus], secondaryKeys: List[String]): Set[String] = { + val recordKeyLocationsMap = metadataTable.readSecondaryIndex(JavaConverters.seqAsJavaListConverter(secondaryKeys).asJava) + val fileIdToPartitionMap: mutable.Map[String, String] = mutable.Map.empty + val candidateFiles: mutable.Set[String] = mutable.Set.empty + for (locations <- JavaConverters.collectionAsScalaIterableConverter(recordKeyLocationsMap.values()).asScala) { + for (location <- JavaConverters.collectionAsScalaIterableConverter(locations).asScala) { + fileIdToPartitionMap.put(location.getFileId, location.getPartitionPath) + } + } + + for (file <- allFiles) { + val fileId = FSUtils.getFileIdFromFilePath(file.getPath) + val partitionOpt = fileIdToPartitionMap.get(fileId) + if (partitionOpt.isDefined) { + candidateFiles += file.getPath.getName + } + } + candidateFiles.toSet + } + /** * Returns the configured record key for the table if it is a simple record key else returns empty option. */ @@ -79,16 +105,33 @@ class RecordLevelIndexSupport(spark: SparkSession, } /** - * Matches the configured simple record key with the input attribute name. + * Returns the configured secondary key for the table + * TODO: Handle multiple secondary indexes (similar to functional index) + */ + private def getSecondaryKeyConfig: Option[String] = { + val secondaryKeysOpt: org.apache.hudi.common.util.Option[Array[String]] = metaClient.getTableConfig.getSecondaryKeyFields + val secondaryKeyOpt = secondaryKeysOpt.map[String](JFunction.toJavaFunction[Array[String], String](arr => + if (arr.length == 1) { + arr(0) + } else { + null + })) + Option.apply(secondaryKeyOpt.orElse(null)) + } + + /** + * Matches the configured key (record key or secondary key) with the input attribute name. * @param attributeName The attribute name provided in the query - * @return true if input attribute name matches the configured simple record key + * @param isPrimaryKey Should match primary key (record key). If false, it matches secondary key. + * @return true if input attribute name matches the configured key + * TODO: Handle multiple secondary indexes (similar to functional index) */ - private def attributeMatchesRecordKey(attributeName: String): Boolean = { - val recordKeyOpt = getRecordKeyConfig - if (recordKeyOpt.isDefined && recordKeyOpt.get == attributeName) { + private def attributeMatchesKey(attributeName: String, isPrimaryKey: Boolean): Boolean = { + val keyOpt = if (isPrimaryKey) getRecordKeyConfig else getSecondaryKeyConfig + if (keyOpt.isDefined && keyOpt.get == attributeName) { true } else { - HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName == recordKeyOpt.get + false } } @@ -132,7 +175,7 @@ class RecordLevelIndexSupport(spark: SparkSession, var recordKeyQueries: List[Expression] = List.empty var recordKeys: List[String] = List.empty for (query <- queryFilters) { - filterQueryWithRecordKey(query).foreach({ + filterQueryWithKey(query, true).foreach({ case (exp: Expression, recKeys: List[String]) => recordKeys = recordKeys ++ recKeys recordKeyQueries = recordKeyQueries :+ exp @@ -143,18 +186,37 @@ class RecordLevelIndexSupport(spark: SparkSession, } } + def filterQueriesWithSecondaryKey(queryFilters: Seq[Expression]): (List[Expression], List[String]) = { + if (!isSecondaryIndexAvailable) { + (List.empty, List.empty) + } else { + var secondaryKeyQueries: List[Expression] = List.empty + var secondaryKeys: List[String] = List.empty + for (query <- queryFilters) { + filterQueryWithKey(query, false).foreach({ + case (exp: Expression, recKeys: List[String]) => + secondaryKeys = secondaryKeys ++ recKeys + secondaryKeyQueries = secondaryKeyQueries :+ exp + }) + } + + Tuple2.apply(secondaryKeyQueries, secondaryKeys) + } + } + /** * If the input query is an EqualTo or IN query on simple record key columns, the function returns a tuple of * list of the query and list of record key literals present in the query otherwise returns an empty option. * * @param queryFilter The query that need to be filtered. + * @param isPrimaryKey Should use primary key (record key) to filter. If false, it uses secondary key. * @return Tuple of filtered query and list of record key literals that need to be matched */ - private def filterQueryWithRecordKey(queryFilter: Expression): Option[(Expression, List[String])] = { + private def filterQueryWithKey(queryFilter: Expression, isPrimaryKey: Boolean): Option[(Expression, List[String])] = { queryFilter match { case equalToQuery: EqualTo => val (attribute, literal) = getAttributeLiteralTuple(equalToQuery.left, equalToQuery.right).orNull - if (attribute != null && attribute.name != null && attributeMatchesRecordKey(attribute.name)) { + if (attribute != null && attribute.name != null && attributeMatchesKey(attribute.name, isPrimaryKey)) { Option.apply(equalToQuery, List.apply(literal.value.toString)) } else { Option.empty @@ -165,6 +227,7 @@ class RecordLevelIndexSupport(spark: SparkSession, case _: AttributeReference => case _ => validINQuery = false } + var literals: List[String] = List.empty inQuery.list.foreach { case literal: Literal => literals = literals :+ literal.value.toString @@ -185,4 +248,11 @@ class RecordLevelIndexSupport(spark: SparkSession, def isIndexAvailable: Boolean = { metadataConfig.enabled && metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX) } + + /** + * Return true if metadata table is enabled and secondary index metadata partition is available. + */ + def isSecondaryIndexAvailable: Boolean = { + isIndexAvailable && metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX) + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala index 95c07b987a0bd..47bc5a94a1105 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.functions.{col, not} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.api._ +import org.slf4j.LoggerFactory import java.util.concurrent.atomic.AtomicInteger import java.util.stream.Collectors @@ -57,7 +58,29 @@ class RecordLevelIndexTestBase extends HoodieSparkClientTestBase { PRECOMBINE_FIELD.key -> "timestamp", HoodieTableConfig.POPULATE_META_FIELDS.key -> "true" ) ++ metadataOpts + + val secondaryIndexOpts = Map( + HoodieMetadataConfig.SECONDARY_INDEX_ENABLE_PROP.key -> "true" + ) + + val commonOptsWithSecondaryIndex = commonOpts ++ secondaryIndexOpts ++ metadataOpts + + val commonOptsNewTableSITest = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "trips_table", + RECORDKEY_FIELD.key -> "uuid", + SECONDARYKEY_FIELD.key -> "city", + PARTITIONPATH_FIELD.key -> "state", + PRECOMBINE_FIELD.key -> "ts", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true" + ) ++ metadataOpts + + val commonOptsWithSecondaryIndexSITest = commonOptsNewTableSITest ++ secondaryIndexOpts + + var mergedDfList: List[DataFrame] = List.empty + private val log = LoggerFactory.getLogger(classOf[RecordLevelIndexTestBase]) @BeforeEach override def setUp() { @@ -172,7 +195,9 @@ class RecordLevelIndexTestBase extends HoodieSparkClientTestBase { latestBatch = recordsToStrings(dataGen.generateInserts(getInstantTime(), 5)).asScala } val latestBatchDf = spark.read.json(spark.sparkContext.parallelize(latestBatch, 2)) + latestBatchDf.cache() + log.error("VINAY: latestBatchDf schema is " + latestBatchDf.schema.simpleString) latestBatchDf.write.format("org.apache.hudi") .options(hudiOpts) .option(DataSourceWriteOptions.OPERATION.key, operation) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndex.scala new file mode 100644 index 0000000000000..f0a6a0248d546 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndex.scala @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.DataSourceWriteOptions +import org.apache.hudi.DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL +import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.DataSourceReadOptions +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.metadata.HoodieTableMetadata +import org.apache.spark.sql.SaveMode +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.EnumSource +import org.scalatest.Assertions.assertResult +import org.slf4j.LoggerFactory + +class TestSecondaryIndex extends RecordLevelIndexTestBase { + private val log = LoggerFactory.getLogger(getClass) + @ParameterizedTest + @EnumSource(classOf[HoodieTableType]) + def testSIInitialization(tableType: HoodieTableType): Unit = { + //create a new table + val tableName = "trips_table1" + val basePath = "file:///tmp/trips_table1" + val columns = Seq("ts", "uuid", "rider", "driver", "fare", "city", "state") + val data = + Seq((1695159649087L, "334e26e9-8355-45cc-97c6-c31daf0df330", "rider-A", "driver-K", 19.10, "san_francisco", "california"), + (1695091554787L, "e96c4396-3fad-413a-a942-4cb36106d720", "rider-B", "driver-M", 27.70, "sao_paulo", "texas"), + (1695091554788L, "e96c4396-3fad-413a-a942-4cb36106d721", "rider-C", "driver-K", 27.70, "san_francisco", "california"), + (1695046462179L, "9909a8b1-2d15-4d3d-8ec9-efc48c536a00", "rider-D", "driver-L", 33.90, "san_francisco", "california"), + (1695516137016L, "e3cf430c-889d-4015-bc98-59bdce1e530c", "rider-E", "driver-P", 34.15, "sao_paulo", "texas"), + (1695046462179L, "9909a8b1-2d15-4d3d-8ec9-efc48c536a01", "rider-D", "driver-L", 33.90, "los-angeles", "california"), + (1695516137016L, "e3cf430c-889d-4015-bc98-59bdce1e530b", "rider-E", "driver-P", 34.15, "bengaluru", "karnataka"), + (1695115999911L, "c8abbe79-8d89-47ea-b4ce-4d224bae5bfa", "rider-F", "driver-T", 17.85, "chennai", "tamil-nadu")); + + val inserts = spark.createDataFrame(data).toDF(columns: _*) + inserts.write.format("hudi"). + option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()). + option("hoodie.datasource.write.operation", "insert"). + options(commonOptsWithSecondaryIndexSITest). + mode(SaveMode.Overwrite). + save(basePath) + + val tripsDF = spark.read.format("hudi"). + load(basePath). + select("ts", "uuid", "rider", "driver", "fare", "city", "state"). + orderBy("uuid") + tripsDF.show(false) + + assertEquals(tripsDF.count(), data.length) + val rowCount = tripsDF.count().toInt; + assertResult(inserts.orderBy("uuid").take(rowCount))(tripsDF.take(rowCount)) + + // Get the metadata reader and read secondary index + val schemaStr = tripsDF.schema.simpleString + val writeConfig: HoodieWriteConfig = HoodieWriteConfig.newBuilder.withPath(basePath).withSchema(schemaStr).build() + val metadataReader = HoodieTableMetadata.create(context, writeConfig.getMetadataConfig, basePath) + val recordLocationFromSecondaryIndex = metadataReader.readSecondaryIndex(scala.collection.JavaConverters.seqAsJavaList(Seq("san_francisco"))) + assertEquals(recordLocationFromSecondaryIndex.size(), tripsDF.select("city").filter("city='san_francisco'").count()) + } + + @ParameterizedTest + @EnumSource(classOf[HoodieTableType]) + def testSIUpsert(tableType: HoodieTableType): Unit = { + val tableName = "trips_table2" + val basePath1 = "file:///tmp/trips_table2" + val columns = Seq("ts", "uuid", "rider", "driver", "fare", "city", "state") + val data1 = + Seq((1695159649087L, "334e26e9-8355-45cc-97c6-c31daf0df330", "rider-A", "driver-K", 19.10, "san_francisco", "california"), + (1695091554787L, "e96c4396-3fad-413a-a942-4cb36106d720", "rider-B", "driver-M", 27.70, "sao_paulo", "texas"), + (1695091554788L, "e96c4396-3fad-413a-a942-4cb36106d721", "rider-C", "driver-K", 27.70, "delhi", "delhi"), + (1695115999911L, "c8abbe79-8d89-47ea-b4ce-4d224bae5bfa", "rider-F", "driver-T", 17.85, "chennai", "tamil-nadu")); + + val inserts = spark.createDataFrame(data1).toDF(columns: _*) + inserts.write.format("hudi"). + option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()). + option("hoodie.datasource.write.operation", "insert"). + options(commonOptsNewTableSITest). + mode(SaveMode.Overwrite). + save(basePath1) + + val data2 = + Seq((1695046462179L, "9909a8b1-2d15-4d3d-8ec9-efc48c536a00", "rider-D", "driver-L", 33.90, "london", "greater-london"), + (1695516137016L, "e96c4396-3fad-413a-a942-4cb36106d722", "rider-E", "driver-P", 34.15, "austin", "texas")) + + val inserts2 = spark.createDataFrame(data2).toDF(columns: _*) + inserts2.write.format("hudi"). + option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()). + option("hoodie.datasource.write.operation", "upsert"). + options(commonOptsWithSecondaryIndexSITest). + mode(SaveMode.Append). + save(basePath1) + + val tripsDF = spark.read.format("hudi").load(basePath1) + tripsDF.show(false) + assertEquals(tripsDF.count(), data1.length + data2.length) + + // Get the metadata reader and read secondary index + val schemaStr = tripsDF.schema.simpleString + val writeConfig: HoodieWriteConfig = HoodieWriteConfig.newBuilder.withPath(basePath1).withSchema(schemaStr).build() + val metadataReader = HoodieTableMetadata.create(context, writeConfig.getMetadataConfig, basePath1) + val recordLocationFromSecondaryIndex = metadataReader.readSecondaryIndex(scala.collection.JavaConverters.seqAsJavaList(Seq("san_francisco"))) + assertEquals(recordLocationFromSecondaryIndex.size(), tripsDF.select("city").filter("city='san_francisco'").count()) + } + + @Test + def testSIWithDelete(): Unit = { + val tableName = "trips_table3" + val basePath1 = "file:///tmp/trips_table3" + val columns = Seq("ts", "uuid", "rider", "driver", "fare", "city", "state") + val data1 = + Seq((1695159649087L, "334e26e9-8355-45cc-97c6-c31daf0df330", "rider-A", "driver-K", 19.10, "san_francisco", "california"), + (1695091554787L, "334e26e9-8355-45cc-97c6-c31daf0df331", "rider-B", "driver-M", 27.70, "san_francisco", "california"), + (1695091554787L, "334e26e9-8355-45cc-97c6-c31daf0df332", "rider-B", "driver-M", 27.70, "san_francisco", "california"), + (1695091554788L, "e96c4396-3fad-413a-a942-4cb36106d721", "rider-C", "driver-K", 27.70, "delhi", "delhi"), + (1695115999911L, "c8abbe79-8d89-47ea-b4ce-4d224bae5bfa", "rider-F", "driver-T", 17.85, "chennai", "tamil-nadu")); + + val inserts = spark.createDataFrame(data1).toDF(columns: _*) + inserts.write.format("hudi"). + option(DataSourceWriteOptions.TABLE_TYPE.key, HoodieTableType.COPY_ON_WRITE.name()). + option("hoodie.datasource.write.operation", "insert"). + options(commonOptsWithSecondaryIndexSITest). + mode(SaveMode.Overwrite). + save(basePath1) + + val deleteDF = inserts.filter("uuid='334e26e9-8355-45cc-97c6-c31daf0df330'") + deleteDF.write.format("org.apache.hudi") + .option(DataSourceWriteOptions.TABLE_TYPE.key, HoodieTableType.COPY_ON_WRITE.name()) + .option(DataSourceWriteOptions.OPERATION.key, DELETE_OPERATION_OPT_VAL) + .options(commonOptsWithSecondaryIndexSITest) + .mode(SaveMode.Append) + .save(basePath1) + + // Read entry using secondary key + val tripsDF = spark.read.format("hudi"). + option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key, "true"). + options(commonOptsWithSecondaryIndexSITest). + load(basePath1). + select("ts", "uuid", "rider", "driver", "fare", "city", "state"). + where("city='san_francisco'"). + orderBy("uuid") + val expectedDF = inserts. + filter("city='san_francisco'"). + filter("uuid != '334e26e9-8355-45cc-97c6-c31daf0df330'"). + orderBy("uuid") + assertEquals(tripsDF.count(), expectedDF.count()) + + val rowCount = tripsDF.count().toInt + assertResult(expectedDF.take(rowCount))(tripsDF.take(rowCount)) + + val schemaStr = tripsDF.schema.simpleString + val writeConfig: HoodieWriteConfig = HoodieWriteConfig.newBuilder.withPath(basePath1).withSchema(schemaStr).build() + val metadataReader = HoodieTableMetadata.create(context, writeConfig.getMetadataConfig, basePath1) + val recordLocationFromSecondaryIndex = metadataReader.readSecondaryIndex(scala.collection.JavaConverters.seqAsJavaList(Seq("san_francisco"))) + assertEquals(recordLocationFromSecondaryIndex.size(), tripsDF.filter("city='san_francisco'").count()) + } + + @ParameterizedTest + @EnumSource(classOf[HoodieTableType]) + def testSIRead(tableType: HoodieTableType): Unit = { + //create a new table + val tableName = "trips_table3" + val basePath1 = "file:///tmp/trips_table3" + val columns = Seq("ts", "uuid", "rider", "driver", "fare", "city", "state") + val data = + Seq((1695159649087L, "334e26e9-8355-45cc-97c6-c31daf0df330", "rider-A", "driver-K", 19.10, "san_francisco", "california"), + (1695091554787L, "e96c4396-3fad-413a-a942-4cb36106d720", "rider-B", "driver-M", 27.70, "sao_paulo", "texas"), + (1695091554788L, "e96c4396-3fad-413a-a942-4cb36106d721", "rider-C", "driver-K", 27.70, "ithaca", "new-york"), + (1695046462179L, "9909a8b1-2d15-4d3d-8ec9-efc48c536a00", "rider-D", "driver-L", 33.90, "seattle", "washington"), + (1695516137016L, "e3cf430c-889d-4015-bc98-59bdce1e530c", "rider-E", "driver-P", 34.15, "chennai", "karnataka"), + (1695115999911L, "c8abbe79-8d89-47ea-b4ce-4d224bae5bfa", "rider-F", "driver-T", 17.85, "chennai", "tamil-nadu")); + + val inserts = spark.createDataFrame(data).toDF(columns: _*) + inserts.write.format("hudi"). + option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()). + option("hoodie.datasource.write.operation", "insert"). + options(commonOptsWithSecondaryIndexSITest). + mode(SaveMode.Overwrite). + save(basePath1) + + // Predicate on secondary key + val tripsDF = spark.read.format("hudi"). + option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key, "true"). + options(commonOptsWithSecondaryIndexSITest). + load(basePath1). + select("ts", "uuid", "rider", "driver", "fare", "city", "state"). + where("city='chennai'"). + orderBy("uuid") + // TODO: Assert the usage of secondary index through query plan + log.info("VINAY: Executed Plan - \n" + tripsDF.queryExecution.executedPlan.toString()) + + val expectedDF = inserts.filter("city='chennai'").orderBy("uuid") + assertEquals(tripsDF.count(), expectedDF.count()) + + val rowCount = tripsDF.count().toInt + assertResult(expectedDF.take(rowCount))(tripsDF.take(rowCount)) + + val schemaStr = tripsDF.schema.simpleString + val writeConfig: HoodieWriteConfig = HoodieWriteConfig.newBuilder.withPath(basePath1).withSchema(schemaStr).build() + val metadataReader = HoodieTableMetadata.create(context, writeConfig.getMetadataConfig, basePath1) + val recordLocationFromSecondaryIndex = metadataReader.readSecondaryIndex(scala.collection.JavaConverters.seqAsJavaList(Seq("chennai"))) + assertEquals(recordLocationFromSecondaryIndex.size(), tripsDF.filter("city='chennai'").count()) + } +}