-
Notifications
You must be signed in to change notification settings - Fork 304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HPCC-32483 Capture global sort/join blocked time #19181
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -299,7 +299,10 @@ class JoinSlaveActivity : public CSlaveActivity, implements ILookAheadStopNotify | |
stopOtherInput(); | ||
throw; | ||
} | ||
asyncSecondaryStart.wait(); | ||
{ | ||
BlockedActivityTimer t(slaveTimerStats, timeActivities); | ||
asyncSecondaryStart.wait(); | ||
} | ||
if (secondaryStartException) | ||
{ | ||
IException *e=secondaryStartException.getClear(); | ||
|
@@ -387,7 +390,10 @@ class JoinSlaveActivity : public CSlaveActivity, implements ILookAheadStopNotify | |
{ | ||
unsigned bn=noSortPartitionSide()?2:4; | ||
ActPrintLog("JOIN waiting barrier.%d",bn); | ||
barrier->wait(false); | ||
{ | ||
BlockedActivityTimer t(slaveTimerStats, timeActivities); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I realize there's BlockActivityTimer's in other stops already (via hash agg stream), but .. stop() time in general is not being timed (as part of total cycles).. may be it should be - but that's a separate question. If any act measures blocked in stop(), it would mean it needs to consider stop() in the calculation in general which it doesn't. |
||
barrier->wait(false); | ||
} | ||
ActPrintLog("JOIN barrier.%d raised",bn); | ||
sorter->stopMerge(); | ||
} | ||
|
@@ -564,8 +570,11 @@ class JoinSlaveActivity : public CSlaveActivity, implements ILookAheadStopNotify | |
return false; | ||
} | ||
ActPrintLog("JOIN waiting barrier.1"); | ||
if (!barrier->wait(false)) | ||
return false; | ||
{ | ||
BlockedActivityTimer t(slaveTimerStats, timeActivities); | ||
if (!barrier->wait(false)) | ||
return false; | ||
} | ||
ActPrintLog("JOIN barrier.1 raised"); | ||
|
||
// primaryWriter will keep as much in memory as possible. | ||
|
@@ -575,8 +584,11 @@ class JoinSlaveActivity : public CSlaveActivity, implements ILookAheadStopNotify | |
primaryStream.setown(primaryWriter->getReader()); // NB: rhsWriter no longer needed after this point | ||
|
||
ActPrintLog("JOIN waiting barrier.2"); | ||
if (!barrier->wait(false)) | ||
return false; | ||
{ | ||
BlockedActivityTimer t(slaveTimerStats, timeActivities); | ||
if (!barrier->wait(false)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what about the barrier->wait on line 613? |
||
return false; | ||
} | ||
ActPrintLog("JOIN barrier.2 raised"); | ||
sorter->stopMerge(); | ||
if (0 == sorter->getGlobalCount()) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -829,6 +829,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem | |
Semaphore sem; | ||
CriticalSection crit; | ||
bool enabled = true; | ||
CSlaveActivity &activity; | ||
|
||
void unblock() | ||
{ | ||
|
@@ -848,10 +849,10 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem | |
} | ||
} | ||
public: | ||
CLimiter() | ||
CLimiter(CSlaveActivity &_activity) : activity(_activity) | ||
{ | ||
} | ||
CLimiter(unsigned _max, unsigned _leewayPercent=0) | ||
CLimiter(CSlaveActivity &_activity, unsigned _max, unsigned _leewayPercent=0): activity(_activity) | ||
{ | ||
_set(_max, _leewayPercent); | ||
} | ||
|
@@ -894,7 +895,10 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem | |
void inc() | ||
{ | ||
while (incNonBlocking()) | ||
{ | ||
BlockedActivityTimer t(activity.getTotalCyclesRef(), activity.queryTimeActivities()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this block time actually holding up the nextRow (or start) of the KJ act? |
||
sem.wait(); | ||
} | ||
} | ||
void dec() | ||
{ | ||
|
@@ -907,6 +911,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem | |
} | ||
void block() | ||
{ | ||
BlockedActivityTimer t(activity.getTotalCyclesRef(), activity.queryTimeActivities()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same q. as per prev. comments. |
||
sem.wait(); | ||
} | ||
void disable() | ||
|
@@ -2965,7 +2970,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem | |
public: | ||
IMPLEMENT_IINTERFACE_USING(PARENT); | ||
|
||
CKeyedJoinSlave(CGraphElementBase *_container) : PARENT(_container, keyedJoinActivityStatistics), readAheadThread(*this) | ||
CKeyedJoinSlave(CGraphElementBase *_container) : PARENT(_container, keyedJoinActivityStatistics), readAheadThread(*this), lookupThreadLimiter(*this), fetchThreadLimiter(*this), pendingKeyLookupLimiter(*this), doneListLimiter(*this) | ||
{ | ||
helper = static_cast <IHThorKeyedJoinArg *> (queryHelper()); | ||
reInit = 0 != (helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) || (helper->getJoinFlags() & JFvarindexfilename); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1027,6 +1027,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper<HELPER>, | |
} | ||
inline void interChannelBarrier() | ||
{ | ||
BlockedActivityTimer t(slaveTimerStats, timeActivities); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is generally corrected to consider this block time, because most are being called in the context (on the stack of) start/nextRow, |
||
if (queryJob().queryJobChannels()>1) | ||
{ | ||
if (channels[0]->incNotifyCountAndCheck()) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure about this.
why is this blocked time?
input is on a thread - asynchronously starting.
Totalcycles at this point, will be the total time both took to start (overlapping).
The timing in queryLocalCycles() can go wrong because it is considering totalcycles - (input1-time+intput2-time), i.e. as if started synchronously.
This timer will be : input2 time - input1 time (or 0 if input2 was quicker).
Counting it as lookahead time, means "processCycles" in queryLocalCycles() will be total+this lookahead time, and mean that when the sum total for both inputs are deduced it should be correct.
Whatever the conclusion, needs a good comment here explaining the logic clearly.