Skip to content

Commit

Permalink
Fetch active task payloads from memory (apache#15377)
Browse files Browse the repository at this point in the history
The TaskQueue maintains a map of active task ids to tasks, which can be utilized to get active task payloads, before falling back to the metadata store.
  • Loading branch information
AmatyaAvadhanula authored Nov 17, 2023
1 parent 6a5da5a commit 77828be
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,17 @@ public CoordinatorRunStats getQueueStats()
return stats;
}

public Optional<Task> getActiveTask(String id)
{
giant.lock();
try {
return Optional.fromNullable(tasks.get(id));
}
finally {
giant.unlock();
}
}

@VisibleForTesting
List<Task> getTasks()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@ public class TaskStorageQueryAdapter
{
private final TaskStorage storage;
private final TaskLockbox taskLockbox;
private final Optional<TaskQueue> taskQueue;

@Inject
public TaskStorageQueryAdapter(TaskStorage storage, TaskLockbox taskLockbox)
public TaskStorageQueryAdapter(TaskStorage storage, TaskLockbox taskLockbox, TaskMaster taskMaster)
{
this.storage = storage;
this.taskLockbox = taskLockbox;
this.taskQueue = taskMaster.getTaskQueue();
}

public List<Task> getActiveTasks()
Expand Down Expand Up @@ -104,6 +106,12 @@ public List<TaskStatusPlus> getTaskStatusPlusList(

public Optional<Task> getTask(final String taskid)
{
if (taskQueue.isPresent()) {
Optional<Task> activeTask = taskQueue.get().getActiveTask(taskid);
if (activeTask.isPresent()) {
return activeTask;
}
}
return storage.getTask(taskid);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,10 @@ private TaskStorage setUpTaskStorage()
default:
throw new RE("Unknown task storage type [%s]", taskStorageType);
}
tsqa = new TaskStorageQueryAdapter(taskStorage, taskLockbox);
TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent()).anyTimes();
EasyMock.replay(taskMaster);
tsqa = new TaskStorageQueryAdapter(taskStorage, taskLockbox, taskMaster);
return taskStorage;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ public void testOverlordRun() throws Exception
Assert.assertEquals(taskMaster.getCurrentLeader(), druidNode.getHostAndPort());
Assert.assertEquals(Optional.absent(), taskMaster.getRedirectLocation());

final TaskStorageQueryAdapter taskStorageQueryAdapter = new TaskStorageQueryAdapter(taskStorage, taskLockbox);
final TaskStorageQueryAdapter taskStorageQueryAdapter = new TaskStorageQueryAdapter(taskStorage, taskLockbox, taskMaster);
final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter = new WorkerTaskRunnerQueryAdapter(taskMaster, null);
// Test Overlord resource stuff
overlordResource = new OverlordResource(
Expand Down

0 comments on commit 77828be

Please sign in to comment.