Skip to content

Commit

Permalink
Use WaitingTaskHolder to signal doneWaiting() instead of WaitingTaskW…
Browse files Browse the repository at this point in the history
…ithArenalHolder in framework

In the framework the doneWaiting() is always called from within the
main arena in the TBB thread pool, and therefore using
WaitingTaskHolder is safe (WaitingTaskWithArenaHolder is needed only
when doneWaiting() is called outside of the TBB arena).

Avoiding WaitingTaskWithArenaHolder allows to avoid enqueue()
operation when the doneWaiting() calls in the framework are the ones
that decrease the task reference count to 0.
  • Loading branch information
makortel committed Jan 8, 2025
1 parent b5cf409 commit e77e887
Show file tree
Hide file tree
Showing 29 changed files with 106 additions and 141 deletions.
5 changes: 2 additions & 3 deletions FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ namespace edm {
// eventually intend for the task to be spawned.
explicit WaitingTaskWithArenaHolder(oneapi::tbb::task_group&, WaitingTask* iTask);

// Takes ownership of the underlying task and uses the current
// arena.
explicit WaitingTaskWithArenaHolder(WaitingTaskHolder&& iTask);
// Captures the current arena.
explicit WaitingTaskWithArenaHolder(WaitingTaskHolder iTask);

~WaitingTaskWithArenaHolder();

Expand Down
2 changes: 1 addition & 1 deletion FWCore/Concurrency/src/WaitingTaskWithArenaHolder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace edm {
m_task->increment_ref_count();
}

WaitingTaskWithArenaHolder::WaitingTaskWithArenaHolder(WaitingTaskHolder&& iTask)
WaitingTaskWithArenaHolder::WaitingTaskWithArenaHolder(WaitingTaskHolder iTask)
: m_task(iTask.release_no_decrement()),
m_group(iTask.group()),
m_arena(std::make_shared<oneapi::tbb::task_arena>(oneapi::tbb::task_arena::attach())) {}
Expand Down
48 changes: 23 additions & 25 deletions FWCore/Framework/interface/CallbackExternalWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,9 @@ namespace edm {
WaitingTaskHolder produceTask =
Base::makeProduceTask(group, token, record, es, emitPostPrefetchingSignal, std::move(produceFunctor));

WaitingTaskWithArenaHolder waitingTaskWithArenaHolder =
makeExceptionHandlerTask(std::move(produceTask), group);
WaitingTaskHolder waitingTaskHolder = makeExceptionHandlerTask(std::move(produceTask), group);

return makeAcquireTask(std::move(waitingTaskWithArenaHolder), group, token, record, es);
return makeAcquireTask(std::move(waitingTaskHolder), group, token, record, es);
},
std::move(iTask),
iRecord,
Expand All @@ -134,15 +133,15 @@ namespace edm {
const TDecorator& iDec = TDecorator())
: Base(iProd, std::move(iProduceFunc), iID, iDec), acquireFunction_(std::move(iAcquireFunc)) {}

WaitingTaskHolder makeAcquireTask(WaitingTaskWithArenaHolder waitingTaskWithArenaHolder,
WaitingTaskHolder makeAcquireTask(WaitingTaskHolder waitingTaskHolder,
oneapi::tbb::task_group* group,
ServiceWeakToken const& serviceToken,
EventSetupRecordImpl const* record,
EventSetupImpl const* eventSetupImpl) {
return WaitingTaskHolder(
*group,
make_waiting_task(
[this, holder = std::move(waitingTaskWithArenaHolder), group, serviceToken, record, eventSetupImpl](
[this, holder = std::move(waitingTaskHolder), group, serviceToken, record, eventSetupImpl](
std::exception_ptr const* iException) mutable {
std::exception_ptr excptr;
if (iException) {
Expand Down Expand Up @@ -191,7 +190,7 @@ namespace edm {
ESModuleCallingContext const& context_;
};
EndGuard guard(record, context);
acquireCache_ = (*acquireFunction_)(rec, holder);
acquireCache_ = (*acquireFunction_)(rec, WaitingTaskWithArenaHolder(holder));
});
} catch (cms::Exception& iException) {
iException.addContext("Running acquire");
Expand All @@ -202,25 +201,24 @@ namespace edm {
}));
}

WaitingTaskWithArenaHolder makeExceptionHandlerTask(WaitingTaskHolder produceTask,
oneapi::tbb::task_group* group) {
return WaitingTaskWithArenaHolder(*group,
make_waiting_task([this, produceTask = std::move(produceTask)](
std::exception_ptr const* iException) mutable {
std::exception_ptr excptr;
if (iException) {
excptr = *iException;
}
if (excptr) {
try {
convertException::wrap([excptr]() { std::rethrow_exception(excptr); });
} catch (cms::Exception& exception) {
exception.addContext("Running acquire and external work");
edm::exceptionContext(exception, Base::callingContext());
produceTask.doneWaiting(std::current_exception());
}
}
}));
WaitingTaskHolder makeExceptionHandlerTask(WaitingTaskHolder produceTask, oneapi::tbb::task_group* group) {
return WaitingTaskHolder(*group,
make_waiting_task([this, produceTask = std::move(produceTask)](
std::exception_ptr const* iException) mutable {
std::exception_ptr excptr;
if (iException) {
excptr = *iException;
}
if (excptr) {
try {
convertException::wrap([excptr]() { std::rethrow_exception(excptr); });
} catch (cms::Exception& exception) {
exception.addContext("Running acquire and external work");
edm::exceptionContext(exception, Base::callingContext());
produceTask.doneWaiting(std::current_exception());
}
}
}));
}

std::shared_ptr<TAcquireFunc> acquireFunction_;
Expand Down
2 changes: 2 additions & 0 deletions FWCore/Framework/interface/StreamSchedule.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@
#include "FWCore/Utilities/interface/propagate_const.h"
#include "FWCore/Utilities/interface/thread_safety_macros.h"

#include "oneapi/tbb/task_arena.h"

#include <exception>
#include <map>
#include <memory>
Expand Down
8 changes: 2 additions & 6 deletions FWCore/Framework/interface/global/EDFilterBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ namespace edm {
class StreamID;
class ActivityRegistry;
class ThinnedAssociationsHelper;
class WaitingTaskWithArenaHolder;
class EventForTransformer;
class ServiceWeakToken;

Expand Down Expand Up @@ -75,10 +74,7 @@ namespace edm {

private:
bool doEvent(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*);
void doAcquire(EventTransitionInfo const&,
ActivityRegistry*,
ModuleCallingContext const*,
WaitingTaskWithArenaHolder&);
void doAcquire(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*, WaitingTaskHolder&&);
void doTransformAsync(WaitingTaskHolder iTask,
size_t iTransformIndex,
EventPrincipal const& iEvent,
Expand Down Expand Up @@ -169,7 +165,7 @@ namespace edm {
virtual bool hasAcquire() const noexcept { return false; }
bool hasAccumulator() const noexcept { return false; }

virtual void doAcquire_(StreamID, Event const&, edm::EventSetup const&, WaitingTaskWithArenaHolder&);
virtual void doAcquire_(StreamID, Event const&, edm::EventSetup const&, WaitingTaskHolder&&);

void setModuleDescription(ModuleDescription const& md) { moduleDescription_ = md; }
ModuleDescription moduleDescription_;
Expand Down
8 changes: 2 additions & 6 deletions FWCore/Framework/interface/global/EDProducerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ namespace edm {
class GlobalSchedule;
class ActivityRegistry;
class ThinnedAssociationsHelper;
class WaitingTaskWithArenaHolder;
class EventForTransformer;
class ServiceWeakToken;

Expand Down Expand Up @@ -78,10 +77,7 @@ namespace edm {

private:
bool doEvent(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*);
void doAcquire(EventTransitionInfo const&,
ActivityRegistry*,
ModuleCallingContext const*,
WaitingTaskWithArenaHolder&);
void doAcquire(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*, WaitingTaskHolder&&);
void doTransformAsync(WaitingTaskHolder iTask,
size_t iTransformIndex,
EventPrincipal const& iEvent,
Expand Down Expand Up @@ -173,7 +169,7 @@ namespace edm {

virtual bool hasAcquire() const noexcept { return false; }

virtual void doAcquire_(StreamID, Event const&, edm::EventSetup const&, WaitingTaskWithArenaHolder&);
virtual void doAcquire_(StreamID, Event const&, edm::EventSetup const&, WaitingTaskHolder&&);

void setModuleDescription(ModuleDescription const& md) { moduleDescription_ = md; }
ModuleDescription moduleDescription_;
Expand Down
7 changes: 2 additions & 5 deletions FWCore/Framework/interface/global/OutputModuleBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,7 @@ namespace edm {
void doEndStream(StreamID id) { doEndStream_(id); }

bool doEvent(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*);
void doAcquire(EventTransitionInfo const&,
ActivityRegistry*,
ModuleCallingContext const*,
WaitingTaskWithArenaHolder&);
void doAcquire(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*, WaitingTaskHolder&&);
//For now this is a placeholder
/*virtual*/ void preActionBeforeRunEventAsync(WaitingTaskHolder iTask,
ModuleCallingContext const& iModuleCallingContext,
Expand All @@ -86,7 +83,7 @@ namespace edm {
virtual void doEndRunSummary_(RunForOutput const&, EventSetup const&) {}
virtual void doBeginLuminosityBlockSummary_(LuminosityBlockForOutput const&, EventSetup const&) {}
virtual void doEndLuminosityBlockSummary_(LuminosityBlockForOutput const&, EventSetup const&) {}
virtual void doAcquire_(StreamID, EventForOutput const&, WaitingTaskWithArenaHolder&) {}
virtual void doAcquire_(StreamID, EventForOutput const&, WaitingTaskHolder&&) {}

virtual bool hasAcquire() const noexcept { return false; }
};
Expand Down
4 changes: 2 additions & 2 deletions FWCore/Framework/interface/global/implementors.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@
// forward declarations
namespace edm {

class WaitingTaskWithArenaHolder;
class ServiceWeakToken;
class ActivityRegistry;
class WaitingTaskWithArenaHolder;

namespace global {
namespace impl {
Expand Down Expand Up @@ -436,7 +436,7 @@ namespace edm {
private:
bool hasAcquire() const noexcept override { return true; }

void doAcquire_(StreamID, Event const&, edm::EventSetup const&, WaitingTaskWithArenaHolder&) final;
void doAcquire_(StreamID, Event const&, edm::EventSetup const&, WaitingTaskHolder&&) final;

virtual void acquire(StreamID, Event const&, edm::EventSetup const&, WaitingTaskWithArenaHolder) const = 0;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ namespace edm {
private:
bool hasAcquire() const noexcept override { return true; }

void doAcquire_(StreamID id, EventForOutput const& event, WaitingTaskWithArenaHolder& holder) final {
acquire(id, event, holder);
void doAcquire_(StreamID id, EventForOutput const& event, WaitingTaskHolder&& holder) final {
acquire(id, event, WaitingTaskWithArenaHolder(std::move(holder)));
}

virtual void acquire(StreamID, EventForOutput const&, WaitingTaskWithArenaHolder) const = 0;
Expand Down
27 changes: 13 additions & 14 deletions FWCore/Framework/interface/maker/Worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ the worker is reset().
#include "FWCore/Framework/interface/ProductResolverIndexAndSkipBit.h"
#include "FWCore/Concurrency/interface/WaitingTask.h"
#include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
#include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h"
#include "FWCore/Concurrency/interface/WaitingTaskList.h"
#include "FWCore/MessageLogger/interface/MessageLogger.h"
#include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
Expand Down Expand Up @@ -267,9 +266,7 @@ namespace edm {
virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
virtual bool implNeedToRunSelection() const noexcept = 0;

virtual void implDoAcquire(EventTransitionInfo const&,
ModuleCallingContext const*,
WaitingTaskWithArenaHolder&) = 0;
virtual void implDoAcquire(EventTransitionInfo const&, ModuleCallingContext const*, WaitingTaskHolder&&) = 0;

virtual void implDoTransformAsync(WaitingTaskHolder,
size_t iTransformIndex,
Expand Down Expand Up @@ -394,12 +391,14 @@ namespace edm {
ParentContext const&,
typename T::Context const*) noexcept;

void runAcquire(EventTransitionInfo const&, ParentContext const&, WaitingTaskWithArenaHolder&);
// runAcquire() must take a copy of WaitingTaskHolder
// see comment in runAcquireAfterAsyncPrefetch() definition
void runAcquire(EventTransitionInfo const&, ParentContext const&, WaitingTaskHolder);

void runAcquireAfterAsyncPrefetch(std::exception_ptr,
EventTransitionInfo const&,
ParentContext const&,
WaitingTaskWithArenaHolder) noexcept;
WaitingTaskHolder) noexcept;

std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr,
ParentContext const& parentContext) noexcept;
Expand Down Expand Up @@ -519,7 +518,7 @@ namespace edm {
typename T::TransitionInfoType const&,
ServiceToken const&,
ParentContext const&,
WaitingTaskWithArenaHolder) noexcept {}
WaitingTaskHolder) noexcept {}
void execute() final {}
};

Expand All @@ -530,7 +529,7 @@ namespace edm {
EventTransitionInfo const& eventTransitionInfo,
ServiceToken const& token,
ParentContext const& parentContext,
WaitingTaskWithArenaHolder holder) noexcept
WaitingTaskHolder holder) noexcept
: m_worker(worker),
m_eventTransitionInfo(eventTransitionInfo),
m_parentContext(parentContext),
Expand All @@ -545,7 +544,7 @@ namespace edm {
// to hold the exception_ptr
std::exception_ptr temp_excptr;
auto excptr = exceptionPtr();
// Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskWithArenaHolder
// Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskHolder
CMS_SA_ALLOW try {
//pre was called in prefetchAsync
m_worker->emitPostModuleEventPrefetchingSignal();
Expand All @@ -563,12 +562,12 @@ namespace edm {
info = m_eventTransitionInfo,
parentContext = m_parentContext,
serviceToken = m_serviceToken,
holder = m_holder]() {
holder = std::move(m_holder)]() {
//Need to make the services available
ServiceRegistry::Operate operateRunAcquire(serviceToken.lock());

std::exception_ptr ptr;
worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, holder);
worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, std::move(holder));
});
return;
}
Expand All @@ -581,7 +580,7 @@ namespace edm {
Worker* m_worker;
EventTransitionInfo m_eventTransitionInfo;
ParentContext const m_parentContext;
WaitingTaskWithArenaHolder m_holder;
WaitingTaskHolder m_holder;
ServiceWeakToken m_serviceToken;
};

Expand Down Expand Up @@ -1127,7 +1126,7 @@ namespace edm {
auto* group = task.group();
moduleTask = make_waiting_task(
[this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
WaitingTaskWithArenaHolder runTaskHolder(
WaitingTaskHolder runTaskHolder(
*group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
t.execute();
Expand All @@ -1154,7 +1153,7 @@ namespace edm {
auto group = task.group();
if constexpr (T::isEvent_) {
if (hasAcquire()) {
WaitingTaskWithArenaHolder runTaskHolder(
WaitingTaskHolder runTaskHolder(
*group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
}
Expand Down
3 changes: 1 addition & 2 deletions FWCore/Framework/interface/maker/WorkerT.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ namespace edm {
class ModuleProcessName;
class ProductResolverIndexAndSkipBit;
class ThinnedAssociationsHelper;
class WaitingTaskWithArenaHolder;

template <typename T>
class WorkerT : public Worker {
Expand Down Expand Up @@ -90,7 +89,7 @@ namespace edm {
void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const final;
bool implNeedToRunSelection() const noexcept final;

void implDoAcquire(EventTransitionInfo const&, ModuleCallingContext const*, WaitingTaskWithArenaHolder&) final;
void implDoAcquire(EventTransitionInfo const&, ModuleCallingContext const*, WaitingTaskHolder&&) final;

size_t transformIndex(edm::BranchDescription const&) const noexcept final;
void implDoTransformAsync(WaitingTaskHolder,
Expand Down
6 changes: 2 additions & 4 deletions FWCore/Framework/interface/stream/EDFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@

namespace edm {

class WaitingTaskWithArenaHolder;

namespace stream {

template <typename... T>
Expand Down Expand Up @@ -66,8 +64,8 @@ namespace edm {
bool hasAbilityToProduceInEndLumis() const final { return HasAbilityToProduceInEndLumis<T...>::value; }

private:
void doAcquire_(Event const& ev, EventSetup const& es, WaitingTaskWithArenaHolder& holder) final {
doAcquireIfNeeded(this, ev, es, holder);
void doAcquire_(Event const& ev, EventSetup const& es, WaitingTaskHolder&& holder) final {
doAcquireIfNeeded(this, ev, es, std::move(holder));
}
};

Expand Down
6 changes: 1 addition & 5 deletions FWCore/Framework/interface/stream/EDFilterAdaptorBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ namespace edm {

class ModuleCallingContext;
class ActivityRegistry;
class WaitingTaskWithArenaHolder;

namespace maker {
template <typename T>
Expand Down Expand Up @@ -70,10 +69,7 @@ namespace edm {
private:
bool doEvent(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*);

void doAcquire(EventTransitionInfo const&,
ActivityRegistry*,
ModuleCallingContext const*,
WaitingTaskWithArenaHolder&);
void doAcquire(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*, WaitingTaskHolder&&);

//For now this is a placeholder
/*virtual*/ void preActionBeforeRunEventAsync(WaitingTaskHolder,
Expand Down
Loading

0 comments on commit e77e887

Please sign in to comment.