Skip to content

Commit

Permalink
Use global command queues
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Pereanu <bogdan.pereanu@intel.com>

Adding test case

Signed-off-by: Bogdan Pereanu <bogdan.pereanu@intel.com>

Destroy pipeline if it was created but workload type is not supported

Signed-off-by: Bogdan Pereanu <bogdan.pereanu@intel.com>

Update tests, command queue is created and set at the first infer

Signed-off-by: Bogdan Pereanu <bogdan.pereanu@intel.com>

Update the names of the variables, methods, classes

Signed-off-by: Bogdan Pereanu <bogdan.pereanu@intel.com>

Create a static instance for CommandQueueManager class and lock get and free methods

Signed-off-by: Bogdan Pereanu <bogdan.pereanu@intel.com>

Add new func test

Signed-off-by: Bogdan Pereanu <bogdan.pereanu@intel.com>

Print correct error message

Signed-off-by: Bogdan Pereanu <bogdan.pereanu@intel.com>

Run test only on newer drivers

Signed-off-by: Bogdan Pereanu <bogdan.pereanu@intel.com>

Create event pool and events only if they are used

Signed-off-by: Bogdan Pereanu <bogdan.pereanu@intel.com>

Add new test case for changing priority, turbo and workload type

Signed-off-by: Bogdan Pereanu <bogdan.pereanu@intel.com>

Destroy pipeline even when use count is 0

Signed-off-by: Bogdan Pereanu <bogdan.pereanu@intel.com>

Make sure that the pipeline is still alive when fences are destroyed

Signed-off-by: Bogdan Pereanu <bogdan.pereanu@intel.com>

Change logic to use dynamic unordered map

Signed-off-by: Bogdan Pereanu <bogdan.pereanu@intel.com>

Set worklokad type

Signed-off-by: Bogdan Pereanu <bogdan.pereanu@intel.com>

Dont' need to create new deleter, add extra test case

Signed-off-by: Bogdan Pereanu <bogdan.pereanu@intel.com>
  • Loading branch information
pereanub committed Jan 31, 2025
1 parent de91371 commit faa72a9
Show file tree
Hide file tree
Showing 17 changed files with 582 additions and 160 deletions.
7 changes: 6 additions & 1 deletion src/plugins/intel_npu/src/backend/include/zero_pipeline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ struct Pipeline {
void closeCommandListIndex(size_t command_list_index);

protected:
void getCommandQueue();

std::shared_ptr<ZeroInitStructsHolder> _init_structs;
std::shared_ptr<IGraph> _graph;
const Config _config;
const uint32_t _id;
Expand All @@ -59,9 +62,11 @@ struct Pipeline {
std::vector<std::unique_ptr<Fence>> _fences;
std::shared_ptr<EventPool> _event_pool;
std::vector<std::shared_ptr<Event>> _events;
bool sync_output_with_fences_ = true;
bool _sync_output_with_fences = true;
std::shared_ptr<zeroProfiling::NpuInferProfiling> _npu_profiling;
Logger _logger;

std::mutex _mutex;
};

} // namespace intel_npu
102 changes: 76 additions & 26 deletions src/plugins/intel_npu/src/backend/src/zero_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ Type extract_object(const ov::AnyMap& params, const ov::Property<Type>& p) {
return res.as<Type>();
}

template <class t>
bool compare_shared_ptr(const std::shared_ptr<t>& a, const std::shared_ptr<t>& b) {
if (a == b) {
return true;
}
if (a && b) {
return a.get() == b.get();
}
return false;
}

} // namespace

namespace intel_npu {
Expand All @@ -41,14 +52,11 @@ Pipeline::Pipeline(const Config& config,
const std::vector<std::vector<std::shared_ptr<ov::ITensor>>>& input_tensors,
const std::vector<std::shared_ptr<ov::ITensor>>& output_tensors,
uint32_t group_ordinal)
: _graph(graph),
: _init_structs(init_structs),
_graph(graph),
_config(config),
_id(_graph->get_unique_id()),
_number_of_command_lists(_graph->get_batch_size().has_value() ? *_graph->get_batch_size() : 1),
_event_pool{
std::make_shared<EventPool>(init_structs->getDevice(),
init_structs->getContext(),
_number_of_command_lists ? static_cast<uint32_t>(_number_of_command_lists) : 1)},
_npu_profiling(npu_profiling),
_logger("Pipeline", _config.get<LOG_LEVEL>()) {
OV_ITT_SCOPED_TASK(itt::domains::LevelZeroBackend, "Zero_infer_request::Pipeline::Pipeline");
Expand All @@ -58,22 +66,43 @@ Pipeline::Pipeline(const Config& config,
profiling_query.create(profiling_pool._handle);
}

OPENVINO_ASSERT(_sync_output_with_fences || !_config.get<RUN_INFERENCES_SEQUENTIALLY>(),
"In-order execution doesn't work in case synchronization of the inferences is done using events");

if (!_sync_output_with_fences || _config.get<RUN_INFERENCES_SEQUENTIALLY>()) {
_event_pool =
std::make_shared<EventPool>(_init_structs->getDevice(),
_init_structs->getContext(),
_number_of_command_lists ? static_cast<uint32_t>(_number_of_command_lists) : 1);

_events.reserve(_number_of_command_lists);
for (size_t i = 0; i < _number_of_command_lists; i++) {
_events.emplace_back(std::make_shared<Event>(_event_pool, static_cast<uint32_t>(i)));
}
}

_command_lists.reserve(_number_of_command_lists);
_events.reserve(_number_of_command_lists);
_fences.reserve(_number_of_command_lists);
_logger.debug("Pipeline - emplace_back _event_pool and _command_queue");
for (size_t i = 0; i < _number_of_command_lists; i++) {
_command_lists.emplace_back(
std::make_unique<CommandList>(init_structs,
std::make_unique<CommandList>(_init_structs,
group_ordinal,
init_structs->getMutableCommandListVersion() ? true : false));
_events.emplace_back(std::make_shared<Event>(_event_pool, static_cast<uint32_t>(i)));
_fences.emplace_back(std::make_unique<Fence>(*_graph->get_command_queue()));
_init_structs->getMutableCommandListVersion() ? true : false));
}

_command_queue = _graph->get_command_queue();

if (_sync_output_with_fences) {
_fences.resize(_number_of_command_lists);

for (size_t i = 0; i < _number_of_command_lists; i++) {
_logger.debug("Pipeline - getCommandQueue() - create new fence");
_fences[i] = std::make_unique<Fence>(_command_queue);
}
}

for (size_t i = 0; i < _number_of_command_lists; i++) {
size_t io_index = 0;
for (const auto& desc : graph->get_input_descriptors()) {
for (const auto& desc : _graph->get_input_descriptors()) {
if (input_tensors.at(io_index).size() > 1) {
void* data = nullptr;
auto remote_tensor = std::dynamic_pointer_cast<ZeroRemoteTensor>(input_tensors.at(io_index).at(i));
Expand All @@ -83,7 +112,7 @@ Pipeline::Pipeline(const Config& config,
data = extract_object(remote_tensor->get_properties(), ov::intel_npu::mem_handle);
}

graph->set_argument_value(desc.idx, data);
_graph->set_argument_value(desc.idx, data);

++io_index;
continue;
Expand All @@ -97,7 +126,7 @@ Pipeline::Pipeline(const Config& config,
data = extract_object(remote_tensor->get_properties(), ov::intel_npu::mem_handle);
}

graph->set_argument_value(
_graph->set_argument_value(
desc.idx,
static_cast<unsigned char*>(data) +
(i * input_tensors.at(io_index).at(0)->get_byte_size()) / _number_of_command_lists);
Expand All @@ -106,7 +135,7 @@ Pipeline::Pipeline(const Config& config,
}

io_index = 0;
for (const auto& desc : graph->get_output_descriptors()) {
for (const auto& desc : _graph->get_output_descriptors()) {
void* data = nullptr;
auto remote_tensor = std::dynamic_pointer_cast<ZeroRemoteTensor>(output_tensors.at(io_index));
if (remote_tensor == nullptr) {
Expand All @@ -115,7 +144,7 @@ Pipeline::Pipeline(const Config& config,
data = extract_object(remote_tensor->get_properties(), ov::intel_npu::mem_handle);
}

graph->set_argument_value(
_graph->set_argument_value(
desc.idx,
static_cast<unsigned char*>(data) +
(i * output_tensors.at(io_index)->get_byte_size()) / _number_of_command_lists);
Expand All @@ -134,7 +163,7 @@ Pipeline::Pipeline(const Config& config,
_command_lists.at(i)->appendNpuTimestamp(reinterpret_cast<uint64_t*>(_npu_profiling->npu_ts_infer_start));
}

_command_lists.at(i)->appendGraphExecute(static_cast<ze_graph_handle_t>(graph->get_handle()),
_command_lists.at(i)->appendGraphExecute(static_cast<ze_graph_handle_t>(_graph->get_handle()),
profiling_query.getHandle());

/// append timestamp command if feature was activated
Expand All @@ -153,7 +182,7 @@ Pipeline::Pipeline(const Config& config,
}

// appendBarrier used in L0 as well
if (!sync_output_with_fences_) {
if (!_sync_output_with_fences) {
_command_lists.at(i)->appendBarrier();
_events.at(i)->AppendSignalEvent(*_command_lists.at(i));
}
Expand All @@ -162,9 +191,30 @@ Pipeline::Pipeline(const Config& config,
_logger.debug("Pipeline - initialize completed");
}

void Pipeline::getCommandQueue() {
_logger.debug("Pipeline - getCommandQueue() started");

std::lock_guard<std::mutex> lock(_mutex);

if (!compare_shared_ptr(_command_queue, _graph->get_command_queue())) {
_command_queue = _graph->get_command_queue();

if (_sync_output_with_fences) {
for (size_t i = 0; i < _number_of_command_lists; i++) {
_logger.debug("Pipeline - getCommandQueue() - create new fence");
_fences[i] = std::make_unique<Fence>(_command_queue);
}
}
}

_logger.debug("Pipeline - getCommandQueue() completed");
}

void Pipeline::push() {
_logger.debug("Pipeline - push() started");

getCommandQueue();

if (_config.get<RUN_INFERENCES_SEQUENTIALLY>()) {
if (_id) {
auto previousIndex = _graph->get_last_submitted_id();
Expand All @@ -179,10 +229,10 @@ void Pipeline::push() {

for (size_t i = 0; i < _command_lists.size(); ++i) {
OV_ITT_TASK_CHAIN(ZERO_PIPELINE_IP_PUSH, itt::domains::LevelZeroBackend, "Pipeline", "push");
if (sync_output_with_fences_) {
_graph->get_command_queue()->executeCommandList(*_command_lists.at(i), *_fences.at(i));
if (_sync_output_with_fences) {
_command_queue->executeCommandList(*_command_lists.at(i), *_fences.at(i));
} else {
_graph->get_command_queue()->executeCommandList(*_command_lists.at(i));
_command_queue->executeCommandList(*_command_lists.at(i));
}
}

Expand All @@ -194,7 +244,7 @@ void Pipeline::pull() {
OV_ITT_TASK_CHAIN(ZERO_PIPELINE_IP_PULL, itt::domains::LevelZeroBackend, "Pipeline", "pull");

for (size_t i = 0; i < _command_lists.size(); ++i) {
if (sync_output_with_fences_) {
if (_sync_output_with_fences) {
_fences.at(i)->hostSynchronize();
} else {
_events.at(i)->hostSynchronize();
Expand All @@ -209,17 +259,17 @@ void Pipeline::pull() {
};

void Pipeline::reset() const {
_logger.debug("Pipeline - rest() started");
_logger.debug("Pipeline - reset() started");

for (size_t i = 0; i < _command_lists.size(); ++i) {
if (sync_output_with_fences_) {
if (_sync_output_with_fences) {
_fences.at(i)->reset();
} else {
_events.at(i)->reset();
}
}

_logger.debug("Pipeline - rest() completed");
_logger.debug("Pipeline - reset() completed");
};

void Pipeline::updateCommandList(uint32_t arg_index, const void* arg_data, size_t byte_size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ class IGraph : public std::enable_shared_from_this<IGraph> {

const std::vector<ArgumentDescriptor>& get_input_descriptors() const;
const std::vector<ArgumentDescriptor>& get_output_descriptors() const;

const std::shared_ptr<CommandQueue>& get_command_queue() const;

void set_workload_type(const ov::WorkloadType workloadType) const;
virtual void set_workload_type(const ov::WorkloadType workloadType) = 0;

std::mutex& get_mutex();

Expand All @@ -58,8 +59,8 @@ class IGraph : public std::enable_shared_from_this<IGraph> {

protected:
/**
* @brief Determines if batching can be addressed inside the plugin. In the positive case, the batch size used by
* the model will also be deduced and returned.
* @brief Determines if batching can be addressed inside the plugin. In the positive case, the batch size used
* by the model will also be deduced and returned.
* @details Batching can be handled by the plugin only if:
* - The batch axis is the first axis.
* - The batch size received by the compiler takes the default value of 1.
Expand All @@ -71,22 +72,23 @@ class IGraph : public std::enable_shared_from_this<IGraph> {
*
* @param metadata Metadata containing the shape values as seen by both the compiler and IR model. These will
* ultimately be used for determining the batch size.
* @returns The batch size deduced by the algorithm or the default value of 1 if batching cannot be performed inside
* the plugin.
* @returns The batch size deduced by the algorithm or the default value of 1 if batching cannot be performed
* inside the plugin.
*/
std::optional<size_t> get_batch_size(const NetworkMetadata& metadata);

virtual void create_new_command_queue() = 0;

ze_graph_handle_t _handle = nullptr;
NetworkMetadata _metadata;

std::vector<ArgumentDescriptor> _input_descriptors;
std::vector<ArgumentDescriptor> _output_descriptors;

std::shared_ptr<CommandQueue> _command_queue;
std::vector<std::shared_ptr<Event>> _last_submitted_event;

// Used to protect zero pipeline creation in the graph. The pipeline should be created only once per graph when the
// first inference starts running
// Used to protect zero pipeline creation in the graph. The pipeline should be created only once per graph when
// the first inference starts running
std::mutex _mutex;

std::vector<uint8_t> _blob;
Expand All @@ -100,6 +102,12 @@ class IGraph : public std::enable_shared_from_this<IGraph> {
*/
std::optional<std::size_t> _batch_size = std::nullopt;

std::shared_ptr<CommandQueue> _command_queue;
uint32_t _group_ordinal;
ze_command_queue_workload_type_t _ze_workload_type;
bool _turbo = false;
ze_command_queue_priority_t _ze_queue_priority;

Logger _logger;
};

Expand Down
20 changes: 0 additions & 20 deletions src/plugins/intel_npu/src/common/src/igraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,26 +50,6 @@ const std::shared_ptr<CommandQueue>& IGraph::get_command_queue() const {
return _command_queue;
}

void IGraph::set_workload_type(const ov::WorkloadType workloadType) const {
if (_command_queue == nullptr) {
return;
}

ze_command_queue_workload_type_t zeWorkloadType;
switch (workloadType) {
case ov::WorkloadType::DEFAULT:
zeWorkloadType = ze_command_queue_workload_type_t::ZE_WORKLOAD_TYPE_DEFAULT;
break;
case ov::WorkloadType::EFFICIENT:
zeWorkloadType = ze_command_queue_workload_type_t::ZE_WORKLOAD_TYPE_BACKGROUND;
break;
default:
OPENVINO_THROW("Unknown value for WorkloadType!");
}

_command_queue->setWorkloadType(zeWorkloadType);
}

std::mutex& IGraph::get_mutex() {
return _mutex;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,15 @@ class DriverGraph final : public IGraph {

void initialize(const Config& config) override;

void set_workload_type(const ov::WorkloadType workloadType) override;

~DriverGraph() override;

private:
bool release_blob(const Config& config);

void create_new_command_queue() override;

std::shared_ptr<ZeGraphExtWrappers> _zeGraphExt;
std::shared_ptr<ZeroInitStructsHolder> _zeroInitStruct;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@ class PluginGraph final : public IGraph {

void initialize(const Config& config) override;

void set_workload_type(const ov::WorkloadType workloadType) override;

~PluginGraph() override;

private:
void create_new_command_queue() override;

std::shared_ptr<ZeGraphExtWrappers> _zeGraphExt;
std::shared_ptr<ZeroInitStructsHolder> _zeroInitStruct;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ZeGraphExtWrappers {

void setGraphArgumentValue(ze_graph_handle_t graphHandle, uint32_t argi_, const void* argv) const;

void initializeGraph(ze_graph_handle_t graphHandle, const Config& config) const;
void initializeGraph(ze_graph_handle_t graphHandle) const;

private:
std::unordered_set<std::string> getQueryResultFromSupportedLayers(
Expand All @@ -60,7 +60,7 @@ class ZeGraphExtWrappers {
std::vector<IODescriptor>& inputs,
std::vector<IODescriptor>& outputs) const;

void initialize_graph_through_command_list(ze_graph_handle_t graphHandle, const Config& config) const;
void initialize_graph_through_command_list(ze_graph_handle_t graphHandle) const;

std::shared_ptr<ZeroInitStructsHolder> _zeroInitStruct;
uint32_t _graphExtVersion;
Expand Down
Loading

0 comments on commit faa72a9

Please sign in to comment.