From 162a27c927a474ede12223ecc8b30bf784a4661e Mon Sep 17 00:00:00 2001 From: Pedro Eugenio Rocha Pedreira Date: Wed, 5 Feb 2025 08:25:36 -0800 Subject: [PATCH] feat(cursor): Enable TaskCursor to take a memory pool (#12261) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/12261 When task cursor runs in multi-threaded mode, vectors are copied from the input memory pool to an output one created by the TaskCursor object. Adding a configuration parameter to enable clients of this API to manually pass a custom pool and allow them to control their lifetime. Reviewed By: Yuhta Differential Revision: D69163034 fbshipit-source-id: 84be6db845260e1b40cfabe32b0c92d7c25b55fb --- velox/exec/Cursor.cpp | 4 +++- velox/exec/Cursor.h | 15 +++++++++++++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/velox/exec/Cursor.cpp b/velox/exec/Cursor.cpp index 9b394b985d09..378ff4abbcf3 100644 --- a/velox/exec/Cursor.cpp +++ b/velox/exec/Cursor.cpp @@ -219,7 +219,9 @@ class MultiThreadedTaskCursor : public TaskCursorBase { queryCtx_->isExecutorSupplied(), "Executor should be set in parallel task cursor"); - queue_ = std::make_shared(params.bufferedBytes); + queue_ = + std::make_shared(params.bufferedBytes, params.outputPool); + // Captured as a shared_ptr by the consumer callback of task_. auto queue = queue_; task_ = Task::create( diff --git a/velox/exec/Cursor.h b/velox/exec/Cursor.h index 02bcb1d813cc..45f3cacc934f 100644 --- a/velox/exec/Cursor.h +++ b/velox/exec/Cursor.h @@ -49,6 +49,12 @@ struct CursorParameters { uint64_t bufferedBytes{512 * 1024}; + /// An optional memory pool to be used to allocate vectors returned by + /// MultiThreadedTaskCursor. A new pool is created if not specified. + /// + /// Only used if serialExecution is false. + std::shared_ptr outputPool; + /// Ungrouped (by default) or grouped (bucketed) execution. core::ExecutionStrategy executionStrategy{ core::ExecutionStrategy::kUngrouped}; @@ -82,8 +88,13 @@ class TaskQueue { uint64_t bytes; }; - explicit TaskQueue(uint64_t maxBytes) - : pool_(memory::memoryManager()->addLeafPool()), maxBytes_(maxBytes) {} + explicit TaskQueue( + uint64_t maxBytes, + const std::shared_ptr& outputPool) + : pool_( + outputPool != nullptr ? outputPool + : memory::memoryManager()->addLeafPool()), + maxBytes_(maxBytes) {} void setNumProducers(int32_t n) { numProducers_ = n;