From 4ff1b51e87ee2a356a73357e68225863b881327a Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Thu, 3 Oct 2024 13:10:01 +0100 Subject: [PATCH] HPCC-32483 Capture global sort/join blocked time Capture blocked time for global sort and joins. Signed-off-by: Shamser Ahmed --- thorlcr/activities/join/thjoinslave.cpp | 24 ++++++++++++----- .../activities/keyedjoin/thkeyedjoinslave.cpp | 11 +++++--- .../lookupjoin/thlookupjoinslave.cpp | 1 + thorlcr/activities/msort/thmsortslave.cpp | 9 ++++--- thorlcr/msort/tsorts.cpp | 27 +++++++++++-------- thorlcr/msort/tsorts.hpp | 4 +-- 6 files changed, 51 insertions(+), 25 deletions(-) diff --git a/thorlcr/activities/join/thjoinslave.cpp b/thorlcr/activities/join/thjoinslave.cpp index f2262256ab7..3ec676fd7cd 100644 --- a/thorlcr/activities/join/thjoinslave.cpp +++ b/thorlcr/activities/join/thjoinslave.cpp @@ -299,7 +299,10 @@ class JoinSlaveActivity : public CSlaveActivity, implements ILookAheadStopNotify stopOtherInput(); throw; } - asyncSecondaryStart.wait(); + { + BlockedActivityTimer t(slaveTimerStats, timeActivities); + asyncSecondaryStart.wait(); + } if (secondaryStartException) { IException *e=secondaryStartException.getClear(); @@ -387,7 +390,10 @@ class JoinSlaveActivity : public CSlaveActivity, implements ILookAheadStopNotify { unsigned bn=noSortPartitionSide()?2:4; ActPrintLog("JOIN waiting barrier.%d",bn); - barrier->wait(false); + { + BlockedActivityTimer t(slaveTimerStats, timeActivities); + barrier->wait(false); + } ActPrintLog("JOIN barrier.%d raised",bn); sorter->stopMerge(); } @@ -564,8 +570,11 @@ class JoinSlaveActivity : public CSlaveActivity, implements ILookAheadStopNotify return false; } ActPrintLog("JOIN waiting barrier.1"); - if (!barrier->wait(false)) - return false; + { + BlockedActivityTimer t(slaveTimerStats, timeActivities); + if (!barrier->wait(false)) + return false; + } ActPrintLog("JOIN barrier.1 raised"); // primaryWriter will keep as much in memory as possible. @@ -575,8 +584,11 @@ class JoinSlaveActivity : public CSlaveActivity, implements ILookAheadStopNotify primaryStream.setown(primaryWriter->getReader()); // NB: rhsWriter no longer needed after this point ActPrintLog("JOIN waiting barrier.2"); - if (!barrier->wait(false)) - return false; + { + BlockedActivityTimer t(slaveTimerStats, timeActivities); + if (!barrier->wait(false)) + return false; + } ActPrintLog("JOIN barrier.2 raised"); sorter->stopMerge(); if (0 == sorter->getGlobalCount()) diff --git a/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp b/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp index 8844a99b7f3..29af630511a 100644 --- a/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp +++ b/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp @@ -829,6 +829,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem Semaphore sem; CriticalSection crit; bool enabled = true; + CSlaveActivity &activity; void unblock() { @@ -848,10 +849,10 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } } public: - CLimiter() + CLimiter(CSlaveActivity &_activity) : activity(_activity) { } - CLimiter(unsigned _max, unsigned _leewayPercent=0) + CLimiter(CSlaveActivity &_activity, unsigned _max, unsigned _leewayPercent=0): activity(_activity) { _set(_max, _leewayPercent); } @@ -894,7 +895,10 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem void inc() { while (incNonBlocking()) + { + BlockedActivityTimer t(activity.getTotalCyclesRef(), activity.queryTimeActivities()); sem.wait(); + } } void dec() { @@ -907,6 +911,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } void block() { + BlockedActivityTimer t(activity.getTotalCyclesRef(), activity.queryTimeActivities()); sem.wait(); } void disable() @@ -2965,7 +2970,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem public: IMPLEMENT_IINTERFACE_USING(PARENT); - CKeyedJoinSlave(CGraphElementBase *_container) : PARENT(_container, keyedJoinActivityStatistics), readAheadThread(*this) + CKeyedJoinSlave(CGraphElementBase *_container) : PARENT(_container, keyedJoinActivityStatistics), readAheadThread(*this), lookupThreadLimiter(*this), fetchThreadLimiter(*this), pendingKeyLookupLimiter(*this), doneListLimiter(*this) { helper = static_cast (queryHelper()); reInit = 0 != (helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) || (helper->getJoinFlags() & JFvarindexfilename); diff --git a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp index c1e62740c2f..0462a80a1ea 100644 --- a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp +++ b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp @@ -1027,6 +1027,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, } inline void interChannelBarrier() { + BlockedActivityTimer t(slaveTimerStats, timeActivities); if (queryJob().queryJobChannels()>1) { if (channels[0]->incNotifyCountAndCheck()) diff --git a/thorlcr/activities/msort/thmsortslave.cpp b/thorlcr/activities/msort/thmsortslave.cpp index 14acc3dedb4..a5b9f50e5a1 100644 --- a/thorlcr/activities/msort/thmsortslave.cpp +++ b/thorlcr/activities/msort/thmsortslave.cpp @@ -140,9 +140,12 @@ class MSortSlaveActivity : public CSlaveActivity throw; } ActPrintLog("SORT waiting barrier.1"); - if (!barrier->wait(false)) { - Sleep(1000); // let original error through - throw MakeThorException(TE_BarrierAborted,"SORT: Barrier Aborted"); + { + BlockedActivityTimer t(slaveTimerStats, timeActivities); + if (!barrier->wait(false)) { + Sleep(1000); // let original error through + throw MakeThorException(TE_BarrierAborted,"SORT: Barrier Aborted"); + } } ActPrintLog("SORT barrier.1 raised"); output.setown(sorter->startMerge(totalrows)); diff --git a/thorlcr/msort/tsorts.cpp b/thorlcr/msort/tsorts.cpp index fe9ff077883..09f1d61bf87 100644 --- a/thorlcr/msort/tsorts.cpp +++ b/thorlcr/msort/tsorts.cpp @@ -35,6 +35,7 @@ #include "thbuf.hpp" #include "thbufdef.hpp" #include "thgraph.hpp" +#include "thgraphslave.hpp" #ifdef _DEBUG //#define TRACE_UNIQUE @@ -59,8 +60,6 @@ inline void traceWait(const char *name, T &sem,unsigned interval=60*1000) #define MINCOMPRESSEDROWSIZE 16 #define MAXCOMPRESSEDROWSIZE 0x2000 -#define MPBLOCKTIMEOUT (1000*60*15) - class CWriteIntercept : public CSimpleInterface { @@ -607,7 +606,7 @@ class CMiniSort class CThorSorter : public CSimpleInterface, implements IThorSorter, implements ISortSlaveBase, implements ISortSlaveMP, private IThreaded { - CActivityBase *activity; + CSlaveActivity *activity; SocketEndpoint myendpoint; Linked clusterComm; mptag_t mpTagRPC; @@ -816,7 +815,7 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements public: IMPLEMENT_IINTERFACE_USING(CSimpleInterface); - CThorSorter(CActivityBase *_activity, SocketEndpoint &ep, ICommunicator *_clusterComm, mptag_t _mpTagRPC) + CThorSorter(CSlaveActivity *_activity, SocketEndpoint &ep, ICommunicator *_clusterComm, mptag_t _mpTagRPC) : activity(_activity), myendpoint(ep), clusterComm(_clusterComm), mpTagRPC(_mpTagRPC), rowArray(*_activity, _activity), threaded("CThorSorter", this), spillStats(spillStatistics) { @@ -1268,11 +1267,14 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements { ActPrintLog(activity, "Gather in"); globalCount = 0; - for (;;) { - if (abort) - return; - if (startgathersem.wait(10000)) - break; + { + BlockedActivityTimer t(activity->getTotalCyclesRef(), activity->queryTimeActivities()); + for (;;) { + if (abort) + return; + if (startgathersem.wait(10000)) + break; + } } ActPrintLog(activity, "SORT: Gather"); assertex(!rowif); @@ -1374,7 +1376,10 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements virtual IRowStream * startMerge(rowcount_t &_totalrows) { ActPrintLog(activity, "SORT Merge Waiting"); - traceWait("startmergesem",startmergesem); + { + BlockedActivityTimer t(activity->getTotalCyclesRef(), activity->queryTimeActivities()); + traceWait("startmergesem",startmergesem); + } ActPrintLog(activity, "SORT Merge Start"); _totalrows = totalrows; return merger.getLink(); @@ -1406,7 +1411,7 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements //============================================================================== -THORSORT_API IThorSorter *CreateThorSorter(CActivityBase *activity, SocketEndpoint &ep, ICommunicator *clusterComm, mptag_t _mpTagRPC) +THORSORT_API IThorSorter *CreateThorSorter(CSlaveActivity *activity, SocketEndpoint &ep, ICommunicator *clusterComm, mptag_t _mpTagRPC) { return new CThorSorter(activity, ep, clusterComm, _mpTagRPC); } diff --git a/thorlcr/msort/tsorts.hpp b/thorlcr/msort/tsorts.hpp index 865b9affdfa..37f666ddfdb 100644 --- a/thorlcr/msort/tsorts.hpp +++ b/thorlcr/msort/tsorts.hpp @@ -74,8 +74,8 @@ interface ISocketRowWriter: extends IRowWriter virtual void stop()=0; }; -class CActivityBase; -THORSORT_API IThorSorter *CreateThorSorter(CActivityBase *activity, SocketEndpoint &ep, ICommunicator *clusterComm, mptag_t _mpTagRPC); +class CSlaveActivity; +THORSORT_API IThorSorter *CreateThorSorter(CSlaveActivity *activity, SocketEndpoint &ep, ICommunicator *clusterComm, mptag_t _mpTagRPC); IRowStream *ConnectMergeRead(unsigned id,IThorRowInterfaces *rowif,SocketEndpoint &nodeaddr,rowcount_t startrec,rowcount_t numrecs, ISocket *socket); ISocketRowWriter *ConnectMergeWrite(IThorRowInterfaces *rowif,ISocket *socket,size32_t bufsize,rowcount_t &startrec,rowcount_t &numrecs); #define SOCKETSERVERINC 1