Skip to content

Commit

Permalink
Fix NPEs when serializing timeline
Browse files Browse the repository at this point in the history
  • Loading branch information
Davis-Zhang-Onehouse committed Feb 12, 2025
1 parent acc4e2e commit b29cd6b
Show file tree
Hide file tree
Showing 14 changed files with 100 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@
import java.util.List;

import static org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
import static org.apache.hudi.common.testutils.HoodieTestUtils.TIMELINE_FACTORY;
import static org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertSame;

/**
* Test cases for {@link HoodieArchivedTimeline}.
Expand Down Expand Up @@ -74,6 +76,11 @@ public void testLoadingInstantsIncrementally() throws Exception {
assertThat(archivedTimeline.firstInstant().map(HoodieInstant::requestedTime).orElse(""), is("10000011"));
}

@Test
void getInstantReaderReferencesSelf() {
HoodieArchivedTimeline timeline = TIMELINE_FACTORY.createArchivedTimeline(metaClient);
assertSame(timeline, timeline.getInstantReader());
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ void testPartitionsForIncrCleaning(boolean isPartitioned, HoodieWriteConfig conf
getCleanCommitMetadata(partitionsInLastClean, lastCleanInstant, earliestInstantsInLastClean, lastCompletedTimeInLastClean,
savepointsTrackedInLastClean.keySet(), expectedEarliestSavepointInLastClean);
HoodieCleanerPlan cleanerPlan = mockLastCleanCommit(mockHoodieTable, lastCleanInstant, earliestInstantsInLastClean, activeTimeline, cleanMetadataOptionPair, savepointsTrackedInLastClean.keySet());
mockFewActiveInstants(instantReader, mockHoodieTable, activeTimeline, activeInstantsPartitions, savepointsTrackedInLastClean, areCommitsForSavepointsRemoved, replaceCommits);
mockFewActiveInstants(mockHoodieTable, activeTimeline, activeInstantsPartitions, savepointsTrackedInLastClean, areCommitsForSavepointsRemoved, replaceCommits);

// mock getAllPartitions
HoodieStorage storage = mock(HoodieStorage.class);
Expand Down Expand Up @@ -655,10 +655,10 @@ private static HoodieCleanerPlan mockLastCleanCommit(HoodieTable hoodieTable, St
return cleanerPlan;
}

private static void mockFewActiveInstants(HoodieInstantReader instantReader, HoodieTable hoodieTable, HoodieActiveTimeline activeTimeline, Map<String, List<String>> activeInstantsToPartitions,
private static void mockFewActiveInstants(HoodieTable hoodieTable, HoodieActiveTimeline activeTimeline, Map<String, List<String>> activeInstantsToPartitions,
Map<String, List<String>> savepointedCommitsToAdd, boolean areCommitsForSavepointsRemoved, List<String> replaceCommits)
throws IOException {
BaseTimelineV2 commitsTimeline = new BaseTimelineV2(instantReader);
BaseTimelineV2 commitsTimeline = new BaseTimelineV2();
List<HoodieInstant> instants = new ArrayList<>();
Map<String, List<String>> instantstoProcess = new HashMap<>();
instantstoProcess.putAll(activeInstantsToPartitions);
Expand Down Expand Up @@ -688,12 +688,12 @@ private static void mockFewActiveInstants(HoodieInstantReader instantReader, Hoo
when(hoodieTable.getActiveTimeline().getInstantsAsStream()).thenReturn(instants.stream());
when(hoodieTable.getCompletedCommitsTimeline()).thenReturn(commitsTimeline);

BaseTimelineV2 savepointTimeline = new BaseTimelineV2(instantReader);
BaseTimelineV2 savepointTimeline = new BaseTimelineV2();
List<HoodieInstant> savepointInstants = savepointedCommitsToAdd.keySet().stream().map(sp -> INSTANT_GENERATOR.createNewInstant(COMPLETED, HoodieTimeline.SAVEPOINT_ACTION, sp))
.collect(Collectors.toList());
savepointTimeline.setInstants(savepointInstants);

BaseTimelineV2 completedReplaceTimeline = new BaseTimelineV2(instantReader);
BaseTimelineV2 completedReplaceTimeline = new BaseTimelineV2();
List<HoodieInstant> completedReplaceInstants = replaceCommits.stream().map(rc -> INSTANT_GENERATOR.createNewInstant(COMPLETED, HoodieTimeline.REPLACE_COMMIT_ACTION, rc))
.collect(Collectors.toList());
completedReplaceTimeline.setInstants(completedReplaceInstants);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void testAverageRecordSize(List<Pair<HoodieInstant, List<HWriteStat>>> in
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp")
.build();
HoodieTableMetaClient metaClient = HoodieTestUtils.init("/tmp");
HoodieTimeline commitsTimeline = new BaseTimelineV2(metaClient.getActiveTimeline());
HoodieTimeline commitsTimeline = new BaseTimelineV2();
List<HoodieInstant> instants = new ArrayList<>();
instantSizePairs.forEach(entry -> {
HoodieInstant hoodieInstant = entry.getKey();
Expand Down Expand Up @@ -99,12 +99,11 @@ public void testAverageRecordSize(List<Pair<HoodieInstant, List<HWriteStat>>> in

@Test
public void testErrorHandling() throws IOException {
HoodieTableMetaClient metaClient = HoodieTestUtils.init("/tmp");
int recordSize = 10000;
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withProps(Collections.singletonMap(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), String.valueOf(recordSize)))
.build(false);
BaseTimelineV2 commitsTimeline = new BaseTimelineV2(metaClient.getActiveTimeline());
BaseTimelineV2 commitsTimeline = new BaseTimelineV2();
List<HoodieInstant> instants = Collections.singletonList(
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "1"));

Expand Down
Loading

0 comments on commit b29cd6b

Please sign in to comment.