Skip to content

Commit

Permalink
Make sure that the pipeline is still alive when fences are destroyed
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Pereanu <bogdan.pereanu@intel.com>
  • Loading branch information
pereanub committed Jan 30, 2025
1 parent ad75cf1 commit 484873c
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 29 deletions.
66 changes: 39 additions & 27 deletions src/plugins/intel_npu/src/backend/src/zero_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ Pipeline::Pipeline(const Config& config,

_ze_queue_priority = zeroUtils::toZeQueuePriority(_config.get<MODEL_PRIORITY>());

if (config.has<WORKLOAD_TYPE>()) {
_ze_workload_type = zeroUtils::toZeQueueWorkloadType(config.get<WORKLOAD_TYPE>());
}

OPENVINO_ASSERT(_sync_output_with_fences || !_config.get<RUN_INFERENCES_SEQUENTIALLY>(),
"In-order execution doesn't work in case synchronization of the inferences is done using events");

Expand Down Expand Up @@ -74,6 +78,19 @@ Pipeline::Pipeline(const Config& config,
_fences.resize(_number_of_command_lists);
}

_command_queue = CommandQueueManager::getInstance().getCommandQueue(_init_structs,
_ze_queue_priority,
_graph->get_ze_workload_type(),
_group_ordinal,
_turbo);

if (_sync_output_with_fences) {
for (size_t i = 0; i < _number_of_command_lists; i++) {
_logger.debug("Pipeline - getCommandQueue() - create new fence");
_fences[i] = std::make_unique<Fence>(*_command_queue);
}
}

for (size_t i = 0; i < _number_of_command_lists; i++) {
size_t io_index = 0;
for (const auto& desc : graph->get_input_descriptors()) {
Expand Down Expand Up @@ -168,41 +185,36 @@ Pipeline::Pipeline(const Config& config,
void Pipeline::getCommandQueue() {
_logger.debug("Pipeline - getCommandQueue() started");

_command_queue = CommandQueueManager::getInstance().getCommandQueue(_init_structs,
_ze_queue_priority,
_graph->get_ze_workload_type(),
_group_ordinal,
_turbo);
{
std::lock_guard<std::mutex> lock(_mutex);

if (_ze_workload_type != _graph->get_ze_workload_type()) {
if (_ze_workload_type.has_value()) {
// fences created for the old command queue shall be destroyed and make new ones
if (_sync_output_with_fences) {
for (size_t i = 0; i < _number_of_command_lists; i++) {
if (_fences[i] != nullptr) {
_logger.debug("Pipeline - getCommandQueue() - destroy old fence");
_fences[i].reset();
}
}
}
std::lock_guard<std::mutex> lock(_mutex);

_logger.debug("Pipeline - getCommandQueue() - free command queue");
CommandQueueManager::getInstance().freeCommandQueue(_ze_queue_priority, _ze_workload_type, _turbo);
if (_ze_workload_type != _graph->get_ze_workload_type()) {
// fences created for the old command queue shall be destroyed and make new ones
if (_sync_output_with_fences) {
for (size_t i = 0; i < _number_of_command_lists; i++) {
if (_fences[i] != nullptr) {
_logger.debug("Pipeline - getCommandQueue() - destroy old fence");
_fences[i].reset();
}
}

_ze_workload_type = _graph->get_ze_workload_type();
}

_command_queue = CommandQueueManager::getInstance().getCommandQueue(_init_structs,
_ze_queue_priority,
_graph->get_ze_workload_type(),
_group_ordinal,
_turbo);

if (_sync_output_with_fences) {
for (size_t i = 0; i < _number_of_command_lists; i++) {
if (_fences[i] == nullptr) {
_logger.debug("Pipeline - getCommandQueue() - create new fence");
_fences[i] = std::make_unique<Fence>(*_command_queue);
}
_logger.debug("Pipeline - getCommandQueue() - create new fence");
_fences[i] = std::make_unique<Fence>(*_command_queue);
}
}

_logger.debug("Pipeline - getCommandQueue() - free previous command queue");
CommandQueueManager::getInstance().freeCommandQueue(_ze_queue_priority, _ze_workload_type, _turbo);

_ze_workload_type = _graph->get_ze_workload_type();
}

_logger.debug("Pipeline - getCommandQueue() completed");
Expand Down
4 changes: 2 additions & 2 deletions src/plugins/intel_npu/src/utils/src/zero/zero_wrappers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ std::shared_ptr<CommandQueue> CommandQueueManager::getCommandQueue(
[zeroUtils::toWorkloadEnum(workload_type)]
->setWorkloadType(*workload_type);
} catch (const std::exception& ex) {
_log.debug("Destroy pipeline if workload type is not supported!");
_log.error("Destroy pipeline if workload type is not supported!");
_gloabal_command_queues[zeroUtils::toPriorityEnum(priority)][zeroUtils::toTurboEnum(turbo)]
[zeroUtils::toWorkloadEnum(workload_type)]
.reset();
Expand All @@ -255,7 +255,7 @@ void CommandQueueManager::freeCommandQueue(const ze_command_queue_priority_t& pr

if (_gloabal_command_queues[zeroUtils::toPriorityEnum(priority)][zeroUtils::toTurboEnum(turbo)]
[zeroUtils::toWorkloadEnum(workload_type)]
.use_count() <= 1) {
.use_count() == 1) {
_log.debug("Destroy command queue");
_gloabal_command_queues[zeroUtils::toPriorityEnum(priority)][zeroUtils::toTurboEnum(turbo)]
[zeroUtils::toWorkloadEnum(workload_type)]
Expand Down

0 comments on commit 484873c

Please sign in to comment.