diff --git a/src/plugins/intel_npu/src/backend/include/zero_pipeline.hpp b/src/plugins/intel_npu/src/backend/include/zero_pipeline.hpp index 29069f0a0cf8cc..a06f579c66c744 100644 --- a/src/plugins/intel_npu/src/backend/include/zero_pipeline.hpp +++ b/src/plugins/intel_npu/src/backend/include/zero_pipeline.hpp @@ -27,7 +27,7 @@ struct Pipeline { Pipeline(const Pipeline&) = delete; Pipeline& operator=(const Pipeline&) = delete; - ~Pipeline(); + virtual ~Pipeline() = default; void push(); void pull(); @@ -66,11 +66,7 @@ struct Pipeline { std::shared_ptr _npu_profiling; Logger _logger; - uint32_t _group_ordinal; std::mutex _mutex; - bool _turbo = false; - ze_command_queue_priority_t _ze_queue_priority; - std::optional _ze_workload_type = std::nullopt; }; } // namespace intel_npu diff --git a/src/plugins/intel_npu/src/backend/src/zero_pipeline.cpp b/src/plugins/intel_npu/src/backend/src/zero_pipeline.cpp index 3cf9b205df2abd..5b29a67435c7fd 100644 --- a/src/plugins/intel_npu/src/backend/src/zero_pipeline.cpp +++ b/src/plugins/intel_npu/src/backend/src/zero_pipeline.cpp @@ -15,6 +15,21 @@ #include "intel_npu/utils/zero/zero_types.hpp" #include "zero_remote_tensor.hpp" +namespace { + +template +bool compare_shared_ptr(const std::shared_ptr& a, const std::shared_ptr& b) { + if (a == b) { + return true; + } + if (a && b) { + return a.get() == b.get(); + } + return false; +} + +} // namespace + namespace intel_npu { Pipeline::Pipeline(const Config& config, @@ -32,8 +47,7 @@ Pipeline::Pipeline(const Config& config, _id(_graph->get_unique_id()), _number_of_command_lists(_graph->get_batch_size().has_value() ? *_graph->get_batch_size() : 1), _npu_profiling(npu_profiling), - _logger("Pipeline", _config.get()), - _group_ordinal(group_ordinal) { + _logger("Pipeline", _config.get()) { OV_ITT_SCOPED_TASK(itt::domains::LevelZeroBackend, "Zero_infer_request::Pipeline::Pipeline"); _logger.debug("Pipeline - initialize started"); @@ -64,34 +78,20 @@ Pipeline::Pipeline(const Config& config, _init_structs->getMutableCommandListVersion() ? true : false)); } - _ze_queue_priority = zeroUtils::toZeQueuePriority(_config.get()); - - if (_config.has()) { - _turbo = _config.get(); - } - - if (config.has()) { - _ze_workload_type = zeroUtils::toZeQueueWorkloadType(config.get()); - } - - _command_queue = CommandQueueManager::getInstance().getCommandQueue(_init_structs, - _ze_queue_priority, - _graph->get_ze_workload_type(), - _group_ordinal, - _turbo); + _command_queue = _graph->get_command_queue(); if (_sync_output_with_fences) { _fences.resize(_number_of_command_lists); for (size_t i = 0; i < _number_of_command_lists; i++) { _logger.debug("Pipeline - getCommandQueue() - create new fence"); - _fences[i] = std::make_unique(*_command_queue); + _fences[i] = std::make_unique(_command_queue); } } for (size_t i = 0; i < _number_of_command_lists; i++) { size_t io_index = 0; - for (const auto& desc : graph->get_input_descriptors()) { + for (const auto& desc : _graph->get_input_descriptors()) { if (input_tensors.at(io_index).size() > 1) { void* data = nullptr; auto remote_tensor = std::dynamic_pointer_cast(input_tensors.at(io_index).at(i)); @@ -101,7 +101,7 @@ Pipeline::Pipeline(const Config& config, data = remote_tensor->get_original_memory(); } - graph->set_argument_value(desc.idx, data); + _graph->set_argument_value(desc.idx, data); ++io_index; continue; @@ -115,7 +115,7 @@ Pipeline::Pipeline(const Config& config, data = remote_tensor->get_original_memory(); } - graph->set_argument_value( + _graph->set_argument_value( desc.idx, static_cast(data) + (i * input_tensors.at(io_index).at(0)->get_byte_size()) / _number_of_command_lists); @@ -124,7 +124,7 @@ Pipeline::Pipeline(const Config& config, } io_index = 0; - for (const auto& desc : graph->get_output_descriptors()) { + for (const auto& desc : _graph->get_output_descriptors()) { void* data = nullptr; auto remote_tensor = std::dynamic_pointer_cast(output_tensors.at(io_index)); if (remote_tensor == nullptr) { @@ -133,7 +133,7 @@ Pipeline::Pipeline(const Config& config, data = remote_tensor->get_original_memory(); } - graph->set_argument_value( + _graph->set_argument_value( desc.idx, static_cast(data) + (i * output_tensors.at(io_index)->get_byte_size()) / _number_of_command_lists); @@ -152,7 +152,7 @@ Pipeline::Pipeline(const Config& config, _command_lists.at(i)->appendNpuTimestamp(reinterpret_cast(_npu_profiling->npu_ts_infer_start)); } - _command_lists.at(i)->appendGraphExecute(static_cast(graph->get_handle()), + _command_lists.at(i)->appendGraphExecute(static_cast(_graph->get_handle()), profiling_query.getHandle()); /// append timestamp command if feature was activated @@ -185,34 +185,15 @@ void Pipeline::getCommandQueue() { std::lock_guard lock(_mutex); - if (_ze_workload_type != _graph->get_ze_workload_type()) { - // fences created for the old command queue shall be destroyed and make new ones - if (_sync_output_with_fences) { - for (size_t i = 0; i < _number_of_command_lists; i++) { - if (_fences[i] != nullptr) { - _logger.debug("Pipeline - getCommandQueue() - destroy old fence"); - _fences[i].reset(); - } - } - } - - _command_queue = CommandQueueManager::getInstance().getCommandQueue(_init_structs, - _ze_queue_priority, - _graph->get_ze_workload_type(), - _group_ordinal, - _turbo); + if (!compare_shared_ptr(_command_queue, _graph->get_command_queue())) { + _command_queue = _graph->get_command_queue(); if (_sync_output_with_fences) { for (size_t i = 0; i < _number_of_command_lists; i++) { _logger.debug("Pipeline - getCommandQueue() - create new fence"); - _fences[i] = std::make_unique(*_command_queue); + _fences[i] = std::make_unique(_command_queue); } } - - _logger.debug("Pipeline - getCommandQueue() - free previous command queue"); - CommandQueueManager::getInstance().freeCommandQueue(_ze_queue_priority, _ze_workload_type, _turbo); - - _ze_workload_type = _graph->get_ze_workload_type(); } _logger.debug("Pipeline - getCommandQueue() completed"); @@ -330,20 +311,4 @@ void Pipeline::closeCommandListIndex(size_t command_list_index) { _command_lists.at(command_list_index)->close(); }; -Pipeline::~Pipeline() { - if (_command_queue) { - if (_sync_output_with_fences) { - // fences shall be destroyed before the command queue is destroyed - for (size_t i = 0; i < _number_of_command_lists; i++) { - if (_fences[i] != nullptr) { - _fences[i].reset(); - } - } - } - - _command_queue.reset(); - CommandQueueManager::getInstance().freeCommandQueue(_ze_queue_priority, _ze_workload_type, _turbo); - } -} - } // namespace intel_npu diff --git a/src/plugins/intel_npu/src/common/include/intel_npu/common/igraph.hpp b/src/plugins/intel_npu/src/common/include/intel_npu/common/igraph.hpp index efb5b6b8978cfc..441169efa5b147 100644 --- a/src/plugins/intel_npu/src/common/include/intel_npu/common/igraph.hpp +++ b/src/plugins/intel_npu/src/common/include/intel_npu/common/igraph.hpp @@ -43,8 +43,9 @@ class IGraph : public std::enable_shared_from_this { const std::vector& get_input_descriptors() const; const std::vector& get_output_descriptors() const; + const std::shared_ptr& get_command_queue() const; + void set_workload_type(const ov::WorkloadType workloadType); - const std::optional get_ze_workload_type() const; std::mutex& get_mutex(); @@ -59,8 +60,8 @@ class IGraph : public std::enable_shared_from_this { protected: /** - * @brief Determines if batching can be addressed inside the plugin. In the positive case, the batch size used by - * the model will also be deduced and returned. + * @brief Determines if batching can be addressed inside the plugin. In the positive case, the batch size used + * by the model will also be deduced and returned. * @details Batching can be handled by the plugin only if: * - The batch axis is the first axis. * - The batch size received by the compiler takes the default value of 1. @@ -72,11 +73,13 @@ class IGraph : public std::enable_shared_from_this { * * @param metadata Metadata containing the shape values as seen by both the compiler and IR model. These will * ultimately be used for determining the batch size. - * @returns The batch size deduced by the algorithm or the default value of 1 if batching cannot be performed inside - * the plugin. + * @returns The batch size deduced by the algorithm or the default value of 1 if batching cannot be performed + * inside the plugin. */ std::optional get_batch_size(const NetworkMetadata& metadata); + virtual void create_new_command_queue() = 0; + ze_graph_handle_t _handle = nullptr; NetworkMetadata _metadata; @@ -85,8 +88,8 @@ class IGraph : public std::enable_shared_from_this { std::vector> _last_submitted_event; - // Used to protect zero pipeline creation in the graph. The pipeline should be created only once per graph when the - // first inference starts running + // Used to protect zero pipeline creation in the graph. The pipeline should be created only once per graph when + // the first inference starts running std::mutex _mutex; std::unique_ptr _blobPtr; @@ -100,7 +103,11 @@ class IGraph : public std::enable_shared_from_this { */ std::optional _batch_size = std::nullopt; + std::shared_ptr _command_queue; + uint32_t _group_ordinal; std::optional _ze_workload_type = std::nullopt; + bool _turbo = false; + ze_command_queue_priority_t _ze_queue_priority; Logger _logger; }; diff --git a/src/plugins/intel_npu/src/common/src/igraph.cpp b/src/plugins/intel_npu/src/common/src/igraph.cpp index ce54b53ea20432..5fa8e4c534f5b1 100644 --- a/src/plugins/intel_npu/src/common/src/igraph.cpp +++ b/src/plugins/intel_npu/src/common/src/igraph.cpp @@ -21,11 +21,7 @@ IGraph::IGraph(ze_graph_handle_t handle, : _handle(handle), _metadata(std::move(metadata)), _blobPtr(std::move(blobPtr)), - _logger("IGraph", config.get()) { - if (config.has()) { - set_workload_type(config.get()); - } -} + _logger("IGraph", config.get()) {} const NetworkMetadata& IGraph::get_metadata() const { return _metadata; @@ -47,8 +43,16 @@ const std::vector& IGraph::get_output_descriptors() const { return _output_descriptors; } +const std::shared_ptr& IGraph::get_command_queue() const { + return _command_queue; +} + void IGraph::set_workload_type(const ov::WorkloadType workloadType) { - _ze_workload_type = zeroUtils::toZeQueueWorkloadType(workloadType); + if (_command_queue) { + _ze_workload_type = zeroUtils::toZeQueueWorkloadType(workloadType); + + create_new_command_queue(); + } } std::mutex& IGraph::get_mutex() { @@ -137,8 +141,4 @@ const std::optional IGraph::get_batch_size() const { return _batch_size; } -const std::optional IGraph::get_ze_workload_type() const { - return _ze_workload_type; -} - } // namespace intel_npu diff --git a/src/plugins/intel_npu/src/compiler_adapter/include/driver_graph.hpp b/src/plugins/intel_npu/src/compiler_adapter/include/driver_graph.hpp index ac89a790291d2e..eba75f9994e6bd 100644 --- a/src/plugins/intel_npu/src/compiler_adapter/include/driver_graph.hpp +++ b/src/plugins/intel_npu/src/compiler_adapter/include/driver_graph.hpp @@ -37,6 +37,8 @@ class DriverGraph final : public IGraph { private: bool release_blob(const Config& config); + void create_new_command_queue() override; + std::shared_ptr _zeGraphExt; std::shared_ptr _zeroInitStruct; diff --git a/src/plugins/intel_npu/src/compiler_adapter/include/plugin_graph.hpp b/src/plugins/intel_npu/src/compiler_adapter/include/plugin_graph.hpp index 61d4a6ed866529..a9dd4c05ae04cb 100644 --- a/src/plugins/intel_npu/src/compiler_adapter/include/plugin_graph.hpp +++ b/src/plugins/intel_npu/src/compiler_adapter/include/plugin_graph.hpp @@ -38,6 +38,8 @@ class PluginGraph final : public IGraph { ~PluginGraph() override; private: + void create_new_command_queue() override; + std::shared_ptr _zeGraphExt; std::shared_ptr _zeroInitStruct; diff --git a/src/plugins/intel_npu/src/compiler_adapter/src/driver_graph.cpp b/src/plugins/intel_npu/src/compiler_adapter/src/driver_graph.cpp index 97f77ca644dc08..2b08f8fb7bbee1 100644 --- a/src/plugins/intel_npu/src/compiler_adapter/src/driver_graph.cpp +++ b/src/plugins/intel_npu/src/compiler_adapter/src/driver_graph.cpp @@ -99,11 +99,6 @@ void DriverGraph::initialize(const Config& config) { _input_descriptors.shrink_to_fit(); _output_descriptors.shrink_to_fit(); - ze_device_properties_t deviceProperties = {}; - deviceProperties.stype = ZE_STRUCTURE_TYPE_DEVICE_PROPERTIES; - THROW_ON_FAIL_FOR_LEVELZERO("zeDeviceGetProperties", - zeDeviceGetProperties(_zeroInitStruct->getDevice(), &deviceProperties)); - _zeGraphExt->initializeGraph(_handle); _logger.debug("Graph initialize finish"); @@ -113,6 +108,25 @@ void DriverGraph::initialize(const Config& config) { // releasing it here to avoid unnecessary memory usage. _blobIsReleased = release_blob(config); + // Find the corresponding command queue group. + ze_device_properties_t deviceProperties = {}; + deviceProperties.stype = ZE_STRUCTURE_TYPE_DEVICE_PROPERTIES; + THROW_ON_FAIL_FOR_LEVELZERO("zeDeviceGetProperties", + zeDeviceGetProperties(_zeroInitStruct->getDevice(), &deviceProperties)); + _group_ordinal = zeroUtils::findGroupOrdinal(_zeroInitStruct->getDevice(), deviceProperties); + + _ze_queue_priority = zeroUtils::toZeQueuePriority(config.get()); + + if (config.has()) { + _turbo = config.get(); + } + + if (config.has()) { + _ze_workload_type = zeroUtils::toZeQueueWorkloadType(config.get()); + } + + create_new_command_queue(); + if (config.get() != ov::intel_npu::BatchMode::COMPILER) { _batch_size = get_batch_size(_metadata); } @@ -124,6 +138,14 @@ void DriverGraph::initialize(const Config& config) { } } +void DriverGraph::create_new_command_queue() { + _command_queue = CommandQueuePool::getInstance().getCommandQueue(_zeroInitStruct, + _ze_queue_priority, + _ze_workload_type, + _group_ordinal, + _turbo); +} + bool DriverGraph::release_blob(const Config& config) { if (_blobPtr == nullptr || _zeroInitStruct->getGraphDdiTable().version() < ZE_GRAPH_EXT_VERSION_1_8 || config.get()) { diff --git a/src/plugins/intel_npu/src/compiler_adapter/src/plugin_graph.cpp b/src/plugins/intel_npu/src/compiler_adapter/src/plugin_graph.cpp index 3c491fd81fb1c8..bc18ab07361c6a 100644 --- a/src/plugins/intel_npu/src/compiler_adapter/src/plugin_graph.cpp +++ b/src/plugins/intel_npu/src/compiler_adapter/src/plugin_graph.cpp @@ -99,12 +99,26 @@ void PluginGraph::initialize(const Config& config) { _input_descriptors.shrink_to_fit(); _output_descriptors.shrink_to_fit(); + _zeGraphExt->initializeGraph(_handle); + + // Find the corresponding command queue group. ze_device_properties_t deviceProperties = {}; deviceProperties.stype = ZE_STRUCTURE_TYPE_DEVICE_PROPERTIES; THROW_ON_FAIL_FOR_LEVELZERO("zeDeviceGetProperties", zeDeviceGetProperties(_zeroInitStruct->getDevice(), &deviceProperties)); + _group_ordinal = zeroUtils::findGroupOrdinal(_zeroInitStruct->getDevice(), deviceProperties); - _zeGraphExt->initializeGraph(_handle); + _ze_queue_priority = zeroUtils::toZeQueuePriority(config.get()); + + if (config.has()) { + _turbo = config.get(); + } + + if (config.has()) { + _ze_workload_type = zeroUtils::toZeQueueWorkloadType(config.get()); + } + + create_new_command_queue(); if (config.get() != ov::intel_npu::BatchMode::COMPILER) { _batch_size = get_batch_size(_metadata); @@ -119,6 +133,14 @@ void PluginGraph::initialize(const Config& config) { _logger.debug("Graph initialize finish"); } +void PluginGraph::create_new_command_queue() { + _command_queue = CommandQueuePool::getInstance().getCommandQueue(_zeroInitStruct, + _ze_queue_priority, + _ze_workload_type, + _group_ordinal, + _turbo); +} + PluginGraph::~PluginGraph() { if (_handle != nullptr) { auto result = _zeGraphExt->destroyGraph(_handle); diff --git a/src/plugins/intel_npu/src/compiler_adapter/src/ze_graph_ext_wrappers.cpp b/src/plugins/intel_npu/src/compiler_adapter/src/ze_graph_ext_wrappers.cpp index 0a13cc075be601..5de26b3aa0a089 100644 --- a/src/plugins/intel_npu/src/compiler_adapter/src/ze_graph_ext_wrappers.cpp +++ b/src/plugins/intel_npu/src/compiler_adapter/src/ze_graph_ext_wrappers.cpp @@ -192,7 +192,8 @@ void ZeGraphExtWrappers::initialize_graph_through_command_list(ze_graph_handle_t _logger.debug("initialize_graph_through_command_list init start - create graph_command_list"); CommandList graph_command_list(_zeroInitStruct, groupOrdinal); _logger.debug("initialize_graph_through_command_list - create graph_command_queue"); - CommandQueue graph_command_queue(_zeroInitStruct, ZE_COMMAND_QUEUE_PRIORITY_NORMAL, groupOrdinal, false); + auto graph_command_queue = + std::make_shared(_zeroInitStruct, ZE_COMMAND_QUEUE_PRIORITY_NORMAL, groupOrdinal, false); _logger.debug("initialize_graph_through_command_list - create fence"); Fence fence(graph_command_queue); @@ -202,7 +203,7 @@ void ZeGraphExtWrappers::initialize_graph_through_command_list(ze_graph_handle_t graph_command_list.close(); _logger.debug("initialize_graph_through_command_list - performing executeCommandList"); - graph_command_queue.executeCommandList(graph_command_list, fence); + graph_command_queue->executeCommandList(graph_command_list, fence); _logger.debug("initialize_graph_through_command_list - performing hostSynchronize"); fence.hostSynchronize(); _logger.debug("initialize_graph_through_command_list - hostSynchronize completed"); diff --git a/src/plugins/intel_npu/src/utils/include/intel_npu/utils/zero/zero_utils.hpp b/src/plugins/intel_npu/src/utils/include/intel_npu/utils/zero/zero_utils.hpp index 8af0dcd2e1d9a3..3650c4ab0a1042 100644 --- a/src/plugins/intel_npu/src/utils/include/intel_npu/utils/zero/zero_utils.hpp +++ b/src/plugins/intel_npu/src/utils/include/intel_npu/utils/zero/zero_utils.hpp @@ -17,29 +17,6 @@ namespace intel_npu { -enum priority { - NORMAL, - LOW, - HIGH, - - PRIORITY_COUNT -}; - -enum turbo { - DISABLED, - ENABLED, - - TURBO_COUNT -}; - -enum workload { - NOT_SET, - DEFAULT, - EFFICIENT, - - WORKLOAD_COUNT -}; - struct ArgumentDescriptor { ze_graph_argument_properties_3_t info; uint32_t idx; @@ -75,37 +52,29 @@ namespace zeroUtils { ze_result_to_description(result)); \ } -static inline priority toPriorityEnum(const ze_command_queue_priority_t& val) { +static inline size_t toPriorityVal(const ze_command_queue_priority_t& val) { switch (val) { case ZE_COMMAND_QUEUE_PRIORITY_PRIORITY_LOW: - return priority::LOW; + return 0; case ZE_COMMAND_QUEUE_PRIORITY_NORMAL: - return priority::NORMAL; + return 1; case ZE_COMMAND_QUEUE_PRIORITY_PRIORITY_HIGH: - return priority::HIGH; + return 2; default: OPENVINO_THROW("Incorrect queue priority."); } } -static inline turbo toTurboEnum(bool val) { - if (val) { - return turbo::ENABLED; - } - - return turbo::DISABLED; -} - -static inline workload toWorkloadEnum(const std::optional& val) { +static inline size_t toWorkloadVal(const std::optional& val) { if (!val.has_value()) { - return workload::NOT_SET; + return 0; } switch (*val) { case ZE_WORKLOAD_TYPE_DEFAULT: - return workload::DEFAULT; + return 1; case ZE_WORKLOAD_TYPE_BACKGROUND: - return workload::EFFICIENT; + return 2; default: OPENVINO_THROW("Incorrect workload type."); } diff --git a/src/plugins/intel_npu/src/utils/include/intel_npu/utils/zero/zero_wrappers.hpp b/src/plugins/intel_npu/src/utils/include/intel_npu/utils/zero/zero_wrappers.hpp index d85725c530fb14..b13921972d3b53 100644 --- a/src/plugins/intel_npu/src/utils/include/intel_npu/utils/zero/zero_wrappers.hpp +++ b/src/plugins/intel_npu/src/utils/include/intel_npu/utils/zero/zero_wrappers.hpp @@ -17,6 +17,12 @@ namespace intel_npu { class CommandList; class CommandQueue; +struct CommandQueueDesc { + size_t priority; + size_t workload; + bool turbo; +}; + class EventPool { public: EventPool() = delete; @@ -98,7 +104,7 @@ class CommandList { class Fence { public: Fence() = delete; - Fence(const CommandQueue& command_queue); + Fence(const std::shared_ptr& command_queue); Fence(const Fence&) = delete; Fence(Fence&&) = delete; Fence& operator=(const Fence&) = delete; @@ -112,6 +118,8 @@ class Fence { } private: + std::shared_ptr _command_queue; + ze_fence_handle_t _handle = nullptr; Logger _log; @@ -145,15 +153,15 @@ class CommandQueue { ze_command_queue_handle_t _handle = nullptr; }; -class CommandQueueManager { +class CommandQueuePool { public: - CommandQueueManager(); - CommandQueueManager(const CommandQueueManager& other) = delete; - CommandQueueManager(CommandQueueManager&& other) = delete; - void operator=(const CommandQueueManager&) = delete; - void operator=(CommandQueueManager&&) = delete; + CommandQueuePool(); + CommandQueuePool(const CommandQueuePool& other) = delete; + CommandQueuePool(CommandQueuePool&& other) = delete; + void operator=(const CommandQueuePool&) = delete; + void operator=(CommandQueuePool&&) = delete; - static CommandQueueManager& getInstance(); + static CommandQueuePool& getInstance(); std::shared_ptr getCommandQueue(const std::shared_ptr& init_structs, const ze_command_queue_priority_t& priority, @@ -161,18 +169,14 @@ class CommandQueueManager { const uint32_t& group_ordinal, bool turbo); - void freeCommandQueue(const ze_command_queue_priority_t& priority, - const std::optional& workload_type, - bool turbo); - private: + int computeHash(CommandQueueDesc desc); + + std::unordered_map> _pool; + Logger _log; std::mutex _mutex; - - std::array, workload::WORKLOAD_COUNT>, turbo::TURBO_COUNT>, - priority::PRIORITY_COUNT> - _gloabal_command_queues; }; } // namespace intel_npu diff --git a/src/plugins/intel_npu/src/utils/src/zero/zero_wrappers.cpp b/src/plugins/intel_npu/src/utils/src/zero/zero_wrappers.cpp index a6d7fe812c4169..a12b080106340b 100644 --- a/src/plugins/intel_npu/src/utils/src/zero/zero_wrappers.cpp +++ b/src/plugins/intel_npu/src/utils/src/zero/zero_wrappers.cpp @@ -187,9 +187,11 @@ CommandQueue::~CommandQueue() { _handle = nullptr; } -Fence::Fence(const CommandQueue& command_queue) : _log("Fence", Logger::global().level()) { +Fence::Fence(const std::shared_ptr& command_queue) + : _command_queue(command_queue), + _log("Fence", Logger::global().level()) { ze_fence_desc_t fence_desc = {ZE_STRUCTURE_TYPE_FENCE_DESC, nullptr, 0}; - auto result = zeFenceCreate(command_queue.handle(), &fence_desc, &_handle); + auto result = zeFenceCreate(command_queue->handle(), &fence_desc, &_handle); THROW_ON_FAIL_FOR_LEVELZERO("zeFenceCreate", result); } void Fence::reset() const { @@ -209,58 +211,54 @@ Fence::~Fence() { _handle = nullptr; } -CommandQueueManager::CommandQueueManager() : _log("CommandQueue", Logger::global().level()) {} -CommandQueueManager& CommandQueueManager::getInstance() { - static CommandQueueManager instance; +CommandQueuePool::CommandQueuePool() : _log("CommandQueue", Logger::global().level()) {} +int CommandQueuePool::computeHash(CommandQueueDesc desc) { + return (desc.priority & 0xFF) | (desc.workload & 0xFF) << 8 | (desc.turbo << 16); +} +CommandQueuePool& CommandQueuePool::getInstance() { + static CommandQueuePool instance; return instance; } -std::shared_ptr CommandQueueManager::getCommandQueue( +std::shared_ptr CommandQueuePool::getCommandQueue( const std::shared_ptr& init_structs, const ze_command_queue_priority_t& priority, const std::optional& workload_type, const uint32_t& group_ordinal, bool turbo) { - std::lock_guard lock(_mutex); + CommandQueueDesc desc = {zeroUtils::toPriorityVal(priority), zeroUtils::toWorkloadVal(workload_type), turbo}; - if (_gloabal_command_queues[zeroUtils::toPriorityEnum(priority)][zeroUtils::toTurboEnum(turbo)] - [zeroUtils::toWorkloadEnum(workload_type)] == nullptr) { - _log.debug("Create new command queue"); - _gloabal_command_queues[zeroUtils::toPriorityEnum(priority)][zeroUtils::toTurboEnum(turbo)] - [zeroUtils::toWorkloadEnum(workload_type)] = - std::make_shared(init_structs, priority, group_ordinal, turbo); + int hash = computeHash(desc); - if (zeroUtils::toWorkloadEnum(workload_type) != workload::NOT_SET) { - try { - _log.debug("Set workload type"); - _gloabal_command_queues[zeroUtils::toPriorityEnum(priority)][zeroUtils::toTurboEnum(turbo)] - [zeroUtils::toWorkloadEnum(workload_type)] - ->setWorkloadType(*workload_type); - } catch (const std::exception& ex) { - _log.error("Destroy pipeline if workload type is not supported!"); - _gloabal_command_queues[zeroUtils::toPriorityEnum(priority)][zeroUtils::toTurboEnum(turbo)] - [zeroUtils::toWorkloadEnum(workload_type)] - .reset(); - OPENVINO_THROW(ex.what()); - } + std::lock_guard lock(_mutex); + if (_pool.find(hash) != _pool.end()) { + // found one weak pointer in the pool + // is it valid? + auto obj = _pool.at(hash).lock(); + if (obj) { + _log.debug("Get Command Queue"); + return obj; } - } + } // otherwise create a new object - return _gloabal_command_queues[zeroUtils::toPriorityEnum(priority)][zeroUtils::toTurboEnum(turbo)] - [zeroUtils::toWorkloadEnum(workload_type)]; -} -void CommandQueueManager::freeCommandQueue(const ze_command_queue_priority_t& priority, - const std::optional& workload_type, - bool turbo) { - std::lock_guard lock(_mutex); + // Create shared_ptr with a deleter + _log.debug("Create Command Queue"); + auto new_obj = std::shared_ptr( + new CommandQueue(init_structs, priority, group_ordinal, turbo), + [this, hash](CommandQueue* ptr) { + std::lock_guard lock(_mutex); + if (_pool.at(hash).lock()) { + _log.debug("Don't destroy the command queue in case the shared ptr is in use!"); + return; + } + _pool.erase(hash); + _log.debug("Destroy Command Queue"); + delete ptr; + }); - if (_gloabal_command_queues[zeroUtils::toPriorityEnum(priority)][zeroUtils::toTurboEnum(turbo)] - [zeroUtils::toWorkloadEnum(workload_type)] - .use_count() == 1) { - _log.debug("Destroy command queue"); - _gloabal_command_queues[zeroUtils::toPriorityEnum(priority)][zeroUtils::toTurboEnum(turbo)] - [zeroUtils::toWorkloadEnum(workload_type)] - .reset(); - } + auto pair = std::make_pair(hash, new_obj); + _pool.emplace(pair); + + return new_obj; } } // namespace intel_npu diff --git a/src/plugins/intel_npu/tests/functional/behavior/infer_request_run.hpp b/src/plugins/intel_npu/tests/functional/behavior/infer_request_run.hpp index 2889bf04f1dc2f..b4eddae85e3b5c 100644 --- a/src/plugins/intel_npu/tests/functional/behavior/infer_request_run.hpp +++ b/src/plugins/intel_npu/tests/functional/behavior/infer_request_run.hpp @@ -1080,9 +1080,10 @@ TEST_P(InferRunTestsOnNewerDrivers, MultipleCompiledModelsTestsSyncInfers) { } for (int i = 0; i < no_of_iterations; ++i) { - infer_reqs_threads[i] = std::thread([&infer_reqs, i]() -> void { + infer_reqs_threads[i] = std::thread([&compiled_models, &infer_reqs, i]() -> void { OV_ASSERT_NO_THROW(infer_reqs[i].infer()); infer_reqs[i] = {}; + compiled_models[i] = {}; }); } diff --git a/src/plugins/intel_npu/tests/functional/internal/overload/compile_and_infer.hpp b/src/plugins/intel_npu/tests/functional/internal/overload/compile_and_infer.hpp index aa555cdf97dd55..e6ec534ef6157c 100644 --- a/src/plugins/intel_npu/tests/functional/internal/overload/compile_and_infer.hpp +++ b/src/plugins/intel_npu/tests/functional/internal/overload/compile_and_infer.hpp @@ -235,11 +235,13 @@ TEST_P(OVCompileAndInferRequest, CompiledModelWorkloadTypeUpdateAfterCompilation OV_ASSERT_NO_THROW(req3.infer()); + req1 = {}; + ov::AnyMap modelConfiguration; modelConfiguration[workload_type.name()] = WorkloadType::DEFAULT; OV_ASSERT_NO_THROW(execNet.set_property(modelConfiguration)); ASSERT_EQ(execNet.get_property(workload_type.name()).as(), WorkloadType::DEFAULT); - OV_ASSERT_NO_THROW(req2 = execNet.create_infer_request()); + OV_ASSERT_NO_THROW(req2 = execNet.create_infer_request()) OV_ASSERT_NO_THROW(req2.infer()); modelConfiguration[workload_type.name()] = WorkloadType::EFFICIENT; @@ -254,7 +256,6 @@ TEST_P(OVCompileAndInferRequest, CompiledModelWorkloadTypeUpdateAfterCompilation OV_ASSERT_NO_THROW(req2.wait()); ASSERT_TRUE(isCalled); - req1 = {}; req2 = {}; req3 = {}; @@ -372,6 +373,7 @@ TEST_P(OVCompileAndInferRequesOnNewerDrivers, MultipleCompiledModelsTestsSyncInf OV_ASSERT_NO_THROW(infer_reqs[i].infer()); infer_reqs[i] = {}; + compiled_models[i] = {}; }); }