Skip to content

Commit

Permalink
HPCC-32922 Changes following review
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <shamser.ahmed@lexisnexis.com>
  • Loading branch information
shamser committed Nov 8, 2024
1 parent 888ad90 commit d8e637f
Showing 1 changed file with 14 additions and 5 deletions.
19 changes: 14 additions & 5 deletions thorlcr/activities/lookupjoin/thlookupjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1445,12 +1445,17 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper<HELPER>,
}
HTHELPER *queryTable() { return table; }
IBitSet *queryRhsChannelStopSet() { dbgassertex(0 == queryJobChannelNumber()); return rhsChannelStop; }
void startLeftInput()
void startLeftInput(bool async=false)
{
LookAheadTimer t(slaveTimerStats, timeActivities);
try
{
startInput(0);
if (async)
{
LookAheadTimer t(slaveTimerStats, timeActivities);
startInput(0);
}
else
startInput(0);
if (ensureStartFTLookAhead(0))
setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), LOOKUPJOINL_SMART_BUFFER_SIZE, ::canStall(input), grouped, RCUNBOUND, this), false);
left.set(inputStream); // can be replaced by loader stream
Expand All @@ -1460,6 +1465,10 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper<HELPER>,
leftexception.setown(e);
}
}
void startLeftInputAsync()
{
startLeftInput(true);
}
virtual bool isRhsConstant() const { return rhsConstant; }

// IThorSlaveActivity overloaded methods
Expand Down Expand Up @@ -1531,6 +1540,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper<HELPER>,
}
virtual void start() override
{
ActivityTimer s(slaveTimerStats, timeActivities);
joined = 0;
joinCounter = 0;
candidateCounter = 0;
Expand Down Expand Up @@ -1563,10 +1573,9 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper<HELPER>,
}
else
{
CAsyncCallStart asyncLeftStart(std::bind(&CInMemJoinBase::startLeftInput, this));
CAsyncCallStart asyncLeftStart(std::bind(&CInMemJoinBase::startLeftInputAsync, this));
try
{
ActivityTimer s(slaveTimerStats, timeActivities);
startInput(1);
rhsStartedBefore = true;
}
Expand Down

0 comments on commit d8e637f

Please sign in to comment.