Skip to content

Commit

Permalink
HPCC-29657 Load serialized graph stats when resuming job
Browse files Browse the repository at this point in the history
Load serialized graph stats when resuming job to ensure aggregates are
calculated correctly.

Signed-off-by: Shamser Ahmed <shamser.ahmed@lexisnexis.com>
  • Loading branch information
shamser committed Oct 4, 2023
1 parent 4930b80 commit c0f357e
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 24 deletions.
26 changes: 9 additions & 17 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2777,10 +2777,15 @@ class SubGraphUpdaterCollection : public CInterfaceOf<IStatisticCollection>
}
};

GlobalStatisticCollection::GlobalStatisticCollection(IPropertyTree * root, StatisticScopeType minScope)
GlobalStatisticCollection::GlobalStatisticCollection()
{
StatsScopeId globalScopeId(SSTglobal, (unsigned)0);
statsCollection.setown(createStatisticCollection(nullptr, globalScopeId));
}

void GlobalStatisticCollection::load(const char *wuid, StatisticScopeType minScope)
{
Owned<IPropertyTree> root = getWUGraphProgress(wuid, true);
if (!root)
return;
Owned<IPropertyTreeIterator> iter = root->getElements("*");
Expand Down Expand Up @@ -2867,28 +2872,15 @@ class StatisticsAggregatesWriter : implements IStatisticVisitor
}
};

void updateAggregates(IWorkUnit *wu, GlobalStatisticCollection *statsCollection)
void updateAggregates(IWorkUnit *wu, GlobalStatisticCollection & statsCollection)
{
// Further improvements:
// 1) maintain a dirty flag for each subgraph scope so that only modified subgraphs are serialized.
// 2) Serialize the aggregates into a blob in GraphProgress rather than to global stats
std::vector<StatisticKind> aggregateKinds = {StCostFileAccess, StSizeGraphSpill, StSizeSpillFile};
StatisticsAggregatesWriter statsAggregatorWriter(wu, aggregateKinds);
if (statsCollection)
{
statsCollection->refreshAggregates(aggregateKinds);
statsCollection->visit(statsAggregatorWriter);
}
else
{
Owned<IPropertyTree> root = getWUGraphProgress(wu->queryWuid(), true);
if (root)
{
Owned<GlobalStatisticCollection> stats = new GlobalStatisticCollection(root, SSTsubgraph);
stats->refreshAggregates(aggregateKinds);
stats->visit(statsAggregatorWriter);
}
}
statsCollection.refreshAggregates(aggregateKinds);
statsCollection.visit(statsAggregatorWriter);
}

//---------------------------------------------------------------------------------------------------------------------
Expand Down
5 changes: 3 additions & 2 deletions common/workunit/workunit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1178,12 +1178,13 @@ interface IConstWUScopeIterator : extends IScmIterator
class WORKUNIT_API GlobalStatisticCollection : public CInterface
{
public:
GlobalStatisticCollection(IPropertyTree * root=nullptr, StatisticScopeType minScope=SSTnone);
GlobalStatisticCollection();
bool refreshAggregates(std::vector<StatisticKind> & aggregateKinds);
void visit(IStatisticVisitor & target) const;
// getCollection() returns IStatisticCollection for given rootScope
// if clear==true, clears the stats at this and below scope
IStatisticCollection * getCollection(const StatsScopeId & wfScope, const StatsScopeId & graphScope, const StatsScopeId & sgScope, StatisticCreatorType creatorType, const char * creator);
void load(const char *wuid, StatisticScopeType minScope=SSTnone);
private:
Owned<IStatisticCollection> statsCollection;
};
Expand Down Expand Up @@ -1738,7 +1739,7 @@ extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, ITimeReporter *ti
extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, StatisticScopeType scopeType, StatisticKind kind, ITimeReporter *timer);
extern WORKUNIT_API void aggregateStatistic(StatsAggregation & result, IConstWorkUnit * wu, const WuScopeFilter & filter, StatisticKind search);
extern WORKUNIT_API cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope=nullptr, bool excludehThor=false);
extern WORKUNIT_API void updateAggregates(IWorkUnit *wu, GlobalStatisticCollection *statsCollection=nullptr);
extern WORKUNIT_API void updateAggregates(IWorkUnit *wu, GlobalStatisticCollection & statsCollection);
extern WORKUNIT_API const char *getTargetClusterComponentName(const char *clustname, const char *processType, StringBuffer &name);
extern WORKUNIT_API void descheduleWorkunit(char const * wuid);
#if 0
Expand Down
4 changes: 2 additions & 2 deletions ecl/eclagent/eclgraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ void EclSubGraph::updateProgress()
if (startGraphTime || elapsedGraphCycles)
{
WorkunitUpdate lockedwu(agent->updateWorkUnit());
updateAggregates(lockedwu);
parent.updateAggregates(lockedwu);
StringBuffer subgraphid;
subgraphid.append(parent.queryGraphName()).append(":").append(SubGraphScopePrefix).append(id);
if (startGraphTime)
Expand Down Expand Up @@ -1494,7 +1494,7 @@ IWUGraphStats *EclGraph::updateStats(StatisticCreatorType creatorType, const cha

void EclGraph::updateAggregates(IWorkUnit* lockedwu)
{
::updateAggregates(lockedwu, &statsCache);
::updateAggregates(lockedwu, statsCache);
}

void EclGraph::updateWUStatistic(IWorkUnit *lockedwu, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * descr, unsigned __int64 value)
Expand Down
8 changes: 6 additions & 2 deletions thorlcr/master/thdemonserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer
unsigned startTime = graphStarts.item(g2);
reportStatus(wu, graph, startTime, finished, success);
}
::updateAggregates(wu, &statsCache);
::updateAggregates(wu, statsCache);
queryServerStatus().commitProperties();
}
catch (IException *E)
Expand Down Expand Up @@ -304,7 +304,11 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer
}
virtual void updateAggregates(IWorkUnit * lockedWu) override
{
::updateAggregates(lockedWu, &statsCache);
::updateAggregates(lockedWu, statsCache);
}
virtual void loadStats(const char *wuid) override
{
statsCache.load(wuid, SSTsubgraph);
}
};

Expand Down
1 change: 1 addition & 0 deletions thorlcr/master/thdemonserver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ interface IDeMonServer : extends IInterface
virtual void endGraph(CGraphBase *graph, bool success) = 0;
virtual void endGraphs() = 0;
virtual void updateAggregates(IWorkUnit * lockedWu) = 0;
virtual void loadStats(const char *wuid) = 0;
};


Expand Down
3 changes: 2 additions & 1 deletion thorlcr/master/thgraphmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,8 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName,
}

setWuid(workunit.queryWuid(), workunit.queryClusterName());

if (job->queryResumed() && globals->getPropBool("@watchdogProgressEnabled"))
queryDeMonServer()->loadStats(workunit.queryWuid());
allDone = job->go();

Owned<IWorkUnit> wu = &workunit.lock();
Expand Down

0 comments on commit c0f357e

Please sign in to comment.