-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-7384] Secondary index support #10625
Conversation
264059f
to
50f2165
Compare
804d739
to
1a69c6a
Compare
aceed21
to
2656b3e
Compare
Rebase and resolve conflicts. Fix a bug related to MOR tables with secondary index. |
2656b3e
to
a65a927
Compare
Moved away from using |
"doc": "Refers to the record key that this secondary key maps to" | ||
}, | ||
{ | ||
"name": "isDeleted", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This field acts as. tombstone marker
@@ -240,6 +255,11 @@ public static HoodieMergedLogRecordScanner.Builder newBuilder() { | |||
|
|||
@Override | |||
public <T> void processNextRecord(HoodieRecord<T> newRecord) throws IOException { | |||
if (logContainsNonUniqueKeys) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the log files are for partitions that can have non-unique keys, then this logic makes use of the new map to buffer the scanned records.
String key = newRecord.getRecordKey(); | ||
HoodieMetadataPayload newPayload = (HoodieMetadataPayload) newRecord.getData(); | ||
|
||
// The rules for merging the prevRecord and the latestRecord is noted below. Note that this only applies for SecondaryIndex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The crux of the merging logic is here. The main issue with using the existing preCombine(...)
method is that it returns 'either-or' i.e chooses only one record. Changing the API was a little tedious- - hence this approach of moving the merge logic directly in the scanner. @vinothchandar @codope.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm. we should be using the combineAndGet... method of the MetadataPayload?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
combineAndGet() (which internally calls preCombine()), is an either-or operation i.e the caller should ensure that the prevRecord
and newRecord
are similar. For secondary-index, this similarity depends on the payload of the record (HoodieMetadataPayload) - maybe I can add a new API in HoodieRecordPayload
which is implemented by HoodieMetadataPayload
and avoid exposing HoodieMetadataPayload
to this layer?
|
||
// TODO: Merger need not be called here as the merging logic is handled explicitly in this function. | ||
// Retain until Secondary Index feature is tested and stabilized | ||
HoodieRecord<T> combinedRecord = (HoodieRecord<T>) recordMerger.merge(prevRecord, readerSchema, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will remove this later. The merging logic does not really rely on the merger (because of the comment made earlier).
@@ -33,6 +33,7 @@ | |||
import org.apache.hadoop.fs.FileSystem; | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please ignore the changes in this file. Will revert in the next upload.
@@ -54,6 +54,13 @@ public class KeyGeneratorOptions extends HoodieConfig { | |||
+ "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using\n" | |||
+ "the dot notation eg: `a.b.c`"); | |||
|
|||
public static final ConfigProperty<String> SECONDARYKEY_FIELD_NAME = ConfigProperty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of the current limitations - only supports one secondary index (per table). Will remove the limitation once the functionality is working end-end. Thinking of using the same approach as functional index (different partition for different secondary index based on a config file/json)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@@ -402,6 +422,31 @@ public HoodieMetadataPayload preCombine(HoodieMetadataPayload previousRecord) { | |||
// 2. A key moved to a different file due to clustering | |||
|
|||
// No need to merge with previous record index, always pick the latest payload. | |||
return this; | |||
case METADATA_TYPE_SECONDARY_INDEX: | |||
// TODO: This block and checks are just for validation and to detecte all callers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is here mainly for asserting and will be removed later. Ideally all merging logic of secondary (or non-unique-key) index records need to be handled at upper layers directly (when the scanner is running the scan)
.withFileSystem(metaClient.getFs()) | ||
.withBasePath(basePath) | ||
.withLogFilePaths(logFilePaths) | ||
.withReaderSchema(tableSchema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will change to filter only the required columns. Could not find a easy way/API to get that yet.
@@ -0,0 +1,222 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please ignore the repetive/duplicated code blocks. Will change once the functionality is ready and subsequently add more tests.
// the secondary index partition for each of these keys. For a commit which is deleting/updating a lot of records, this | ||
// operation is going to be expensive (in CPU, memory and IO) | ||
List<String> keysToRemove = new ArrayList<>(); | ||
writeStatus.collectAsList().forEach(status -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be fixed. If the design is changed so that WriteStatus
includes the (record-key, old-secondary-key, new-secondary-key) then this needs to change anyways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this collect can OOM right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the logic here is probably going to change if one uses the WriteStatus
to hold the (old-secondary-key, new-secondary-key) pair. Hence did not think of optimising here yet.
// Reuse record index parallelism config to build secondary index | ||
int parallelism = Math.min(partitionFilePairs.size(), dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism()); | ||
|
||
return deletedRecords.union(readSecondaryKeysFromFiles( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is convoluted. deletedRecords
are the tombstone records. For correctness, this tombstone records should be emitted before the regular records. Could not find anywhere in the doc if this ordering is preserved? But noticed in the tests that it is.
a65a927
to
890599a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initial comments. only half way there in review.
@@ -2493,6 +2493,10 @@ public boolean isLogCompactionEnabledOnMetadata() { | |||
return getBoolean(HoodieMetadataConfig.ENABLE_LOG_COMPACTION_ON_METADATA_TABLE); | |||
} | |||
|
|||
public boolean isSecondaryIndexEnabled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nts: revisit if this is needed or we need to consolidate using the record level index flag
|
||
// Enable secondary index only iff record index is enabled | ||
if (dataWriteConfig.isSecondaryIndexEnabled() || dataMetaClient.getTableConfig().isMetadataPartitionAvailable(SECONDARY_INDEX)) { | ||
this.enabledPartitionTypes.add(SECONDARY_INDEX); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nts: rename SECONDARY_INDEX
? technically the record index itself is an unique secondary index. This is just a non-unique secondary index.
dataMetaClient, | ||
EngineType.SPARK); | ||
|
||
// Initialize the file groups - using the same estimation logic as that of record index |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scope for code reuse?
fsView.loadAllPartitions(); | ||
|
||
List<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<>(); | ||
partitions.forEach(partition -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would n't this take a long time, if done on the driver?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually borrowed from how the existing indexes/partitions are built in metadata table (like RLI partion in initializeRecordIndexPartition()
)
EngineType.SPARK); | ||
|
||
// Initialize the file groups - using the same estimation logic as that of record index | ||
final int fileGroupCount = HoodieTableMetadataUtil.estimateFileGroupCount(RECORD_INDEX, records.count(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is going to compute records
twice? no?
this.nonUniqueKeyRecords = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(), | ||
new HoodieRecordSizeEstimator(readerSchema), diskMapType, isBitCaskDiskMapCompressionEnabled); | ||
|
||
if (logFilePaths.size() > 0 && HoodieTableMetadata.isMetadataTableSecondaryIndexPartition(basePath, partitionName)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this layer cannot be aware of metadata table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this all be hidden inside a method (still in this layer) - there needs to be some way to determine if the logs can have non-unique-keys. Initial implementation had it one layer above (i.e the callers instantiating HoodieMergedLogRecordScanner
passing in the flag), but having it here looked cleaner.
String key = newRecord.getRecordKey(); | ||
HoodieMetadataPayload newPayload = (HoodieMetadataPayload) newRecord.getData(); | ||
|
||
// The rules for merging the prevRecord and the latestRecord is noted below. Note that this only applies for SecondaryIndex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm. we should be using the combineAndGet... method of the MetadataPayload?
@@ -269,6 +350,68 @@ public <T> void processNextRecord(HoodieRecord<T> newRecord) throws IOException | |||
} | |||
} | |||
|
|||
private <T> void processNextNonUniqueKeyRecord(HoodieRecord<T> newRecord) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should probably subclass /find some way to avoid having this code right inside this class?
hudi-common/src/main/java/org/apache/hudi/common/table/log/LogFileIterator.java
Outdated
Show resolved
Hide resolved
@@ -54,6 +54,13 @@ public class KeyGeneratorOptions extends HoodieConfig { | |||
+ "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using\n" | |||
+ "the dot notation eg: `a.b.c`"); | |||
|
|||
public static final ConfigProperty<String> SECONDARYKEY_FIELD_NAME = ConfigProperty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Initial commit. Supports the following features: 1. Modify schema to add secondary index to metadata 2. New partition type in the metadata table to store secondary_keys-to-record_keys mapping 3. Various options to support secondary index enablement, column mappings (for secondary keys) etc 4. Initialization of secondary keys 5. Update secondary keys on inserts/upsert/deletes 6. Add hooks in HoodieFileIndex to prune candidate files (to scan) based on secondary key column filters. 7. Add ability in HoodieMergedLogRecordScanner to buffer non-unique key (i.e secondary key) records and merge 'similar' records 8. Add support for merging secondary index records (from delta log files and base files) 9. Ability to merge secondary index records across a group of log-files and across log-file/base-file Limitations: 1. Supports only one secondary index at the moment. 2. Scanning of the secondary index partition is done sequentially (both on the query side and the index-maintainance side) Pending items: 1. Integrate with compaction 2. Handle rollback 3. Cleanup existing tests and add more
890599a
to
32d4469
Compare
Hi @bhat-vinay! Is this design of secondary index through MDT is the only one to be implemented or there plans to make some other Index Types? As I remember there was RFC for Lucene Index and maybe some other types in future? |
Please get in touch with @codope for the latest update on this. AFAIK, lucene based secondary index is not planned at this time and MDT based secondary index is the one being developed. |
Hi @skyshineb , we do plan to add more index types. If you are interested in contributing to lucene based secondary index, I can help you to get started with multi-modal indexing framework. |
hi @codope! I planned to test this MDT implementation and the Lucene(one which I took from previous SI attempt and finished myself). And figure out is it profitable to use Lucene or not. But why this PR got closed? |
Initial commit. Supports the following features:
Limitations:
Pending items:
Change Logs
Initial commit. Supports the following features:
Limitations:
Pending items:
Impact
Support secondary index on columns (similar to record index, but for non-unique columns)
Risk level (write none, low medium or high below)
Medium. New and existing tests
Documentation Update
NA. Will be done later
Contributor's checklist