Skip to content

Commit

Permalink
Adds tests
Browse files Browse the repository at this point in the history
  • Loading branch information
noob-se7en committed Jan 16, 2025
1 parent 3297ddd commit 748d0d3
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1781,22 +1781,27 @@ private void executeBatch(String tableNameWithType, Set<String> segmentBatchToCo
}
}

private List<Set<String>> getSegmentBatchList(IdealState idealState, Set<String> targetConsumingSegments,
@VisibleForTesting
List<Set<String>> getSegmentBatchList(IdealState idealState, Set<String> targetConsumingSegments,
int batchSize) {
Map<String, Queue<String>> instanceToConsumingSegments =
getInstanceToConsumingSegments(idealState, targetConsumingSegments);

Set<String> segmentsAdded = new HashSet<>();
List<Set<String>> segmentBatchList = new ArrayList<>();
Set<String> currentBatch = new HashSet<>();
Set<String> segmentsAdded = new HashSet<>();
boolean segmentsRemaining = true;

while (segmentsRemaining) {
segmentsRemaining = false;
// pick segments in round-robin fashion to parallelize
// forceCommit across max servers
for (Queue<String> queue : instanceToConsumingSegments.values()) {
if (!queue.isEmpty()) {
segmentsRemaining = true;
String segmentName = queue.poll();
// there might be a segment replica hosted on
// another instance added before
if (segmentsAdded.contains(segmentName)) {
continue;
}
Expand All @@ -1816,7 +1821,8 @@ private List<Set<String>> getSegmentBatchList(IdealState idealState, Set<String>
return segmentBatchList;
}

private Map<String, Queue<String>> getInstanceToConsumingSegments(IdealState idealState,
@VisibleForTesting
Map<String, Queue<String>> getInstanceToConsumingSegments(IdealState idealState,
Set<String> targetConsumingSegments) {
Map<String, Queue<String>> instanceToConsumingSegments = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.pinot.controller.helix.core.realtime;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
Expand All @@ -30,9 +33,12 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -1247,6 +1253,148 @@ public void testGetPartitionIds()
Assert.assertEquals(partitionIds.size(), 2);
}

@Test
public void testGetInstanceToConsumingSegments() {
PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class);
FakePinotLLCRealtimeSegmentManager realtimeSegmentManager =
new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager);
IdealState idealState = mock(IdealState.class);
Map<String, Map<String, String>> map = ImmutableMap.of(
"seg0", ImmutableMap.of("i1", "CONSUMING", "i4", "ONLINE"),
"seg1", ImmutableMap.of("i2", "CONSUMING"),
"seg2", ImmutableMap.of("i3", "CONSUMING", "i2", "OFFLINE"),
"seg3", ImmutableMap.of("i4", "CONSUMING", "i2", "CONSUMING", "i3", "CONSUMING"),
"seg4", ImmutableMap.of("i5", "CONSUMING", "i1", "CONSUMING", "i3", "CONSUMING")
);

ZNRecord znRecord = mock(ZNRecord.class);
when(znRecord.getMapFields()).thenReturn(map);
when(idealState.getRecord()).thenReturn(znRecord);
Set<String> targetConsumingSegment = new HashSet<>(map.keySet());

Map<String, Queue<String>> instanceToConsumingSegments =
realtimeSegmentManager.getInstanceToConsumingSegments(idealState, targetConsumingSegment);
List<String> instanceList = ImmutableList.of("i1", "i2", "i3", "i4", "i5");

StringBuilder expectedSegNames = new StringBuilder();
for (String instanceName : instanceList) {
SortedSet<String> sortedSegNames = new TreeSet<>(instanceToConsumingSegments.get(instanceName));
expectedSegNames.append(sortedSegNames);
}
assert expectedSegNames.toString()
.equals("[seg0, seg4][seg1, seg3][seg2, seg3, seg4][seg3][seg4]");
}

@Test
public void getSegmentBatchList() {
PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class);
FakePinotLLCRealtimeSegmentManager realtimeSegmentManager =
new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager);
IdealState idealState = mock(IdealState.class);

Map<String, Map<String, String>> map = ImmutableMap.of(
"seg0", ImmutableMap.of("i1", "CONSUMING", "i4", "ONLINE"),
"seg1", ImmutableMap.of("i2", "CONSUMING"),
"seg2", ImmutableMap.of("i3", "CONSUMING", "i2", "OFFLINE"),
"seg3", ImmutableMap.of("i4", "CONSUMING", "i2", "CONSUMING", "i3", "CONSUMING"),
"seg4", ImmutableMap.of("i5", "CONSUMING", "i1", "CONSUMING", "i3", "CONSUMING"),
"seg5", ImmutableMap.of("i6", "CONSUMING", "i1", "CONSUMING", "i3", "CONSUMING"),
"seg6", ImmutableMap.of("i7", "CONSUMING", "i1", "CONSUMING", "i3", "CONSUMING")
);

ZNRecord znRecord = mock(ZNRecord.class);
when(znRecord.getMapFields()).thenReturn(map);
when(idealState.getRecord()).thenReturn(znRecord);
Set<String> targetConsumingSegment = new HashSet<>(map.keySet());

List<Set<String>> segmentBatchList =
realtimeSegmentManager.getSegmentBatchList(idealState, targetConsumingSegment, 2);

// i1 = [seg0, seg4, seg5, seg6]
// i2 = [seg1, seg3]
// i3 = [seg2, seg3, seg4, seg5, seg6]
// i4 = [seg3]
// i5 = [seg4]
// i6 = [seg5]
// i7 = [seg6]

assert segmentBatchList.size() == 4;
Set<String> segmentsAdded = new HashSet<>();

for (Set<String> segmentBatch : segmentBatchList) {
assert segmentBatch.size() <= 2;
for (String segmentName : segmentBatch) {
assert !segmentsAdded.contains(segmentName);
segmentsAdded.add(segmentName);
}
}

Random random = new Random();
int numOfServers = 1 + random.nextInt(20);
int numOfSegments = Math.max(numOfServers, random.nextInt(500));
int rf = Math.min(numOfServers, random.nextInt(7));
int batchSize = random.nextInt(100);

map = new HashMap<>();
for (int segmentIndex = 0; segmentIndex < numOfSegments; segmentIndex++) {
String segmentName = "seg_" + segmentIndex;
Map<String, String> instanceToStateMap = new HashMap<>();
for (int rfIndex = 0; rfIndex < rf; rfIndex++) {
instanceToStateMap.put("i_" + random.nextInt(numOfServers), "CONSUMING");
}
map.put(segmentName, instanceToStateMap);
}

znRecord = mock(ZNRecord.class);
when(znRecord.getMapFields()).thenReturn(map);
when(idealState.getRecord()).thenReturn(znRecord);
targetConsumingSegment = new HashSet<>(map.keySet());

segmentBatchList = realtimeSegmentManager.getSegmentBatchList(idealState, targetConsumingSegment, batchSize);
int numBatchesExpected = (targetConsumingSegment.size() + batchSize - 1) / batchSize;

assert numBatchesExpected == segmentBatchList.size();

segmentsAdded = new HashSet<>();
for (Set<String> batch : segmentBatchList) {
assert batch.size() <= batchSize;
for (String segmentName : batch) {
assert !segmentsAdded.contains(segmentName);
segmentsAdded.add(segmentName);
}
}

assert segmentsAdded.equals(targetConsumingSegment);
}

@Test
public void getSegmentsYetToBeCommitted() {
PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class);
FakePinotLLCRealtimeSegmentManager realtimeSegmentManager =
new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager);

SegmentZKMetadata mockSegmentZKMetadataDone = mock(SegmentZKMetadata.class);
when(mockSegmentZKMetadataDone.getStatus()).thenReturn(Status.DONE);

SegmentZKMetadata mockSegmentZKMetadataUploaded = mock(SegmentZKMetadata.class);
when(mockSegmentZKMetadataUploaded.getStatus()).thenReturn(Status.UPLOADED);

SegmentZKMetadata mockSegmentZKMetadataInProgress = mock(SegmentZKMetadata.class);
when(mockSegmentZKMetadataInProgress.getStatus()).thenReturn(Status.IN_PROGRESS);

when(mockHelixResourceManager.getSegmentZKMetadata("test", "s0")).thenReturn(mockSegmentZKMetadataDone);
when(mockHelixResourceManager.getSegmentZKMetadata("test", "s3")).thenReturn(mockSegmentZKMetadataDone);
when(mockHelixResourceManager.getSegmentZKMetadata("test", "s2")).thenReturn(mockSegmentZKMetadataUploaded);
when(mockHelixResourceManager.getSegmentZKMetadata("test", "s4")).thenReturn(mockSegmentZKMetadataInProgress);
when(mockHelixResourceManager.getSegmentZKMetadata("test", "s1")).thenReturn(null);

Set<String> segmentsToCheck = ImmutableSet.of("s0", "s1", "s2", "s3", "s4");
Set<String> segmentsYetToBeCommitted = realtimeSegmentManager.getSegmentsYetToBeCommitted("test", segmentsToCheck);

assert ImmutableSet.of("s4").equals(segmentsYetToBeCommitted);
}


//////////////////////////////////////////////////////////////////////////////////
// Fake classes
/////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.integration.tests;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -428,6 +429,19 @@ public void testForceCommit()
throws Exception {
Set<String> consumingSegments = getConsumingSegmentsFromIdealState(getTableName() + "_REALTIME");
String jobId = forceCommit(getTableName());
testForceCommitInternal(jobId, consumingSegments);
}

@Test
public void testForceCommitInBatches()
throws Exception {
Set<String> consumingSegments = getConsumingSegmentsFromIdealState(getTableName() + "_REALTIME");
String jobId = forceCommit(getTableName(), 1);
testForceCommitInternal(jobId, consumingSegments);
}

private void testForceCommitInternal(String jobId, Set<String> consumingSegments)
throws JsonProcessingException {
Map<String, String> jobMetadata =
_helixResourceManager.getControllerJobZKMetadata(jobId, ControllerJobType.FORCE_COMMIT);
assert jobMetadata != null;
Expand Down Expand Up @@ -490,6 +504,13 @@ private String forceCommit(String tableName)
return JsonUtils.stringToJsonNode(response).get("forceCommitJobId").asText();
}

private String forceCommit(String tableName, int batchSize)
throws Exception {
String response =
sendPostRequest(_controllerRequestURLBuilder.forTableForceCommit(tableName) + "?batchSize=" + batchSize, null);
return JsonUtils.stringToJsonNode(response).get("forceCommitJobId").asText();
}

@Test
@Override
public void testHardcodedServerPartitionedSqlQueries()
Expand Down

0 comments on commit 748d0d3

Please sign in to comment.