Skip to content

Commit

Permalink
HPCC-29657 Changes following review
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <shamser.ahmed@lexisnexis.com>
  • Loading branch information
shamser committed Oct 13, 2023
1 parent be295cb commit 68305df
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 32 deletions.
27 changes: 14 additions & 13 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2667,20 +2667,19 @@ cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope, bool exclu

GlobalStatisticCollection::GlobalStatisticCollection()
{
// Construct statsCollection here as call to GlobalStatisticCollection::load() is optional
StatsScopeId globalScopeId(SSTglobal, (unsigned)0);
statsCollection.setown(createStatisticCollection(nullptr, globalScopeId));
}

void GlobalStatisticCollection::load(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly, bool missingScopesOnly)
{
const char * _wuid = workunit.queryWuid();
if (strcmp(_wuid, wuid.str())!=0) // Make sure stats collection is for this workunit
if (!streq(_wuid, wuid.str())) // New statsCollection if collection for different workunit
{
if (!wuid.isEmpty())
{
StatsScopeId globalScopeId(SSTglobal, (unsigned)0);
statsCollection.setown(createStatisticCollection(nullptr, globalScopeId));
}
// future: consider caching so that stats are not lost between jobs
StatsScopeId globalScopeId(SSTglobal, (unsigned)0);
statsCollection.setown(createStatisticCollection(nullptr, globalScopeId));
wuid.set(_wuid);
}

Expand All @@ -2695,7 +2694,7 @@ void GlobalStatisticCollection::load(IConstWorkUnit &workunit, const char * grap
StatsScopeId wfScopeId(SSTworkflow, graphPT->getPropInt("@wfid", 0));
StatsScopeId graphScopeId(SSTgraph, graphName);

Owned<IPropertyTreeIterator> iter = graphPT->getElements("./*");
Owned<IPropertyTreeIterator> iter = graphPT->getElements("*");
ForEach(*iter)
{
StatsScopeId sgScopeId;
Expand Down Expand Up @@ -2729,11 +2728,11 @@ void GlobalStatisticCollection::load(IConstWorkUnit &workunit, const char * grap
int statsMinDepth = 0, statsMaxDepth = INT_MAX;
if (aggregatesOnly)
{
// Only store stats for subgraph level
statsMinDepth = 3;
statsMaxDepth = 3;
}
// deserialize the subgraph stats, excluding any descendant stats
// Note: not all sg stats required for generating aggregates, so it would be
// future: not all sg stats required for generating aggregates, so consider if it is
// more efficient to deserialize just the stats in the aggregateKinds list.
statsCollection->deserializeChild(childId, serialized, version, statsMinDepth, statsMaxDepth);
}
Expand Down Expand Up @@ -2773,11 +2772,12 @@ void GlobalStatisticCollection::loadGlobalAggregates(IConstWorkUnit &workunit)
}

// getCollectionForUpdate() returns IStatisticCollection for the given rootScope
// if clearStats==true then the existing stats are cleared for the given scope
// if clearStats==true then the existing stats are cleared for the given scope and descendants
IStatisticCollection * GlobalStatisticCollection::getCollectionForUpdate(const StatsScopeId & wfScopeId, const StatsScopeId & graphScopeId, const StatsScopeId & sgScopeId, StatisticCreatorType creatorType, const char * creator, bool clearStats)
{
IStatisticCollection * sgScopeCollection = statsCollection->ensureSubScopePath({wfScopeId,graphScopeId, sgScopeId});
if (clearStats)
bool wasCreated;
IStatisticCollection * sgScopeCollection = statsCollection->ensureSubScopePath({wfScopeId,graphScopeId, sgScopeId}, &wasCreated);
if (clearStats && wasCreated)
sgScopeCollection->clearStats();
sgScopeCollection->markDirty();
return createRootStatisticCollection(creatorType, creator, wfScopeId, graphScopeId, sgScopeCollection);
Expand All @@ -2787,7 +2787,8 @@ IStatisticCollection * GlobalStatisticCollection::getCollectionForUpdate(const S
bool GlobalStatisticCollection::refreshAggregates()
{
std::vector<unsigned __int64> totals(aggregateKinds.size());
return statsCollection->refreshAggregates(aggregateKinds, totals);
std::vector<bool> isTotalUpdated(aggregateKinds.size());
return statsCollection->refreshAggregates(aggregateKinds, totals, isTotalUpdated);
}

// Refresh aggregates and write them to global stats
Expand Down
48 changes: 32 additions & 16 deletions system/jlib/jstats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1799,8 +1799,6 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
}
}
public:
CStatisticCollection(IStatisticCollection * _parent=nullptr) : parent(_parent) {}

CStatisticCollection(IStatisticCollection * _parent, const StatsScopeId & _id) : id(_id), parent(_parent)
{
}
Expand Down Expand Up @@ -1966,24 +1964,30 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
stats.append(s);
return true;
}

virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) override
virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren)
{
bool wasCreated;
return ensureSubScope(search, hasChildren, &wasCreated);
}
virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren, bool * wasCreated) override
{
//Once the CStatisticCollection is created it should not be replaced - so that returned pointers remain valid.
*wasCreated = false;
IStatisticCollection * match = children.find(&search);
if (match)
return match;

IStatisticCollection * ret = new CStatisticCollection(this, search);
children.add(*ret);
*wasCreated=true;
return ret;
}

virtual IStatisticCollection * ensureSubScopePath(std::initializer_list<StatsScopeId> path) override
virtual IStatisticCollection * ensureSubScopePath(std::initializer_list<StatsScopeId> path, bool * wasCreated) override
{
IStatisticCollection * curScope = this;
for (auto it = path.begin(); it != path.end(); ++it)
curScope = curScope->ensureSubScope(*it, true); // n.b. this will always return a valid pointer
curScope = curScope->ensureSubScope(*it, true, wasCreated); // n.b. this will always return a valid pointer
return curScope;
}

Expand Down Expand Up @@ -2021,7 +2025,7 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
stats.ensureCapacity(numStats);
while (numStats-- > 0)
{
Statistic next (in, version);
Statistic next(in, version);
if (minDepth <= 0)
stats.append(next);
}
Expand All @@ -2038,7 +2042,7 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
}
}

virtual void deserializeChild(StatsScopeId childId, MemoryBuffer & in, unsigned version, int minDepth, int maxDepth) override
virtual void deserializeChild(const StatsScopeId childId, MemoryBuffer & in, unsigned version, int minDepth, int maxDepth) override
{
if (maxDepth > 0)
{
Expand Down Expand Up @@ -2078,7 +2082,7 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
cur.visit(visitor);
}

virtual bool refreshAggregates(std::vector<StatisticKind> & aggregateKinds, std::vector<unsigned __int64> & totals) override
virtual bool refreshAggregates(const std::vector<StatisticKind> & aggregateKinds, std::vector<unsigned __int64> & totals, std::vector<bool> & isTotalUpdated) override
{
assertex(aggregateKinds.size()==totals.size());

Expand All @@ -2096,38 +2100,47 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
unsigned pos = iteratorVec-aggregateKinds.begin();
totals[pos] += mergeStatisticValue(totals[pos], stat.queryValue(), mergeAction);
updated = true;
isTotalUpdated[pos]=true;
}
}
}
else
{
std::vector<unsigned __int64> childTotals(aggregateKinds.size());
std::vector<bool> isChildTotalUpdated(aggregateKinds.size()); // Every entry defaults false
for (auto & child : children)
{
if (child.refreshAggregates(aggregateKinds, childTotals))
if (child.refreshAggregates(aggregateKinds, childTotals, isChildTotalUpdated))
updated = true;
}
if (updated)
{
updated=false;
std::vector<unsigned __int64>::iterator totalIter = totals.begin();
std::vector<unsigned __int64>::iterator subTotalIter = childTotals.begin();
std::vector<StatisticKind>::iterator kindIter = aggregateKinds.begin();
std::vector<StatisticKind>::const_iterator kindIter = aggregateKinds.begin();
std::vector<bool>::const_iterator isChildTotalUpdatedIter = isChildTotalUpdated.begin();
std::vector<bool>::iterator isTotalUpdatedIter = isTotalUpdated.begin();
while (totalIter != totals.end())
{
if (updateStatistic(*kindIter, *subTotalIter, StatsMergeReplace))
updated=true;
StatsMergeAction mergeAction = queryMergeMode(*kindIter);
(*totalIter) += mergeStatisticValue(*totalIter, *subTotalIter, mergeAction);
if (*isChildTotalUpdatedIter)
{
updateStatistic(*kindIter, *subTotalIter, StatsMergeReplace);
StatsMergeAction mergeAction = queryMergeMode(*kindIter);
*totalIter = mergeStatisticValue(*totalIter, *subTotalIter, mergeAction);
*isTotalUpdatedIter = true;
}
++totalIter;
++subTotalIter;
++kindIter;
++isChildTotalUpdatedIter;
++isTotalUpdatedIter;
}
}
isDirty=false;
}
return updated;
}

virtual void clearStats() override
{
stats.clear(true);
Expand All @@ -2136,15 +2149,18 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
for (iter.first(); iter.isValid(); iter.next())
iter.query().clearStats();
}

virtual void addChild(IStatisticCollection *stats) override
{
children.add(*LINK(stats));
}

virtual void markDirty() override
{
isDirty=true;
if (parent) parent->markDirty();
}

private:
StatsScopeId id;
IStatisticCollection * parent;
Expand Down
7 changes: 4 additions & 3 deletions system/jlib/jstats.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,14 @@ interface IStatisticCollection : public IInterface
virtual void visit(IStatisticVisitor & target) const = 0;
virtual void visitChildren(IStatisticVisitor & target) const = 0;
virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) = 0;
virtual IStatisticCollection * ensureSubScopePath(std::initializer_list<StatsScopeId> path) = 0;
virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren, bool * wasCreated) = 0;
virtual IStatisticCollection * ensureSubScopePath(std::initializer_list<StatsScopeId> path, bool * wasCreated) = 0;
virtual IStatisticCollection * querySubScopePath(std::initializer_list<StatsScopeId> path) = 0;
virtual void addStatistic(StatisticKind kind, unsigned __int64 value) = 0;
virtual bool updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) = 0;
virtual bool refreshAggregates(std::vector<StatisticKind> & aggregateKinds, std::vector<unsigned __int64> & totals) = 0;
virtual bool refreshAggregates(const std::vector<StatisticKind> & aggregateKinds, std::vector<unsigned __int64> & totals, std::vector<bool> & isKindUpdated) = 0;
virtual void deserialize(MemoryBuffer & in, unsigned version, int minDepth, int maxDepth) = 0;
virtual void deserializeChild(StatsScopeId scopeId, MemoryBuffer & in, unsigned version, int minDepth, int maxDepth) = 0;
virtual void deserializeChild(const StatsScopeId scopeId, MemoryBuffer & in, unsigned version, int minDepth, int maxDepth) = 0;
virtual void clearStats() = 0;
virtual void addChild(IStatisticCollection *stats) = 0;
virtual void markDirty() = 0;
Expand Down

0 comments on commit 68305df

Please sign in to comment.