Skip to content

Commit

Permalink
feat(cursor): Enable TaskCursor to take a memory pool (facebookincuba…
Browse files Browse the repository at this point in the history
…tor#12261)

Summary:
Pull Request resolved: facebookincubator#12261

When task cursor runs in multi-threaded mode, vectors are copied from
the input memory pool to an output one created by the TaskCursor object. Adding
a configuration parameter to enable clients of this API to manually pass a
custom pool and allow them to control their lifetime.

Reviewed By: Yuhta

Differential Revision: D69163034

fbshipit-source-id: 84be6db845260e1b40cfabe32b0c92d7c25b55fb
  • Loading branch information
pedroerp authored and facebook-github-bot committed Feb 5, 2025
1 parent ddc20e6 commit 162a27c
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
4 changes: 3 additions & 1 deletion velox/exec/Cursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,9 @@ class MultiThreadedTaskCursor : public TaskCursorBase {
queryCtx_->isExecutorSupplied(),
"Executor should be set in parallel task cursor");

queue_ = std::make_shared<TaskQueue>(params.bufferedBytes);
queue_ =
std::make_shared<TaskQueue>(params.bufferedBytes, params.outputPool);

// Captured as a shared_ptr by the consumer callback of task_.
auto queue = queue_;
task_ = Task::create(
Expand Down
15 changes: 13 additions & 2 deletions velox/exec/Cursor.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ struct CursorParameters {

uint64_t bufferedBytes{512 * 1024};

/// An optional memory pool to be used to allocate vectors returned by
/// MultiThreadedTaskCursor. A new pool is created if not specified.
///
/// Only used if serialExecution is false.
std::shared_ptr<memory::MemoryPool> outputPool;

/// Ungrouped (by default) or grouped (bucketed) execution.
core::ExecutionStrategy executionStrategy{
core::ExecutionStrategy::kUngrouped};
Expand Down Expand Up @@ -82,8 +88,13 @@ class TaskQueue {
uint64_t bytes;
};

explicit TaskQueue(uint64_t maxBytes)
: pool_(memory::memoryManager()->addLeafPool()), maxBytes_(maxBytes) {}
explicit TaskQueue(
uint64_t maxBytes,
const std::shared_ptr<memory::MemoryPool>& outputPool)
: pool_(
outputPool != nullptr ? outputPool
: memory::memoryManager()->addLeafPool()),
maxBytes_(maxBytes) {}

void setNumProducers(int32_t n) {
numProducers_ = n;
Expand Down

0 comments on commit 162a27c

Please sign in to comment.