Skip to content

Commit

Permalink
HPCC-29657 Produce aggregate stats (e.g. spill, cost) whilst a job is…
Browse files Browse the repository at this point in the history
… running

Signed-off-by: Shamser Ahmed <shamser.ahmed@lexisnexis.com>
  • Loading branch information
shamser committed Oct 31, 2023
1 parent 9f4aa1e commit fb9a872
Show file tree
Hide file tree
Showing 14 changed files with 573 additions and 177 deletions.
252 changes: 180 additions & 72 deletions common/workunit/workunit.cpp

Large diffs are not rendered by default.

24 changes: 20 additions & 4 deletions common/workunit/workunit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,6 @@ interface IConstWUScopeIterator : extends IScmIterator
};

//---------------------------------------------------------------------------------------------------------------------

//! IWorkUnit
//! Provides high level access to WorkUnit "header" data.
interface IWorkUnit;
Expand Down Expand Up @@ -1302,7 +1301,7 @@ interface IConstWorkUnit : extends IConstWorkUnitInfo
virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const = 0;
virtual void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const = 0;
virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const = 0;
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const = 0;
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, IStatisticCollection * stats=nullptr) const = 0;
virtual void clearGraphProgress() const = 0;
virtual IStringVal & getAbortBy(IStringVal & str) const = 0;
virtual unsigned __int64 getAbortTimeStamp() const = 0;
Expand Down Expand Up @@ -1725,8 +1724,6 @@ extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, ITimeReporter *ti
extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, StatisticScopeType scopeType, StatisticKind kind, ITimeReporter *timer);
extern WORKUNIT_API void aggregateStatistic(StatsAggregation & result, IConstWorkUnit * wu, const WuScopeFilter & filter, StatisticKind search);
extern WORKUNIT_API cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope=nullptr, bool excludehThor=false);
extern WORKUNIT_API cost_type aggregateDiskAccessCost(const IConstWorkUnit * wu, const char *scope);
extern WORKUNIT_API void updateSpillSize(IWorkUnit * wu, const char * scope, StatisticScopeType scopeType);
extern WORKUNIT_API const char *getTargetClusterComponentName(const char *clustname, const char *processType, StringBuffer &name);
extern WORKUNIT_API void descheduleWorkunit(char const * wuid);
#if 0
Expand Down Expand Up @@ -1785,4 +1782,23 @@ extern WORKUNIT_API TraceFlags loadTraceFlags(IConstWorkUnit * wu, const std::in

extern WORKUNIT_API bool executeGraphOnLingeringThor(IConstWorkUnit &workunit, unsigned wfid, const char *graphName);


class WORKUNIT_API GlobalStatisticCollection : public CInterface
{
public:
GlobalStatisticCollection();

void load(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly);
void loadGlobalAggregates(IConstWorkUnit &workunit);
IStatisticCollection * getCollectionForUpdate(StatisticCreatorType creatorType, const char * creator, unsigned wfid, const char *graphName, unsigned sgId, bool clearStats);
bool refreshAggregates();
IStatisticCollection * queryCollection() { return statsCollection; }
void updateAggregates(IWorkUnit *wu);
void pruneSubGraphDescendants(unsigned wfid, const char *graphName, unsigned sgId);
private:
Owned<IStatisticCollection> statsCollection;
const StatisticsMapping & aggregateKindsMapping;
StringBuffer wuid;
};

#endif
4 changes: 2 additions & 2 deletions common/workunit/workunit.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public:
virtual void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const;
virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const;
virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const;
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const override;
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, IStatisticCollection * stats=nullptr) const override;
void clearGraphProgress() const;
virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree) {}; //No GraphProgressTree in CLocalWorkUnit.

Expand Down Expand Up @@ -661,7 +661,7 @@ public:
class WORKUNIT_API CWuGraphStats : public CInterfaceOf<IWUGraphStats>
{
public:
CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge);
CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge, IStatisticCollection * stats);
virtual void beforeDispose();
virtual IStatisticGatherer & queryStatsBuilder();
protected:
Expand Down
2 changes: 2 additions & 0 deletions ecl/eclagent/agentctx.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ struct IAgentContext : extends IGlobalCodeContext
virtual bool forceNewDiskReadActivity() const = 0;
virtual void addWuExceptionEx(const char * text, unsigned code, unsigned severity, unsigned audience, char const * source) = 0;
virtual double queryAgentMachineCost() const = 0;
virtual IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, const char *graphName, unsigned subgraph) = 0;
virtual void updateAggregates(IWorkUnit* lockedwu) = 0;
};

#endif // AGENTCTX_HPP_INCL
8 changes: 0 additions & 8 deletions ecl/eclagent/eclagent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1988,10 +1988,6 @@ void EclAgent::doProcess()
const cost_type cost = aggregateCost(w, nullptr, false);
if (cost)
w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
const cost_type diskAccessCost = aggregateDiskAccessCost(w, nullptr);
if (diskAccessCost)
w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostFileAccess, NULL, diskAccessCost, 1, 0, StatsMergeReplace);
updateSpillSize(w, nullptr, SSTglobal);
addTimings(w);

switch (w->getState())
Expand Down Expand Up @@ -2513,10 +2509,6 @@ void EclAgentWorkflowMachine::noteTiming(unsigned wfid, timestamp_type startTime
const cost_type cost = money2cost_type(calcCost(agent.queryAgentMachineCost(), nanoToMilli(elapsedNs))) + aggregateCost(wu, scope, true);
if (cost)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
const cost_type diskAccessCost = aggregateDiskAccessCost(wu, scope);
if (diskAccessCost)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostFileAccess, NULL, diskAccessCost, 1, 0, StatsMergeReplace);
updateSpillSize(wu, scope, SSTworkflow);
}

void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item)
Expand Down
20 changes: 19 additions & 1 deletion ecl/eclagent/eclagent.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,14 @@ public:
{
return ctx->queryAgentMachineCost();
};
virtual IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, const char *graphName, unsigned subgraph) override
{
return ctx->updateStats(creatorType, creator, activeWfid, graphName, subgraph);
};
virtual void updateAggregates(IWorkUnit* lockedwu) override
{
ctx->updateAggregates(lockedwu);
}

protected:
IAgentContext * ctx;
Expand Down Expand Up @@ -392,6 +400,7 @@ private:
Owned<IOrderedOutputSerializer> outputSerializer;
int retcode;
double agentMachineCost = 0;
GlobalStatisticCollection globalStats;

private:
void doSetResultString(type_t type, const char * stepname, unsigned sequence, int len, const char *val);
Expand Down Expand Up @@ -705,6 +714,15 @@ public:
{
return agentMachineCost;
}
virtual IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, const char *graphName, unsigned subgraph) override
{
Owned<IStatisticCollection> sgCollection = globalStats.getCollectionForUpdate(creatorType, creator, activeWfid, graphName, subgraph, true); // true=>clear existing stats
return wuRead->updateStats(graphName, creatorType, creator, activeWfid, subgraph, false, sgCollection);
}
virtual void updateAggregates(IWorkUnit* lockedwu) override
{
globalStats.updateAggregates(lockedwu);
}
};

//---------------------------------------------------------------------------
Expand Down Expand Up @@ -1055,7 +1073,7 @@ public:
void executeLibrary(const byte * parentExtract, IHThorGraphResults * results);
IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph);
void updateWUStatistic(IWorkUnit* lockedwu, StatisticScopeType scopeType, const char* scope, StatisticKind kind, const char* descr, long long unsigned int value);

void updateAggregates(IWorkUnit* lockedwu);
EclSubGraph * idToGraph(unsigned id);
EclGraphElement * idToActivity(unsigned id);
const char *queryGraphName() { return graphName; }
Expand Down
26 changes: 15 additions & 11 deletions ecl/eclagent/eclgraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -879,10 +879,10 @@ void EclSubGraph::updateProgress()
Owned<IWUGraphStats> progress = parent.updateStats(queryStatisticsComponentType(), queryStatisticsComponentName(), parent.queryWfid(), id);
IStatisticGatherer & stats = progress->queryStatsBuilder();
updateProgress(stats);

if (startGraphTime || elapsedGraphCycles)
{
WorkunitUpdate lockedwu(agent->updateWorkUnit());
parent.updateAggregates(lockedwu);
StringBuffer subgraphid;
subgraphid.append(parent.queryGraphName()).append(":").append(SubGraphScopePrefix).append(id);
if (startGraphTime)
Expand All @@ -897,10 +897,6 @@ void EclSubGraph::updateProgress()
if (cost)
lockedwu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
}
Owned<IStatisticCollection> statsCollection = stats.getResult();
const cost_type costDiskAccess = aggregateStatistic(StCostFileAccess, statsCollection) ;
if (costDiskAccess)
lockedwu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, scope, StCostFileAccess, NULL, costDiskAccess, 1, 0, StatsMergeReplace);
}
}
}
Expand All @@ -927,6 +923,11 @@ void EclSubGraph::updateProgress(IStatisticGatherer &progress)
}
ForEachItemIn(i2, subgraphs)
subgraphs.item(i2).updateProgress(progress);

Owned<IStatisticCollection> statsCollection = progress.getResult();
const cost_type costDiskAccess = aggregateStatistic(StCostFileAccess, statsCollection);
if (costDiskAccess)
progress.addStatistic(StCostFileAccess, costDiskAccess);
}

bool EclSubGraph::prepare(const byte * parentExtract, bool checkDependencies)
Expand Down Expand Up @@ -1277,10 +1278,6 @@ void EclGraph::execute(const byte * parentExtract)
const cost_type cost = money2cost_type(calcCost(agent->queryAgentMachineCost(), elapsed));
if (cost)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);

const cost_type costDiskAccess = aggregateDiskAccessCost(wu, scope);
if (costDiskAccess)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, scope, StCostFileAccess, NULL, costDiskAccess, 1, 0, StatsMergeReplace);
}

if (agent->queryRemoteWorkunit())
Expand Down Expand Up @@ -1349,7 +1346,8 @@ void EclGraph::updateLibraryProgress()
{
EclSubGraph & cur = graphs.item(idx);
unsigned wfid = cur.parent.queryWfid();
Owned<IWUGraphStats> progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, cur.id, false);

Owned<IWUGraphStats> progress = agent->updateStats(queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, queryGraphName(), cur.id);
cur.updateProgress(progress->queryStatsBuilder());
}
}
Expand Down Expand Up @@ -1492,7 +1490,12 @@ void GraphResults::setResult(unsigned id, IHThorGraphResult * result)

IWUGraphStats *EclGraph::updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, unsigned subgraph)
{
return wu->updateStats (queryGraphName(), creatorType, creator, activeWfid, subgraph, false);
return agent->updateStats(creatorType, creator, activeWfid, queryGraphName(), subgraph);
}

void EclGraph::updateAggregates(IWorkUnit* lockedwu)
{
agent->updateAggregates(lockedwu);
}

void EclGraph::updateWUStatistic(IWorkUnit *lockedwu, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * descr, unsigned __int64 value)
Expand Down Expand Up @@ -1544,6 +1547,7 @@ EclGraph * EclAgent::loadGraph(const char * graphName, IConstWorkUnit * wu, ILoa

Owned<EclGraph> eclGraph = new EclGraph(*this, graphName, wu, isLibrary, debugContext, probeManager, wuGraph->getWfid());
eclGraph->createFromXGMML(dll, xgmml);
globalStats.load(*wu, nullptr, true);
return eclGraph.getClear();
}

Expand Down
8 changes: 4 additions & 4 deletions plugins/cassandra/cassandrawu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2743,8 +2743,8 @@ class CCassandraWorkUnit : public CPersistedWorkUnit
class CCassandraWuGraphStats : public CWuGraphStats
{
public:
CCassandraWuGraphStats(const CCassandraWorkUnit *_parent, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge)
: CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge),
CCassandraWuGraphStats(const CCassandraWorkUnit *_parent, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, IStatisticCollection * stats)
: CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge, stats),
progress(createPTree(_rootScope)), parent(_parent)
{
}
Expand All @@ -2764,9 +2764,9 @@ class CCassandraWorkUnit : public CPersistedWorkUnit
StringAttr wuid;
};

IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph, bool merge) const override
IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph, bool merge, IStatisticCollection * stats) const override
{
return new CCassandraWuGraphStats(this, creatorType, creator, wfid, graphName, subgraph, merge);
return new CCassandraWuGraphStats(this, creatorType, creator, wfid, graphName, subgraph, merge, stats);
}


Expand Down
Loading

0 comments on commit fb9a872

Please sign in to comment.