diff --git a/common/thorhelper/thorcommon.hpp b/common/thorhelper/thorcommon.hpp index c36f57583d6..2393b041301 100644 --- a/common/thorhelper/thorcommon.hpp +++ b/common/thorhelper/thorcommon.hpp @@ -245,6 +245,7 @@ class THORHELPER_API ActivityTimeAccumulator unsigned __int64 firstRow; // Timestamp of first row (nanoseconds since epoch) cycle_t firstExitCycles; // Wall clock time of first exit from this activity cycle_t blockedCycles; // Time spent blocked + cycle_t lookAheadCycles; // Time spent by lookahead thread // Return the total amount of time (in nanoseconds) spent in this activity (first entry to last exit) inline unsigned __int64 elapsed() const { return cycle_to_nanosec(endCycles-startCycles); } @@ -264,6 +265,7 @@ class THORHELPER_API ActivityTimeAccumulator firstRow = 0; firstExitCycles = 0; blockedCycles = 0; + lookAheadCycles = 0; } }; @@ -335,31 +337,20 @@ class SimpleActivityTimer } }; -class BlockedActivityTimer +class BlockedActivityTimer : public SimpleActivityTimer { - unsigned __int64 startCycles; - ActivityTimeAccumulator &accumulator; -protected: - const bool enabled; public: BlockedActivityTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled) - : accumulator(_accumulator), enabled(_enabled) - { - if (enabled) - startCycles = get_cycles_now(); - else - startCycles = 0; - } + : SimpleActivityTimer(_accumulator.blockedCycles, _enabled) { } +}; - ~BlockedActivityTimer() - { - if (enabled) - { - cycle_t elapsedCycles = get_cycles_now() - startCycles; - accumulator.blockedCycles += elapsedCycles; - } - } +class LookAheadTimer : public SimpleActivityTimer +{ +public: + inline LookAheadTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled) + : SimpleActivityTimer(_accumulator.lookAheadCycles, _enabled) { } }; + #else class ActivityTimer { @@ -373,7 +364,13 @@ struct SimpleActivityTimer struct BlockedActivityTimer { inline BlockedActivityTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled) { } + }; +struct LookAheadTimer +{ + inline LookAheadTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled){ } +}; + #endif class THORHELPER_API IndirectCodeContextEx : public IndirectCodeContext diff --git a/system/jlib/jstatcodes.h b/system/jlib/jstatcodes.h index bfabe4648cf..a4744cd246e 100644 --- a/system/jlib/jstatcodes.h +++ b/system/jlib/jstatcodes.h @@ -319,6 +319,8 @@ enum StatisticKind StCycleSoapcallDNSCycles, StCycleSoapcallConnectCycles, StNumSoapcallConnectFailures, + StTimeLookAhead, + StCycleLookAheadCycles, StMax, //For any quantity there is potentially the following variants. diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index cb927bdae69..29f47ee54d9 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -991,6 +991,8 @@ static const constexpr StatisticMeta statsMetaData[StMax] = { { CYCLESTAT(SoapcallDNS) }, { CYCLESTAT(SoapcallConnect) }, { NUMSTAT(SoapcallConnectFailures), "The number of SOAPCALL connect failures" }, + { TIMESTAT(LookAhead), "The total time lookahead thread spend prefetching rows from upstream activities" }, + { CYCLESTAT(LookAhead) }, }; static MapStringTo statisticNameMap(true); diff --git a/thorlcr/activities/nsplitter/thnsplitterslave.cpp b/thorlcr/activities/nsplitter/thnsplitterslave.cpp index 2cb322e6730..d1d8f2e2e8e 100644 --- a/thorlcr/activities/nsplitter/thnsplitterslave.cpp +++ b/thorlcr/activities/nsplitter/thnsplitterslave.cpp @@ -60,6 +60,7 @@ class CSplitterOutput : public CSimpleInterfaceOf, pu virtual IStrandJunction *getOutputStreams(CActivityBase &ctx, unsigned idx, PointerArrayOf &streams, const CThorStrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks) override; virtual unsigned __int64 queryTotalCycles() const override { return COutputTiming::queryTotalCycles(); } virtual unsigned __int64 queryEndCycles() const override { return COutputTiming::queryEndCycles(); } + virtual unsigned __int64 queryBlockedCycles() const { return COutputTiming::queryBlockedCycles(); } virtual void debugRequest(MemoryBuffer &mb) override; // Stepping methods virtual IInputSteppingMeta *querySteppingMeta() { return nullptr; } diff --git a/thorlcr/activities/thactivityutil.cpp b/thorlcr/activities/thactivityutil.cpp index f3ac67d48e8..a12336860cc 100644 --- a/thorlcr/activities/thactivityutil.cpp +++ b/thorlcr/activities/thactivityutil.cpp @@ -119,10 +119,17 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf { while (requiredLeft&&running) { - OwnedConstThorRow row = inputStream->nextRow(); - if (!row) + OwnedConstThorRow row; { + LookAheadTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities()); row.setown(inputStream->nextRow()); + } + if (!row) + { + { + LookAheadTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities()); + row.setown(inputStream->nextRow()); + } if (!row) break; else @@ -138,7 +145,11 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf { while (requiredLeft&&running) { - OwnedConstThorRow row = inputStream->ungroupedNextRow(); + OwnedConstThorRow row; + { + LookAheadTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities()); + row.setown(inputStream->ungroupedNextRow()); + } if (!row) break; ++count; @@ -234,7 +245,15 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf // IEngineRowStream virtual const void *nextRow() override { - OwnedConstThorRow row = smartbuf->nextRow(); + OwnedConstThorRow row; + { + // smartbuf->nextRow should return immediately if a row is available. + // smartbuf->nextRow will take time if blocked, so record time taken as blocked time. + // N.b. smartbuf->next may take a trivial amount of time if row is available but + // for the purposes of stats this will still be considered blocked. + BlockedActivityTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities()); + row.setown(smartbuf->nextRow()); + } if (getexception) throw getexception.getClear(); if (!row) diff --git a/thorlcr/graph/thgraphslave.cpp b/thorlcr/graph/thgraphslave.cpp index 486772febd3..d0c87d8e7f0 100644 --- a/thorlcr/graph/thgraphslave.cpp +++ b/thorlcr/graph/thgraphslave.cpp @@ -586,14 +586,17 @@ unsigned __int64 CSlaveActivity::queryLocalCycles() const break; } } - unsigned __int64 localCycles = queryTotalCycles(); - if (localCycles < inputCycles) // not sure how/if possible, but guard against + unsigned __int64 processCycles = queryTotalCycles() + queryLookAheadCycles(); + if (processCycles < inputCycles) // not sure how/if possible, but guard against return 0; - localCycles -= inputCycles; + processCycles -= inputCycles; const unsigned __int64 blockedCycles = queryBlockedCycles(); - if (localCycles < blockedCycles) + if (processCycles < blockedCycles) + { + IWARNLOG("CSlaveActivity::queryLocalCycles - processCycles %" I64F "u < blockedCycles %" I64F "u", processCycles, blockedCycles); return 0; - return localCycles-blockedCycles; + } + return processCycles-blockedCycles; } void CSlaveActivity::serializeStats(MemoryBuffer &mb) @@ -618,6 +621,8 @@ void CSlaveActivity::serializeStats(MemoryBuffer &mb) serializedStats.setStatistic(StTimeLocalExecute, (unsigned __int64)cycle_to_nanosec(queryLocalCycles())); serializedStats.setStatistic(StTimeTotalExecute, (unsigned __int64)cycle_to_nanosec(queryTotalCycles())); serializedStats.setStatistic(StTimeBlocked, (unsigned __int64)cycle_to_nanosec(queryBlockedCycles())); + serializedStats.setStatistic(StTimeLookAhead, (unsigned __int64)cycle_to_nanosec(queryLookAheadCycles())); + serializedStats.serialize(mb); ForEachItemIn(i, outputs) { diff --git a/thorlcr/graph/thgraphslave.hpp b/thorlcr/graph/thgraphslave.hpp index b40bda65164..685784ddc86 100644 --- a/thorlcr/graph/thgraphslave.hpp +++ b/thorlcr/graph/thgraphslave.hpp @@ -46,10 +46,11 @@ class COutputTiming COutputTiming() { } void resetTiming() { slaveTimerStats.reset(); } - ActivityTimeAccumulator &getTotalCyclesRef() { return slaveTimerStats; } + ActivityTimeAccumulator &getActivityTimerAccumulator() { return slaveTimerStats; } unsigned __int64 queryTotalCycles() const { return slaveTimerStats.totalCycles; } unsigned __int64 queryEndCycles() const { return slaveTimerStats.endCycles; } unsigned __int64 queryBlockedCycles() const { return slaveTimerStats.blockedCycles; } + unsigned __int64 queryLookAheadCycles() const { return slaveTimerStats.lookAheadCycles; } }; class CEdgeProgress @@ -289,8 +290,9 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres return consumerOrdered; } virtual unsigned __int64 queryTotalCycles() const { return COutputTiming::queryTotalCycles(); } - virtual unsigned __int64 queryBlockedCycles() const { return COutputTiming::queryBlockedCycles();} + virtual unsigned __int64 queryBlockedCycles() const { return COutputTiming::queryBlockedCycles(); } virtual unsigned __int64 queryEndCycles() const { return COutputTiming::queryEndCycles(); } + virtual unsigned __int64 queryLookAheadCycles() const { return COutputTiming::queryLookAheadCycles(); } virtual void debugRequest(MemoryBuffer &msg) override; // IThorDataLink diff --git a/thorlcr/thorutil/thormisc.cpp b/thorlcr/thorutil/thormisc.cpp index 9c6cda182b9..85c15ef2007 100644 --- a/thorlcr/thorutil/thormisc.cpp +++ b/thorlcr/thorutil/thormisc.cpp @@ -76,7 +76,7 @@ static Owned ClusterMPAllocator; const StatisticsMapping spillStatistics({StTimeSpillElapsed, StTimeSortElapsed, StNumSpills, StSizeSpillFile, StSizePeakTempDisk}); const StatisticsMapping executeStatistics({StTimeTotalExecute, StTimeLocalExecute, StTimeBlocked}); const StatisticsMapping soapcallStatistics({StTimeSoapcall, StTimeSoapcallDNS, StTimeSoapcallConnect, StNumSoapcallConnectFailures}); -const StatisticsMapping basicActivityStatistics({StNumParallelExecute}, executeStatistics, spillStatistics); +const StatisticsMapping basicActivityStatistics({StNumParallelExecute, StTimeLookAhead}, executeStatistics, spillStatistics); const StatisticsMapping groupActivityStatistics({StNumGroups, StNumGroupMax}, basicActivityStatistics); const StatisticsMapping indexReadFileStatistics({}, diskReadRemoteStatistics, jhtreeCacheStatistics); const StatisticsMapping indexReadActivityStatistics({StNumRowsProcessed}, indexReadFileStatistics, basicActivityStatistics);