From d8e637f7c07ecfcbcfafb694551b9790f70bd182 Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Fri, 8 Nov 2024 10:24:06 +0000 Subject: [PATCH] HPCC-32922 Changes following review Signed-off-by: Shamser Ahmed --- .../lookupjoin/thlookupjoinslave.cpp | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp index 1b2d53db7d4..6c84c53e7e0 100644 --- a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp +++ b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp @@ -1445,12 +1445,17 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, } HTHELPER *queryTable() { return table; } IBitSet *queryRhsChannelStopSet() { dbgassertex(0 == queryJobChannelNumber()); return rhsChannelStop; } - void startLeftInput() + void startLeftInput(bool async=false) { - LookAheadTimer t(slaveTimerStats, timeActivities); try { - startInput(0); + if (async) + { + LookAheadTimer t(slaveTimerStats, timeActivities); + startInput(0); + } + else + startInput(0); if (ensureStartFTLookAhead(0)) setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), LOOKUPJOINL_SMART_BUFFER_SIZE, ::canStall(input), grouped, RCUNBOUND, this), false); left.set(inputStream); // can be replaced by loader stream @@ -1460,6 +1465,10 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, leftexception.setown(e); } } + void startLeftInputAsync() + { + startLeftInput(true); + } virtual bool isRhsConstant() const { return rhsConstant; } // IThorSlaveActivity overloaded methods @@ -1531,6 +1540,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, } virtual void start() override { + ActivityTimer s(slaveTimerStats, timeActivities); joined = 0; joinCounter = 0; candidateCounter = 0; @@ -1563,10 +1573,9 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, } else { - CAsyncCallStart asyncLeftStart(std::bind(&CInMemJoinBase::startLeftInput, this)); + CAsyncCallStart asyncLeftStart(std::bind(&CInMemJoinBase::startLeftInputAsync, this)); try { - ActivityTimer s(slaveTimerStats, timeActivities); startInput(1); rhsStartedBefore = true; }