Skip to content

Commit

Permalink
deprecate all deserialize code using byte array
Browse files Browse the repository at this point in the history
  • Loading branch information
Davis-Zhang-Onehouse committed Feb 12, 2025
1 parent c441530 commit 22489a2
Show file tree
Hide file tree
Showing 118 changed files with 508 additions and 428 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public String showCleans(
List<Comparable[]> rows = new ArrayList<>();
for (HoodieInstant clean : cleans) {
HoodieCleanMetadata cleanMetadata =
TimelineMetadataUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(clean).get());
TimelineMetadataUtils.deserializeHoodieCleanMetadata(timeline.getInstantContentStream(clean));
rows.add(new Comparable[] {clean.requestedTime(), cleanMetadata.getEarliestCommitToRetain(),
cleanMetadata.getTotalFilesDeleted(), cleanMetadata.getTimeTakenInMillis()});
}
Expand Down Expand Up @@ -110,7 +110,7 @@ public String showCleanPartitions(
}

HoodieCleanMetadata cleanMetadata =
TimelineMetadataUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(cleanInstant).get());
TimelineMetadataUtils.deserializeHoodieCleanMetadata(timeline.getInstantContentStream(cleanInstant));
List<Comparable[]> rows = new ArrayList<>();
for (Map.Entry<String, HoodieCleanPartitionMetadata> entry : cleanMetadata.getPartitionMetadata().entrySet()) {
String path = entry.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.CommitMetadataSerDe;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantComparator;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.NumericUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
Expand Down Expand Up @@ -72,19 +72,18 @@ private String printCommits(HoodieTimeline timeline,
.getInstantsAsStream().sorted(instantComparator.requestedTimeOrderedComparator().reversed()).collect(Collectors.toList());

for (final HoodieInstant commit : commits) {
if (timeline.getInstantDetails(commit).isPresent()) {
final HoodieCommitMetadata commitMetadata = getCommitMetadata(HoodieCLI.getTableMetaClient().getActiveTimeline(),
commit,
HoodieCommitMetadata.class);
rows.add(new Comparable[] {commit.requestedTime(),
commitMetadata.fetchTotalBytesWritten(),
commitMetadata.fetchTotalFilesInsert(),
commitMetadata.fetchTotalFilesUpdated(),
commitMetadata.fetchTotalPartitionsWritten(),
commitMetadata.fetchTotalRecordsWritten(),
commitMetadata.fetchTotalUpdateRecordsWritten(),
commitMetadata.fetchTotalWriteErrors()});
}
CommitMetadataSerDe serDe = HoodieCLI.getTableMetaClient().getCommitMetadataSerDe();
final HoodieCommitMetadata commitMetadata = serDe.deserialize(
commit, timeline.getInstantContentStream(commit),
HoodieCommitMetadata.class);
rows.add(new Comparable[] {commit.requestedTime(),
commitMetadata.fetchTotalBytesWritten(),
commitMetadata.fetchTotalFilesInsert(),
commitMetadata.fetchTotalFilesUpdated(),
commitMetadata.fetchTotalPartitionsWritten(),
commitMetadata.fetchTotalRecordsWritten(),
commitMetadata.fetchTotalUpdateRecordsWritten(),
commitMetadata.fetchTotalWriteErrors()});
}

final Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
Expand All @@ -111,25 +110,23 @@ private String printCommitsWithMetadata(HoodieTimeline timeline,
.getInstantsAsStream().sorted(instantComparator.requestedTimeOrderedComparator().reversed()).collect(Collectors.toList());

for (final HoodieInstant commit : commits) {
if (timeline.getInstantDetails(commit).isPresent()) {
final HoodieCommitMetadata commitMetadata = HoodieCLI.getTableMetaClient().getCommitMetadataSerDe().deserialize(
commit,
timeline.getInstantDetails(commit).get(),
HoodieCommitMetadata.class);

for (Map.Entry<String, List<HoodieWriteStat>> partitionWriteStat :
commitMetadata.getPartitionToWriteStats().entrySet()) {
for (HoodieWriteStat hoodieWriteStat : partitionWriteStat.getValue()) {
if (StringUtils.isNullOrEmpty(partition) || partition.equals(hoodieWriteStat.getPartitionPath())) {
rows.add(new Comparable[] {commit.getAction(), commit.requestedTime(), hoodieWriteStat.getPartitionPath(),
hoodieWriteStat.getFileId(), hoodieWriteStat.getPrevCommit(), hoodieWriteStat.getNumWrites(),
hoodieWriteStat.getNumInserts(), hoodieWriteStat.getNumDeletes(),
hoodieWriteStat.getNumUpdateWrites(), hoodieWriteStat.getTotalWriteErrors(),
hoodieWriteStat.getTotalLogBlocks(), hoodieWriteStat.getTotalCorruptLogBlock(),
hoodieWriteStat.getTotalRollbackBlocks(), hoodieWriteStat.getTotalLogRecords(),
hoodieWriteStat.getTotalUpdatedRecordsCompacted(), hoodieWriteStat.getTotalWriteBytes()
});
}
final HoodieCommitMetadata commitMetadata = HoodieCLI.getTableMetaClient().getCommitMetadataSerDe().deserialize(
commit,
timeline.getInstantContentStream(commit),
HoodieCommitMetadata.class);

for (Map.Entry<String, List<HoodieWriteStat>> partitionWriteStat :
commitMetadata.getPartitionToWriteStats().entrySet()) {
for (HoodieWriteStat hoodieWriteStat : partitionWriteStat.getValue()) {
if (StringUtils.isNullOrEmpty(partition) || partition.equals(hoodieWriteStat.getPartitionPath())) {
rows.add(new Comparable[] {commit.getAction(), commit.requestedTime(), hoodieWriteStat.getPartitionPath(),
hoodieWriteStat.getFileId(), hoodieWriteStat.getPrevCommit(), hoodieWriteStat.getNumWrites(),
hoodieWriteStat.getNumInserts(), hoodieWriteStat.getNumDeletes(),
hoodieWriteStat.getNumUpdateWrites(), hoodieWriteStat.getTotalWriteErrors(),
hoodieWriteStat.getTotalLogBlocks(), hoodieWriteStat.getTotalCorruptLogBlock(),
hoodieWriteStat.getTotalRollbackBlocks(), hoodieWriteStat.getTotalLogRecords(),
hoodieWriteStat.getTotalUpdatedRecordsCompacted(), hoodieWriteStat.getTotalWriteBytes()
});
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public String compactionShowArchived(
try {
archivedTimeline.loadCompactionDetailsInMemory(compactionInstantTime);
HoodieCompactionPlan compactionPlan =
TimelineMetadataUtils.deserializeCompactionPlan(archivedTimeline.getInstantDetails(instant).get());
TimelineMetadataUtils.deserializeCompactionPlan(archivedTimeline.getInstantContentStream(instant));
return printCompaction(compactionPlan, sortByField, descending, limit, headerOnly, partition);
} finally {
archivedTimeline.clearInstantDetailsFromMemory(compactionInstantTime);
Expand Down Expand Up @@ -371,7 +371,7 @@ Function<HoodieInstant, HoodieCompactionPlan> compactionPlanReader(
private HoodieCompactionPlan readCompactionPlanForArchivedTimeline(HoodieArchivedTimeline archivedTimeline,
HoodieInstant instant) {
try {
return TimelineMetadataUtils.deserializeCompactionPlan(archivedTimeline.getInstantDetails(instant).get());
return TimelineMetadataUtils.deserializeCompactionPlan(archivedTimeline.getInstantContentStream(instant));
} catch (Exception e) {
throw new HoodieException(e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.hudi.common.table.timeline.InstantComparator;
import org.apache.hudi.common.table.timeline.TimelineLayout;
import org.apache.hudi.common.util.NumericUtils;
import org.apache.hudi.common.util.Option;

import org.springframework.shell.standard.ShellComponent;
import org.springframework.shell.standard.ShellMethod;
Expand Down Expand Up @@ -118,14 +117,12 @@ private String printDiffWithMetadata(HoodieTimeline timeline, Integer limit, Str
.getInstantsAsStream().sorted(instantComparator.requestedTimeOrderedComparator().reversed()).collect(Collectors.toList());

for (final HoodieInstant commit : commits) {
Option<byte[]> instantDetails = timeline.getInstantDetails(commit);
if (instantDetails.isPresent()) {
HoodieCommitMetadata commitMetadata = layout.getCommitMetadataSerDe().deserialize(commit, instantDetails.get(), HoodieCommitMetadata.class);
for (Map.Entry<String, List<HoodieWriteStat>> partitionWriteStat :
commitMetadata.getPartitionToWriteStats().entrySet()) {
for (HoodieWriteStat hoodieWriteStat : partitionWriteStat.getValue()) {
populateRows(rows, commit, hoodieWriteStat, diffEntity, diffEntityChecker);
}
HoodieCommitMetadata commitMetadata = layout.getCommitMetadataSerDe().deserialize(
commit, timeline.getInstantContentStream(commit), HoodieCommitMetadata.class);
for (Map.Entry<String, List<HoodieWriteStat>> partitionWriteStat :
commitMetadata.getPartitionToWriteStats().entrySet()) {
for (HoodieWriteStat hoodieWriteStat : partitionWriteStat.getValue()) {
populateRows(rows, commit, hoodieWriteStat, diffEntity, diffEntityChecker);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.Option;

import org.springframework.shell.standard.ShellComponent;
import org.springframework.shell.standard.ShellMethod;
Expand Down Expand Up @@ -98,9 +97,8 @@ public String showRestore(
private void addDetailsOfCompletedRestore(HoodieActiveTimeline activeTimeline, List<Comparable[]> rows,
HoodieInstant restoreInstant) throws IOException {
HoodieRestoreMetadata instantMetadata;
Option<byte[]> instantDetails = activeTimeline.getInstantDetails(restoreInstant);
instantMetadata = TimelineMetadataUtils
.deserializeAvroMetadata(instantDetails.get(), HoodieRestoreMetadata.class);
.deserializeAvroMetadata(activeTimeline.getInstantContentStream(restoreInstant), HoodieRestoreMetadata.class);

for (String rolledbackInstant : instantMetadata.getInstantsToRollback()) {
Comparable[] row = createDataRow(instantMetadata.getStartRestoreTime(), rolledbackInstant,
Expand All @@ -122,9 +120,8 @@ private void addDetailsOfInflightRestore(HoodieActiveTimeline activeTimeline, Li
private HoodieRestorePlan getRestorePlan(HoodieActiveTimeline activeTimeline, HoodieInstant restoreInstant) throws IOException {
HoodieInstant instantKey = HoodieCLI.getTableMetaClient().createNewInstant(HoodieInstant.State.REQUESTED, RESTORE_ACTION,
restoreInstant.requestedTime());
Option<byte[]> instantDetails = activeTimeline.getInstantDetails(instantKey);
HoodieRestorePlan restorePlan = TimelineMetadataUtils
.deserializeAvroMetadata(instantDetails.get(), HoodieRestorePlan.class);
.deserializeAvroMetadata(activeTimeline.getInstantContentStream(instantKey), HoodieRestorePlan.class);
return restorePlan;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public String showRollbacks(
rollback.getInstants().forEach(instant -> {
try {
HoodieRollbackMetadata metadata = TimelineMetadataUtils
.deserializeAvroMetadata(activeTimeline.getInstantDetails(instant).get(), HoodieRollbackMetadata.class);
.deserializeAvroMetadata(activeTimeline.getInstantContentStream(instant), HoodieRollbackMetadata.class);
metadata.getCommitsRollback().forEach(c -> {
Comparable[] row = new Comparable[5];
row[0] = metadata.getStartRollbackTime();
Expand Down Expand Up @@ -98,7 +98,7 @@ public String showRollback(
HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
final List<Comparable[]> rows = new ArrayList<>();
HoodieRollbackMetadata metadata = TimelineMetadataUtils.deserializeAvroMetadata(
activeTimeline.getInstantDetails(HoodieCLI.getTableMetaClient().createNewInstant(State.COMPLETED, ROLLBACK_ACTION, rollbackInstant)).get(),
activeTimeline.getInstantContentStream(HoodieCLI.getTableMetaClient().createNewInstant(State.COMPLETED, ROLLBACK_ACTION, rollbackInstant)),
HoodieRollbackMetadata.class);
metadata.getPartitionMetadata().forEach((key, value) -> Stream
.concat(value.getSuccessDeleteFiles().stream().map(f -> Pair.of(f, true)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public String writeAmplificationStats(
TimelineLayout layout = TimelineLayout.fromVersion(timeline.getTimelineLayoutVersion());
for (HoodieInstant instantTime : timeline.getInstants()) {
String waf = "0";
HoodieCommitMetadata commit = layout.getCommitMetadataSerDe().deserialize(instantTime, activeTimeline.getInstantDetails(instantTime).get(),
HoodieCommitMetadata commit = layout.getCommitMetadataSerDe().deserialize(instantTime, activeTimeline.getInstantContentStream(instantTime),
HoodieCommitMetadata.class);
if (commit.fetchTotalUpdateRecordsWritten() > 0) {
waf = df.format((float) commit.fetchTotalRecordsWritten() / commit.fetchTotalUpdateRecordsWritten());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,11 +310,11 @@ private String getInstantToRollback(HoodieTimeline timeline, HoodieInstant insta
HoodieInstant instantToUse = HoodieCLI.getTableMetaClient().createNewInstant(
HoodieInstant.State.REQUESTED, instant.getAction(), instant.requestedTime());
HoodieRollbackPlan metadata = TimelineMetadataUtils
.deserializeAvroMetadata(timeline.getInstantDetails(instantToUse).get(), HoodieRollbackPlan.class);
.deserializeAvroMetadata(timeline.getInstantContentStream(instantToUse), HoodieRollbackPlan.class);
return metadata.getInstantToRollback().getCommitTime();
} else {
HoodieRollbackMetadata metadata = TimelineMetadataUtils
.deserializeAvroMetadata(timeline.getInstantDetails(instant).get(), HoodieRollbackMetadata.class);
.deserializeAvroMetadata(timeline.getInstantContentStream(instant), HoodieRollbackMetadata.class);
return String.join(",", metadata.getCommitsRollback());
}
} catch (IOException e) {
Expand All @@ -335,12 +335,12 @@ private Map<String, List<String>> getRolledBackInstantInfo(HoodieTimeline timeli
HoodieInstant instantToUse = HoodieCLI.getTableMetaClient().createNewInstant(
HoodieInstant.State.REQUESTED, rollbackInstant.getAction(), rollbackInstant.requestedTime());
HoodieRollbackPlan metadata = TimelineMetadataUtils
.deserializeAvroMetadata(timeline.getInstantDetails(instantToUse).get(), HoodieRollbackPlan.class);
.deserializeAvroMetadata(timeline.getInstantContentStream(instantToUse), HoodieRollbackPlan.class);
rollbackInfoMap.computeIfAbsent(metadata.getInstantToRollback().getCommitTime(), k -> new ArrayList<>())
.add(rollbackInstant.requestedTime());
} else {
HoodieRollbackMetadata metadata = TimelineMetadataUtils
.deserializeAvroMetadata(timeline.getInstantDetails(rollbackInstant).get(), HoodieRollbackMetadata.class);
.deserializeAvroMetadata(timeline.getInstantContentStream(rollbackInstant), HoodieRollbackMetadata.class);
metadata.getCommitsRollback().forEach(instant -> {
rollbackInfoMap.computeIfAbsent(instant, k -> new ArrayList<>())
.add(rollbackInstant.requestedTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public static long countNewRecords(HoodieTableMetaClient metaClient, List<String
HoodieInstant instant = metaClient.createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, commit);
HoodieCommitMetadata c = metaClient.getCommitMetadataSerDe().deserialize(
instant,
timeline.getInstantDetails(instant).get(),
timeline.getInstantContentStream(instant),
HoodieCommitMetadata.class);
totalNew += c.fetchTotalRecordsWritten() - c.fetchTotalUpdateRecordsWritten();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ private Long getLatestCleanTimeTakenInMillis() throws IOException {
HoodieInstant clean = timeline.getReverseOrderedInstants().findFirst().orElse(null);
if (clean != null) {
HoodieCleanMetadata cleanMetadata =
TimelineMetadataUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(clean).get());
TimelineMetadataUtils.deserializeHoodieCleanMetadata(timeline.getInstantContentStream(clean));
return cleanMetadata.getTimeTakenInMillis();
}
return -1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void testShowRestores() {
restores.sorted().forEach(instant -> {
try {
HoodieRestoreMetadata metadata = TimelineMetadataUtils
.deserializeAvroMetadata(activeTimeline.getInstantDetails(instant).get(), HoodieRestoreMetadata.class);
.deserializeAvroMetadata(activeTimeline.getInstantContentStream(instant), HoodieRestoreMetadata.class);
metadata.getInstantsToRollback().forEach(c -> {
Comparable[] row = new Comparable[4];
row[0] = metadata.getStartRestoreTime();
Expand Down Expand Up @@ -182,7 +182,7 @@ public void testShowRestore() throws IOException {

// get metadata of instant
HoodieRestoreMetadata instantMetadata = TimelineMetadataUtils.deserializeAvroMetadata(
activeTimeline.getInstantDetails(instant).get(), HoodieRestoreMetadata.class);
activeTimeline.getInstantContentStream(instant), HoodieRestoreMetadata.class);

// generate expected result
TableHeader header = new TableHeader()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public void testShowRollbacks() {
try {
// get pair of rollback time and instant time
HoodieRollbackMetadata metadata = TimelineMetadataUtils
.deserializeAvroMetadata(activeTimeline.getInstantDetails(instant).get(), HoodieRollbackMetadata.class);
.deserializeAvroMetadata(activeTimeline.getInstantContentStream(instant), HoodieRollbackMetadata.class);
metadata.getCommitsRollback().forEach(c -> {
Comparable[] row = new Comparable[5];
row[0] = metadata.getStartRollbackTime();
Expand Down Expand Up @@ -183,7 +183,7 @@ public void testShowRollback() throws IOException {
List<Comparable[]> rows = new ArrayList<>();
// get metadata of instant
HoodieRollbackMetadata metadata = TimelineMetadataUtils.deserializeAvroMetadata(
activeTimeline.getInstantDetails(instant).get(), HoodieRollbackMetadata.class);
activeTimeline.getInstantContentStream(instant), HoodieRollbackMetadata.class);
// generate expect result
metadata.getPartitionMetadata().forEach((key, value) -> Stream
.concat(value.getSuccessDeleteFiles().stream().map(f -> Pair.of(f, true)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1425,7 +1425,7 @@ protected void setWriteSchemaForDeletes(HoodieTableMetaClient metaClient) {
.lastInstant();
if (lastInstant.isPresent()) {
HoodieCommitMetadata commitMetadata = metaClient.getCommitMetadataSerDe().deserialize(lastInstant.get(),
activeTimeline.getInstantDetails(lastInstant.get()).get(), HoodieCommitMetadata.class);
activeTimeline.getInstantContentStream(lastInstant.get()), HoodieCommitMetadata.class);
String extraSchema = commitMetadata.getExtraMetadata().get(SCHEMA_KEY);
if (!StringUtils.isNullOrEmpty(extraSchema)) {
config.setSchema(commitMetadata.getExtraMetadata().get(SCHEMA_KEY));
Expand Down
Loading

0 comments on commit 22489a2

Please sign in to comment.