Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
shounakmk219 committed Jan 6, 2025
1 parent acad7ec commit 8976428
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 35 deletions.
14 changes: 13 additions & 1 deletion pinot-controller/src/main/resources/app/pages/TaskQueueTable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,19 @@ const TaskQueueTable = (props) => {
if (get(res, `${taskType}`, null) === null) {
dispatch({
type: 'error',
message: `Could not schedule task.\nTask generation errors : ${get(res, 'generationErrors', 'none')}.\nTask scheduling errors : ${get(res, 'schedulingErrors', 'none')}`,
message: (
<Box>
<Typography>
Could not schedule task
</Typography>
<Typography>
Task generation errors : {get(res, 'generationErrors', 'none')}
</Typography>
<Typography>
Task scheduling errors : {get(res, 'schedulingErrors', 'none')}
</Typography>
</Box>
),
show: true
});
} else if (get(res, `${taskType}`, null) === '') {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.integration.tests;

import java.util.Map;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;

import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;


public class MinionTaskTestUtils {
private MinionTaskTestUtils() {
}

public static void assertNoTaskSchedule(String offlineTableName, String taskType, PinotTaskManager taskManager) {
PinotTaskManager.TaskSchedulingInfo info =
taskManager.scheduleAllTasksForTable(offlineTableName, null).get(taskType);
assertNoTaskSchedule(info);
}

public static void assertNoTaskSchedule(String taskType, PinotTaskManager taskManager) {
PinotTaskManager.TaskSchedulingInfo info = taskManager.scheduleTaskForAllTables(taskType, null);
assertNoTaskSchedule(info);
}

public static void assertNoTaskSchedule(PinotTaskManager taskManager) {
Map<String, PinotTaskManager.TaskSchedulingInfo> infoMap = taskManager.scheduleAllTasksForAllTables(null);
infoMap.forEach((key, value) -> assertNoTaskSchedule(value));
}

public static void assertNoTaskSchedule(PinotTaskManager.TaskSchedulingInfo info) {
assertNotNull(info.getScheduledTaskNames());
assertTrue(info.getScheduledTaskNames().isEmpty());
assertNotNull(info.getGenerationErrors());
assertTrue(info.getGenerationErrors().isEmpty());
assertNotNull(info.getSchedulingErrors());
assertTrue(info.getSchedulingErrors().isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ public void testOfflineTableSingleLevelConcat()
tasks != null;
taskList = _taskManager.scheduleAllTasksForTable(offlineTableName, null)
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames(),
tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
tasks = taskList != null && !taskList.isEmpty() ? taskList.get(0) : null, numTasks++) {
assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
Expand Down Expand Up @@ -530,7 +530,7 @@ public void testOfflineTableSingleLevelConcatWithMetadataPush()
tasks != null;
taskList = _taskManager.scheduleAllTasksForTable(offlineTableName, null)
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames(),
tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
tasks = taskList != null && !taskList.isEmpty() ? taskList.get(0) : null, numTasks++) {
assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
Expand Down Expand Up @@ -639,7 +639,7 @@ public void testOfflineTableSingleLevelRollup()
tasks != null;
taskList = _taskManager.scheduleAllTasksForTable(offlineTableName, null)
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames(),
tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
tasks = taskList != null && !taskList.isEmpty() ? taskList.get(0) : null, numTasks++) {
assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), 1);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
Expand Down Expand Up @@ -791,7 +791,7 @@ public void testOfflineTableMultiLevelConcat()
tasks != null;
taskList = _taskManager.scheduleAllTasksForTable(offlineTableName, null)
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames(),
tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
tasks = taskList != null && !taskList.isEmpty() ? taskList.get(0) : null, numTasks++) {
assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
Expand Down Expand Up @@ -924,7 +924,7 @@ public void testRealtimeTableSingleLevelConcat()
tasks != null;
taskList = taskManager.scheduleAllTasksForTable(realtimeTableName, null)
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames(),
tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
tasks = taskList != null && !taskList.isEmpty() ? taskList.get(0) : null, numTasks++) {
// assertEquals(helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
assertTrue(helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
Expand Down Expand Up @@ -1029,7 +1029,7 @@ public void testRealtimeTableProcessAllModeMultiLevelConcat()
get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames().get(0); tasks != null;
taskList = taskManager.scheduleAllTasksForTable(realtimeTableName, null)
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames(),
tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
tasks = taskList != null && !taskList.isEmpty() ? taskList.get(0) : null, numTasks++) {
assertTrue(helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));

Expand Down Expand Up @@ -1071,7 +1071,7 @@ public void testRealtimeTableProcessAllModeMultiLevelConcat()
tasks != null;
taskList = taskManager.scheduleAllTasksForTable(realtimeTableName, null)
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames(),
tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
tasks = taskList != null && !taskList.isEmpty() ? taskList.get(0) : null, numTasks++) {
waitForTaskToComplete();
// Check metrics
long numBucketsToProcess = MetricValueUtils.getGaugeValue(_controllerStarter.getControllerMetrics(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;


Expand Down Expand Up @@ -191,7 +190,7 @@ public void testFirstRunPurge()
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE));
MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, MinionConstants.PurgeTask.TASK_TYPE, _taskManager);
waitForTaskToComplete();

// Check that metadata contains expected values
Expand All @@ -201,7 +200,7 @@ public void testFirstRunPurge()
metadata.getCustomMap().containsKey(MinionConstants.PurgeTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX));
}
// Should not generate new purge task as the last time purge is not greater than last + 1day (default purge delay)
assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE));
MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, MinionConstants.PurgeTask.TASK_TYPE, _taskManager);

// 52 rows with ArrTime = 1
// 115545 totals rows
Expand Down Expand Up @@ -236,7 +235,7 @@ public void testPassedDelayTimePurge()
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE));
MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, MinionConstants.PurgeTask.TASK_TYPE, _taskManager);
waitForTaskToComplete();

// Check that metadata contains expected values
Expand All @@ -248,7 +247,7 @@ public void testPassedDelayTimePurge()
assertTrue(System.currentTimeMillis() - Long.parseLong(purgeTime) < 86400000);
}
// Should not generate new purge task as the last time purge is not greater than last + 1day (default purge delay)
assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE));
MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, MinionConstants.PurgeTask.TASK_TYPE, _taskManager);

// 52 rows with ArrTime = 1
// 115545 totals rows
Expand Down Expand Up @@ -280,7 +279,7 @@ public void testNotPassedDelayTimePurge()
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_NOT_PASSED_TABLE);

// No task should be schedule as the delay is not passed
assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE));
MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, MinionConstants.PurgeTask.TASK_TYPE, _taskManager);
for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
// Check purge time
String purgeTime =
Expand Down Expand Up @@ -335,7 +334,7 @@ public void testPurgeOnOldSegmentsWithIndicesOnNewColumns()
_taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE));
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE));
MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, MinionConstants.PurgeTask.TASK_TYPE, _taskManager);
waitForTaskToComplete();

// Check that metadata contains expected values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ public void testRealtimeToOfflineSegmentsTask()
assertTrue(_taskResourceManager.getTaskQueues().contains(
PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)));
// Should not generate more tasks
assertNull(_taskManager.scheduleAllTasksForTable(_realtimeTableName, null)
.get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
MinionTaskTestUtils.assertNoTaskSchedule(_realtimeTableName,
MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, _taskManager);

// Wait at most 600 seconds for all tasks COMPLETED
waitForTaskToComplete(expectedWatermark, _realtimeTableName);
Expand Down Expand Up @@ -288,8 +288,8 @@ public void testRealtimeToOfflineSegmentsMetadataPushTask()
assertTrue(_taskResourceManager.getTaskQueues().contains(
PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)));
// Should not generate more tasks
assertNull(_taskManager.scheduleAllTasksForTable(_realtimeMetadataTableName, null)
.get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
MinionTaskTestUtils.assertNoTaskSchedule(_realtimeMetadataTableName,
MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, _taskManager);

// Wait at most 600 seconds for all tasks COMPLETED
waitForTaskToComplete(expectedWatermark, _realtimeMetadataTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ public void testFirstSegmentRefresh() {
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
.get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, MinionConstants.RefreshSegmentTask.TASK_TYPE,
_taskManager);
waitForTaskToComplete();

// Check that metadata contains expected values
Expand All @@ -128,8 +128,8 @@ public void testFirstSegmentRefresh() {
}

// This should be no-op as nothing changes.
assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
.get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, MinionConstants.RefreshSegmentTask.TASK_TYPE,
_taskManager);
for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
// Get the value in segment metadata
Map<String, String> customMap = metadata.getCustomMap();
Expand Down Expand Up @@ -158,8 +158,8 @@ public void testValidDatatypeChange() throws Exception {
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
.get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, MinionConstants.RefreshSegmentTask.TASK_TYPE,
_taskManager);
waitForTaskToComplete();

waitForServerSegmentDownload(aVoid -> {
Expand Down Expand Up @@ -237,8 +237,8 @@ public void testIndexChanges() throws Exception {
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
.get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, MinionConstants.RefreshSegmentTask.TASK_TYPE,
_taskManager);
waitForTaskToComplete();

waitForServerSegmentDownload(aVoid -> {
Expand Down Expand Up @@ -328,8 +328,8 @@ public void checkColumnAddition() throws Exception {
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
.get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, MinionConstants.RefreshSegmentTask.TASK_TYPE,
_taskManager);
waitForTaskToComplete();

// Check that metadata contains processed times.
Expand Down Expand Up @@ -406,8 +406,8 @@ public void checkRefreshNotNecessary() throws Exception {
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
.get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, MinionConstants.RefreshSegmentTask.TASK_TYPE,
_taskManager);
waitForTaskToComplete();

// Check that metadata contains expected values
Expand All @@ -423,8 +423,8 @@ public void checkRefreshNotNecessary() throws Exception {
}

// This should be no-op as nothing changes.
assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
.get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, MinionConstants.RefreshSegmentTask.TASK_TYPE,
_taskManager);
for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
// Get the value in segment metadata
Map<String, String> customMap = metadata.getCustomMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ public void testStopResumeDeleteTaskQueue() {
// Should not generate more tasks since SimpleMinionClusterIntegrationTests.NUM_TASKS is 2.
// Our test task generator does not generate if there are already this many sub-tasks in the
// running+waiting count already.
assertNull(_taskManager.scheduleAllTasksForAllTables(null).get(TASK_TYPE));
assertNull(_taskManager.scheduleTaskForAllTables(TASK_TYPE, null));
MinionTaskTestUtils.assertNoTaskSchedule(_taskManager);
MinionTaskTestUtils.assertNoTaskSchedule(TASK_TYPE, _taskManager);

// Wait at most 60 seconds for all tasks IN_PROGRESS
TestUtils.waitForCondition(input -> {
Expand Down

0 comments on commit 8976428

Please sign in to comment.