Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <shamser.ahmed@lexisnexis.com>
  • Loading branch information
shamser committed Oct 11, 2023
1 parent 1347444 commit c838bcd
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 215 deletions.
146 changes: 14 additions & 132 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,20 +183,20 @@ void doDescheduleWorkkunit(char const * wuid)
* Graph progress support
*/

CWuGraphStats::CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * statsCache)
CWuGraphStats::CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * globalStatsCollection)
: creatorType(_creatorType), creator(_creator), id(_id), merge(_merge)
{
StatsScopeId graphScopeId;
verifyex(graphScopeId.setScopeText(_rootScope));
StatsScopeId rootScopeId(SSTworkflow,wfid);
StatsScopeId wfScopeId(SSTworkflow,wfid);

if (statsCache)
if (globalStatsCollection)
{
StatsScopeId sgScopeId(SSTsubgraph, id);
collector.setown(createStatisticsGatherer(statsCache->getCollection(rootScopeId, graphScopeId, sgScopeId, _creatorType, _creator)));
collector.setown(createStatisticsGatherer(globalStatsCollection->getCollection(wfScopeId, graphScopeId, sgScopeId, _creatorType, _creator, true)));
}
else
collector.setown(createStatisticsGatherer(_creatorType, _creator, rootScopeId));
collector.setown(createStatisticsGatherer(_creatorType, _creator, wfScopeId));
collector->beginScope(graphScopeId);
}

Expand Down Expand Up @@ -2665,133 +2665,13 @@ cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope, bool exclu
}
}

class SubGraphUpdaterCollection : public CInterfaceOf<IStatisticCollection>
{
Owned<IStatisticCollection> rootCollection;
StatsScopeId sgScopeId;
bool statsCleared = false;
public:
SubGraphUpdaterCollection(IStatisticCollection * _sgCollection, const StatsScopeId & wfScopeId, const StatsScopeId & graphScopeId, const StatsScopeId & _sgScopeId, StatisticCreatorType creatorType, const char * creator, bool clearStats) : sgScopeId(_sgScopeId)
{
rootCollection.setown(createRootStatisticCollection(creatorType, creator, wfScopeId, graphScopeId, _sgCollection));
statsCleared = !clearStats;
}

virtual StatisticScopeType queryScopeType() const override
{
return rootCollection->queryScopeType();
}
virtual StringBuffer & getFullScope(StringBuffer & str) const override
{
return rootCollection->getFullScope(str);
}
virtual StringBuffer & getScope(StringBuffer & str) const override
{
return rootCollection->getScope(str);
}
virtual unsigned __int64 queryStatistic(StatisticKind kind) const override
{
return rootCollection->queryStatistic(kind);
}
virtual unsigned getNumStatistics() const override
{
return rootCollection->getNumStatistics();
}
virtual bool getStatistic(StatisticKind kind, unsigned __int64 & value) const override
{
return rootCollection->getStatistic(kind, value);
}
virtual void getStatistic(StatisticKind & kind, unsigned __int64 & value, unsigned idx) const
{
rootCollection->getStatistic(kind, value, idx);
}
virtual IStatisticCollectionIterator & getScopes(const char * filter, bool sorted) override
{
return rootCollection->getScopes(filter, sorted);
}
virtual void getMinMaxScope(IStringVal & minValue, IStringVal & maxValue, StatisticScopeType searchScopeType) const override
{
rootCollection->getMinMaxScope(minValue, maxValue, searchScopeType);
}
virtual void getMinMaxActivity(unsigned & minValue, unsigned & maxValue) const override
{
rootCollection->getMinMaxActivity(minValue, maxValue);
}
virtual void serialize(MemoryBuffer & out) const
{
rootCollection->serialize(out, &sgScopeId);
}
virtual void serialize(MemoryBuffer & out, const StatsScopeId * matchScope) const override
{
rootCollection->serialize(out, matchScope);
}
virtual unsigned __int64 queryWhenCreated() const override
{
return rootCollection->queryWhenCreated();
}
virtual void mergeInto(IStatisticGatherer & target) const override
{
rootCollection->mergeInto(target);
}
virtual StringBuffer &toXML(StringBuffer &out) const override
{
return rootCollection->toXML(out);
}
virtual void visit(IStatisticVisitor & target) const override
{
return rootCollection->visit(target);
}
virtual void visitChildren(IStatisticVisitor & target) const override
{
return rootCollection->visitChildren(target);
}
virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) override
{
IStatisticCollection * stats = rootCollection->ensureSubScope(search, hasChildren);
if (!statsCleared)
{
clearStats();
statsCleared = true;
}
return stats;
}
virtual void addStatistic(StatisticKind kind, unsigned __int64 value) override
{
return rootCollection->addStatistic(kind,value);
}
virtual bool updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) override
{
return rootCollection->updateStatistic(kind, value, mergeAction);
}
virtual bool refreshAggregates(std::vector<StatisticKind> & aggregateKinds, std::vector<unsigned __int64> & totals) override
{
return rootCollection->refreshAggregates(aggregateKinds, totals);
}
virtual void deserialize(MemoryBuffer & in, unsigned version, StatisticScopeType minScope=SSTnone) override
{
return rootCollection->deserialize(in, version, minScope);
}
virtual void deserializeChild(StatsScopeId scopeId, MemoryBuffer & in, unsigned version, StatisticScopeType minScope=SSTnone) override
{
rootCollection->deserializeChild(scopeId, in, version, minScope);
}
virtual void clearStats() override
{
rootCollection->clearStats();
}
virtual void addChild(IStatisticCollection *stats) override
{
rootCollection->addChild(stats);
}
};

GlobalStatisticCollection::GlobalStatisticCollection()
{
StatsScopeId globalScopeId(SSTglobal, (unsigned)0);
statsCollection.setown(createStatisticCollection(nullptr, globalScopeId));
}

void GlobalStatisticCollection::load(const char *wuid, StatisticScopeType minScope)
void GlobalStatisticCollection::load(const char *wuid, StatisticScopeType maxDepth)
{
Owned<IPropertyTree> root = getWUGraphProgress(wuid, true);
if (!root)
Expand Down Expand Up @@ -2823,7 +2703,7 @@ void GlobalStatisticCollection::load(const char *wuid, StatisticScopeType minSco

StatsScopeId childId;
childId.deserialize(serialized, version);
statsCollection->deserializeChild(childId, serialized, version, minScope);
statsCollection->deserializeChild(childId, serialized, version, 3);
}
}
}
Expand All @@ -2839,12 +2719,14 @@ void GlobalStatisticCollection::visit(IStatisticVisitor & target) const
statsCollection->visit(target);
}

IStatisticCollection * GlobalStatisticCollection::getCollection(const StatsScopeId & wfScope, const StatsScopeId & graphScope, const StatsScopeId & sgScope, StatisticCreatorType creatorType, const char * creator)
IStatisticCollection * GlobalStatisticCollection::getCollection(const StatsScopeId & wfScopeId, const StatsScopeId & graphScopeId, const StatsScopeId & sgScopeId, StatisticCreatorType creatorType, const char * creator, bool clearStats)
{
IStatisticCollection * wfScopeCollection = statsCollection->ensureSubScope(wfScope, true);
IStatisticCollection * graphScopeCollection = wfScopeCollection->ensureSubScope(graphScope, true);
IStatisticCollection * sgScopeCollection = graphScopeCollection->ensureSubScope(sgScope, true);
return new SubGraphUpdaterCollection(sgScopeCollection, wfScope, graphScope, sgScope, creatorType, creator, true);
IStatisticCollection * wfScopeCollection = statsCollection->ensureSubScope(wfScopeId, true);
IStatisticCollection * graphScopeCollection = wfScopeCollection->ensureSubScope(graphScopeId, true);
IStatisticCollection * sgScopeCollection = graphScopeCollection->ensureSubScope(sgScopeId, true);
if (clearStats)
sgScopeCollection->clearStats();
return createRootStatisticCollection(creatorType, creator, wfScopeId, graphScopeId, sgScopeCollection);
}

class StatisticsAggregatesWriter : implements IStatisticVisitor
Expand Down
6 changes: 3 additions & 3 deletions common/workunit/workunit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1182,9 +1182,9 @@ class WORKUNIT_API GlobalStatisticCollection : public CInterface
bool refreshAggregates(std::vector<StatisticKind> & aggregateKinds);
void visit(IStatisticVisitor & target) const;
// getCollection() returns IStatisticCollection for given rootScope
// if clear==true, clears the stats at this and below scope
IStatisticCollection * getCollection(const StatsScopeId & wfScope, const StatsScopeId & graphScope, const StatsScopeId & sgScope, StatisticCreatorType creatorType, const char * creator);
void load(const char *wuid, StatisticScopeType minScope=SSTnone);
// if clearStats==true then the existing stats are cleared for the given scope
IStatisticCollection * getCollection(const StatsScopeId & wfScopeId, const StatsScopeId & graphScopeId, const StatsScopeId & sgScopeId, StatisticCreatorType creatorType, const char * creator, bool clearStats);
void load(const char *wuid, StatisticScopeType maxDepth=SSTnone);
private:
Owned<IStatisticCollection> statsCollection;
};
Expand Down
98 changes: 24 additions & 74 deletions system/jlib/jstats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1940,96 +1940,49 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
children.add(*ret);
return ret;
}
virtual void serialize(MemoryBuffer & out) const
{
serialize(out, nullptr);
}
virtual void serialize(MemoryBuffer & out, const StatsScopeId * matchScopeId) const
virtual void serialize(MemoryBuffer & out) const override
{
out.append(getCollectionType());
id.serialize(out);

// The following is required to maintain compatibility with existing serialization:
// Subgraph stats blob has a workflow scope (no stats) followed by graph scope (not stats) then
// subgraph scope (with all stats including descendent scopes and stats)
// This compatibility is achieved by ignoring all stats until matchin the scope specified by matchScope:
// - If matchScopeId matches current id, then use nullptr for child matches so it doesn't that
// it doesn't constrain serialization of children (i.e. all children will be serialized)
// - If matchScopeIddoesn't match current id, only serialize scope id but not the stats. Use
// matchScopeId recursively for all child scopes.
const StatsScopeId * matchChildScope = (matchScopeId && id.matches(*matchScopeId)) ? nullptr : matchScopeId;
if (matchChildScope)
{
out.append((aindex_t) 0);
}
else
{
out.append(stats.ordinality());
ForEachItemIn(iStat, stats)
stats.item(iStat).serialize(out);
}
out.append(stats.ordinality());
ForEachItemIn(iStat, stats)
stats.item(iStat).serialize(out);

out.append(children.ordinality());
SuperHashIteratorOf<CStatisticCollection> iter(children, false);
for (iter.first(); iter.isValid(); iter.next())
iter.query().serialize(out, matchChildScope);
iter.query().serialize(out);
}

void deserializeIgnore(MemoryBuffer & in, unsigned version)
{
unsigned numStats;
in.read(numStats);
while (numStats-- > 0)
Statistic next (in, version);
unsigned numChildren;
in.read(numChildren);
while (numChildren-- > 0)
deserializeIgnoreChild(in, version);
}
void deserializeIgnoreChild(MemoryBuffer & in, unsigned version)
{
byte kind;
in.read(kind);
StatsScopeId childId;
childId.deserialize(in, version);
deserializeIgnore(in, version);
}

virtual void deserialize(MemoryBuffer & in, unsigned version, StatisticScopeType minScope=SSTnone) override
virtual void deserialize(MemoryBuffer & in, unsigned version, unsigned statsMinDepth=0) override
{
unsigned numStats;
in.read(numStats);
stats.ensureCapacity(numStats);
while (numStats-- > 0)
{
Statistic next (in, version);
stats.append(next);
if (statsMinDepth==0) stats.append(next);
}
unsigned numChildren;
in.read(numChildren);
if (minScope!=SSTnone && queryScopeType()==minScope)
{
while (numChildren-- > 0)
deserializeIgnoreChild(in, version);
}
else
children.ensure(numChildren);
statsMinDepth = (statsMinDepth > 0) ? (statsMinDepth - 1) : 0;
while (numChildren-- > 0)
{
children.ensure(numChildren);
while (numChildren-- > 0)
{
byte kind;
in.read(kind);
StatsScopeId childId;
childId.deserialize(in, version);
deserializeChild(childId, in, version);
}
byte kind;
in.read(kind);
StatsScopeId childId;
childId.deserialize(in, version);
deserializeChild(childId, in, version, statsMinDepth);
}
}

virtual void deserializeChild(StatsScopeId childId, MemoryBuffer & in, unsigned version, StatisticScopeType minScope=SSTnone) override
virtual void deserializeChild(StatsScopeId childId, MemoryBuffer & in, unsigned version, unsigned statsMinDepth=0) override
{
IStatisticCollection * childCollection = ensureSubScope(childId, true);
childCollection->deserialize(in, version, minScope);
childCollection->deserialize(in, version, statsMinDepth);
}

inline const StatsScopeId & queryScopeId() const { return id; }
Expand Down Expand Up @@ -2110,6 +2063,7 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
virtual void clearStats()
{
stats.clear(true);
// Note: Children should NOT be deleted as all pointers return by ensureSubScope must still be valid
SuperHashIteratorOf<CStatisticCollection> iter(children, false);
for (iter.first(); iter.isValid(); iter.next())
iter.query().clearStats();
Expand Down Expand Up @@ -2195,12 +2149,12 @@ bool CollectionHashTable::matchesElement(const void *et, const void *searchET) c
class CRootStatisticCollection : public CStatisticCollection
{
public:
CRootStatisticCollection(StatisticCreatorType _creatorType, const char * _creator, const StatsScopeId & rootScopeId, const StatsScopeId & graphScopeId, IStatisticCollection * childCollection)
CRootStatisticCollection(StatisticCreatorType _creatorType, const char * _creator, const StatsScopeId & rootScopeId, const StatsScopeId & graphScopeId, IStatisticCollection * sgCollection)
: CStatisticCollection(nullptr, rootScopeId), creatorType(_creatorType), creator(_creator)
{
whenCreated = getTimeStampNowValue();
IStatisticCollection * child = ensureSubScope(graphScopeId, true);
child->addChild(childCollection);
child->addChild(sgCollection);
}
CRootStatisticCollection(StatisticCreatorType _creatorType, const char * _creator, const StatsScopeId & rootScopeId) : CStatisticCollection(nullptr, rootScopeId), creatorType(_creatorType), creator(_creator)
{
Expand All @@ -2221,13 +2175,9 @@ class CRootStatisticCollection : public CStatisticCollection
{
return whenCreated;
}
virtual void serialize(MemoryBuffer & out) const
{
serialize(out, nullptr);
}
virtual void serialize(MemoryBuffer & out, const StatsScopeId * matchScopeId) const override
virtual void serialize(MemoryBuffer & out) const override
{
CStatisticCollection::serialize(out, matchScopeId);
CStatisticCollection::serialize(out);
out.append((byte)creatorType);
out.append(creator);
out.append(whenCreated);
Expand Down Expand Up @@ -3972,9 +3922,9 @@ void StatisticsFilter::setScopeDepth(unsigned _scopeDepth)
scopeFilter.setDepth(_scopeDepth);
}

void StatisticsFilter::setScopeDepth(unsigned _minScopeDepth, unsigned _maxScopeDepth)
void StatisticsFilter::setScopeDepth(unsigned _maxDepthDepth, unsigned _maxScopeDepth)
{
scopeFilter.setDepth(_minScopeDepth, _maxScopeDepth);
scopeFilter.setDepth(_maxDepthDepth, _maxScopeDepth);
}

void StatisticsFilter::setScope(const char * _scope)
Expand Down
9 changes: 4 additions & 5 deletions system/jlib/jstats.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ interface IStatisticCollection : public IInterface
virtual void getMinMaxScope(IStringVal & minValue, IStringVal & maxValue, StatisticScopeType searchScopeType) const = 0;
virtual void getMinMaxActivity(unsigned & minValue, unsigned & maxValue) const = 0;
virtual void serialize(MemoryBuffer & out) const = 0;
virtual void serialize(MemoryBuffer & out, const StatsScopeId * matchScopeId) const = 0;
virtual unsigned __int64 queryWhenCreated() const = 0;
virtual void mergeInto(IStatisticGatherer & target) const = 0;
virtual StringBuffer &toXML(StringBuffer &out) const = 0;
Expand All @@ -140,8 +139,8 @@ interface IStatisticCollection : public IInterface
virtual void addStatistic(StatisticKind kind, unsigned __int64 value) = 0;
virtual bool updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) = 0;
virtual bool refreshAggregates(std::vector<StatisticKind> & aggregateKinds, std::vector<unsigned __int64> & totals) = 0;
virtual void deserialize(MemoryBuffer & in, unsigned version, StatisticScopeType minScope=SSTnone) = 0;
virtual void deserializeChild(StatsScopeId scopeId, MemoryBuffer & in, unsigned version, StatisticScopeType minScope=SSTnone) = 0;
virtual void deserialize(MemoryBuffer & in, unsigned version, unsigned statsMinDepth) = 0;
virtual void deserializeChild(StatsScopeId scopeId, MemoryBuffer & in, unsigned version, unsigned statsMinDepth) = 0;
virtual void clearStats() = 0;
virtual void addChild(IStatisticCollection *stats) = 0;
};
Expand Down Expand Up @@ -406,8 +405,8 @@ class jlib_decl StatisticsFilter : public CInterface
void setCreator(const char * _creator);
void setCreatorType(StatisticCreatorType _creatorType);
void setFilter(const char * filter);
void setScopeDepth(unsigned _minScopeDepth);
void setScopeDepth(unsigned _minScopeDepth, unsigned _maxScopeDepth);
void setScopeDepth(unsigned _maxDepthDepth);
void setScopeDepth(unsigned _maxDepthDepth, unsigned _maxScopeDepth);
void setScope(const char * _scope);
void setScopeType(StatisticScopeType _scopeType);
void setValueRange(unsigned __int64 minValue, unsigned __int64 _maxValue);
Expand Down
Loading

0 comments on commit c838bcd

Please sign in to comment.