Skip to content

Commit

Permalink
HPCC-32479 Record lookahead timings and use it to calculate localtime
Browse files Browse the repository at this point in the history
Track execution cycles and blocked cycles used by lookahead in the
activities timers.  New activity statistic "TimeLookAhead" has been
created to report the lookahead execution time.  Use the lookahead
execution time and lookahead blocked time to calculate each activities
localtime more accurately.

This improvement makes activities that use lookahead more accurate and
also makes upstream activities (upstream from activities that use
lookahead) more accurate.

Signed-off-by: Shamser Ahmed <shamser.ahmed@lexisnexis.com>
  • Loading branch information
shamser committed Oct 14, 2024
1 parent 8b07c8a commit 93b585a
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 32 deletions.
37 changes: 17 additions & 20 deletions common/thorhelper/thorcommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ class THORHELPER_API ActivityTimeAccumulator
unsigned __int64 firstRow; // Timestamp of first row (nanoseconds since epoch)
cycle_t firstExitCycles; // Wall clock time of first exit from this activity
cycle_t blockedCycles; // Time spent blocked
cycle_t lookAheadCycles; // Time spent by lookahead thread

// Return the total amount of time (in nanoseconds) spent in this activity (first entry to last exit)
inline unsigned __int64 elapsed() const { return cycle_to_nanosec(endCycles-startCycles); }
Expand All @@ -265,6 +266,7 @@ class THORHELPER_API ActivityTimeAccumulator
firstRow = 0;
firstExitCycles = 0;
blockedCycles = 0;
lookAheadCycles = 0;
}
};

Expand Down Expand Up @@ -336,31 +338,20 @@ class SimpleActivityTimer
}
};

class BlockedActivityTimer
class BlockedActivityTimer : public SimpleActivityTimer
{
unsigned __int64 startCycles;
ActivityTimeAccumulator &accumulator;
protected:
const bool enabled;
public:
BlockedActivityTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled)
: accumulator(_accumulator), enabled(_enabled)
{
if (enabled)
startCycles = get_cycles_now();
else
startCycles = 0;
}
: SimpleActivityTimer(_accumulator.blockedCycles, _enabled) { }
};

~BlockedActivityTimer()
{
if (enabled)
{
cycle_t elapsedCycles = get_cycles_now() - startCycles;
accumulator.blockedCycles += elapsedCycles;
}
}
class LookAheadTimer : public SimpleActivityTimer
{
public:
inline LookAheadTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled)
: SimpleActivityTimer(_accumulator.lookAheadCycles, _enabled) { }
};

#else
struct ActivityTimer
{
Expand All @@ -373,7 +364,13 @@ struct SimpleActivityTimer
struct BlockedActivityTimer
{
inline BlockedActivityTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled) { }

};
struct LookAheadTimer
{
inline LookAheadTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled){ }
};

#endif

class THORHELPER_API IndirectCodeContextEx : public IndirectCodeContext
Expand Down
2 changes: 2 additions & 0 deletions system/jlib/jstatcodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ enum StatisticKind
StCycleSoapcallDNSCycles,
StCycleSoapcallConnectCycles,
StNumSoapcallConnectFailures,
StTimeLookAhead,
StCycleLookAheadCycles,
StMax,

//For any quantity there is potentially the following variants.
Expand Down
2 changes: 2 additions & 0 deletions system/jlib/jstats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,8 @@ static const constexpr StatisticMeta statsMetaData[StMax] = {
{ CYCLESTAT(SoapcallDNS) },
{ CYCLESTAT(SoapcallConnect) },
{ NUMSTAT(SoapcallConnectFailures), "The number of SOAPCALL connect failures" },
{ TIMESTAT(LookAhead), "The total time lookahead thread spend prefetching rows from upstream activities" },
{ CYCLESTAT(LookAhead) },
};

static MapStringTo<StatisticKind, StatisticKind> statisticNameMap(true);
Expand Down
1 change: 1 addition & 0 deletions thorlcr/activities/nsplitter/thnsplitterslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class CSplitterOutput : public CSimpleInterfaceOf<IStartableEngineRowStream>, pu
virtual IStrandJunction *getOutputStreams(CActivityBase &ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, const CThorStrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks) override;
virtual unsigned __int64 queryTotalCycles() const override { return COutputTiming::queryTotalCycles(); }
virtual unsigned __int64 queryEndCycles() const override { return COutputTiming::queryEndCycles(); }
virtual unsigned __int64 queryBlockedCycles() const { return COutputTiming::queryBlockedCycles(); }
virtual void debugRequest(MemoryBuffer &mb) override;
// Stepping methods
virtual IInputSteppingMeta *querySteppingMeta() { return nullptr; }
Expand Down
27 changes: 23 additions & 4 deletions thorlcr/activities/thactivityutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,17 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf<IStartableEngineRowStream>
{
while (requiredLeft&&running)
{
OwnedConstThorRow row = inputStream->nextRow();
if (!row)
OwnedConstThorRow row;
{
LookAheadTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities());
row.setown(inputStream->nextRow());
}
if (!row)
{
{
LookAheadTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities());
row.setown(inputStream->nextRow());
}
if (!row)
break;
else
Expand All @@ -138,7 +145,11 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf<IStartableEngineRowStream>
{
while (requiredLeft&&running)
{
OwnedConstThorRow row = inputStream->ungroupedNextRow();
OwnedConstThorRow row;
{
LookAheadTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities());
row.setown(inputStream->ungroupedNextRow());
}
if (!row)
break;
++count;
Expand Down Expand Up @@ -234,7 +245,15 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf<IStartableEngineRowStream>
// IEngineRowStream
virtual const void *nextRow() override
{
OwnedConstThorRow row = smartbuf->nextRow();
OwnedConstThorRow row;
{
// smartbuf->nextRow should return immediately if a row is available.
// smartbuf->nextRow will take time if blocked, so record time taken as blocked time.
// N.b. smartbuf->next may take a trivial amount of time if row is available but
// for the purposes of stats this will still be considered blocked.
BlockedActivityTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities());
row.setown(smartbuf->nextRow());
}
if (getexception)
throw getexception.getClear();
if (!row)
Expand Down
15 changes: 10 additions & 5 deletions thorlcr/graph/thgraphslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -586,14 +586,17 @@ unsigned __int64 CSlaveActivity::queryLocalCycles() const
break;
}
}
unsigned __int64 localCycles = queryTotalCycles();
if (localCycles < inputCycles) // not sure how/if possible, but guard against
unsigned __int64 processCycles = queryTotalCycles() + queryLookAheadCycles();
if (processCycles < inputCycles) // not sure how/if possible, but guard against
return 0;
localCycles -= inputCycles;
processCycles -= inputCycles;
const unsigned __int64 blockedCycles = queryBlockedCycles();
if (localCycles < blockedCycles)
if (processCycles < blockedCycles)
{
IWARNLOG("CSlaveActivity::queryLocalCycles - processCycles %" I64F "u < blockedCycles %" I64F "u", processCycles, blockedCycles);
return 0;
return localCycles-blockedCycles;
}
return processCycles-blockedCycles;
}

void CSlaveActivity::serializeStats(MemoryBuffer &mb)
Expand All @@ -618,6 +621,8 @@ void CSlaveActivity::serializeStats(MemoryBuffer &mb)
serializedStats.setStatistic(StTimeLocalExecute, (unsigned __int64)cycle_to_nanosec(queryLocalCycles()));
serializedStats.setStatistic(StTimeTotalExecute, (unsigned __int64)cycle_to_nanosec(queryTotalCycles()));
serializedStats.setStatistic(StTimeBlocked, (unsigned __int64)cycle_to_nanosec(queryBlockedCycles()));
serializedStats.setStatistic(StTimeLookAhead, (unsigned __int64)cycle_to_nanosec(queryLookAheadCycles()));

serializedStats.serialize(mb);
ForEachItemIn(i, outputs)
{
Expand Down
6 changes: 4 additions & 2 deletions thorlcr/graph/thgraphslave.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ class COutputTiming
COutputTiming() { }

void resetTiming() { slaveTimerStats.reset(); }
ActivityTimeAccumulator &getTotalCyclesRef() { return slaveTimerStats; }
ActivityTimeAccumulator &getActivityTimerAccumulator() { return slaveTimerStats; }
unsigned __int64 queryTotalCycles() const { return slaveTimerStats.totalCycles; }
unsigned __int64 queryEndCycles() const { return slaveTimerStats.endCycles; }
unsigned __int64 queryBlockedCycles() const { return slaveTimerStats.blockedCycles; }
unsigned __int64 queryLookAheadCycles() const { return slaveTimerStats.lookAheadCycles; }
};

class CEdgeProgress
Expand Down Expand Up @@ -289,8 +290,9 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres
return consumerOrdered;
}
virtual unsigned __int64 queryTotalCycles() const { return COutputTiming::queryTotalCycles(); }
virtual unsigned __int64 queryBlockedCycles() const { return COutputTiming::queryBlockedCycles();}
virtual unsigned __int64 queryBlockedCycles() const { return COutputTiming::queryBlockedCycles(); }
virtual unsigned __int64 queryEndCycles() const { return COutputTiming::queryEndCycles(); }
virtual unsigned __int64 queryLookAheadCycles() const { return COutputTiming::queryLookAheadCycles(); }
virtual void debugRequest(MemoryBuffer &msg) override;

// IThorDataLink
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/thorutil/thormisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ static Owned<IMPtagAllocator> ClusterMPAllocator;
const StatisticsMapping spillStatistics({StTimeSpillElapsed, StTimeSortElapsed, StNumSpills, StSizeSpillFile, StSizePeakTempDisk});
const StatisticsMapping executeStatistics({StTimeTotalExecute, StTimeLocalExecute, StTimeBlocked});
const StatisticsMapping soapcallStatistics({StTimeSoapcall, StTimeSoapcallDNS, StTimeSoapcallConnect, StNumSoapcallConnectFailures});
const StatisticsMapping basicActivityStatistics({StNumParallelExecute}, executeStatistics, spillStatistics);
const StatisticsMapping basicActivityStatistics({StNumParallelExecute, StTimeLookAhead}, executeStatistics, spillStatistics);
const StatisticsMapping groupActivityStatistics({StNumGroups, StNumGroupMax}, basicActivityStatistics);
const StatisticsMapping indexReadFileStatistics({}, diskReadRemoteStatistics, jhtreeCacheStatistics);
const StatisticsMapping indexReadActivityStatistics({StNumRowsProcessed}, indexReadFileStatistics, basicActivityStatistics);
Expand Down

0 comments on commit 93b585a

Please sign in to comment.