Skip to content

Commit

Permalink
Merge pull request #19073 from shamser/issue32479
Browse files Browse the repository at this point in the history
HPCC-32479 Record lookahead timings and use it to calculate activity localtime

Reviewed-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Merged-by: Gavin Halliday <ghalliday@hpccsystems.com>
  • Loading branch information
ghalliday authored Oct 15, 2024
2 parents 2c4c9ca + 93b585a commit 8321d83
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 32 deletions.
37 changes: 17 additions & 20 deletions common/thorhelper/thorcommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ class THORHELPER_API ActivityTimeAccumulator
unsigned __int64 firstRow; // Timestamp of first row (nanoseconds since epoch)
cycle_t firstExitCycles; // Wall clock time of first exit from this activity
cycle_t blockedCycles; // Time spent blocked
cycle_t lookAheadCycles; // Time spent by lookahead thread

// Return the total amount of time (in nanoseconds) spent in this activity (first entry to last exit)
inline unsigned __int64 elapsed() const { return cycle_to_nanosec(endCycles-startCycles); }
Expand All @@ -264,6 +265,7 @@ class THORHELPER_API ActivityTimeAccumulator
firstRow = 0;
firstExitCycles = 0;
blockedCycles = 0;
lookAheadCycles = 0;
}
};

Expand Down Expand Up @@ -335,31 +337,20 @@ class SimpleActivityTimer
}
};

class BlockedActivityTimer
class BlockedActivityTimer : public SimpleActivityTimer
{
unsigned __int64 startCycles;
ActivityTimeAccumulator &accumulator;
protected:
const bool enabled;
public:
BlockedActivityTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled)
: accumulator(_accumulator), enabled(_enabled)
{
if (enabled)
startCycles = get_cycles_now();
else
startCycles = 0;
}
: SimpleActivityTimer(_accumulator.blockedCycles, _enabled) { }
};

~BlockedActivityTimer()
{
if (enabled)
{
cycle_t elapsedCycles = get_cycles_now() - startCycles;
accumulator.blockedCycles += elapsedCycles;
}
}
class LookAheadTimer : public SimpleActivityTimer
{
public:
inline LookAheadTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled)
: SimpleActivityTimer(_accumulator.lookAheadCycles, _enabled) { }
};

#else
class ActivityTimer
{
Expand All @@ -373,7 +364,13 @@ struct SimpleActivityTimer
struct BlockedActivityTimer
{
inline BlockedActivityTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled) { }

};
struct LookAheadTimer
{
inline LookAheadTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled){ }
};

#endif

class THORHELPER_API IndirectCodeContextEx : public IndirectCodeContext
Expand Down
2 changes: 2 additions & 0 deletions system/jlib/jstatcodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ enum StatisticKind
StCycleSoapcallDNSCycles,
StCycleSoapcallConnectCycles,
StNumSoapcallConnectFailures,
StTimeLookAhead,
StCycleLookAheadCycles,
StMax,

//For any quantity there is potentially the following variants.
Expand Down
2 changes: 2 additions & 0 deletions system/jlib/jstats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,8 @@ static const constexpr StatisticMeta statsMetaData[StMax] = {
{ CYCLESTAT(SoapcallDNS) },
{ CYCLESTAT(SoapcallConnect) },
{ NUMSTAT(SoapcallConnectFailures), "The number of SOAPCALL connect failures" },
{ TIMESTAT(LookAhead), "The total time lookahead thread spend prefetching rows from upstream activities" },
{ CYCLESTAT(LookAhead) },
};

static MapStringTo<StatisticKind, StatisticKind> statisticNameMap(true);
Expand Down
1 change: 1 addition & 0 deletions thorlcr/activities/nsplitter/thnsplitterslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class CSplitterOutput : public CSimpleInterfaceOf<IStartableEngineRowStream>, pu
virtual IStrandJunction *getOutputStreams(CActivityBase &ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, const CThorStrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks) override;
virtual unsigned __int64 queryTotalCycles() const override { return COutputTiming::queryTotalCycles(); }
virtual unsigned __int64 queryEndCycles() const override { return COutputTiming::queryEndCycles(); }
virtual unsigned __int64 queryBlockedCycles() const { return COutputTiming::queryBlockedCycles(); }
virtual void debugRequest(MemoryBuffer &mb) override;
// Stepping methods
virtual IInputSteppingMeta *querySteppingMeta() { return nullptr; }
Expand Down
27 changes: 23 additions & 4 deletions thorlcr/activities/thactivityutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,17 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf<IStartableEngineRowStream>
{
while (requiredLeft&&running)
{
OwnedConstThorRow row = inputStream->nextRow();
if (!row)
OwnedConstThorRow row;
{
LookAheadTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities());
row.setown(inputStream->nextRow());
}
if (!row)
{
{
LookAheadTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities());
row.setown(inputStream->nextRow());
}
if (!row)
break;
else
Expand All @@ -138,7 +145,11 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf<IStartableEngineRowStream>
{
while (requiredLeft&&running)
{
OwnedConstThorRow row = inputStream->ungroupedNextRow();
OwnedConstThorRow row;
{
LookAheadTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities());
row.setown(inputStream->ungroupedNextRow());
}
if (!row)
break;
++count;
Expand Down Expand Up @@ -234,7 +245,15 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf<IStartableEngineRowStream>
// IEngineRowStream
virtual const void *nextRow() override
{
OwnedConstThorRow row = smartbuf->nextRow();
OwnedConstThorRow row;
{
// smartbuf->nextRow should return immediately if a row is available.
// smartbuf->nextRow will take time if blocked, so record time taken as blocked time.
// N.b. smartbuf->next may take a trivial amount of time if row is available but
// for the purposes of stats this will still be considered blocked.
BlockedActivityTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities());
row.setown(smartbuf->nextRow());
}
if (getexception)
throw getexception.getClear();
if (!row)
Expand Down
15 changes: 10 additions & 5 deletions thorlcr/graph/thgraphslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -586,14 +586,17 @@ unsigned __int64 CSlaveActivity::queryLocalCycles() const
break;
}
}
unsigned __int64 localCycles = queryTotalCycles();
if (localCycles < inputCycles) // not sure how/if possible, but guard against
unsigned __int64 processCycles = queryTotalCycles() + queryLookAheadCycles();
if (processCycles < inputCycles) // not sure how/if possible, but guard against
return 0;
localCycles -= inputCycles;
processCycles -= inputCycles;
const unsigned __int64 blockedCycles = queryBlockedCycles();
if (localCycles < blockedCycles)
if (processCycles < blockedCycles)
{
IWARNLOG("CSlaveActivity::queryLocalCycles - processCycles %" I64F "u < blockedCycles %" I64F "u", processCycles, blockedCycles);
return 0;
return localCycles-blockedCycles;
}
return processCycles-blockedCycles;
}

void CSlaveActivity::serializeStats(MemoryBuffer &mb)
Expand All @@ -618,6 +621,8 @@ void CSlaveActivity::serializeStats(MemoryBuffer &mb)
serializedStats.setStatistic(StTimeLocalExecute, (unsigned __int64)cycle_to_nanosec(queryLocalCycles()));
serializedStats.setStatistic(StTimeTotalExecute, (unsigned __int64)cycle_to_nanosec(queryTotalCycles()));
serializedStats.setStatistic(StTimeBlocked, (unsigned __int64)cycle_to_nanosec(queryBlockedCycles()));
serializedStats.setStatistic(StTimeLookAhead, (unsigned __int64)cycle_to_nanosec(queryLookAheadCycles()));

serializedStats.serialize(mb);
ForEachItemIn(i, outputs)
{
Expand Down
6 changes: 4 additions & 2 deletions thorlcr/graph/thgraphslave.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ class COutputTiming
COutputTiming() { }

void resetTiming() { slaveTimerStats.reset(); }
ActivityTimeAccumulator &getTotalCyclesRef() { return slaveTimerStats; }
ActivityTimeAccumulator &getActivityTimerAccumulator() { return slaveTimerStats; }
unsigned __int64 queryTotalCycles() const { return slaveTimerStats.totalCycles; }
unsigned __int64 queryEndCycles() const { return slaveTimerStats.endCycles; }
unsigned __int64 queryBlockedCycles() const { return slaveTimerStats.blockedCycles; }
unsigned __int64 queryLookAheadCycles() const { return slaveTimerStats.lookAheadCycles; }
};

class CEdgeProgress
Expand Down Expand Up @@ -289,8 +290,9 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres
return consumerOrdered;
}
virtual unsigned __int64 queryTotalCycles() const { return COutputTiming::queryTotalCycles(); }
virtual unsigned __int64 queryBlockedCycles() const { return COutputTiming::queryBlockedCycles();}
virtual unsigned __int64 queryBlockedCycles() const { return COutputTiming::queryBlockedCycles(); }
virtual unsigned __int64 queryEndCycles() const { return COutputTiming::queryEndCycles(); }
virtual unsigned __int64 queryLookAheadCycles() const { return COutputTiming::queryLookAheadCycles(); }
virtual void debugRequest(MemoryBuffer &msg) override;

// IThorDataLink
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/thorutil/thormisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ static Owned<IMPtagAllocator> ClusterMPAllocator;
const StatisticsMapping spillStatistics({StTimeSpillElapsed, StTimeSortElapsed, StNumSpills, StSizeSpillFile, StSizePeakTempDisk});
const StatisticsMapping executeStatistics({StTimeTotalExecute, StTimeLocalExecute, StTimeBlocked});
const StatisticsMapping soapcallStatistics({StTimeSoapcall, StTimeSoapcallDNS, StTimeSoapcallConnect, StNumSoapcallConnectFailures});
const StatisticsMapping basicActivityStatistics({StNumParallelExecute}, executeStatistics, spillStatistics);
const StatisticsMapping basicActivityStatistics({StNumParallelExecute, StTimeLookAhead}, executeStatistics, spillStatistics);
const StatisticsMapping groupActivityStatistics({StNumGroups, StNumGroupMax}, basicActivityStatistics);
const StatisticsMapping indexReadFileStatistics({}, diskReadRemoteStatistics, jhtreeCacheStatistics);
const StatisticsMapping indexReadActivityStatistics({StNumRowsProcessed}, indexReadFileStatistics, basicActivityStatistics);
Expand Down

0 comments on commit 8321d83

Please sign in to comment.