Skip to content

Commit

Permalink
util changes done
Browse files Browse the repository at this point in the history
  • Loading branch information
Davis-Zhang-Onehouse committed Feb 12, 2025
1 parent ae04c98 commit c441530
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static org.apache.hudi.cli.utils.CommitUtil.getTimeDaysAgo;
import static org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN;
import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
import static org.apache.hudi.common.table.timeline.TimelineUtils.getCommitMetadata;
import static org.apache.hudi.common.table.timeline.TimelineUtils.getTimeline;

/**
Expand All @@ -72,9 +73,8 @@ private String printCommits(HoodieTimeline timeline,

for (final HoodieInstant commit : commits) {
if (timeline.getInstantDetails(commit).isPresent()) {
final HoodieCommitMetadata commitMetadata = HoodieCLI.getTableMetaClient().getCommitMetadataSerDe().deserialize(
final HoodieCommitMetadata commitMetadata = getCommitMetadata(HoodieCLI.getTableMetaClient().getActiveTimeline(),
commit,
timeline.getInstantDetails(commit).get(),
HoodieCommitMetadata.class);
rows.add(new Comparable[] {commit.requestedTime(),
commitMetadata.fetchTotalBytesWritten(),
Expand Down Expand Up @@ -421,7 +421,7 @@ private Option<HoodieInstant> getCommitForInstant(HoodieTimeline timeline, Strin

private Option<HoodieCommitMetadata> getHoodieCommitMetadata(HoodieTimeline timeline, Option<HoodieInstant> hoodieInstant) throws IOException {
if (hoodieInstant.isPresent()) {
return Option.of(TimelineUtils.getCommitMetadata(hoodieInstant.get(), timeline));
return Option.of(getCommitMetadata(hoodieInstant.get(), timeline));
}
return Option.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.JsonUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;

import com.fasterxml.jackson.databind.exc.MismatchedInputException;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -565,19 +568,23 @@ public InputStream getInstantContentStream(HoodieInstant instant) {
}

@Override
public <T> T deserializeAvroInstantContent(HoodieInstant instant, Class<T> clazz) throws IOException {
try (InputStream inputStream = getInstantContentStream(instant)) {
DatumReader<T> reader = new SpecificDatumReader<>(clazz);
DataFileStream<T> fileReader = new DataFileStream<>(inputStream, reader);
ValidationUtils.checkArgument(fileReader.hasNext(), "Could not deserialize metadata of type " + clazz);
return fileReader.next();
}
}

@Override
public <T> T deserializeJsonInstantContent(HoodieInstant instant, Class<T> clazz) throws IOException {
try (InputStream inputStream = getInstantContentStream(instant)) {
return JsonUtils.getObjectMapper().readValue(inputStream, clazz);
public <T> T deserializeInstantContent(HoodieInstant instant, Class<T> clazz) throws IOException {
if (SpecificRecord.class.isAssignableFrom(clazz)) {
try (InputStream inputStream = getInstantContentStream(instant)) {
DatumReader<T> reader = new SpecificDatumReader<>(clazz);
DataFileStream<T> fileReader = new DataFileStream<>(inputStream, reader);
ValidationUtils.checkArgument(fileReader.hasNext(), "Could not deserialize metadata of type " + clazz);
return fileReader.next();
}
} else {
try (InputStream inputStream = getInstantContentStream(instant)) {
return JsonUtils.getObjectMapper().readValue(inputStream, clazz);
} catch (MismatchedInputException ex) {
if (ex.getMessage().startsWith("No content to map")) {
return ReflectionUtils.loadClass(clazz.getName());
}
throw ex;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,9 +457,7 @@ public interface HoodieTimeline extends HoodieInstantReader, Serializable {
*/
InputStream getInstantContentStream(HoodieInstant instant);

<T> T deserializeAvroInstantContent(HoodieInstant instant, Class<T> clazz) throws IOException;

<T> T deserializeJsonInstantContent(HoodieInstant instant, Class<T> clazz) throws IOException;
<T> T deserializeInstantContent(HoodieInstant instant, Class<T> clazz) throws IOException;

boolean isEmpty(HoodieInstant instant);

Expand Down
Loading

0 comments on commit c441530

Please sign in to comment.