-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-8992] Deprecate all byte array usage in metadata deserialization path #12826
base: master
Are you sure you want to change the base?
[HUDI-8992] Deprecate all byte array usage in metadata deserialization path #12826
Conversation
a1c91f0
to
8552c5d
Compare
8552c5d
to
4e48fd8
Compare
bee0cdf
to
9699515
Compare
9699515
to
082fd85
Compare
082fd85
to
7b08e0b
Compare
@@ -394,4 +423,12 @@ public static <T extends SpecificRecordBase> byte[] convertCommitMetadataToJsonB | |||
throw new HoodieIOException("Failed to convert to JSON.", e); | |||
} | |||
} | |||
|
|||
public static boolean isEmptyStream(InputStream inputStream) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this method is unused?
} | ||
return JsonUtils.getObjectMapper().convertValue(replaceCommitMetadata, HoodieReplaceCommitMetadata.class); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method below this one is now showing as unused, can it be cleaned up?
} | ||
|
||
public static HoodieCleanerPlan deserializeCleanerPlan(byte[] bytes) throws IOException { | ||
return deserializeAvroMetadata(bytes, HoodieCleanerPlan.class); | ||
public static HoodieCleanMetadata deserializeHoodieCleanMetadataLegacy(byte[] bytes) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are only 2 callers of this and it looks like they can both be migrated to use the content stream.
switch (actionType) { | ||
case HoodieTimeline.CLEAN_ACTION: { | ||
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient, instantDetails.get())); | ||
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient, planBytes.get())); | ||
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlanLegacy(metaClient, planBytes.get())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could replace all the "legacy" methods by wrapping the bytes in an input stream. This seems to be the main non-test class where these methods are used so we can also try to remove those methods after cleaning this up. This could help avoid confusion for future developers.
@@ -202,7 +202,7 @@ private Long getLatestCleanTimeTakenInMillis() throws IOException { | |||
HoodieInstant clean = timeline.getReverseOrderedInstants().findFirst().orElse(null); | |||
if (clean != null) { | |||
HoodieCleanMetadata cleanMetadata = | |||
TimelineMetadataUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(clean).get()); | |||
TimelineMetadataUtils.deserializeHoodieCleanMetadataLegacy(timeline.getInstantDetails(clean).get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this
TimelineMetadataUtils.deserializeHoodieCleanMetadataLegacy(timeline.getInstantDetails(clean).get()); | |
TimelineMetadataUtils.deserializeHoodieCleanMetadata(timeline.getInstantContentStream(clean)); |
|
||
@Override | ||
public boolean isEmpty(HoodieInstant instant) { | ||
return TimelineUtils.isEmpty(metaClient, instant); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly here, do we want to reference the readCommits
?
public static HoodieCleanMetadata getCleanerMetadata(HoodieTableMetaClient metaClient, Option<InputStream> details) | ||
throws IOException { | ||
CleanMetadataMigrator metadataMigrator = new CleanMetadataMigrator(metaClient); | ||
HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils.deserializeHoodieCleanMetadata(details); | ||
return metadataMigrator.upgradeToLatest(cleanMetadata, cleanMetadata.getVersion()); | ||
} | ||
|
||
public static HoodieCleanMetadata getCleanerMetadataFromInputStream(HoodieTableMetaClient metaClient, Option<InputStream> in) | ||
throws IOException { | ||
CleanMetadataMigrator metadataMigrator = new CleanMetadataMigrator(metaClient); | ||
HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils.deserializeHoodieCleanMetadata(in); | ||
return metadataMigrator.upgradeToLatest(cleanMetadata, cleanMetadata.getVersion()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two methods look like they are the same?
@@ -190,22 +190,21 @@ private static Option<HoodieRequestedReplaceMetadata> getRequestedReplaceMetadat | |||
String action = factory instanceof InstantGeneratorV2 ? HoodieTimeline.CLUSTERING_ACTION : HoodieTimeline.REPLACE_COMMIT_ACTION; | |||
requestedInstant = factory.createNewInstant(HoodieInstant.State.REQUESTED, action, pendingReplaceOrClusterInstant.requestedTime()); | |||
} | |||
Option<byte[]> content = Option.empty(); | |||
if (timeline.isEmpty(requestedInstant)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this to the error handling?
} | ||
} | ||
|
||
public static HoodieCompactionPlan getCompactionPlanFromInputStream(HoodieTableMetaClient metaClient, Option<InputStream> in) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems similar to the method at line 196
@@ -1094,7 +1094,7 @@ private static void reAddLogFilesFromRollbackPlan(HoodieTableMetaClient dataTabl | |||
HoodieInstant requested = factory.getRollbackRequestedInstant(rollbackInstant); | |||
try { | |||
HoodieRollbackPlan rollbackPlan = TimelineMetadataUtils.deserializeAvroMetadata( | |||
dataTableMetaClient.getActiveTimeline().readRollbackInfoAsBytes(requested).get(), HoodieRollbackPlan.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
readRollbackInfoAsBytes
looks like it is no longer used in the codebase and can be removed
Reviewers please start reviewing from commit with title "util changes done", the upstream PR is tracked in #12829
Change Logs
For all metadata deserialization now we change them to use input streams. A few minor place that requires more code refactoring we need to follow up separately.
Also fix the logic of checking if a given instant file is empty, previously we load file to byte array and check length, now we check file stats which is much light weight.
Impact
Allows for reading of large commit metadata without OOM.
Risk level (write none, low medium or high below)
low
Documentation Update
NA
Contributor's checklist