Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-32483 Capture global sort/join blocked time #19181

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions thorlcr/activities/join/thjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,10 @@ class JoinSlaveActivity : public CSlaveActivity, implements ILookAheadStopNotify
stopOtherInput();
throw;
}
asyncSecondaryStart.wait();
{
BlockedActivityTimer t(slaveTimerStats, timeActivities);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure about this.

why is this blocked time?

input is on a thread - asynchronously starting.
Totalcycles at this point, will be the total time both took to start (overlapping).
The timing in queryLocalCycles() can go wrong because it is considering totalcycles - (input1-time+intput2-time), i.e. as if started synchronously.

This timer will be : input2 time - input1 time (or 0 if input2 was quicker).
Counting it as lookahead time, means "processCycles" in queryLocalCycles() will be total+this lookahead time, and mean that when the sum total for both inputs are deduced it should be correct.

Whatever the conclusion, needs a good comment here explaining the logic clearly.

asyncSecondaryStart.wait();
}
if (secondaryStartException)
{
IException *e=secondaryStartException.getClear();
Expand Down Expand Up @@ -387,7 +390,10 @@ class JoinSlaveActivity : public CSlaveActivity, implements ILookAheadStopNotify
{
unsigned bn=noSortPartitionSide()?2:4;
ActPrintLog("JOIN waiting barrier.%d",bn);
barrier->wait(false);
{
BlockedActivityTimer t(slaveTimerStats, timeActivities);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize there's BlockActivityTimer's in other stops already (via hash agg stream),

but .. stop() time in general is not being timed (as part of total cycles).. may be it should be - but that's a separate question.
So that means, if blocked time is, then processCycles could be < blockedCycles ( and result in warnings from queryLocalCycles)

If any act measures blocked in stop(), it would mean it needs to consider stop() in the calculation in general which it doesn't.
Can you remove this (and separately other Blocked time considerations in other stop'd contexts) and open a separate JIRA to revisit measuring time in general in activities stop()'s.

barrier->wait(false);
}
ActPrintLog("JOIN barrier.%d raised",bn);
sorter->stopMerge();
}
Expand Down Expand Up @@ -564,8 +570,11 @@ class JoinSlaveActivity : public CSlaveActivity, implements ILookAheadStopNotify
return false;
}
ActPrintLog("JOIN waiting barrier.1");
if (!barrier->wait(false))
return false;
{
BlockedActivityTimer t(slaveTimerStats, timeActivities);
if (!barrier->wait(false))
return false;
}
ActPrintLog("JOIN barrier.1 raised");

// primaryWriter will keep as much in memory as possible.
Expand All @@ -575,8 +584,11 @@ class JoinSlaveActivity : public CSlaveActivity, implements ILookAheadStopNotify
primaryStream.setown(primaryWriter->getReader()); // NB: rhsWriter no longer needed after this point

ActPrintLog("JOIN waiting barrier.2");
if (!barrier->wait(false))
return false;
{
BlockedActivityTimer t(slaveTimerStats, timeActivities);
if (!barrier->wait(false))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about the barrier->wait on line 613?

return false;
}
ActPrintLog("JOIN barrier.2 raised");
sorter->stopMerge();
if (0 == sorter->getGlobalCount())
Expand Down
11 changes: 8 additions & 3 deletions thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
Semaphore sem;
CriticalSection crit;
bool enabled = true;
CSlaveActivity &activity;

void unblock()
{
Expand All @@ -848,10 +849,10 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
}
}
public:
CLimiter()
CLimiter(CSlaveActivity &_activity) : activity(_activity)
{
}
CLimiter(unsigned _max, unsigned _leewayPercent=0)
CLimiter(CSlaveActivity &_activity, unsigned _max, unsigned _leewayPercent=0): activity(_activity)
{
_set(_max, _leewayPercent);
}
Expand Down Expand Up @@ -894,7 +895,10 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
void inc()
{
while (incNonBlocking())
{
BlockedActivityTimer t(activity.getTotalCyclesRef(), activity.queryTimeActivities());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this block time actually holding up the nextRow (or start) of the KJ act?
If it's not and overlapping time, e.g. because full, but nextRow is not blocked, then it would be wrong to consider it "blocked" time.
The time we are trying to track is the time the totalcycles was blocked.

sem.wait();
}
}
void dec()
{
Expand All @@ -907,6 +911,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
}
void block()
{
BlockedActivityTimer t(activity.getTotalCyclesRef(), activity.queryTimeActivities());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same q. as per prev. comments.
Need to think carefully whether overlapping concurrent threads that are pending on something are blocking the acts start/nextrow from making progress.

sem.wait();
}
void disable()
Expand Down Expand Up @@ -2965,7 +2970,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
public:
IMPLEMENT_IINTERFACE_USING(PARENT);

CKeyedJoinSlave(CGraphElementBase *_container) : PARENT(_container, keyedJoinActivityStatistics), readAheadThread(*this)
CKeyedJoinSlave(CGraphElementBase *_container) : PARENT(_container, keyedJoinActivityStatistics), readAheadThread(*this), lookupThreadLimiter(*this), fetchThreadLimiter(*this), pendingKeyLookupLimiter(*this), doneListLimiter(*this)
{
helper = static_cast <IHThorKeyedJoinArg *> (queryHelper());
reInit = 0 != (helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) || (helper->getJoinFlags() & JFvarindexfilename);
Expand Down
1 change: 1 addition & 0 deletions thorlcr/activities/lookupjoin/thlookupjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper<HELPER>,
}
inline void interChannelBarrier()
{
BlockedActivityTimer t(slaveTimerStats, timeActivities);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is generally corrected to consider this block time, because most are being called in the context (on the stack of) start/nextRow,
but there are a couple which are in the context of stop() [ see other comments re. not trackingstop() time ]

if (queryJob().queryJobChannels()>1)
{
if (channels[0]->incNotifyCountAndCheck())
Expand Down
9 changes: 6 additions & 3 deletions thorlcr/activities/msort/thmsortslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,12 @@ class MSortSlaveActivity : public CSlaveActivity
throw;
}
ActPrintLog("SORT waiting barrier.1");
if (!barrier->wait(false)) {
Sleep(1000); // let original error through
throw MakeThorException(TE_BarrierAborted,"SORT: Barrier Aborted");
{
BlockedActivityTimer t(slaveTimerStats, timeActivities);
if (!barrier->wait(false)) {
Sleep(1000); // let original error through
throw MakeThorException(TE_BarrierAborted,"SORT: Barrier Aborted");
}
}
ActPrintLog("SORT barrier.1 raised");
output.setown(sorter->startMerge(totalrows));
Expand Down
27 changes: 16 additions & 11 deletions thorlcr/msort/tsorts.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "thbuf.hpp"
#include "thbufdef.hpp"
#include "thgraph.hpp"
#include "thgraphslave.hpp"

#ifdef _DEBUG
//#define TRACE_UNIQUE
Expand All @@ -59,8 +60,6 @@ inline void traceWait(const char *name, T &sem,unsigned interval=60*1000)
#define MINCOMPRESSEDROWSIZE 16
#define MAXCOMPRESSEDROWSIZE 0x2000

#define MPBLOCKTIMEOUT (1000*60*15)


class CWriteIntercept : public CSimpleInterface
{
Expand Down Expand Up @@ -607,7 +606,7 @@ class CMiniSort
class CThorSorter : public CSimpleInterface, implements IThorSorter, implements ISortSlaveBase, implements ISortSlaveMP,
private IThreaded
{
CActivityBase *activity;
CSlaveActivity *activity;
SocketEndpoint myendpoint;
Linked<ICommunicator> clusterComm;
mptag_t mpTagRPC;
Expand Down Expand Up @@ -816,7 +815,7 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements
public:
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);

CThorSorter(CActivityBase *_activity, SocketEndpoint &ep, ICommunicator *_clusterComm, mptag_t _mpTagRPC)
CThorSorter(CSlaveActivity *_activity, SocketEndpoint &ep, ICommunicator *_clusterComm, mptag_t _mpTagRPC)
: activity(_activity), myendpoint(ep), clusterComm(_clusterComm), mpTagRPC(_mpTagRPC),
rowArray(*_activity, _activity), threaded("CThorSorter", this), spillStats(spillStatistics)
{
Expand Down Expand Up @@ -1269,11 +1268,14 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements
{
ActPrintLog(activity, "Gather in");
globalCount = 0;
for (;;) {
if (abort)
return;
if (startgathersem.wait(10000))
break;
{
BlockedActivityTimer t(activity->getTotalCyclesRef(), activity->queryTimeActivities());
for (;;) {
if (abort)
return;
if (startgathersem.wait(10000))
break;
}
}
ActPrintLog(activity, "SORT: Gather");
assertex(!rowif);
Expand Down Expand Up @@ -1375,7 +1377,10 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements
virtual IRowStream * startMerge(rowcount_t &_totalrows)
{
ActPrintLog(activity, "SORT Merge Waiting");
traceWait("startmergesem",startmergesem);
{
BlockedActivityTimer t(activity->getTotalCyclesRef(), activity->queryTimeActivities());
traceWait("startmergesem",startmergesem);
}
ActPrintLog(activity, "SORT Merge Start");
_totalrows = totalrows;
return merger.getLink();
Expand Down Expand Up @@ -1407,7 +1412,7 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements

//==============================================================================

THORSORT_API IThorSorter *CreateThorSorter(CActivityBase *activity, SocketEndpoint &ep, ICommunicator *clusterComm, mptag_t _mpTagRPC)
THORSORT_API IThorSorter *CreateThorSorter(CSlaveActivity *activity, SocketEndpoint &ep, ICommunicator *clusterComm, mptag_t _mpTagRPC)
{
return new CThorSorter(activity, ep, clusterComm, _mpTagRPC);
}
Expand Down
4 changes: 2 additions & 2 deletions thorlcr/msort/tsorts.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ interface ISocketRowWriter: extends IRowWriter
virtual void stop()=0;
};

class CActivityBase;
THORSORT_API IThorSorter *CreateThorSorter(CActivityBase *activity, SocketEndpoint &ep, ICommunicator *clusterComm, mptag_t _mpTagRPC);
class CSlaveActivity;
THORSORT_API IThorSorter *CreateThorSorter(CSlaveActivity *activity, SocketEndpoint &ep, ICommunicator *clusterComm, mptag_t _mpTagRPC);
IRowStream *ConnectMergeRead(unsigned id,IThorRowInterfaces *rowif,SocketEndpoint &nodeaddr,rowcount_t startrec,rowcount_t numrecs, ISocket *socket);
ISocketRowWriter *ConnectMergeWrite(IThorRowInterfaces *rowif,ISocket *socket,size32_t bufsize,rowcount_t &startrec,rowcount_t &numrecs);
#define SOCKETSERVERINC 1
Expand Down