Skip to content

Commit

Permalink
Change logic to use dynamic unordered map
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Pereanu <bogdan.pereanu@intel.com>
  • Loading branch information
pereanub committed Jan 30, 2025
1 parent 937f700 commit 6d4ab81
Show file tree
Hide file tree
Showing 14 changed files with 183 additions and 192 deletions.
6 changes: 1 addition & 5 deletions src/plugins/intel_npu/src/backend/include/zero_pipeline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct Pipeline {

Pipeline(const Pipeline&) = delete;
Pipeline& operator=(const Pipeline&) = delete;
~Pipeline();
virtual ~Pipeline() = default;

void push();
void pull();
Expand Down Expand Up @@ -66,11 +66,7 @@ struct Pipeline {
std::shared_ptr<zeroProfiling::NpuInferProfiling> _npu_profiling;
Logger _logger;

uint32_t _group_ordinal;
std::mutex _mutex;
bool _turbo = false;
ze_command_queue_priority_t _ze_queue_priority;
std::optional<ze_command_queue_workload_type_t> _ze_workload_type = std::nullopt;
};

} // namespace intel_npu
89 changes: 27 additions & 62 deletions src/plugins/intel_npu/src/backend/src/zero_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,21 @@
#include "intel_npu/utils/zero/zero_types.hpp"
#include "zero_remote_tensor.hpp"

namespace {

template <class t>
bool compare_shared_ptr(const std::shared_ptr<t>& a, const std::shared_ptr<t>& b) {
if (a == b) {
return true;
}
if (a && b) {
return a.get() == b.get();
}
return false;
}

} // namespace

namespace intel_npu {

Pipeline::Pipeline(const Config& config,
Expand All @@ -32,8 +47,7 @@ Pipeline::Pipeline(const Config& config,
_id(_graph->get_unique_id()),
_number_of_command_lists(_graph->get_batch_size().has_value() ? *_graph->get_batch_size() : 1),
_npu_profiling(npu_profiling),
_logger("Pipeline", _config.get<LOG_LEVEL>()),
_group_ordinal(group_ordinal) {
_logger("Pipeline", _config.get<LOG_LEVEL>()) {
OV_ITT_SCOPED_TASK(itt::domains::LevelZeroBackend, "Zero_infer_request::Pipeline::Pipeline");
_logger.debug("Pipeline - initialize started");

Expand Down Expand Up @@ -64,34 +78,20 @@ Pipeline::Pipeline(const Config& config,
_init_structs->getMutableCommandListVersion() ? true : false));
}

_ze_queue_priority = zeroUtils::toZeQueuePriority(_config.get<MODEL_PRIORITY>());

if (_config.has<TURBO>()) {
_turbo = _config.get<TURBO>();
}

if (config.has<WORKLOAD_TYPE>()) {
_ze_workload_type = zeroUtils::toZeQueueWorkloadType(config.get<WORKLOAD_TYPE>());
}

_command_queue = CommandQueueManager::getInstance().getCommandQueue(_init_structs,
_ze_queue_priority,
_graph->get_ze_workload_type(),
_group_ordinal,
_turbo);
_command_queue = _graph->get_command_queue();

if (_sync_output_with_fences) {
_fences.resize(_number_of_command_lists);

for (size_t i = 0; i < _number_of_command_lists; i++) {
_logger.debug("Pipeline - getCommandQueue() - create new fence");
_fences[i] = std::make_unique<Fence>(*_command_queue);
_fences[i] = std::make_unique<Fence>(_command_queue);
}
}

for (size_t i = 0; i < _number_of_command_lists; i++) {
size_t io_index = 0;
for (const auto& desc : graph->get_input_descriptors()) {
for (const auto& desc : _graph->get_input_descriptors()) {
if (input_tensors.at(io_index).size() > 1) {
void* data = nullptr;
auto remote_tensor = std::dynamic_pointer_cast<ZeroRemoteTensor>(input_tensors.at(io_index).at(i));
Expand All @@ -101,7 +101,7 @@ Pipeline::Pipeline(const Config& config,
data = remote_tensor->get_original_memory();
}

graph->set_argument_value(desc.idx, data);
_graph->set_argument_value(desc.idx, data);

++io_index;
continue;
Expand All @@ -115,7 +115,7 @@ Pipeline::Pipeline(const Config& config,
data = remote_tensor->get_original_memory();
}

graph->set_argument_value(
_graph->set_argument_value(
desc.idx,
static_cast<unsigned char*>(data) +
(i * input_tensors.at(io_index).at(0)->get_byte_size()) / _number_of_command_lists);
Expand All @@ -124,7 +124,7 @@ Pipeline::Pipeline(const Config& config,
}

io_index = 0;
for (const auto& desc : graph->get_output_descriptors()) {
for (const auto& desc : _graph->get_output_descriptors()) {
void* data = nullptr;
auto remote_tensor = std::dynamic_pointer_cast<ZeroRemoteTensor>(output_tensors.at(io_index));
if (remote_tensor == nullptr) {
Expand All @@ -133,7 +133,7 @@ Pipeline::Pipeline(const Config& config,
data = remote_tensor->get_original_memory();
}

graph->set_argument_value(
_graph->set_argument_value(
desc.idx,
static_cast<unsigned char*>(data) +
(i * output_tensors.at(io_index)->get_byte_size()) / _number_of_command_lists);
Expand All @@ -152,7 +152,7 @@ Pipeline::Pipeline(const Config& config,
_command_lists.at(i)->appendNpuTimestamp(reinterpret_cast<uint64_t*>(_npu_profiling->npu_ts_infer_start));
}

_command_lists.at(i)->appendGraphExecute(static_cast<ze_graph_handle_t>(graph->get_handle()),
_command_lists.at(i)->appendGraphExecute(static_cast<ze_graph_handle_t>(_graph->get_handle()),
profiling_query.getHandle());

/// append timestamp command if feature was activated
Expand Down Expand Up @@ -185,34 +185,15 @@ void Pipeline::getCommandQueue() {

std::lock_guard<std::mutex> lock(_mutex);

if (_ze_workload_type != _graph->get_ze_workload_type()) {
// fences created for the old command queue shall be destroyed and make new ones
if (_sync_output_with_fences) {
for (size_t i = 0; i < _number_of_command_lists; i++) {
if (_fences[i] != nullptr) {
_logger.debug("Pipeline - getCommandQueue() - destroy old fence");
_fences[i].reset();
}
}
}

_command_queue = CommandQueueManager::getInstance().getCommandQueue(_init_structs,
_ze_queue_priority,
_graph->get_ze_workload_type(),
_group_ordinal,
_turbo);
if (!compare_shared_ptr(_command_queue, _graph->get_command_queue())) {
_command_queue = _graph->get_command_queue();

if (_sync_output_with_fences) {
for (size_t i = 0; i < _number_of_command_lists; i++) {
_logger.debug("Pipeline - getCommandQueue() - create new fence");
_fences[i] = std::make_unique<Fence>(*_command_queue);
_fences[i] = std::make_unique<Fence>(_command_queue);
}
}

_logger.debug("Pipeline - getCommandQueue() - free previous command queue");
CommandQueueManager::getInstance().freeCommandQueue(_ze_queue_priority, _ze_workload_type, _turbo);

_ze_workload_type = _graph->get_ze_workload_type();
}

_logger.debug("Pipeline - getCommandQueue() completed");
Expand Down Expand Up @@ -330,20 +311,4 @@ void Pipeline::closeCommandListIndex(size_t command_list_index) {
_command_lists.at(command_list_index)->close();
};

Pipeline::~Pipeline() {
if (_command_queue) {
if (_sync_output_with_fences) {
// fences shall be destroyed before the command queue is destroyed
for (size_t i = 0; i < _number_of_command_lists; i++) {
if (_fences[i] != nullptr) {
_fences[i].reset();
}
}
}

_command_queue.reset();
CommandQueueManager::getInstance().freeCommandQueue(_ze_queue_priority, _ze_workload_type, _turbo);
}
}

} // namespace intel_npu
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ class IGraph : public std::enable_shared_from_this<IGraph> {
const std::vector<ArgumentDescriptor>& get_input_descriptors() const;
const std::vector<ArgumentDescriptor>& get_output_descriptors() const;

const std::shared_ptr<CommandQueue>& get_command_queue() const;

void set_workload_type(const ov::WorkloadType workloadType);
const std::optional<ze_command_queue_workload_type_t> get_ze_workload_type() const;

std::mutex& get_mutex();

Expand All @@ -59,8 +60,8 @@ class IGraph : public std::enable_shared_from_this<IGraph> {

protected:
/**
* @brief Determines if batching can be addressed inside the plugin. In the positive case, the batch size used by
* the model will also be deduced and returned.
* @brief Determines if batching can be addressed inside the plugin. In the positive case, the batch size used
* by the model will also be deduced and returned.
* @details Batching can be handled by the plugin only if:
* - The batch axis is the first axis.
* - The batch size received by the compiler takes the default value of 1.
Expand All @@ -72,11 +73,13 @@ class IGraph : public std::enable_shared_from_this<IGraph> {
*
* @param metadata Metadata containing the shape values as seen by both the compiler and IR model. These will
* ultimately be used for determining the batch size.
* @returns The batch size deduced by the algorithm or the default value of 1 if batching cannot be performed inside
* the plugin.
* @returns The batch size deduced by the algorithm or the default value of 1 if batching cannot be performed
* inside the plugin.
*/
std::optional<size_t> get_batch_size(const NetworkMetadata& metadata);

virtual void create_new_command_queue() = 0;

ze_graph_handle_t _handle = nullptr;
NetworkMetadata _metadata;

Expand All @@ -85,8 +88,8 @@ class IGraph : public std::enable_shared_from_this<IGraph> {

std::vector<std::shared_ptr<Event>> _last_submitted_event;

// Used to protect zero pipeline creation in the graph. The pipeline should be created only once per graph when the
// first inference starts running
// Used to protect zero pipeline creation in the graph. The pipeline should be created only once per graph when
// the first inference starts running
std::mutex _mutex;

std::unique_ptr<BlobContainer> _blobPtr;
Expand All @@ -100,7 +103,11 @@ class IGraph : public std::enable_shared_from_this<IGraph> {
*/
std::optional<std::size_t> _batch_size = std::nullopt;

std::shared_ptr<CommandQueue> _command_queue;
uint32_t _group_ordinal;
std::optional<ze_command_queue_workload_type_t> _ze_workload_type = std::nullopt;
bool _turbo = false;
ze_command_queue_priority_t _ze_queue_priority;

Logger _logger;
};
Expand Down
20 changes: 10 additions & 10 deletions src/plugins/intel_npu/src/common/src/igraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@ IGraph::IGraph(ze_graph_handle_t handle,
: _handle(handle),
_metadata(std::move(metadata)),
_blobPtr(std::move(blobPtr)),
_logger("IGraph", config.get<LOG_LEVEL>()) {
if (config.has<WORKLOAD_TYPE>()) {
set_workload_type(config.get<WORKLOAD_TYPE>());
}
}
_logger("IGraph", config.get<LOG_LEVEL>()) {}

const NetworkMetadata& IGraph::get_metadata() const {
return _metadata;
Expand All @@ -47,8 +43,16 @@ const std::vector<ArgumentDescriptor>& IGraph::get_output_descriptors() const {
return _output_descriptors;
}

const std::shared_ptr<CommandQueue>& IGraph::get_command_queue() const {
return _command_queue;
}

void IGraph::set_workload_type(const ov::WorkloadType workloadType) {
_ze_workload_type = zeroUtils::toZeQueueWorkloadType(workloadType);
if (_command_queue) {
_ze_workload_type = zeroUtils::toZeQueueWorkloadType(workloadType);

create_new_command_queue();
}
}

std::mutex& IGraph::get_mutex() {
Expand Down Expand Up @@ -137,8 +141,4 @@ const std::optional<std::size_t> IGraph::get_batch_size() const {
return _batch_size;
}

const std::optional<ze_command_queue_workload_type_t> IGraph::get_ze_workload_type() const {
return _ze_workload_type;
}

} // namespace intel_npu
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class DriverGraph final : public IGraph {
private:
bool release_blob(const Config& config);

void create_new_command_queue() override;

std::shared_ptr<ZeGraphExtWrappers> _zeGraphExt;
std::shared_ptr<ZeroInitStructsHolder> _zeroInitStruct;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class PluginGraph final : public IGraph {
~PluginGraph() override;

private:
void create_new_command_queue() override;

std::shared_ptr<ZeGraphExtWrappers> _zeGraphExt;
std::shared_ptr<ZeroInitStructsHolder> _zeroInitStruct;

Expand Down
32 changes: 27 additions & 5 deletions src/plugins/intel_npu/src/compiler_adapter/src/driver_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,6 @@ void DriverGraph::initialize(const Config& config) {
_input_descriptors.shrink_to_fit();
_output_descriptors.shrink_to_fit();

ze_device_properties_t deviceProperties = {};
deviceProperties.stype = ZE_STRUCTURE_TYPE_DEVICE_PROPERTIES;
THROW_ON_FAIL_FOR_LEVELZERO("zeDeviceGetProperties",
zeDeviceGetProperties(_zeroInitStruct->getDevice(), &deviceProperties));

_zeGraphExt->initializeGraph(_handle);

_logger.debug("Graph initialize finish");
Expand All @@ -113,6 +108,25 @@ void DriverGraph::initialize(const Config& config) {
// releasing it here to avoid unnecessary memory usage.
_blobIsReleased = release_blob(config);

// Find the corresponding command queue group.
ze_device_properties_t deviceProperties = {};
deviceProperties.stype = ZE_STRUCTURE_TYPE_DEVICE_PROPERTIES;
THROW_ON_FAIL_FOR_LEVELZERO("zeDeviceGetProperties",
zeDeviceGetProperties(_zeroInitStruct->getDevice(), &deviceProperties));
_group_ordinal = zeroUtils::findGroupOrdinal(_zeroInitStruct->getDevice(), deviceProperties);

_ze_queue_priority = zeroUtils::toZeQueuePriority(config.get<MODEL_PRIORITY>());

if (config.has<TURBO>()) {
_turbo = config.get<TURBO>();
}

if (config.has<WORKLOAD_TYPE>()) {
_ze_workload_type = zeroUtils::toZeQueueWorkloadType(config.get<WORKLOAD_TYPE>());
}

create_new_command_queue();

if (config.get<BATCH_MODE>() != ov::intel_npu::BatchMode::COMPILER) {
_batch_size = get_batch_size(_metadata);
}
Expand All @@ -124,6 +138,14 @@ void DriverGraph::initialize(const Config& config) {
}
}

void DriverGraph::create_new_command_queue() {
_command_queue = CommandQueuePool::getInstance().getCommandQueue(_zeroInitStruct,
_ze_queue_priority,
_ze_workload_type,
_group_ordinal,
_turbo);
}

bool DriverGraph::release_blob(const Config& config) {
if (_blobPtr == nullptr || _zeroInitStruct->getGraphDdiTable().version() < ZE_GRAPH_EXT_VERSION_1_8 ||
config.get<PERF_COUNT>()) {
Expand Down
Loading

0 comments on commit 6d4ab81

Please sign in to comment.