Skip to content

Commit

Permalink
HPCC-29657 Change following review
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <shamser.ahmed@lexisnexis.com>
  • Loading branch information
shamser committed Nov 9, 2023
1 parent 48fb0d6 commit 9db0c5f
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 231 deletions.
104 changes: 8 additions & 96 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2669,7 +2669,7 @@ GlobalStatisticCollection::GlobalStatisticCollection() : aggregateKindsMapping(a
statsCollection.setown(createStatisticCollection(nullptr, globalScopeId));
}

void GlobalStatisticCollection::load(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly)
void GlobalStatisticCollection::loadExistingAggregates(IConstWorkUnit &workunit)
{
const char * _wuid = workunit.queryWuid();
if (!streq(_wuid, wuid.str())) // New statsCollection if collection for different workunit
Expand All @@ -2679,57 +2679,6 @@ void GlobalStatisticCollection::load(IConstWorkUnit &workunit, const char * grap
wuid.set(_wuid);
}

loadGlobalAggregates(workunit);
if (isEmptyString(graphName))
{
Owned<IPropertyTree> root = getWUGraphProgress(wuid, true);
if (root)
{
Owned<IPropertyTree> graphPT = root->getPropTree(graphName);
if (!graphPT)
return;

StatsScopeId wfScopeId(SSTworkflow, graphPT->getPropInt("@wfid", 0));
StatsScopeId graphScopeId(SSTgraph, graphName);

Owned<IPropertyTreeIterator> iter = graphPT->getElements("*");
ForEach(*iter)
{
StatsScopeId sgScopeId;
IPropertyTree * sgPT = & iter->query();
const char * sgName = sgPT->queryName();
if (strcmp(sgName, "node")==0)
continue;
verifyex(sgScopeId.setScopeText(sgName));
MemoryBuffer compressed;
sgPT->getPropBin("Stats", compressed);
if (!compressed.length())
break;
MemoryBuffer serialized;
decompressToBuffer(serialized, compressed);

unsigned version;
serialized.read(version);
byte kind;
serialized.read(kind);

StatsScopeId childId;
childId.deserialize(serialized, version);
int statsMinDepth = 0, statsMaxDepth = INT_MAX;
if (aggregatesOnly)
{
// Only store stats for subgraph level
statsMinDepth = 3; // this is subgraph level
statsMaxDepth = 3;
}
statsCollection->deserializeChild(childId, serialized, version, statsMinDepth, statsMaxDepth);
}
}
}
}

void GlobalStatisticCollection::loadGlobalAggregates(IConstWorkUnit &workunit)
{
class StatsCollectionAggregatesLoader : public IWuScopeVisitor
{
public:
Expand Down Expand Up @@ -2781,57 +2730,20 @@ IStatisticCollection * GlobalStatisticCollection::getCollectionForUpdate(Statist
return createRootStatisticCollection(creatorType, creator, wfScopeId, graphScopeId, sgScopeCollection);
}

// Recalculate aggregates for global, workflow and graph scopes
bool GlobalStatisticCollection::refreshAggregates()
{
return statsCollection->refreshAggregates(aggregateKindsMapping);
}

// Recalculate aggregates and then write the aggregates to global stats (dali)
void GlobalStatisticCollection::updateAggregates(IWorkUnit *wu)
{
class StatisticsAggregatesWriter : implements IStatisticVisitor
struct AggregateUpdatedCallBackFunc : implements IWhenAggregateUpdatedCallBack
{
const StatisticsMapping & aggregateKindsMapping;
const unsigned numStats;
Linked<IWorkUnit> wu;
public:
StatisticsAggregatesWriter(IWorkUnit * _wu, const StatisticsMapping & _aggregateKindsMapping): wu(_wu), aggregateKindsMapping(_aggregateKindsMapping), numStats(aggregateKindsMapping.numStatistics()) {}

virtual bool visitScope(const IStatisticCollection & cur)
AggregateUpdatedCallBackFunc(IWorkUnit *_wu) : wu(_wu) {}
void operator () (const char * scope, StatisticScopeType sst, StatisticKind kind, stat_type value)
{
switch (cur.queryScopeType())
{
case SSTglobal:
case SSTworkflow:
case SSTgraph:
for (unsigned i=0; i<numStats; ++i)
{
StatisticKind kind = aggregateKindsMapping.getKind(i);
stat_type value;
if (cur.getStatistic(kind, value))
{
if (value || includeStatisticIfZero(kind))
{
StringBuffer s;
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), cur.queryScopeType(), cur.getFullScope(s).str(), kind, nullptr, value, 1, 0, StatsMergeReplace);
}
}
}
if (cur.queryScopeType()==SSTgraph)
return false;
else
return true;
default:
return false;
}
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), sst, scope, kind, nullptr, value, 1, 0, StatsMergeReplace);
}
};
if (refreshAggregates()) // Only serialize if the aggregates have changed
{
StatisticsAggregatesWriter statsAggregatorWriter(wu, aggregateKindsMapping);
statsCollection->visit(statsAggregatorWriter);
}
} aggregateUpdatedCallBackFunc(wu);

statsCollection->refreshAggregates(aggregateKindsMapping, aggregateUpdatedCallBackFunc);
}

// Prune all subgraph descendent stats (leaving subgraph stats for future aggregation)
Expand Down
4 changes: 1 addition & 3 deletions common/workunit/workunit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1788,10 +1788,8 @@ class WORKUNIT_API GlobalStatisticCollection : public CInterface
public:
GlobalStatisticCollection();

void load(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly);
void loadGlobalAggregates(IConstWorkUnit &workunit);
void loadExistingAggregates(IConstWorkUnit &workunit);
IStatisticCollection * getCollectionForUpdate(StatisticCreatorType creatorType, const char * creator, unsigned wfid, const char *graphName, unsigned sgId, bool clearStats);
bool refreshAggregates();
IStatisticCollection * queryCollection() { return statsCollection; }
void updateAggregates(IWorkUnit *wu);
void pruneSubGraphDescendants(unsigned wfid, const char *graphName, unsigned sgId);
Expand Down
2 changes: 1 addition & 1 deletion ecl/eclagent/agentctx.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ struct IAgentContext : extends IGlobalCodeContext
virtual bool forceNewDiskReadActivity() const = 0;
virtual void addWuExceptionEx(const char * text, unsigned code, unsigned severity, unsigned audience, char const * source) = 0;
virtual double queryAgentMachineCost() const = 0;
virtual IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, const char *graphName, unsigned subgraph) = 0;
virtual IWUGraphStats *updateStats(unsigned activeWfid, const char *graphName, unsigned subgraph) = 0;
virtual void updateAggregates(IWorkUnit* lockedwu) = 0;
};

Expand Down
10 changes: 5 additions & 5 deletions ecl/eclagent/eclagent.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,9 @@ public:
{
return ctx->queryAgentMachineCost();
};
virtual IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, const char *graphName, unsigned subgraph) override
virtual IWUGraphStats *updateStats(unsigned activeWfid, const char *graphName, unsigned subgraph) override
{
return ctx->updateStats(creatorType, creator, activeWfid, graphName, subgraph);
return ctx->updateStats(activeWfid, graphName, subgraph);
};
virtual void updateAggregates(IWorkUnit* lockedwu) override
{
Expand Down Expand Up @@ -714,10 +714,10 @@ public:
{
return agentMachineCost;
}
virtual IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, const char *graphName, unsigned subgraph) override
virtual IWUGraphStats *updateStats(unsigned activeWfid, const char *graphName, unsigned subgraph) override
{
Owned<IStatisticCollection> sgCollection = globalStats.getCollectionForUpdate(creatorType, creator, activeWfid, graphName, subgraph, true); // true=>clear existing stats
return wuRead->updateStats(graphName, creatorType, creator, activeWfid, subgraph, false, sgCollection);
Owned<IStatisticCollection> sgCollection = globalStats.getCollectionForUpdate(queryStatisticsComponentType(), queryStatisticsComponentName(), activeWfid, graphName, subgraph, true); // true=>clear existing stats
return wuRead->updateStats(graphName, queryStatisticsComponentType(), queryStatisticsComponentName(), activeWfid, subgraph, false, sgCollection);
}
virtual void updateAggregates(IWorkUnit* lockedwu) override
{
Expand Down
8 changes: 4 additions & 4 deletions ecl/eclagent/eclgraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,7 @@ void EclSubGraph::updateProgress(IStatisticGatherer &progress)
subgraphs.item(i2).updateProgress(progress);

Owned<IStatisticCollection> statsCollection = progress.getResult();
const cost_type costDiskAccess = aggregateStatistic(StCostFileAccess, statsCollection);
const cost_type costDiskAccess = statsCollection->aggregateStatistic(StCostFileAccess);
if (costDiskAccess)
progress.addStatistic(StCostFileAccess, costDiskAccess);
}
Expand Down Expand Up @@ -1347,7 +1347,7 @@ void EclGraph::updateLibraryProgress()
EclSubGraph & cur = graphs.item(idx);
unsigned wfid = cur.parent.queryWfid();

Owned<IWUGraphStats> progress = agent->updateStats(queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, queryGraphName(), cur.id);
Owned<IWUGraphStats> progress = agent->updateStats(wfid, queryGraphName(), cur.id);
cur.updateProgress(progress->queryStatsBuilder());
}
}
Expand Down Expand Up @@ -1490,7 +1490,7 @@ void GraphResults::setResult(unsigned id, IHThorGraphResult * result)

IWUGraphStats *EclGraph::updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, unsigned subgraph)
{
return agent->updateStats(creatorType, creator, activeWfid, queryGraphName(), subgraph);
return agent->updateStats(activeWfid, queryGraphName(), subgraph);
}

void EclGraph::updateAggregates(IWorkUnit* lockedwu)
Expand Down Expand Up @@ -1547,7 +1547,7 @@ EclGraph * EclAgent::loadGraph(const char * graphName, IConstWorkUnit * wu, ILoa

Owned<EclGraph> eclGraph = new EclGraph(*this, graphName, wu, isLibrary, debugContext, probeManager, wuGraph->getWfid());
eclGraph->createFromXGMML(dll, xgmml);
globalStats.load(*wu, nullptr, true);
globalStats.loadExistingAggregates(*wu);
return eclGraph.getClear();
}

Expand Down
Loading

0 comments on commit 9db0c5f

Please sign in to comment.