Skip to content

Commit

Permalink
Use WaitingTaskHolder to signal doneWaiting() instead of WaitingTaskW…
Browse files Browse the repository at this point in the history
…ithArenalHolder in framework

In the framework the doneWaiting() is always called from within the
main arena in the TBB thread pool, and therefore using
WaitingTaskHolder is safe (WaitingTaskWithArenaHolder is needed only
when doneWaiting() is called outside of the TBB arena).

Avoiding WaitingTaskWithArenaHolder allows to avoid enqueue()
operation when the doneWaiting() calls in the framework are the ones
that decrease the task reference count to 0.
  • Loading branch information
makortel committed Dec 27, 2024
1 parent ab76956 commit 1d9168d
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 15 deletions.
2 changes: 1 addition & 1 deletion FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace edm {

// Takes ownership of the underlying task and uses the current
// arena.
explicit WaitingTaskWithArenaHolder(WaitingTaskHolder&& iTask);
explicit WaitingTaskWithArenaHolder(WaitingTaskHolder iTask);

~WaitingTaskWithArenaHolder();

Expand Down
2 changes: 1 addition & 1 deletion FWCore/Concurrency/src/WaitingTaskWithArenaHolder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace edm {
m_task->increment_ref_count();
}

WaitingTaskWithArenaHolder::WaitingTaskWithArenaHolder(WaitingTaskHolder&& iTask)
WaitingTaskWithArenaHolder::WaitingTaskWithArenaHolder(WaitingTaskHolder iTask)
: m_task(iTask.release_no_decrement()),
m_group(iTask.group()),
m_arena(std::make_shared<oneapi::tbb::task_arena>(oneapi::tbb::task_arena::attach())) {}
Expand Down
14 changes: 7 additions & 7 deletions FWCore/Framework/interface/maker/Worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -394,12 +394,12 @@ namespace edm {
ParentContext const&,
typename T::Context const*) noexcept;

void runAcquire(EventTransitionInfo const&, ParentContext const&, WaitingTaskWithArenaHolder&);
void runAcquire(EventTransitionInfo const&, ParentContext const&, WaitingTaskHolder&);

void runAcquireAfterAsyncPrefetch(std::exception_ptr,
EventTransitionInfo const&,
ParentContext const&,
WaitingTaskWithArenaHolder) noexcept;
WaitingTaskHolder) noexcept;

std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr,
ParentContext const& parentContext) noexcept;
Expand Down Expand Up @@ -519,7 +519,7 @@ namespace edm {
typename T::TransitionInfoType const&,
ServiceToken const&,
ParentContext const&,
WaitingTaskWithArenaHolder) noexcept {}
WaitingTaskHolder) noexcept {}
void execute() final {}
};

Expand All @@ -530,7 +530,7 @@ namespace edm {
EventTransitionInfo const& eventTransitionInfo,
ServiceToken const& token,
ParentContext const& parentContext,
WaitingTaskWithArenaHolder holder) noexcept
WaitingTaskHolder holder) noexcept
: m_worker(worker),
m_eventTransitionInfo(eventTransitionInfo),
m_parentContext(parentContext),
Expand Down Expand Up @@ -581,7 +581,7 @@ namespace edm {
Worker* m_worker;
EventTransitionInfo m_eventTransitionInfo;
ParentContext const m_parentContext;
WaitingTaskWithArenaHolder m_holder;
WaitingTaskHolder m_holder;
ServiceWeakToken m_serviceToken;
};

Expand Down Expand Up @@ -1127,7 +1127,7 @@ namespace edm {
auto* group = task.group();
moduleTask = make_waiting_task(
[this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
WaitingTaskWithArenaHolder runTaskHolder(
WaitingTaskHolder runTaskHolder(
*group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
t.execute();
Expand All @@ -1154,7 +1154,7 @@ namespace edm {
auto group = task.group();
if constexpr (T::isEvent_) {
if (hasAcquire()) {
WaitingTaskWithArenaHolder runTaskHolder(
WaitingTaskHolder runTaskHolder(
*group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
}
Expand Down
4 changes: 2 additions & 2 deletions FWCore/Framework/src/TransformerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ namespace edm {
handle);
}
});
WaitingTaskWithArenaHolder wta(*iHolder.group(), nextTask);
WaitingTaskHolder wta(*iHolder.group(), nextTask);
CMS_SA_ALLOW try {
*cache = transformInfo_.get<kPreTransform>(iIndex)(*(handle->wrapper()), wta);
*cache = transformInfo_.get<kPreTransform>(iIndex)(*(handle->wrapper()), WaitingTaskWithArenaHolder(wta));
} catch (...) {
wta.doneWaiting(std::current_exception());
}
Expand Down
9 changes: 5 additions & 4 deletions FWCore/Framework/src/Worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -395,10 +395,11 @@ namespace edm {

void Worker::runAcquire(EventTransitionInfo const& info,
ParentContext const& parentContext,
WaitingTaskWithArenaHolder& holder) {
WaitingTaskHolder& holder) {
ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
try {
convertException::wrap([&]() { this->implDoAcquire(info, &moduleCallingContext_, holder); });
WaitingTaskWithArenaHolder holderWithArena{holder};
convertException::wrap([&]() { this->implDoAcquire(info, &moduleCallingContext_, holderWithArena); });
} catch (cms::Exception& ex) {
edm::exceptionContext(ex, moduleCallingContext_);
if (shouldRethrowException(std::current_exception(), parentContext, true, shouldTryToContinue_)) {
Expand All @@ -411,7 +412,7 @@ namespace edm {
void Worker::runAcquireAfterAsyncPrefetch(std::exception_ptr iEPtr,
EventTransitionInfo const& eventTransitionInfo,
ParentContext const& parentContext,
WaitingTaskWithArenaHolder holder) noexcept {
WaitingTaskHolder holder) noexcept {
ranAcquireWithoutException_ = false;
std::exception_ptr exceptionPtr;
if (iEPtr) {
Expand All @@ -420,7 +421,7 @@ namespace edm {
}
moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
} else {
// Caught exception is propagated via WaitingTaskWithArenaHolder
// Caught exception is propagated via WaitingTaskHolder
CMS_SA_ALLOW try {
runAcquire(eventTransitionInfo, parentContext, holder);
ranAcquireWithoutException_ = true;
Expand Down

0 comments on commit 1d9168d

Please sign in to comment.