Skip to content

Commit

Permalink
Make sure that the pipeline is still alive when fences are destroyed
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Pereanu <bogdan.pereanu@intel.com>
  • Loading branch information
pereanub committed Jan 30, 2025
1 parent ad75cf1 commit 9c9cc88
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 39 deletions.
76 changes: 43 additions & 33 deletions src/plugins/intel_npu/src/backend/src/zero_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,6 @@ Pipeline::Pipeline(const Config& config,
profiling_query.create(profiling_pool._handle);
}

if (_config.has<TURBO>()) {
_turbo = _config.get<TURBO>();
}

_ze_queue_priority = zeroUtils::toZeQueuePriority(_config.get<MODEL_PRIORITY>());

OPENVINO_ASSERT(_sync_output_with_fences || !_config.get<RUN_INFERENCES_SEQUENTIALLY>(),
"In-order execution doesn't work in case synchronization of the inferences is done using events");

Expand All @@ -70,8 +64,29 @@ Pipeline::Pipeline(const Config& config,
_init_structs->getMutableCommandListVersion() ? true : false));
}

_ze_queue_priority = zeroUtils::toZeQueuePriority(_config.get<MODEL_PRIORITY>());

if (_config.has<TURBO>()) {
_turbo = _config.get<TURBO>();
}

if (config.has<WORKLOAD_TYPE>()) {
_ze_workload_type = zeroUtils::toZeQueueWorkloadType(config.get<WORKLOAD_TYPE>());
}

_command_queue = CommandQueueManager::getInstance().getCommandQueue(_init_structs,
_ze_queue_priority,
_graph->get_ze_workload_type(),
_group_ordinal,
_turbo);

if (_sync_output_with_fences) {
_fences.resize(_number_of_command_lists);

for (size_t i = 0; i < _number_of_command_lists; i++) {
_logger.debug("Pipeline - getCommandQueue() - create new fence");
_fences[i] = std::make_unique<Fence>(*_command_queue);
}
}

for (size_t i = 0; i < _number_of_command_lists; i++) {
Expand Down Expand Up @@ -168,41 +183,36 @@ Pipeline::Pipeline(const Config& config,
void Pipeline::getCommandQueue() {
_logger.debug("Pipeline - getCommandQueue() started");

_command_queue = CommandQueueManager::getInstance().getCommandQueue(_init_structs,
_ze_queue_priority,
_graph->get_ze_workload_type(),
_group_ordinal,
_turbo);
{
std::lock_guard<std::mutex> lock(_mutex);

if (_ze_workload_type != _graph->get_ze_workload_type()) {
if (_ze_workload_type.has_value()) {
// fences created for the old command queue shall be destroyed and make new ones
if (_sync_output_with_fences) {
for (size_t i = 0; i < _number_of_command_lists; i++) {
if (_fences[i] != nullptr) {
_logger.debug("Pipeline - getCommandQueue() - destroy old fence");
_fences[i].reset();
}
}
}
std::lock_guard<std::mutex> lock(_mutex);

_logger.debug("Pipeline - getCommandQueue() - free command queue");
CommandQueueManager::getInstance().freeCommandQueue(_ze_queue_priority, _ze_workload_type, _turbo);
if (_ze_workload_type != _graph->get_ze_workload_type()) {
// fences created for the old command queue shall be destroyed and make new ones
if (_sync_output_with_fences) {
for (size_t i = 0; i < _number_of_command_lists; i++) {
if (_fences[i] != nullptr) {
_logger.debug("Pipeline - getCommandQueue() - destroy old fence");
_fences[i].reset();
}
}

_ze_workload_type = _graph->get_ze_workload_type();
}

_command_queue = CommandQueueManager::getInstance().getCommandQueue(_init_structs,
_ze_queue_priority,
_graph->get_ze_workload_type(),
_group_ordinal,
_turbo);

if (_sync_output_with_fences) {
for (size_t i = 0; i < _number_of_command_lists; i++) {
if (_fences[i] == nullptr) {
_logger.debug("Pipeline - getCommandQueue() - create new fence");
_fences[i] = std::make_unique<Fence>(*_command_queue);
}
_logger.debug("Pipeline - getCommandQueue() - create new fence");
_fences[i] = std::make_unique<Fence>(*_command_queue);
}
}

_logger.debug("Pipeline - getCommandQueue() - free previous command queue");
CommandQueueManager::getInstance().freeCommandQueue(_ze_queue_priority, _ze_workload_type, _turbo);

_ze_workload_type = _graph->get_ze_workload_type();
}

_logger.debug("Pipeline - getCommandQueue() completed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,6 @@ class CommandQueue {
ze_command_queue_handle_t _handle = nullptr;
};

static std::array<std::array<std::array<std::shared_ptr<CommandQueue>, workload::WORKLOAD_COUNT>, turbo::TURBO_COUNT>,
priority::PRIORITY_COUNT>
_gloabal_command_queues;

class CommandQueueManager {
public:
CommandQueueManager();
Expand All @@ -173,6 +169,10 @@ class CommandQueueManager {
Logger _log;

std::mutex _mutex;

std::array<std::array<std::array<std::shared_ptr<CommandQueue>, workload::WORKLOAD_COUNT>, turbo::TURBO_COUNT>,
priority::PRIORITY_COUNT>
_gloabal_command_queues;
};

} // namespace intel_npu
4 changes: 2 additions & 2 deletions src/plugins/intel_npu/src/utils/src/zero/zero_wrappers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ std::shared_ptr<CommandQueue> CommandQueueManager::getCommandQueue(
[zeroUtils::toWorkloadEnum(workload_type)]
->setWorkloadType(*workload_type);
} catch (const std::exception& ex) {
_log.debug("Destroy pipeline if workload type is not supported!");
_log.error("Destroy pipeline if workload type is not supported!");
_gloabal_command_queues[zeroUtils::toPriorityEnum(priority)][zeroUtils::toTurboEnum(turbo)]
[zeroUtils::toWorkloadEnum(workload_type)]
.reset();
Expand All @@ -255,7 +255,7 @@ void CommandQueueManager::freeCommandQueue(const ze_command_queue_priority_t& pr

if (_gloabal_command_queues[zeroUtils::toPriorityEnum(priority)][zeroUtils::toTurboEnum(turbo)]
[zeroUtils::toWorkloadEnum(workload_type)]
.use_count() <= 1) {
.use_count() == 1) {
_log.debug("Destroy command queue");
_gloabal_command_queues[zeroUtils::toPriorityEnum(priority)][zeroUtils::toTurboEnum(turbo)]
[zeroUtils::toWorkloadEnum(workload_type)]
Expand Down

0 comments on commit 9c9cc88

Please sign in to comment.