Skip to content

Commit

Permalink
Include the task scheduling errors in schedule response
Browse files Browse the repository at this point in the history
  • Loading branch information
shounakmk219 committed Jan 3, 2025
1 parent f8fa1a8 commit acad7ec
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -642,21 +641,35 @@ public Map<String, String> scheduleTasks(
@ApiParam(value = "Minion Instance tag to schedule the task explicitly on") @QueryParam("minionInstanceTag")
@Nullable String minionInstanceTag, @Context HttpHeaders headers) {
String database = headers != null ? headers.getHeaderString(DATABASE) : DEFAULT_DATABASE;
Map<String, String> response = new HashMap<>();
List<String> generationErrors = new ArrayList<>();
List<String> schedulingErrors = new ArrayList<>();
if (taskType != null) {
// Schedule task for the given task type
List<String> taskNames = tableName != null ? _pinotTaskManager.scheduleTaskForTable(taskType,
DatabaseUtils.translateTableName(tableName, headers), minionInstanceTag)
PinotTaskManager.TaskSchedulingInfo taskInfos = tableName != null
? _pinotTaskManager.scheduleTaskForTable(taskType, DatabaseUtils.translateTableName(tableName, headers),
minionInstanceTag)
: _pinotTaskManager.scheduleTaskForDatabase(taskType, database, minionInstanceTag);
return Collections.singletonMap(taskType, taskNames == null ? null : StringUtils.join(taskNames, ','));
response.put(taskType, StringUtils.join(taskInfos.getScheduledTaskNames(), ','));
generationErrors.addAll(taskInfos.getGenerationErrors());
schedulingErrors.addAll(taskInfos.getSchedulingErrors());
} else {
// Schedule tasks for all task types
Map<String, List<String>> allTaskNames = tableName != null ? _pinotTaskManager.scheduleAllTasksForTable(
DatabaseUtils.translateTableName(tableName, headers), minionInstanceTag)
Map<String, PinotTaskManager.TaskSchedulingInfo> allTaskInfos = tableName != null
? _pinotTaskManager.scheduleAllTasksForTable(DatabaseUtils.translateTableName(tableName, headers),
minionInstanceTag)
: _pinotTaskManager.scheduleAllTasksForDatabase(database, minionInstanceTag);
Map<String, String> result = allTaskNames.entrySet().stream().filter(entry -> entry.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, entry -> String.join(",", entry.getValue())));
return result.isEmpty() ? null : result;
allTaskInfos.forEach((key, value) -> {
if (value.getScheduledTaskNames() != null) {
response.put(key, String.join(",", value.getScheduledTaskNames()));
}
generationErrors.addAll(value.getGenerationErrors());
schedulingErrors.addAll(value.getSchedulingErrors());
});
}
response.put("generationErrors", String.join(",", generationErrors));
response.put("schedulingErrors", String.join(",", schedulingErrors));
return response;
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,70 +486,79 @@ public void registerTaskGenerator(PinotTaskGenerator taskGenerator) {
/**
* Schedules tasks (all task types) for all tables.
* It might be called from the non-leader controller.
* Returns a map from the task type to the list of tasks scheduled.
* Returns a map from the task type to the {@link TaskSchedulingInfo} of tasks scheduled.
*/
public synchronized Map<String, List<String>> scheduleAllTasksForAllTables(@Nullable String minionInstanceTag) {
public synchronized Map<String, TaskSchedulingInfo> scheduleAllTasksForAllTables(@Nullable String minionInstanceTag) {
return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false, minionInstanceTag);
}

/**
* Schedules tasks (all task types) for all tables in the given database.
* It might be called from the non-leader controller.
* Returns a map from the task type to the list of tasks scheduled.
* Returns a map from the task type to the {@link TaskSchedulingInfo} of tasks scheduled.
*/
public synchronized Map<String, List<String>> scheduleAllTasksForDatabase(@Nullable String database,
public synchronized Map<String, TaskSchedulingInfo> scheduleAllTasksForDatabase(@Nullable String database,
@Nullable String minionInstanceTag) {
return scheduleTasks(_pinotHelixResourceManager.getAllTables(database), false, minionInstanceTag);
}

/**
* Schedules tasks (all task types) for the given table.
* It might be called from the non-leader controller.
* Returns a map from the task type to the list of tasks scheduled.
* Returns a map from the task type to the {@link TaskSchedulingInfo} of tasks scheduled.
*/
public synchronized Map<String, List<String>> scheduleAllTasksForTable(String tableNameWithType,
public synchronized Map<String, TaskSchedulingInfo> scheduleAllTasksForTable(String tableNameWithType,
@Nullable String minionInstanceTag) {
return scheduleTasks(List.of(tableNameWithType), false, minionInstanceTag);
}

/**
* Schedules task for the given task type for all tables.
* It might be called from the non-leader controller.
* Returns a list of tasks scheduled, or {@code null} if no task is scheduled.
* Returns {@link TaskSchedulingInfo} which consists
* - list of scheduled task names (empty list if nothing to schedule),
* or {@code null} if no task is scheduled due to scheduling errors.
* - list of task generation errors if any
* - list of task scheduling errors if any
*/
@Nullable
public synchronized List<String> scheduleTaskForAllTables(String taskType, @Nullable String minionInstanceTag) {
public synchronized TaskSchedulingInfo scheduleTaskForAllTables(String taskType, @Nullable String minionInstanceTag) {
return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(), minionInstanceTag);
}

/**
* Schedules task for the given task type for all tables in the given database.
* It might be called from the non-leader controller.
* Returns a list of tasks scheduled, or {@code null} if no task is scheduled.
* Returns {@link TaskSchedulingInfo} which consists
* - list of scheduled task names (empty list if nothing to schedule),
* or {@code null} if no task is scheduled due to scheduling errors.
* - list of task generation errors if any
* - list of task scheduling errors if any
*/
@Nullable
public synchronized List<String> scheduleTaskForDatabase(String taskType, @Nullable String database,
public synchronized TaskSchedulingInfo scheduleTaskForDatabase(String taskType, @Nullable String database,
@Nullable String minionInstanceTag) {
return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(database), minionInstanceTag);
}

/**
* Schedules task for the given task type for the give table.
* It might be called from the non-leader controller.
* Returns a list of tasks scheduled, or {@code null} if no task is scheduled.
* Returns {@link TaskSchedulingInfo} which consists
* - list of scheduled task names (empty list if nothing to schedule),
* or {@code null} if no task is scheduled due to scheduling errors.
* - list of task generation errors if any
* - list of task scheduling errors if any
*/
@Nullable
public synchronized List<String> scheduleTaskForTable(String taskType, String tableNameWithType,
public synchronized TaskSchedulingInfo scheduleTaskForTable(String taskType, String tableNameWithType,
@Nullable String minionInstanceTag) {
return scheduleTask(taskType, List.of(tableNameWithType), minionInstanceTag);
}

/**
* Helper method to schedule tasks (all task types) for the given tables that have the tasks enabled. Returns a map
* from the task type to the list of the tasks scheduled.
* Helper method to schedule tasks (all task types) for the given tables that have the tasks enabled.
* Returns a map from the task type to the {@link TaskSchedulingInfo} of the tasks scheduled.
*/
protected synchronized Map<String, List<String>> scheduleTasks(List<String> tableNamesWithType, boolean isLeader,
@Nullable String minionInstanceTag) {
protected synchronized Map<String, TaskSchedulingInfo> scheduleTasks(List<String> tableNamesWithType,
boolean isLeader, @Nullable String minionInstanceTag) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);

// Scan all table configs to get the tables with tasks enabled
Expand All @@ -565,7 +574,7 @@ protected synchronized Map<String, List<String>> scheduleTasks(List<String> tabl
}

// Generate each type of tasks
Map<String, List<String>> tasksScheduled = new HashMap<>();
Map<String, TaskSchedulingInfo> tasksScheduled = new HashMap<>();
for (Map.Entry<String, List<TableConfig>> entry : enabledTableConfigMap.entrySet()) {
String taskType = entry.getKey();
List<TableConfig> enabledTableConfigs = entry.getValue();
Expand All @@ -577,16 +586,18 @@ protected synchronized Map<String, List<String>> scheduleTasks(List<String> tabl
addTaskTypeMetricsUpdaterIfNeeded(taskType);
tasksScheduled.put(taskType, scheduleTask(taskGenerator, enabledTableConfigs, isLeader, minionInstanceTag));
} else {
LOGGER.warn("Task type: {} is not registered, cannot enable it for tables: {}", taskType, enabledTables);
tasksScheduled.put(taskType, null);
String message = "Task type: " + taskType + " is not registered, cannot enable it for tables: " + enabledTables;
LOGGER.warn(message);
TaskSchedulingInfo taskSchedulingInfo = new TaskSchedulingInfo();
taskSchedulingInfo.addSchedulingError(message);
tasksScheduled.put(taskType, taskSchedulingInfo);
}
}

return tasksScheduled;
}

@Nullable
protected synchronized List<String> scheduleTask(String taskType, List<String> tables,
protected synchronized TaskSchedulingInfo scheduleTask(String taskType, List<String> tables,
@Nullable String minionInstanceTag) {
PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType);
Expand All @@ -608,17 +619,23 @@ protected synchronized List<String> scheduleTask(String taskType, List<String> t

/**
* Helper method to schedule task with the given task generator for the given tables that have the task enabled.
* Returns the list of task names, or {@code null} if no task is scheduled.
* Returns
* - list of scheduled task names (empty list if nothing to schedule),
* or {@code null} if no task is scheduled due to scheduling errors.
* - list of task generation errors if any
* - list of task scheduling errors if any
*/
@Nullable
protected List<String> scheduleTask(PinotTaskGenerator taskGenerator, List<TableConfig> enabledTableConfigs,
protected TaskSchedulingInfo scheduleTask(PinotTaskGenerator taskGenerator, List<TableConfig> enabledTableConfigs,
boolean isLeader, @Nullable String minionInstanceTagForTask) {
TaskSchedulingInfo response = new TaskSchedulingInfo();
String taskType = taskGenerator.getTaskType();
List<String> enabledTables =
enabledTableConfigs.stream().map(TableConfig::getTableName).collect(Collectors.toList());
LOGGER.info("Trying to schedule task type: {}, for tables: {}, isLeader: {}", taskType, enabledTables, isLeader);
if (!isTaskSchedulable(taskType, enabledTables)) {
return null;
response.addSchedulingError("Unable to start scheduling for task type " + taskType
+ " as task queue may be stopped. Please check the task queue status.");
return response;
}
Map<String, List<PinotTaskConfig>> minionInstanceTagToTaskConfigs = new HashMap<>();
for (TableConfig tableConfig : enabledTableConfigs) {
Expand All @@ -645,6 +662,8 @@ protected List<String> scheduleTask(PinotTaskGenerator taskGenerator, List<Table
try (PrintWriter pw = new PrintWriter(errors)) {
e.printStackTrace(pw);
}
response.addGenerationError("Failed to generate tasks for task type " + taskType + " for table " + tableName
+ "\n Reason : " + errors);
long failureRunTimestamp = System.currentTimeMillis();
_taskManagerStatusCache.saveTaskGeneratorInfo(tableName, taskType,
taskGeneratorMostRecentRunInfo -> taskGeneratorMostRecentRunInfo.addErrorRunMessage(failureRunTimestamp,
Expand Down Expand Up @@ -684,17 +703,17 @@ protected List<String> scheduleTask(PinotTaskGenerator taskGenerator, List<Table
numErrorTasksScheduled++;
LOGGER.error("Failed to schedule task type {} on minion instance {} with task configs: {}", taskType,
minionInstanceTag, pinotTaskConfigs, e);
response.addSchedulingError(e.getMessage());
}
}
if (numErrorTasksScheduled > 0) {
LOGGER.warn("Failed to schedule {} tasks for task type type {}", numErrorTasksScheduled, taskType);
// No job got scheduled due to errors
if (numErrorTasksScheduled == minionInstanceTagToTaskConfigs.size()) {
return response;
}
}
// No job got scheduled
if (numErrorTasksScheduled == minionInstanceTagToTaskConfigs.size() || submittedTaskNames.isEmpty()) {
return null;
}
// atleast one job got scheduled
return submittedTaskNames;
return response.setScheduledTaskNames(submittedTaskNames);
}

@Override
Expand Down Expand Up @@ -762,4 +781,36 @@ protected boolean isTaskSchedulable(String taskType, List<String> tables) {
}
return true;
}

public static class TaskSchedulingInfo {
private List<String> _scheduledTaskNames;
private final List<String> _generationErrors = new ArrayList<>();
private final List<String> _schedulingErrors = new ArrayList<>();

@Nullable
public List<String> getScheduledTaskNames() {
return _scheduledTaskNames;
}

public TaskSchedulingInfo setScheduledTaskNames(List<String> scheduledTaskNames) {
_scheduledTaskNames = scheduledTaskNames;
return this;
}

public List<String> getGenerationErrors() {
return _generationErrors;
}

public void addGenerationError(String generationError) {
_generationErrors.add(generationError);
}

public List<String> getSchedulingErrors() {
return _schedulingErrors;
}

public void addSchedulingError(String schedulingError) {
_schedulingErrors.add(schedulingError);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,13 @@ const TaskQueueTable = (props) => {
if (get(res, `${taskType}`, null) === null) {
dispatch({
type: 'error',
message: `Could not schedule task`,
message: `Could not schedule task.\nTask generation errors : ${get(res, 'generationErrors', 'none')}.\nTask scheduling errors : ${get(res, 'schedulingErrors', 'none')}`,
show: true
});
} else if (get(res, `${taskType}`, null) === '') {
dispatch({
type: 'success',
message: `No task to schedule`,
show: true
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ public void testPinotTaskManagerScheduleTaskWithStoppedTaskQueue()
throws Exception {
testValidateTaskGeneration(taskManager -> {
// Validate schedule tasks for table when task queue is in stopped state
List<String> taskIDs = taskManager.scheduleTaskForTable("SegmentGenerationAndPushTask", "myTable", null);
List<String> taskIDs = taskManager.scheduleTaskForTable("SegmentGenerationAndPushTask", "myTable", null)
.getScheduledTaskNames();
assertNull(taskIDs);
return null;
});
Expand Down
Loading

0 comments on commit acad7ec

Please sign in to comment.