Skip to content

Commit

Permalink
Merge pull request #47036 from makortel/transformerStream
Browse files Browse the repository at this point in the history
Add StreamID parameter to Transformer callback functions
  • Loading branch information
cmsbuild authored Jan 7, 2025
2 parents 7227919 + 0e28aeb commit b5cf409
Show file tree
Hide file tree
Showing 17 changed files with 123 additions and 69 deletions.
6 changes: 4 additions & 2 deletions FWCore/Framework/interface/TransformerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "FWCore/Utilities/interface/EDPutToken.h"
#include "FWCore/Utilities/interface/SoATuple.h"
#include "FWCore/Utilities/interface/StreamID.h"
#include "FWCore/Utilities/interface/TypeID.h"
#include "FWCore/Utilities/interface/ProductResolverIndex.h"

Expand Down Expand Up @@ -38,8 +39,9 @@ namespace edm {
protected:
//The function takes the WrapperBase corresponding to the data product from the EDPutToken
// and returns the WrapperBase associated to the id and instanceName
using TransformFunction = std::function<std::unique_ptr<edm::WrapperBase>(std::any)>;
using PreTransformFunction = std::function<std::any(edm::WrapperBase const&, edm::WaitingTaskWithArenaHolder)>;
using TransformFunction = std::function<std::unique_ptr<edm::WrapperBase>(edm::StreamID, std::any)>;
using PreTransformFunction =
std::function<std::any(edm::StreamID, edm::WrapperBase const&, edm::WaitingTaskWithArenaHolder)>;

void registerTransformImp(ProducerBase&, EDPutToken, const TypeID& id, std::string instanceName, TransformFunction);
void registerTransformAsyncImp(
Expand Down
20 changes: 11 additions & 9 deletions FWCore/Framework/interface/global/implementors.h
Original file line number Diff line number Diff line change
Expand Up @@ -474,17 +474,17 @@ namespace edm {

template <typename G, typename F>
void registerTransform(edm::EDPutTokenT<G> iToken, F iF, std::string productInstance = std::string()) {
using ReturnTypeT = decltype(iF(std::declval<G>()));
using ReturnTypeT = decltype(iF(std::declval<edm::StreamID>(), std::declval<G>()));
TypeID returnType(typeid(ReturnTypeT));
TransformerBase::registerTransformImp(
*this,
EDPutToken(iToken),
returnType,
std::move(productInstance),
[f = std::move(iF)](std::any const& iGotProduct) {
[f = std::move(iF)](edm::StreamID id, std::any const& iGotProduct) {
auto pGotProduct = std::any_cast<edm::WrapperBase const*>(iGotProduct);
return std::make_unique<edm::Wrapper<ReturnTypeT>>(
WrapperBase::Emplace{}, f(*static_cast<edm::Wrapper<G> const*>(pGotProduct)->product()));
WrapperBase::Emplace{}, f(id, *static_cast<edm::Wrapper<G> const*>(pGotProduct)->product()));
});
}

Expand All @@ -493,20 +493,22 @@ namespace edm {
P iPre,
F iF,
std::string productInstance = std::string()) {
using CacheTypeT = decltype(iPre(std::declval<G>(), WaitingTaskWithArenaHolder()));
using ReturnTypeT = decltype(iF(std::declval<CacheTypeT>()));
using CacheTypeT =
decltype(iPre(std::declval<edm::StreamID>(), std::declval<G>(), WaitingTaskWithArenaHolder()));
using ReturnTypeT = decltype(iF(std::declval<edm::StreamID>(), std::declval<CacheTypeT>()));
TypeID returnType(typeid(ReturnTypeT));
TransformerBase::registerTransformAsyncImp(
*this,
EDPutToken(iToken),
returnType,
std::move(productInstance),
[p = std::move(iPre)](edm::WrapperBase const& iGotProduct, WaitingTaskWithArenaHolder iHolder) {
return std::any(p(*static_cast<edm::Wrapper<G> const&>(iGotProduct).product(), std::move(iHolder)));
[p = std::move(iPre)](
edm::StreamID id, edm::WrapperBase const& iGotProduct, WaitingTaskWithArenaHolder iHolder) {
return std::any(p(id, *static_cast<edm::Wrapper<G> const&>(iGotProduct).product(), std::move(iHolder)));
},
[f = std::move(iF)](std::any const& iCache) {
[f = std::move(iF)](edm::StreamID id, std::any const& iCache) {
return std::make_unique<edm::Wrapper<ReturnTypeT>>(WrapperBase::Emplace{},
f(std::any_cast<CacheTypeT>(iCache)));
f(id, std::any_cast<CacheTypeT>(iCache)));
});
}

Expand Down
19 changes: 10 additions & 9 deletions FWCore/Framework/interface/limited/implementors.h
Original file line number Diff line number Diff line change
Expand Up @@ -462,17 +462,17 @@ namespace edm {

template <typename G, typename F>
void registerTransform(edm::EDPutTokenT<G> iToken, F iF, std::string productInstance = std::string()) {
using ReturnTypeT = decltype(iF(std::declval<G>()));
using ReturnTypeT = decltype(iF(std::declval<edm::StreamID>(), std::declval<G>()));
TypeID returnType(typeid(ReturnTypeT));
TransformerBase::registerTransformImp(
*this,
EDPutToken(iToken),
returnType,
std::move(productInstance),
[f = std::move(iF)](std::any const& iGotProduct) {
[f = std::move(iF)](edm::StreamID id, std::any const& iGotProduct) {
auto pGotProduct = std::any_cast<edm::WrapperBase const*>(iGotProduct);
return std::make_unique<edm::Wrapper<ReturnTypeT>>(
WrapperBase::Emplace{}, f(*static_cast<edm::Wrapper<G> const*>(pGotProduct)->product()));
WrapperBase::Emplace{}, f(id, *static_cast<edm::Wrapper<G> const*>(pGotProduct)->product()));
});
}

Expand All @@ -481,20 +481,21 @@ namespace edm {
P iPre,
F iF,
std::string productInstance = std::string()) {
using CacheTypeT = decltype(iPre(std::declval<G>(), WaitingTaskWithArenaHolder()));
using ReturnTypeT = decltype(iF(std::declval<CacheTypeT>()));
using CacheTypeT = decltype(iPre(std::declval<StreamID>(), std::declval<G>(), WaitingTaskWithArenaHolder()));
using ReturnTypeT = decltype(iF(std::declval<StreamID>(), std::declval<CacheTypeT>()));
TypeID returnType(typeid(ReturnTypeT));
TransformerBase::registerTransformAsyncImp(
*this,
EDPutToken(iToken),
returnType,
std::move(productInstance),
[p = std::move(iPre)](edm::WrapperBase const& iGotProduct, WaitingTaskWithArenaHolder iHolder) {
return std::any(p(*static_cast<edm::Wrapper<G> const&>(iGotProduct).product(), std::move(iHolder)));
[p = std::move(iPre)](
edm::StreamID id, edm::WrapperBase const& iGotProduct, WaitingTaskWithArenaHolder iHolder) {
return std::any(p(id, *static_cast<edm::Wrapper<G> const&>(iGotProduct).product(), std::move(iHolder)));
},
[f = std::move(iF)](std::any const& iCache) {
[f = std::move(iF)](edm::StreamID id, std::any const& iCache) {
auto cache = std::any_cast<CacheTypeT>(iCache);
return std::make_unique<edm::Wrapper<ReturnTypeT>>(WrapperBase::Emplace{}, f(cache));
return std::make_unique<edm::Wrapper<ReturnTypeT>>(WrapperBase::Emplace{}, f(id, cache));
});
}

Expand Down
35 changes: 19 additions & 16 deletions FWCore/Framework/interface/one/implementors.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,38 +354,41 @@ namespace edm {

template <typename G, typename F>
void registerTransform(edm::EDPutTokenT<G> iToken, F iF, std::string productInstance = std::string()) {
using ReturnTypeT = decltype(iF(std::declval<G>()));
using ReturnTypeT = decltype(iF(std::declval<edm::StreamID>(), std::declval<G>()));
TypeID returnType(typeid(ReturnTypeT));
TransformerBase::registerTransformImp(*this,
EDPutToken(iToken),
returnType,
std::move(productInstance),
[f = std::move(iF)](edm::WrapperBase const& iGotProduct) {
return std::make_unique<edm::Wrapper<ReturnTypeT>>(
WrapperBase::Emplace{},
f(*static_cast<edm::Wrapper<G> const&>(iGotProduct).product()));
});
TransformerBase::registerTransformImp(
*this,
EDPutToken(iToken),
returnType,
std::move(productInstance),
[f = std::move(iF)](edm::StreamID id, std::any const& iGotProduct) {
auto pGotProduct = std::any_cast<edm::WrapperBase const*>(iGotProduct);
return std::make_unique<edm::Wrapper<ReturnTypeT>>(
WrapperBase::Emplace{}, f(id, *static_cast<edm::Wrapper<G> const*>(pGotProduct)->product()));
});
}

template <typename G, typename P, typename F>
void registerTransformAsync(edm::EDPutTokenT<G> iToken,
P iPre,
F iF,
std::string productInstance = std::string()) {
using CacheTypeT = decltype(iPre(std::declval<G>(), WaitingTaskWithArenaHolder()));
using ReturnTypeT = decltype(iF(std::declval<CacheTypeT>()));
using CacheTypeT =
decltype(iPre(std::declval<edm::StreamID>(), std::declval<G>(), WaitingTaskWithArenaHolder()));
using ReturnTypeT = decltype(iF(std::declval<edm::StreamID>(), std::declval<CacheTypeT>()));
TypeID returnType(typeid(ReturnTypeT));
TransformerBase::registerTransformAsyncImp(
*this,
EDPutToken(iToken),
returnType,
std::move(productInstance),
[p = std::move(iPre)](edm::WrapperBase const& iGotProduct, WaitingTaskWithArenaHolder iHolder) {
return std::any(p(*static_cast<edm::Wrapper<G> const&>(iGotProduct).product(), std::move(iHolder)));
[p = std::move(iPre)](
edm::StreamID id, edm::WrapperBase const& iGotProduct, WaitingTaskWithArenaHolder iHolder) {
return std::any(p(id, *static_cast<edm::Wrapper<G> const&>(iGotProduct).product(), std::move(iHolder)));
},
[f = std::move(iF)](std::any const& iCache) {
[f = std::move(iF)](edm::StreamID id, std::any const& iCache) {
auto cache = std::any_cast<CacheTypeT>(iCache);
return std::make_unique<edm::Wrapper<ReturnTypeT>>(WrapperBase::Emplace{}, f(cache));
return std::make_unique<edm::Wrapper<ReturnTypeT>>(WrapperBase::Emplace{}, f(id, cache));
});
}

Expand Down
20 changes: 11 additions & 9 deletions FWCore/Framework/interface/stream/implementors.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,17 +328,17 @@ namespace edm {

template <typename G, typename F>
void registerTransform(edm::EDPutTokenT<G> iToken, F iF, std::string productInstance = std::string()) {
using ReturnTypeT = decltype(iF(std::declval<G>()));
using ReturnTypeT = decltype(iF(std::declval<edm::StreamID>(), std::declval<G>()));
TypeID returnType(typeid(ReturnTypeT));
TransformerBase::registerTransformImp(
*this,
EDPutToken(iToken),
returnType,
std::move(productInstance),
[f = std::move(iF)](std::any const& iGotProduct) {
[f = std::move(iF)](edm::StreamID id, std::any const& iGotProduct) {
auto pGotProduct = std::any_cast<edm::WrapperBase const*>(iGotProduct);
return std::make_unique<edm::Wrapper<ReturnTypeT>>(
WrapperBase::Emplace{}, f(*static_cast<edm::Wrapper<G> const*>(pGotProduct)->product()));
WrapperBase::Emplace{}, f(id, *static_cast<edm::Wrapper<G> const*>(pGotProduct)->product()));
});
}

Expand All @@ -347,20 +347,22 @@ namespace edm {
P iPre,
F iF,
std::string productInstance = std::string()) {
using CacheTypeT = decltype(iPre(std::declval<G>(), WaitingTaskWithArenaHolder()));
using ReturnTypeT = decltype(iF(std::declval<CacheTypeT>()));
using CacheTypeT =
decltype(iPre(std::declval<edm::StreamID>(), std::declval<G>(), WaitingTaskWithArenaHolder()));
using ReturnTypeT = decltype(iF(std::declval<edm::StreamID>(), std::declval<CacheTypeT>()));
TypeID returnType(typeid(ReturnTypeT));
TransformerBase::registerTransformAsyncImp(
*this,
EDPutToken(iToken),
returnType,
std::move(productInstance),
[p = std::move(iPre)](edm::WrapperBase const& iGotProduct, WaitingTaskWithArenaHolder iHolder) {
return std::any(p(*static_cast<edm::Wrapper<G> const&>(iGotProduct).product(), std::move(iHolder)));
[p = std::move(iPre)](
edm::StreamID id, edm::WrapperBase const& iGotProduct, WaitingTaskWithArenaHolder iHolder) {
return std::any(p(id, *static_cast<edm::Wrapper<G> const&>(iGotProduct).product(), std::move(iHolder)));
},
[f = std::move(iF)](std::any const& iCache) {
[f = std::move(iF)](edm::StreamID id, std::any const& iCache) {
auto cache = std::any_cast<CacheTypeT>(iCache);
return std::make_unique<edm::Wrapper<ReturnTypeT>>(WrapperBase::Emplace{}, f(cache));
return std::make_unique<edm::Wrapper<ReturnTypeT>>(WrapperBase::Emplace{}, f(id, cache));
});
}

Expand Down
17 changes: 11 additions & 6 deletions FWCore/Framework/src/TransformerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

#include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
#include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
#include "FWCore/ServiceRegistry/interface/StreamContext.h"

#include <optional>

namespace {
Expand Down Expand Up @@ -116,7 +118,8 @@ namespace edm {
std::optional<decltype(iEvent.get(transformInfo_.get<kType>(iIndex), transformInfo_.get<kResolverIndex>(iIndex)))>
handle;
//transform acquiring signal
TransformAcquiringSignalSentry sentry(iAct, *mcc.getStreamContext(), mcc);
auto const& streamContext = *mcc.getStreamContext();
TransformAcquiringSignalSentry sentry(iAct, streamContext, mcc);
CMS_SA_ALLOW try {
handle = iEvent.get(transformInfo_.get<kType>(iIndex), transformInfo_.get<kResolverIndex>(iIndex));
} catch (...) {
Expand All @@ -133,15 +136,16 @@ namespace edm {
} else {
//transform signal
auto mcc = iEvent.moduleCallingContext();
TransformSignalSentry sentry(iAct, *mcc.getStreamContext(), mcc);
auto const& streamContext = *mcc.getStreamContext();
TransformSignalSentry sentry(iAct, streamContext, mcc);
iEvent.put(iBase.putTokenIndexToProductResolverIndex()[transformInfo_.get<kToken>(iIndex).index()],
transformInfo_.get<kTransform>(iIndex)(std::move(*cache)),
transformInfo_.get<kTransform>(iIndex)(streamContext.streamID(), std::move(*cache)),
handle);
}
});
WaitingTaskWithArenaHolder wta(*iHolder.group(), nextTask);
CMS_SA_ALLOW try {
*cache = transformInfo_.get<kPreTransform>(iIndex)(*(handle->wrapper()), wta);
*cache = transformInfo_.get<kPreTransform>(iIndex)(streamContext.streamID(), *(handle->wrapper()), wta);
} catch (...) {
wta.doneWaiting(std::current_exception());
}
Expand All @@ -153,9 +157,10 @@ namespace edm {
if (handle.wrapper()) {
std::any v = handle.wrapper();
//transform signal
TransformSignalSentry sentry(iAct, *mcc.getStreamContext(), mcc);
auto const& streamContext = *mcc.getStreamContext();
TransformSignalSentry sentry(iAct, streamContext, mcc);
iEvent.put(iBase.putTokenIndexToProductResolverIndex()[transformInfo_.get<kToken>(iIndex).index()],
transformInfo_.get<kTransform>(iIndex)(std::move(v)),
transformInfo_.get<kTransform>(iIndex)(streamContext.streamID(), std::move(v)),
handle);
}
} catch (...) {
Expand Down
6 changes: 3 additions & 3 deletions FWCore/Framework/test/global_filter_t.cppunit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ class testGlobalFilter : public CppUnit::TestFixture {
public:
TransformProd(edm::ParameterSet const&) {
token_ = produces<float>();
registerTransform(token_, [](float iV) { return int(iV); });
registerTransform(token_, [](edm::StreamID, float iV) { return int(iV); });
}

bool filter(edm::StreamID, edm::Event& iEvent, edm::EventSetup const&) const {
Expand All @@ -380,8 +380,8 @@ class testGlobalFilter : public CppUnit::TestFixture {
token_ = produces<float>();
registerTransformAsync(
token_,
[](float iV, edm::WaitingTaskWithArenaHolder iHolder) { return IntHolder(iV); },
[](IntHolder iWaitValue) { return iWaitValue.value_; });
[](edm::StreamID, float iV, edm::WaitingTaskWithArenaHolder iHolder) { return IntHolder(iV); },
[](edm::StreamID, IntHolder iWaitValue) { return iWaitValue.value_; });
}

bool filter(edm::StreamID, edm::Event& iEvent, edm::EventSetup const&) const {
Expand Down
6 changes: 3 additions & 3 deletions FWCore/Framework/test/global_producer_t.cppunit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ class testGlobalProducer : public CppUnit::TestFixture {
public:
TransformProd(edm::ParameterSet const&) {
token_ = produces<float>();
registerTransform(token_, [](float iV) { return int(iV); });
registerTransform(token_, [](edm::StreamID, float iV) { return int(iV); });
}

void produce(edm::StreamID, edm::Event& iEvent, edm::EventSetup const&) const {
Expand All @@ -347,8 +347,8 @@ class testGlobalProducer : public CppUnit::TestFixture {
token_ = produces<float>();
registerTransformAsync(
token_,
[](float iV, edm::WaitingTaskWithArenaHolder iHolder) { return IntHolder(iV); },
[](IntHolder iWaitValue) { return iWaitValue.value_; });
[](edm::StreamID, float iV, edm::WaitingTaskWithArenaHolder iHolder) { return IntHolder(iV); },
[](edm::StreamID, IntHolder iWaitValue) { return iWaitValue.value_; });
}

void produce(edm::StreamID, edm::Event& iEvent, edm::EventSetup const&) const {
Expand Down
2 changes: 1 addition & 1 deletion FWCore/Framework/test/limited_filter_t.cppunit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ class testLimitedFilter : public CppUnit::TestFixture {
TransformProd(edm::ParameterSet const&)
: edm::limited::EDFilterBase(s_pset), edm::limited::EDFilter<edm::Transformer>(s_pset) {
token_ = produces<float>();
registerTransform(token_, [](float iV) { return int(iV); });
registerTransform(token_, [](edm::StreamID, float iV) { return int(iV); });
}

bool filter(edm::StreamID, edm::Event& iEvent, edm::EventSetup const&) const {
Expand Down
2 changes: 1 addition & 1 deletion FWCore/Framework/test/limited_producer_t.cppunit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ class testLimitedProducer : public CppUnit::TestFixture {
TransformProd(edm::ParameterSet const&)
: edm::limited::EDProducerBase(s_pset), edm::limited::EDProducer<edm::Transformer>(s_pset) {
token_ = produces<float>();
registerTransform(token_, [](float iV) { return int(iV); });
registerTransform(token_, [](edm::StreamID, float iV) { return int(iV); });
}

void produce(edm::StreamID, edm::Event& iEvent, edm::EventSetup const&) const {
Expand Down
Loading

0 comments on commit b5cf409

Please sign in to comment.