Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-7384] Secondary index support #10625

Closed
wants to merge 1 commit into from

Conversation

bhat-vinay
Copy link
Contributor

@bhat-vinay bhat-vinay commented Feb 5, 2024

Initial commit. Supports the following features:

  1. Modify schema to add secondary index to metadata
  2. New partition type in the metadata table to store secondary_keys-to-record_keys mapping
  3. Various options to support secondary index enablement, column mappings (for secondary keys) etc
  4. Initialization of secondary keys
  5. Update secondary keys on inserts/upsert/deletes
  6. Add hooks in HoodieFileIndex to prune candidate files (to scan) based on secondary key column filters.
  7. Add ability in HoodieMergedLogRecordScanner to buffer non-unique key (i.e secondary key) records and merge 'similar' records
  8. Add support for merging secondary index records (from delta log files and base files)
  9. Ability to merge secondary index records across a group of log-files and across log-file/base-file

Limitations:

  1. Supports only one secondary index at the moment.
  2. Scanning of the secondary index partition is done sequentially (both on the query side and the index-maintainance side)

Pending items:

  1. Integrate with compaction
  2. Handle rollback
  3. Cleanup existing tests and add more

Change Logs

Initial commit. Supports the following features:

  1. Modify schema to add secondary index to metadata
  2. New partition type in the metadata table to store secondary_keys-to-record_keys mapping
  3. Various options to support secondary index enablement, column mappings (for secondary keys) etc
  4. Initialization of secondary keys
  5. Update secondary keys on inserts/upsert/deletes
  6. Add hooks in HoodieFileIndex to prune candidate files (to scan) based on secondary key column filters.
  7. Add ability in HoodieMergedLogRecordScanner to buffer non-unique key (i.e secondary key) records and merge 'similar' records
  8. Add support for merging secondary index records (from delta log files and base files)
  9. Ability to merge secondary index records across a group of log-files and across log-file/base-file

Limitations:

  1. Supports only one secondary index at the moment.
  2. Scanning of the secondary index partition is done sequentially (both on the query side and the index-maintainance side)

Pending items:

  1. Integrate with compaction
  2. Handle rollback
  3. Cleanup existing tests and add more

Impact

Support secondary index on columns (similar to record index, but for non-unique columns)

Risk level (write none, low medium or high below)

Medium. New and existing tests

Documentation Update

NA. Will be done later

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@bhat-vinay bhat-vinay force-pushed the secondary-index-write branch from 264059f to 50f2165 Compare February 12, 2024 17:29
@vinothchandar vinothchandar self-assigned this Feb 13, 2024
@bhat-vinay bhat-vinay force-pushed the secondary-index-write branch 2 times, most recently from 804d739 to 1a69c6a Compare February 20, 2024 15:57
@bhat-vinay bhat-vinay changed the title [HUDI-7384][secondary-index][write] build secondary index on the keys… [HUDI-7384] [HUDI-7405] [secondary-index] Secondary index support Feb 20, 2024
@bhat-vinay bhat-vinay mentioned this pull request Feb 20, 2024
4 tasks
@bhat-vinay bhat-vinay force-pushed the secondary-index-write branch 2 times, most recently from aceed21 to 2656b3e Compare February 22, 2024 06:48
@bhat-vinay
Copy link
Contributor Author

Rebase and resolve conflicts. Fix a bug related to MOR tables with secondary index.

@bhat-vinay bhat-vinay force-pushed the secondary-index-write branch from 2656b3e to a65a927 Compare February 22, 2024 19:34
@bhat-vinay bhat-vinay changed the title [HUDI-7384] [HUDI-7405] [secondary-index] Secondary index support [HUDI-7384] Secondary index support Feb 22, 2024
@bhat-vinay
Copy link
Contributor Author

Moved away from using HoodieUnMergedLogRecordScanner. Added new buffer in HoodieMergedLogRecordScanner (based on SpillableDiskMap) to handle non-unique keys (secondary keys)

"doc": "Refers to the record key that this secondary key maps to"
},
{
"name": "isDeleted",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field acts as. tombstone marker

@@ -240,6 +255,11 @@ public static HoodieMergedLogRecordScanner.Builder newBuilder() {

@Override
public <T> void processNextRecord(HoodieRecord<T> newRecord) throws IOException {
if (logContainsNonUniqueKeys) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the log files are for partitions that can have non-unique keys, then this logic makes use of the new map to buffer the scanned records.

String key = newRecord.getRecordKey();
HoodieMetadataPayload newPayload = (HoodieMetadataPayload) newRecord.getData();

// The rules for merging the prevRecord and the latestRecord is noted below. Note that this only applies for SecondaryIndex
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The crux of the merging logic is here. The main issue with using the existing preCombine(...) method is that it returns 'either-or' i.e chooses only one record. Changing the API was a little tedious- - hence this approach of moving the merge logic directly in the scanner. @vinothchandar @codope.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm. we should be using the combineAndGet... method of the MetadataPayload?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

combineAndGet() (which internally calls preCombine()), is an either-or operation i.e the caller should ensure that the prevRecord and newRecord are similar. For secondary-index, this similarity depends on the payload of the record (HoodieMetadataPayload) - maybe I can add a new API in HoodieRecordPayload which is implemented by HoodieMetadataPayload and avoid exposing HoodieMetadataPayload to this layer?


// TODO: Merger need not be called here as the merging logic is handled explicitly in this function.
// Retain until Secondary Index feature is tested and stabilized
HoodieRecord<T> combinedRecord = (HoodieRecord<T>) recordMerger.merge(prevRecord, readerSchema,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove this later. The merging logic does not really rely on the merger (because of the comment made earlier).

@@ -33,6 +33,7 @@
import org.apache.hadoop.fs.FileSystem;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please ignore the changes in this file. Will revert in the next upload.

@@ -54,6 +54,13 @@ public class KeyGeneratorOptions extends HoodieConfig {
+ "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using\n"
+ "the dot notation eg: `a.b.c`");

public static final ConfigProperty<String> SECONDARYKEY_FIELD_NAME = ConfigProperty
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the current limitations - only supports one secondary index (per table). Will remove the limitation once the functionality is working end-end. Thinking of using the same approach as functional index (different partition for different secondary index based on a config file/json)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@@ -402,6 +422,31 @@ public HoodieMetadataPayload preCombine(HoodieMetadataPayload previousRecord) {
// 2. A key moved to a different file due to clustering

// No need to merge with previous record index, always pick the latest payload.
return this;
case METADATA_TYPE_SECONDARY_INDEX:
// TODO: This block and checks are just for validation and to detecte all callers.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is here mainly for asserting and will be removed later. Ideally all merging logic of secondary (or non-unique-key) index records need to be handled at upper layers directly (when the scanner is running the scan)

.withFileSystem(metaClient.getFs())
.withBasePath(basePath)
.withLogFilePaths(logFilePaths)
.withReaderSchema(tableSchema)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will change to filter only the required columns. Could not find a easy way/API to get that yet.

@@ -0,0 +1,222 @@
/*
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please ignore the repetive/duplicated code blocks. Will change once the functionality is ready and subsequently add more tests.

// the secondary index partition for each of these keys. For a commit which is deleting/updating a lot of records, this
// operation is going to be expensive (in CPU, memory and IO)
List<String> keysToRemove = new ArrayList<>();
writeStatus.collectAsList().forEach(status -> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be fixed. If the design is changed so that WriteStatus includes the (record-key, old-secondary-key, new-secondary-key) then this needs to change anyways.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this collect can OOM right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the logic here is probably going to change if one uses the WriteStatus to hold the (old-secondary-key, new-secondary-key) pair. Hence did not think of optimising here yet.

// Reuse record index parallelism config to build secondary index
int parallelism = Math.min(partitionFilePairs.size(), dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());

return deletedRecords.union(readSecondaryKeysFromFiles(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is convoluted. deletedRecords are the tombstone records. For correctness, this tombstone records should be emitted before the regular records. Could not find anywhere in the doc if this ordering is preserved? But noticed in the tests that it is.

@github-actions github-actions bot added the size:XL PR with lines of changes > 1000 label Feb 26, 2024
@bhat-vinay bhat-vinay force-pushed the secondary-index-write branch from a65a927 to 890599a Compare February 27, 2024 06:09
Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial comments. only half way there in review.

@@ -2493,6 +2493,10 @@ public boolean isLogCompactionEnabledOnMetadata() {
return getBoolean(HoodieMetadataConfig.ENABLE_LOG_COMPACTION_ON_METADATA_TABLE);
}

public boolean isSecondaryIndexEnabled() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nts: revisit if this is needed or we need to consolidate using the record level index flag


// Enable secondary index only iff record index is enabled
if (dataWriteConfig.isSecondaryIndexEnabled() || dataMetaClient.getTableConfig().isMetadataPartitionAvailable(SECONDARY_INDEX)) {
this.enabledPartitionTypes.add(SECONDARY_INDEX);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nts: rename SECONDARY_INDEX ? technically the record index itself is an unique secondary index. This is just a non-unique secondary index.

dataMetaClient,
EngineType.SPARK);

// Initialize the file groups - using the same estimation logic as that of record index
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scope for code reuse?

fsView.loadAllPartitions();

List<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<>();
partitions.forEach(partition -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would n't this take a long time, if done on the driver?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually borrowed from how the existing indexes/partitions are built in metadata table (like RLI partion in initializeRecordIndexPartition())

EngineType.SPARK);

// Initialize the file groups - using the same estimation logic as that of record index
final int fileGroupCount = HoodieTableMetadataUtil.estimateFileGroupCount(RECORD_INDEX, records.count(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is going to compute records twice? no?

this.nonUniqueKeyRecords = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(),
new HoodieRecordSizeEstimator(readerSchema), diskMapType, isBitCaskDiskMapCompressionEnabled);

if (logFilePaths.size() > 0 && HoodieTableMetadata.isMetadataTableSecondaryIndexPartition(basePath, partitionName)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this layer cannot be aware of metadata table.

Copy link
Contributor Author

@bhat-vinay bhat-vinay Feb 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this all be hidden inside a method (still in this layer) - there needs to be some way to determine if the logs can have non-unique-keys. Initial implementation had it one layer above (i.e the callers instantiating HoodieMergedLogRecordScanner passing in the flag), but having it here looked cleaner.

String key = newRecord.getRecordKey();
HoodieMetadataPayload newPayload = (HoodieMetadataPayload) newRecord.getData();

// The rules for merging the prevRecord and the latestRecord is noted below. Note that this only applies for SecondaryIndex
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm. we should be using the combineAndGet... method of the MetadataPayload?

@@ -269,6 +350,68 @@ public <T> void processNextRecord(HoodieRecord<T> newRecord) throws IOException
}
}

private <T> void processNextNonUniqueKeyRecord(HoodieRecord<T> newRecord) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should probably subclass /find some way to avoid having this code right inside this class?

@@ -54,6 +54,13 @@ public class KeyGeneratorOptions extends HoodieConfig {
+ "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using\n"
+ "the dot notation eg: `a.b.c`");

public static final ConfigProperty<String> SECONDARYKEY_FIELD_NAME = ConfigProperty
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Initial commit. Supports the following features:
1. Modify schema to add secondary index to metadata
2. New partition type  in the metadata table to store secondary_keys-to-record_keys mapping
3. Various options to support secondary index enablement, column mappings (for secondary keys) etc
4. Initialization of secondary keys
5. Update secondary keys on inserts/upsert/deletes
6. Add hooks in HoodieFileIndex to prune candidate files (to scan) based on secondary key column filters.
7. Add ability in HoodieMergedLogRecordScanner to buffer non-unique key (i.e secondary key) records and merge 'similar' records
8. Add support for merging secondary index records (from delta log files and base files)
9. Ability to merge secondary index records across a group of log-files and across log-file/base-file

Limitations:
1. Supports only one secondary index at the moment.
2. Scanning of the secondary index partition is done sequentially (both on the query side and the index-maintainance side)

Pending items:
1. Integrate with compaction
2. Handle rollback
3. Cleanup existing tests and add more
@bhat-vinay bhat-vinay force-pushed the secondary-index-write branch from 890599a to 32d4469 Compare March 5, 2024 09:37
@hudi-bot
Copy link

hudi-bot commented Mar 5, 2024

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@skyshineb
Copy link
Contributor

Hi @bhat-vinay! Is this design of secondary index through MDT is the only one to be implemented or there plans to make some other Index Types? As I remember there was RFC for Lucene Index and maybe some other types in future?

@bhat-vinay
Copy link
Contributor Author

Hi @bhat-vinay! Is this design of secondary index through MDT is the only one to be implemented or there plans to make some other Index Types? As I remember there was RFC for Lucene Index and maybe some other types in future?

Please get in touch with @codope for the latest update on this. AFAIK, lucene based secondary index is not planned at this time and MDT based secondary index is the one being developed.

@codope
Copy link
Member

codope commented Jun 11, 2024

Hi @skyshineb , we do plan to add more index types. If you are interested in contributing to lucene based secondary index, I can help you to get started with multi-modal indexing framework.

@codope codope closed this Jun 11, 2024
@skyshineb
Copy link
Contributor

hi @codope! I planned to test this MDT implementation and the Lucene(one which I took from previous SI attempt and finished myself). And figure out is it profitable to use Lucene or not. But why this PR got closed?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
index release-1.0.0 size:XL PR with lines of changes > 1000
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants