diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java index 3b42edc383a5..7a3d6fdcc9b2 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java @@ -50,6 +50,7 @@ import static org.apache.hudi.cli.utils.CommitUtil.getTimeDaysAgo; import static org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN; import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps; +import static org.apache.hudi.common.table.timeline.TimelineUtils.getCommitMetadata; import static org.apache.hudi.common.table.timeline.TimelineUtils.getTimeline; /** @@ -72,9 +73,8 @@ private String printCommits(HoodieTimeline timeline, for (final HoodieInstant commit : commits) { if (timeline.getInstantDetails(commit).isPresent()) { - final HoodieCommitMetadata commitMetadata = HoodieCLI.getTableMetaClient().getCommitMetadataSerDe().deserialize( + final HoodieCommitMetadata commitMetadata = getCommitMetadata(HoodieCLI.getTableMetaClient().getActiveTimeline(), commit, - timeline.getInstantDetails(commit).get(), HoodieCommitMetadata.class); rows.add(new Comparable[] {commit.requestedTime(), commitMetadata.fetchTotalBytesWritten(), @@ -421,7 +421,7 @@ private Option getCommitForInstant(HoodieTimeline timeline, Strin private Option getHoodieCommitMetadata(HoodieTimeline timeline, Option hoodieInstant) throws IOException { if (hoodieInstant.isPresent()) { - return Option.of(TimelineUtils.getCommitMetadata(hoodieInstant.get(), timeline)); + return Option.of(getCommitMetadata(hoodieInstant.get(), timeline)); } return Option.empty(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/BaseHoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/BaseHoodieTimeline.java index 3243ad73aaf5..b1f5ea6f1e18 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/BaseHoodieTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/BaseHoodieTimeline.java @@ -22,13 +22,16 @@ import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.JsonUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieException; +import com.fasterxml.jackson.databind.exc.MismatchedInputException; import org.apache.avro.file.DataFileStream; import org.apache.avro.io.DatumReader; import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -565,19 +568,23 @@ public InputStream getInstantContentStream(HoodieInstant instant) { } @Override - public T deserializeAvroInstantContent(HoodieInstant instant, Class clazz) throws IOException { - try (InputStream inputStream = getInstantContentStream(instant)) { - DatumReader reader = new SpecificDatumReader<>(clazz); - DataFileStream fileReader = new DataFileStream<>(inputStream, reader); - ValidationUtils.checkArgument(fileReader.hasNext(), "Could not deserialize metadata of type " + clazz); - return fileReader.next(); - } - } - - @Override - public T deserializeJsonInstantContent(HoodieInstant instant, Class clazz) throws IOException { - try (InputStream inputStream = getInstantContentStream(instant)) { - return JsonUtils.getObjectMapper().readValue(inputStream, clazz); + public T deserializeInstantContent(HoodieInstant instant, Class clazz) throws IOException { + if (SpecificRecord.class.isAssignableFrom(clazz)) { + try (InputStream inputStream = getInstantContentStream(instant)) { + DatumReader reader = new SpecificDatumReader<>(clazz); + DataFileStream fileReader = new DataFileStream<>(inputStream, reader); + ValidationUtils.checkArgument(fileReader.hasNext(), "Could not deserialize metadata of type " + clazz); + return fileReader.next(); + } + } else { + try (InputStream inputStream = getInstantContentStream(instant)) { + return JsonUtils.getObjectMapper().readValue(inputStream, clazz); + } catch (MismatchedInputException ex) { + if (ex.getMessage().startsWith("No content to map")) { + return ReflectionUtils.loadClass(clazz.getName()); + } + throw ex; + } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java index 692b00b8a254..06c0f9883f1e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java @@ -457,9 +457,7 @@ public interface HoodieTimeline extends HoodieInstantReader, Serializable { */ InputStream getInstantContentStream(HoodieInstant instant); - T deserializeAvroInstantContent(HoodieInstant instant, Class clazz) throws IOException; - - T deserializeJsonInstantContent(HoodieInstant instant, Class clazz) throws IOException; + T deserializeInstantContent(HoodieInstant instant, Class clazz) throws IOException; boolean isEmpty(HoodieInstant instant); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java index 8a49841d341e..23c3d276101c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.model.ActionType; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.table.timeline.versioning.v2.ArchivedTimelineV2; @@ -56,100 +57,103 @@ */ public class MetadataConversionUtils { - public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant hoodieInstant, HoodieTableMetaClient metaClient) throws IOException { - Option instantDetails = metaClient.getActiveTimeline().getInstantDetails(hoodieInstant); - if (hoodieInstant.isCompleted() && instantDetails.get().length == 0) { - // in local FS and HDFS, there could be empty completed instants due to crash. - // let's add an entry to the archival, even if not for the plan. - return createMetaWrapperForEmptyInstant(hoodieInstant); - } - HoodieArchivedMetaEntry archivedMetaWrapper = new HoodieArchivedMetaEntry(); - archivedMetaWrapper.setCommitTime(hoodieInstant.requestedTime()); - archivedMetaWrapper.setActionState(hoodieInstant.getState().name()); - archivedMetaWrapper.setStateTransitionTime(hoodieInstant.getCompletionTime()); - switch (hoodieInstant.getAction()) { - case HoodieTimeline.CLEAN_ACTION: { - if (hoodieInstant.isCompleted()) { - archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient, instantDetails.get())); - } else { - archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient, instantDetails.get())); + public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant hoodieInstant, HoodieTableMetaClient metaClient) { + try { + HoodieArchivedMetaEntry archivedMetaWrapper = new HoodieArchivedMetaEntry(); + archivedMetaWrapper.setCommitTime(hoodieInstant.requestedTime()); + archivedMetaWrapper.setActionState(hoodieInstant.getState().name()); + archivedMetaWrapper.setStateTransitionTime(hoodieInstant.getCompletionTime()); + switch (hoodieInstant.getAction()) { + case HoodieTimeline.CLEAN_ACTION: { + if (hoodieInstant.isCompleted()) { + archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient, metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get())); + } else { + archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient, metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get())); + } + archivedMetaWrapper.setActionType(ActionType.clean.name()); + break; } - archivedMetaWrapper.setActionType(ActionType.clean.name()); - break; - } - case HoodieTimeline.COMMIT_ACTION: { - HoodieCommitMetadata commitMetadata = metaClient.getCommitMetadataSerDe().deserialize(hoodieInstant, instantDetails.get(), HoodieCommitMetadata.class); - archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata)); - archivedMetaWrapper.setActionType(ActionType.commit.name()); - break; - } - case HoodieTimeline.DELTA_COMMIT_ACTION: { - HoodieCommitMetadata deltaCommitMetadata = metaClient.getCommitMetadataSerDe().deserialize(hoodieInstant, instantDetails.get(), HoodieCommitMetadata.class); - archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(deltaCommitMetadata)); - archivedMetaWrapper.setActionType(ActionType.deltacommit.name()); - break; - } - case HoodieTimeline.REPLACE_COMMIT_ACTION: - case HoodieTimeline.CLUSTERING_ACTION: { - if (hoodieInstant.isCompleted()) { - HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata.fromBytes(instantDetails.get(), HoodieReplaceCommitMetadata.class); - archivedMetaWrapper.setHoodieReplaceCommitMetadata(convertReplaceCommitMetadata(replaceCommitMetadata)); - } else if (hoodieInstant.isInflight()) { - // inflight replacecommit files have the same metadata body as HoodieCommitMetadata - // so we could re-use it without further creating an inflight extension. - // Or inflight replacecommit files are empty under clustering circumstance - Option inflightCommitMetadata = getInflightCommitMetadata(metaClient, hoodieInstant, instantDetails); - if (inflightCommitMetadata.isPresent()) { - archivedMetaWrapper.setHoodieInflightReplaceMetadata(convertCommitMetadata(inflightCommitMetadata.get())); + case HoodieTimeline.COMMIT_ACTION: { + getCommitMetadata(metaClient.getActiveTimeline(), hoodieInstant, HoodieCommitMetadata.class) + .ifPresent(commitMetadata -> archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata))); + archivedMetaWrapper.setActionType(ActionType.commit.name()); + break; + } + case HoodieTimeline.DELTA_COMMIT_ACTION: { + getCommitMetadata(metaClient.getActiveTimeline(), hoodieInstant, HoodieCommitMetadata.class) + .ifPresent(deltaCommitMetadata -> archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(deltaCommitMetadata))); + archivedMetaWrapper.setActionType(ActionType.deltacommit.name()); + break; + } + case HoodieTimeline.REPLACE_COMMIT_ACTION: + case HoodieTimeline.CLUSTERING_ACTION: { + if (hoodieInstant.isCompleted()) { + getCommitMetadata(metaClient.getActiveTimeline(), hoodieInstant, HoodieReplaceCommitMetadata.class) + .ifPresent(replaceCommitMetadata -> archivedMetaWrapper.setHoodieReplaceCommitMetadata(convertReplaceCommitMetadata(replaceCommitMetadata))); + } else if (hoodieInstant.isInflight()) { + // inflight replacecommit files have the same metadata body as HoodieCommitMetadata + // so we could re-use it without further creating an inflight extension. + // Or inflight replacecommit files are empty under clustering circumstance + Option inflightCommitMetadata = getCommitMetadata(metaClient.getActiveTimeline(), hoodieInstant, HoodieCommitMetadata.class); + if (inflightCommitMetadata.isPresent()) { + archivedMetaWrapper.setHoodieInflightReplaceMetadata(convertCommitMetadata(inflightCommitMetadata.get())); + } + } else { + // we may have cases with empty HoodieRequestedReplaceMetadata e.g. insert_overwrite_table or insert_overwrite + // without clustering. However, we should revisit the requested commit file standardization + Option requestedReplaceMetadata = getRequestedReplaceMetadata(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant)); + if (requestedReplaceMetadata.isPresent()) { + archivedMetaWrapper.setHoodieRequestedReplaceMetadata(requestedReplaceMetadata.get()); + } } - } else { - // we may have cases with empty HoodieRequestedReplaceMetadata e.g. insert_overwrite_table or insert_overwrite - // without clustering. However, we should revisit the requested commit file standardization - Option requestedReplaceMetadata = getRequestedReplaceMetadata(instantDetails); - if (requestedReplaceMetadata.isPresent()) { - archivedMetaWrapper.setHoodieRequestedReplaceMetadata(requestedReplaceMetadata.get()); + archivedMetaWrapper.setActionType( + hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION) ? ActionType.replacecommit.name() : ActionType.clustering.name()); + break; + } + case HoodieTimeline.ROLLBACK_ACTION: { + if (hoodieInstant.isCompleted()) { + archivedMetaWrapper.setHoodieRollbackMetadata( + TimelineMetadataUtils.deserializeAvroMetadata(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(), HoodieRollbackMetadata.class)); } + archivedMetaWrapper.setActionType(ActionType.rollback.name()); + break; } - archivedMetaWrapper.setActionType( - hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION) ? ActionType.replacecommit.name() : ActionType.clustering.name()); - break; - } - case HoodieTimeline.ROLLBACK_ACTION: { - if (hoodieInstant.isCompleted()) { - archivedMetaWrapper.setHoodieRollbackMetadata(TimelineMetadataUtils.deserializeAvroMetadata(instantDetails.get(), HoodieRollbackMetadata.class)); + case HoodieTimeline.SAVEPOINT_ACTION: { + archivedMetaWrapper.setHoodieSavePointMetadata( + TimelineMetadataUtils.deserializeAvroMetadata(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(), HoodieSavepointMetadata.class)); + archivedMetaWrapper.setActionType(ActionType.savepoint.name()); + break; } - archivedMetaWrapper.setActionType(ActionType.rollback.name()); - break; - } - case HoodieTimeline.SAVEPOINT_ACTION: { - archivedMetaWrapper.setHoodieSavePointMetadata(TimelineMetadataUtils.deserializeAvroMetadata(instantDetails.get(), HoodieSavepointMetadata.class)); - archivedMetaWrapper.setActionType(ActionType.savepoint.name()); - break; - } - case HoodieTimeline.COMPACTION_ACTION: { - if (hoodieInstant.isRequested()) { - HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(metaClient, instantDetails); - archivedMetaWrapper.setHoodieCompactionPlan(plan); + case HoodieTimeline.COMPACTION_ACTION: { + if (hoodieInstant.isRequested()) { + HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(metaClient, metaClient.getActiveTimeline().getInstantDetails(hoodieInstant)); + archivedMetaWrapper.setHoodieCompactionPlan(plan); + } + archivedMetaWrapper.setActionType(ActionType.compaction.name()); + break; } - archivedMetaWrapper.setActionType(ActionType.compaction.name()); - break; - } - case HoodieTimeline.LOG_COMPACTION_ACTION: { - if (hoodieInstant.isRequested()) { - HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(metaClient, instantDetails); - archivedMetaWrapper.setHoodieCompactionPlan(plan); + case HoodieTimeline.LOG_COMPACTION_ACTION: { + if (hoodieInstant.isRequested()) { + HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(metaClient, metaClient.getActiveTimeline().getInstantDetails(hoodieInstant)); + archivedMetaWrapper.setHoodieCompactionPlan(plan); + } + archivedMetaWrapper.setActionType(ActionType.logcompaction.name()); + break; + } + default: { + throw new UnsupportedOperationException("Action not fully supported yet"); } - archivedMetaWrapper.setActionType(ActionType.logcompaction.name()); - break; - } - default: { - throw new UnsupportedOperationException("Action not fully supported yet"); } + return archivedMetaWrapper; + } catch (IOException | HoodieIOException ex) { + // in local FS and HDFS, there could be empty completed instants due to crash. + // let's add an entry to the archival, even if not for the plan. + return createMetaWrapperForEmptyInstant(hoodieInstant); } - return archivedMetaWrapper; } /** + * TODO(reviewers) - new code applied similar refactoring, please pay close attention. * Creates the legacy archived metadata entry from the new LSM-timeline read. * *

For legacy archive log, 3 entries are persisted for one instant, here only one summary entry is converted into. @@ -173,7 +177,7 @@ public static HoodieArchivedMetaEntry createMetaWrapper( archivedMetaWrapper.setActionState(HoodieInstant.State.COMPLETED.name()); archivedMetaWrapper.setStateTransitionTime(completionTime); String actionType = lsmTimelineRecord.get(ArchivedTimelineV2.ACTION_ARCHIVED_META_FIELD).toString(); - HoodieInstant instant = metaClient.getInstantGenerator().createNewInstant(HoodieInstant.State.COMPLETED, actionType, instantTime, completionTime); + HoodieInstant hoodieInstant = metaClient.getInstantGenerator().createNewInstant(HoodieInstant.State.COMPLETED, actionType, instantTime, completionTime); switch (actionType) { case HoodieTimeline.CLEAN_ACTION: { archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient, instantDetails.get())); @@ -182,8 +186,8 @@ public static HoodieArchivedMetaEntry createMetaWrapper( break; } case HoodieTimeline.COMMIT_ACTION: { - HoodieCommitMetadata commitMetadata = metaClient.getCommitMetadataSerDe().deserialize(instant, instantDetails.get(), HoodieCommitMetadata.class); - archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata)); + getCommitMetadata(metaClient.getActiveTimeline(), hoodieInstant, HoodieCommitMetadata.class) + .ifPresent(commitMetadata -> archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata))); archivedMetaWrapper.setActionType(ActionType.commit.name()); if (planBytes.isPresent()) { @@ -194,8 +198,8 @@ public static HoodieArchivedMetaEntry createMetaWrapper( break; } case HoodieTimeline.DELTA_COMMIT_ACTION: { - HoodieCommitMetadata deltaCommitMetadata = metaClient.getCommitMetadataSerDe().deserialize(instant, instantDetails.get(), HoodieCommitMetadata.class); - archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(deltaCommitMetadata)); + getCommitMetadata(metaClient.getActiveTimeline(), hoodieInstant, HoodieCommitMetadata.class) + .ifPresent(commitMetadata -> archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata))); archivedMetaWrapper.setActionType(ActionType.deltacommit.name()); if (planBytes.isPresent()) { @@ -207,38 +211,41 @@ public static HoodieArchivedMetaEntry createMetaWrapper( } case HoodieTimeline.REPLACE_COMMIT_ACTION: case HoodieTimeline.CLUSTERING_ACTION: { - HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata.fromBytes(instantDetails.get(), HoodieReplaceCommitMetadata.class); - archivedMetaWrapper.setHoodieReplaceCommitMetadata(convertReplaceCommitMetadata(replaceCommitMetadata)); + getCommitMetadata(metaClient.getActiveTimeline(), hoodieInstant, HoodieReplaceCommitMetadata.class) + .ifPresent(replaceCommitMetadata -> archivedMetaWrapper.setHoodieReplaceCommitMetadata(convertReplaceCommitMetadata(replaceCommitMetadata))); // inflight replacecommit files have the same metadata body as HoodieCommitMetadata // so we could re-use it without further creating an inflight extension. // Or inflight replacecommit files are empty under clustering circumstance - Option inflightCommitMetadata = getInflightCommitMetadata(metaClient, instant, instantDetails); + Option inflightCommitMetadata = getCommitMetadata(metaClient.getActiveTimeline(), hoodieInstant, HoodieCommitMetadata.class); if (inflightCommitMetadata.isPresent()) { archivedMetaWrapper.setHoodieInflightReplaceMetadata(convertCommitMetadata(inflightCommitMetadata.get())); } - archivedMetaWrapper.setActionType(ActionType.replacecommit.name()); + archivedMetaWrapper.setActionType( + hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION) ? ActionType.replacecommit.name() : ActionType.clustering.name()); break; } case HoodieTimeline.ROLLBACK_ACTION: { - archivedMetaWrapper.setHoodieRollbackMetadata(TimelineMetadataUtils.deserializeAvroMetadata(instantDetails.get(), HoodieRollbackMetadata.class)); + archivedMetaWrapper.setHoodieRollbackMetadata( + TimelineMetadataUtils.deserializeAvroMetadata(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(), HoodieRollbackMetadata.class)); archivedMetaWrapper.setActionType(ActionType.rollback.name()); break; } case HoodieTimeline.SAVEPOINT_ACTION: { - archivedMetaWrapper.setHoodieSavePointMetadata(TimelineMetadataUtils.deserializeAvroMetadata(instantDetails.get(), HoodieSavepointMetadata.class)); + archivedMetaWrapper.setHoodieSavePointMetadata( + TimelineMetadataUtils.deserializeAvroMetadata(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(), HoodieSavepointMetadata.class)); archivedMetaWrapper.setActionType(ActionType.savepoint.name()); break; } case HoodieTimeline.COMPACTION_ACTION: { // should be handled by commit_action branch though, this logic is redundant. - HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(metaClient, planBytes); + HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(metaClient, metaClient.getActiveTimeline().getInstantDetails(hoodieInstant)); archivedMetaWrapper.setHoodieCompactionPlan(plan); archivedMetaWrapper.setActionType(ActionType.compaction.name()); break; } case HoodieTimeline.LOG_COMPACTION_ACTION: { - HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(metaClient, planBytes); + HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(metaClient, metaClient.getActiveTimeline().getInstantDetails(hoodieInstant)); archivedMetaWrapper.setHoodieCompactionPlan(plan); archivedMetaWrapper.setActionType(ActionType.logcompaction.name()); break; @@ -332,13 +339,13 @@ public static HoodieArchivedMetaEntry createMetaWrapperForEmptyInstant(HoodieIns return archivedMetaWrapper; } - private static Option getInflightCommitMetadata(HoodieTableMetaClient metaClient, HoodieInstant instant, - Option inflightContent) throws IOException { - if (!inflightContent.isPresent() || inflightContent.get().length == 0) { - // inflight files can be empty in some certain cases, e.g. when users opt in clustering + private static Option getCommitMetadata(HoodieActiveTimeline timeline, HoodieInstant instant, Class clazz) throws IOException { + T commitMetadata = timeline.deserializeInstantContent(instant, clazz); + // an empty file will return the default instance with an UNKNOWN operation type and in that case we return an empty option + if (commitMetadata.getOperationType() == WriteOperationType.UNKNOWN) { return Option.empty(); } - return Option.of(metaClient.getCommitMetadataSerDe().deserialize(instant, inflightContent.get(), HoodieCommitMetadata.class)); + return Option.of(commitMetadata); } private static Option getRequestedReplaceMetadata(Option requestedContent) throws IOException {