Skip to content

Commit

Permalink
Merge pull request #47029 from makortel/waitingTaskWithArenaHolder
Browse files Browse the repository at this point in the history
Use `WaitingTaskHolder` to signal `doneWaiting()` instead of `WaitingTaskWithArenaHolder` in framework
  • Loading branch information
cmsbuild authored Jan 10, 2025
2 parents f4e1712 + e77e887 commit 79e4b35
Show file tree
Hide file tree
Showing 29 changed files with 106 additions and 141 deletions.
5 changes: 2 additions & 3 deletions FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ namespace edm {
// eventually intend for the task to be spawned.
explicit WaitingTaskWithArenaHolder(oneapi::tbb::task_group&, WaitingTask* iTask);

// Takes ownership of the underlying task and uses the current
// arena.
explicit WaitingTaskWithArenaHolder(WaitingTaskHolder&& iTask);
// Captures the current arena.
explicit WaitingTaskWithArenaHolder(WaitingTaskHolder iTask);

~WaitingTaskWithArenaHolder();

Expand Down
2 changes: 1 addition & 1 deletion FWCore/Concurrency/src/WaitingTaskWithArenaHolder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace edm {
m_task->increment_ref_count();
}

WaitingTaskWithArenaHolder::WaitingTaskWithArenaHolder(WaitingTaskHolder&& iTask)
WaitingTaskWithArenaHolder::WaitingTaskWithArenaHolder(WaitingTaskHolder iTask)
: m_task(iTask.release_no_decrement()),
m_group(iTask.group()),
m_arena(std::make_shared<oneapi::tbb::task_arena>(oneapi::tbb::task_arena::attach())) {}
Expand Down
48 changes: 23 additions & 25 deletions FWCore/Framework/interface/CallbackExternalWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,9 @@ namespace edm {
WaitingTaskHolder produceTask =
Base::makeProduceTask(group, token, record, es, emitPostPrefetchingSignal, std::move(produceFunctor));

WaitingTaskWithArenaHolder waitingTaskWithArenaHolder =
makeExceptionHandlerTask(std::move(produceTask), group);
WaitingTaskHolder waitingTaskHolder = makeExceptionHandlerTask(std::move(produceTask), group);

return makeAcquireTask(std::move(waitingTaskWithArenaHolder), group, token, record, es);
return makeAcquireTask(std::move(waitingTaskHolder), group, token, record, es);
},
std::move(iTask),
iRecord,
Expand All @@ -134,15 +133,15 @@ namespace edm {
const TDecorator& iDec = TDecorator())
: Base(iProd, std::move(iProduceFunc), iID, iDec), acquireFunction_(std::move(iAcquireFunc)) {}

WaitingTaskHolder makeAcquireTask(WaitingTaskWithArenaHolder waitingTaskWithArenaHolder,
WaitingTaskHolder makeAcquireTask(WaitingTaskHolder waitingTaskHolder,
oneapi::tbb::task_group* group,
ServiceWeakToken const& serviceToken,
EventSetupRecordImpl const* record,
EventSetupImpl const* eventSetupImpl) {
return WaitingTaskHolder(
*group,
make_waiting_task(
[this, holder = std::move(waitingTaskWithArenaHolder), group, serviceToken, record, eventSetupImpl](
[this, holder = std::move(waitingTaskHolder), group, serviceToken, record, eventSetupImpl](
std::exception_ptr const* iException) mutable {
std::exception_ptr excptr;
if (iException) {
Expand Down Expand Up @@ -191,7 +190,7 @@ namespace edm {
ESModuleCallingContext const& context_;
};
EndGuard guard(record, context);
acquireCache_ = (*acquireFunction_)(rec, holder);
acquireCache_ = (*acquireFunction_)(rec, WaitingTaskWithArenaHolder(holder));
});
} catch (cms::Exception& iException) {
iException.addContext("Running acquire");
Expand All @@ -202,25 +201,24 @@ namespace edm {
}));
}

WaitingTaskWithArenaHolder makeExceptionHandlerTask(WaitingTaskHolder produceTask,
oneapi::tbb::task_group* group) {
return WaitingTaskWithArenaHolder(*group,
make_waiting_task([this, produceTask = std::move(produceTask)](
std::exception_ptr const* iException) mutable {
std::exception_ptr excptr;
if (iException) {
excptr = *iException;
}
if (excptr) {
try {
convertException::wrap([excptr]() { std::rethrow_exception(excptr); });
} catch (cms::Exception& exception) {
exception.addContext("Running acquire and external work");
edm::exceptionContext(exception, Base::callingContext());
produceTask.doneWaiting(std::current_exception());
}
}
}));
WaitingTaskHolder makeExceptionHandlerTask(WaitingTaskHolder produceTask, oneapi::tbb::task_group* group) {
return WaitingTaskHolder(*group,
make_waiting_task([this, produceTask = std::move(produceTask)](
std::exception_ptr const* iException) mutable {
std::exception_ptr excptr;
if (iException) {
excptr = *iException;
}
if (excptr) {
try {
convertException::wrap([excptr]() { std::rethrow_exception(excptr); });
} catch (cms::Exception& exception) {
exception.addContext("Running acquire and external work");
edm::exceptionContext(exception, Base::callingContext());
produceTask.doneWaiting(std::current_exception());
}
}
}));
}

std::shared_ptr<TAcquireFunc> acquireFunction_;
Expand Down
2 changes: 2 additions & 0 deletions FWCore/Framework/interface/StreamSchedule.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@
#include "FWCore/Utilities/interface/propagate_const.h"
#include "FWCore/Utilities/interface/thread_safety_macros.h"

#include "oneapi/tbb/task_arena.h"

#include <exception>
#include <map>
#include <memory>
Expand Down
8 changes: 2 additions & 6 deletions FWCore/Framework/interface/global/EDFilterBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ namespace edm {
class StreamID;
class ActivityRegistry;
class ThinnedAssociationsHelper;
class WaitingTaskWithArenaHolder;
class EventForTransformer;
class ServiceWeakToken;

Expand Down Expand Up @@ -75,10 +74,7 @@ namespace edm {

private:
bool doEvent(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*);
void doAcquire(EventTransitionInfo const&,
ActivityRegistry*,
ModuleCallingContext const*,
WaitingTaskWithArenaHolder&);
void doAcquire(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*, WaitingTaskHolder&&);
void doTransformAsync(WaitingTaskHolder iTask,
size_t iTransformIndex,
EventPrincipal const& iEvent,
Expand Down Expand Up @@ -169,7 +165,7 @@ namespace edm {
virtual bool hasAcquire() const noexcept { return false; }
bool hasAccumulator() const noexcept { return false; }

virtual void doAcquire_(StreamID, Event const&, edm::EventSetup const&, WaitingTaskWithArenaHolder&);
virtual void doAcquire_(StreamID, Event const&, edm::EventSetup const&, WaitingTaskHolder&&);

void setModuleDescription(ModuleDescription const& md) { moduleDescription_ = md; }
ModuleDescription moduleDescription_;
Expand Down
8 changes: 2 additions & 6 deletions FWCore/Framework/interface/global/EDProducerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ namespace edm {
class GlobalSchedule;
class ActivityRegistry;
class ThinnedAssociationsHelper;
class WaitingTaskWithArenaHolder;
class EventForTransformer;
class ServiceWeakToken;

Expand Down Expand Up @@ -78,10 +77,7 @@ namespace edm {

private:
bool doEvent(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*);
void doAcquire(EventTransitionInfo const&,
ActivityRegistry*,
ModuleCallingContext const*,
WaitingTaskWithArenaHolder&);
void doAcquire(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*, WaitingTaskHolder&&);
void doTransformAsync(WaitingTaskHolder iTask,
size_t iTransformIndex,
EventPrincipal const& iEvent,
Expand Down Expand Up @@ -173,7 +169,7 @@ namespace edm {

virtual bool hasAcquire() const noexcept { return false; }

virtual void doAcquire_(StreamID, Event const&, edm::EventSetup const&, WaitingTaskWithArenaHolder&);
virtual void doAcquire_(StreamID, Event const&, edm::EventSetup const&, WaitingTaskHolder&&);

void setModuleDescription(ModuleDescription const& md) { moduleDescription_ = md; }
ModuleDescription moduleDescription_;
Expand Down
7 changes: 2 additions & 5 deletions FWCore/Framework/interface/global/OutputModuleBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,7 @@ namespace edm {
void doEndStream(StreamID id) { doEndStream_(id); }

bool doEvent(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*);
void doAcquire(EventTransitionInfo const&,
ActivityRegistry*,
ModuleCallingContext const*,
WaitingTaskWithArenaHolder&);
void doAcquire(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*, WaitingTaskHolder&&);
//For now this is a placeholder
/*virtual*/ void preActionBeforeRunEventAsync(WaitingTaskHolder iTask,
ModuleCallingContext const& iModuleCallingContext,
Expand All @@ -86,7 +83,7 @@ namespace edm {
virtual void doEndRunSummary_(RunForOutput const&, EventSetup const&) {}
virtual void doBeginLuminosityBlockSummary_(LuminosityBlockForOutput const&, EventSetup const&) {}
virtual void doEndLuminosityBlockSummary_(LuminosityBlockForOutput const&, EventSetup const&) {}
virtual void doAcquire_(StreamID, EventForOutput const&, WaitingTaskWithArenaHolder&) {}
virtual void doAcquire_(StreamID, EventForOutput const&, WaitingTaskHolder&&) {}

virtual bool hasAcquire() const noexcept { return false; }
};
Expand Down
4 changes: 2 additions & 2 deletions FWCore/Framework/interface/global/implementors.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@
// forward declarations
namespace edm {

class WaitingTaskWithArenaHolder;
class ServiceWeakToken;
class ActivityRegistry;
class WaitingTaskWithArenaHolder;

namespace global {
namespace impl {
Expand Down Expand Up @@ -436,7 +436,7 @@ namespace edm {
private:
bool hasAcquire() const noexcept override { return true; }

void doAcquire_(StreamID, Event const&, edm::EventSetup const&, WaitingTaskWithArenaHolder&) final;
void doAcquire_(StreamID, Event const&, edm::EventSetup const&, WaitingTaskHolder&&) final;

virtual void acquire(StreamID, Event const&, edm::EventSetup const&, WaitingTaskWithArenaHolder) const = 0;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ namespace edm {
private:
bool hasAcquire() const noexcept override { return true; }

void doAcquire_(StreamID id, EventForOutput const& event, WaitingTaskWithArenaHolder& holder) final {
acquire(id, event, holder);
void doAcquire_(StreamID id, EventForOutput const& event, WaitingTaskHolder&& holder) final {
acquire(id, event, WaitingTaskWithArenaHolder(std::move(holder)));
}

virtual void acquire(StreamID, EventForOutput const&, WaitingTaskWithArenaHolder) const = 0;
Expand Down
27 changes: 13 additions & 14 deletions FWCore/Framework/interface/maker/Worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ the worker is reset().
#include "FWCore/Framework/interface/ProductResolverIndexAndSkipBit.h"
#include "FWCore/Concurrency/interface/WaitingTask.h"
#include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
#include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h"
#include "FWCore/Concurrency/interface/WaitingTaskList.h"
#include "FWCore/MessageLogger/interface/MessageLogger.h"
#include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
Expand Down Expand Up @@ -267,9 +266,7 @@ namespace edm {
virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
virtual bool implNeedToRunSelection() const noexcept = 0;

virtual void implDoAcquire(EventTransitionInfo const&,
ModuleCallingContext const*,
WaitingTaskWithArenaHolder&) = 0;
virtual void implDoAcquire(EventTransitionInfo const&, ModuleCallingContext const*, WaitingTaskHolder&&) = 0;

virtual void implDoTransformAsync(WaitingTaskHolder,
size_t iTransformIndex,
Expand Down Expand Up @@ -394,12 +391,14 @@ namespace edm {
ParentContext const&,
typename T::Context const*) noexcept;

void runAcquire(EventTransitionInfo const&, ParentContext const&, WaitingTaskWithArenaHolder&);
// runAcquire() must take a copy of WaitingTaskHolder
// see comment in runAcquireAfterAsyncPrefetch() definition
void runAcquire(EventTransitionInfo const&, ParentContext const&, WaitingTaskHolder);

void runAcquireAfterAsyncPrefetch(std::exception_ptr,
EventTransitionInfo const&,
ParentContext const&,
WaitingTaskWithArenaHolder) noexcept;
WaitingTaskHolder) noexcept;

std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr,
ParentContext const& parentContext) noexcept;
Expand Down Expand Up @@ -519,7 +518,7 @@ namespace edm {
typename T::TransitionInfoType const&,
ServiceToken const&,
ParentContext const&,
WaitingTaskWithArenaHolder) noexcept {}
WaitingTaskHolder) noexcept {}
void execute() final {}
};

Expand All @@ -530,7 +529,7 @@ namespace edm {
EventTransitionInfo const& eventTransitionInfo,
ServiceToken const& token,
ParentContext const& parentContext,
WaitingTaskWithArenaHolder holder) noexcept
WaitingTaskHolder holder) noexcept
: m_worker(worker),
m_eventTransitionInfo(eventTransitionInfo),
m_parentContext(parentContext),
Expand All @@ -545,7 +544,7 @@ namespace edm {
// to hold the exception_ptr
std::exception_ptr temp_excptr;
auto excptr = exceptionPtr();
// Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskWithArenaHolder
// Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskHolder
CMS_SA_ALLOW try {
//pre was called in prefetchAsync
m_worker->emitPostModuleEventPrefetchingSignal();
Expand All @@ -563,12 +562,12 @@ namespace edm {
info = m_eventTransitionInfo,
parentContext = m_parentContext,
serviceToken = m_serviceToken,
holder = m_holder]() {
holder = std::move(m_holder)]() {
//Need to make the services available
ServiceRegistry::Operate operateRunAcquire(serviceToken.lock());

std::exception_ptr ptr;
worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, holder);
worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, std::move(holder));
});
return;
}
Expand All @@ -581,7 +580,7 @@ namespace edm {
Worker* m_worker;
EventTransitionInfo m_eventTransitionInfo;
ParentContext const m_parentContext;
WaitingTaskWithArenaHolder m_holder;
WaitingTaskHolder m_holder;
ServiceWeakToken m_serviceToken;
};

Expand Down Expand Up @@ -1127,7 +1126,7 @@ namespace edm {
auto* group = task.group();
moduleTask = make_waiting_task(
[this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
WaitingTaskWithArenaHolder runTaskHolder(
WaitingTaskHolder runTaskHolder(
*group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
t.execute();
Expand All @@ -1154,7 +1153,7 @@ namespace edm {
auto group = task.group();
if constexpr (T::isEvent_) {
if (hasAcquire()) {
WaitingTaskWithArenaHolder runTaskHolder(
WaitingTaskHolder runTaskHolder(
*group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
}
Expand Down
3 changes: 1 addition & 2 deletions FWCore/Framework/interface/maker/WorkerT.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ namespace edm {
class ModuleProcessName;
class ProductResolverIndexAndSkipBit;
class ThinnedAssociationsHelper;
class WaitingTaskWithArenaHolder;

template <typename T>
class WorkerT : public Worker {
Expand Down Expand Up @@ -90,7 +89,7 @@ namespace edm {
void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const final;
bool implNeedToRunSelection() const noexcept final;

void implDoAcquire(EventTransitionInfo const&, ModuleCallingContext const*, WaitingTaskWithArenaHolder&) final;
void implDoAcquire(EventTransitionInfo const&, ModuleCallingContext const*, WaitingTaskHolder&&) final;

size_t transformIndex(edm::BranchDescription const&) const noexcept final;
void implDoTransformAsync(WaitingTaskHolder,
Expand Down
6 changes: 2 additions & 4 deletions FWCore/Framework/interface/stream/EDFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@

namespace edm {

class WaitingTaskWithArenaHolder;

namespace stream {

template <typename... T>
Expand Down Expand Up @@ -66,8 +64,8 @@ namespace edm {
bool hasAbilityToProduceInEndLumis() const final { return HasAbilityToProduceInEndLumis<T...>::value; }

private:
void doAcquire_(Event const& ev, EventSetup const& es, WaitingTaskWithArenaHolder& holder) final {
doAcquireIfNeeded(this, ev, es, holder);
void doAcquire_(Event const& ev, EventSetup const& es, WaitingTaskHolder&& holder) final {
doAcquireIfNeeded(this, ev, es, std::move(holder));
}
};

Expand Down
6 changes: 1 addition & 5 deletions FWCore/Framework/interface/stream/EDFilterAdaptorBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ namespace edm {

class ModuleCallingContext;
class ActivityRegistry;
class WaitingTaskWithArenaHolder;

namespace maker {
template <typename T>
Expand Down Expand Up @@ -70,10 +69,7 @@ namespace edm {
private:
bool doEvent(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*);

void doAcquire(EventTransitionInfo const&,
ActivityRegistry*,
ModuleCallingContext const*,
WaitingTaskWithArenaHolder&);
void doAcquire(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*, WaitingTaskHolder&&);

//For now this is a placeholder
/*virtual*/ void preActionBeforeRunEventAsync(WaitingTaskHolder,
Expand Down
Loading

0 comments on commit 79e4b35

Please sign in to comment.