Skip to content

Commit

Permalink
remove getInstantReader for active timeline
Browse files Browse the repository at this point in the history
  • Loading branch information
Davis-Zhang-Onehouse committed Feb 10, 2025
1 parent fb6fe7f commit acc4e2e
Show file tree
Hide file tree
Showing 7 changed files with 13 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ private HoodieTableFileSystemView buildFileSystemView(String globRegex, String m
instantsStream = instantsStream.filter(is -> predicate.test(maxInstant, is.requestedTime()));
}
TimelineFactory timelineFactory = metaClient.getTimelineLayout().getTimelineFactory();
HoodieTimeline filteredTimeline = timelineFactory.createDefaultTimeline(instantsStream, metaClient.getActiveTimeline()::getInstantDetails);
HoodieTimeline filteredTimeline = timelineFactory.createDefaultTimeline(instantsStream, metaClient.getActiveTimeline());
return new HoodieTableFileSystemView(metaClient, filteredTimeline, pathInfoList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ public void testCheckIfValidCommit() throws Exception {
setUp(IndexType.BLOOM, true, false);

// When timeline is empty, all commits are invalid
HoodieTimeline timeline = TIMELINE_FACTORY.createDefaultTimeline(Collections.EMPTY_LIST.stream(), metaClient.getActiveTimeline().getInstantReader());
HoodieTimeline timeline = TIMELINE_FACTORY.createDefaultTimeline(Collections.EMPTY_LIST.stream(), metaClient.getActiveTimeline());
assertTrue(timeline.empty());
assertFalse(HoodieIndexUtils.checkIfValidCommit(timeline, "001"));
assertFalse(HoodieIndexUtils.checkIfValidCommit(timeline, writeClient.createNewInstantTime()));
Expand All @@ -541,7 +541,7 @@ public void testCheckIfValidCommit() throws Exception {
final HoodieInstant instant1 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "010");
String instantTimestamp = writeClient.createNewInstantTime();
final HoodieInstant instant2 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, writeClient.createNewInstantTime());
timeline = TIMELINE_FACTORY.createDefaultTimeline(Stream.of(instant1, instant2), metaClient.getActiveTimeline().getInstantReader());
timeline = TIMELINE_FACTORY.createDefaultTimeline(Stream.of(instant1, instant2), metaClient.getActiveTimeline());
assertFalse(timeline.empty());
assertTrue(HoodieIndexUtils.checkIfValidCommit(timeline, instant1.requestedTime()));
assertTrue(HoodieIndexUtils.checkIfValidCommit(timeline, instant2.requestedTime()));
Expand All @@ -557,15 +557,15 @@ public void testCheckIfValidCommit() throws Exception {
instantTimestamp = writeClient.createNewInstantTime();
String instantTimestampSec = instantTimestamp.substring(0, instantTimestamp.length() - HoodieInstantTimeGenerator.DEFAULT_MILLIS_EXT.length());
final HoodieInstant instant3 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, instantTimestampSec);
timeline = TIMELINE_FACTORY.createDefaultTimeline(Stream.of(instant1, instant3), metaClient.getActiveTimeline().getInstantReader());
timeline = TIMELINE_FACTORY.createDefaultTimeline(Stream.of(instant1, instant3), metaClient.getActiveTimeline());
assertFalse(timeline.empty());
assertFalse(HoodieIndexUtils.checkIfValidCommit(timeline, instantTimestamp));
assertTrue(HoodieIndexUtils.checkIfValidCommit(timeline, instantTimestampSec));

// With a sec format instant time lesser than first entry in the active timeline, checkifContainsOrBefore() should return true
instantTimestamp = writeClient.createNewInstantTime();
final HoodieInstant instant4 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, instantTimestamp);
timeline = TIMELINE_FACTORY.createDefaultTimeline(Stream.of(instant4), metaClient.getActiveTimeline().getInstantReader());
timeline = TIMELINE_FACTORY.createDefaultTimeline(Stream.of(instant4), metaClient.getActiveTimeline());
instantTimestampSec = instantTimestamp.substring(0, instantTimestamp.length() - HoodieInstantTimeGenerator.DEFAULT_MILLIS_EXT.length());
assertFalse(timeline.empty());
assertTrue(HoodieIndexUtils.checkIfValidCommit(timeline, instantTimestamp));
Expand All @@ -588,14 +588,14 @@ public void testCheckIfValidCommit() throws Exception {
String newTimestamp = writeClient.createNewInstantTime();
String newTimestampSec = newTimestamp.substring(0, newTimestamp.length() - HoodieInstantTimeGenerator.DEFAULT_MILLIS_EXT.length());
final HoodieInstant instant5 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, newTimestamp);
timeline = TIMELINE_FACTORY.createDefaultTimeline(Stream.of(instant5), metaClient.getActiveTimeline().getInstantReader());
timeline = TIMELINE_FACTORY.createDefaultTimeline(Stream.of(instant5), metaClient.getActiveTimeline());
assertFalse(timeline.empty());
assertFalse(timeline.containsInstant(checkInstantTimestamp));
assertFalse(timeline.containsInstant(checkInstantTimestampSec));

final HoodieInstant instant6 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.DELTA_COMMIT_ACTION, newTimestampSec + HoodieInstantTimeGenerator.DEFAULT_MILLIS_EXT);
timeline = TIMELINE_FACTORY.createDefaultTimeline(Stream.of(instant6), metaClient.getActiveTimeline().getInstantReader());
timeline = TIMELINE_FACTORY.createDefaultTimeline(Stream.of(instant6), metaClient.getActiveTimeline());
assertFalse(timeline.empty());
assertFalse(timeline.containsInstant(newTimestamp));
assertFalse(timeline.containsInstant(checkInstantTimestamp));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void testAverageRecordSize(List<Pair<HoodieInstant, List<HWriteStat>>> in
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp")
.build();
HoodieTableMetaClient metaClient = HoodieTestUtils.init("/tmp");
HoodieTimeline commitsTimeline = new BaseTimelineV2(metaClient.getActiveTimeline().getInstantReader());
HoodieTimeline commitsTimeline = new BaseTimelineV2(metaClient.getActiveTimeline());
List<HoodieInstant> instants = new ArrayList<>();
instantSizePairs.forEach(entry -> {
HoodieInstant hoodieInstant = entry.getKey();
Expand Down Expand Up @@ -104,7 +104,7 @@ public void testErrorHandling() throws IOException {
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withProps(Collections.singletonMap(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), String.valueOf(recordSize)))
.build(false);
BaseTimelineV2 commitsTimeline = new BaseTimelineV2(metaClient.getActiveTimeline().getInstantReader());
BaseTimelineV2 commitsTimeline = new BaseTimelineV2(metaClient.getActiveTimeline());
List<HoodieInstant> instants = Collections.singletonList(
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "1"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,6 @@ HoodieTimeline getSchemaEvolutionTimelineInReverseOrder() {
.sorted(reversedComparator);
return timelineLayout.getTimelineFactory().createDefaultTimeline(
reversedTimelineWithTableSchema,
metaClient.getActiveTimeline().getInstantReader());
metaClient.getActiveTimeline());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ public static HoodieTimeline concatTimeline(HoodieTimeline timeline1, HoodieTime
HoodieTableMetaClient metaClient) {
return metaClient.getTimelineLayout().getTimelineFactory().createDefaultTimeline(
Stream.concat(timeline1.getInstantsAsStream(), timeline2.getInstantsAsStream()).sorted(),
metaClient.getActiveTimeline().getInstantReader());
metaClient.getActiveTimeline());
}

public static boolean isDeletePartition(WriteOperationType operation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ public static HoodieTimeline toTimeline(TimelineDTO dto, HoodieTableMetaClient m
TimelineFactory factory = metaClient.getTimelineLayout().getTimelineFactory();
// TODO: For Now, we will assume, only active-timeline will be transferred.
return factory.createDefaultTimeline(dto.instants.stream().map(d -> InstantDTO.toInstant(d, instantGenerator)),
metaClient.getActiveTimeline().getInstantReader());
metaClient.getActiveTimeline());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1394,7 +1394,7 @@ public static HoodieTableFileSystemView getFileSystemView(HoodieTableMetaClient
if (timeline.empty()) {
final HoodieInstant instant = metaClient.createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION,
metaClient.createNewInstantTime(false));
timeline = factory.createDefaultTimeline(Stream.of(instant), metaClient.getActiveTimeline().getInstantReader());
timeline = factory.createDefaultTimeline(Stream.of(instant), metaClient.getActiveTimeline());
}
return new HoodieTableFileSystemView(metaClient, timeline);
}
Expand Down

0 comments on commit acc4e2e

Please sign in to comment.