Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-33055 Support KEYED JOIN where rhs may evaluate to a null dataset #19415

Merged
merged 2 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions ecl/hqlcpp/hqlckey.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,14 +305,38 @@ IHqlExpression * KeyedJoinInfo::querySimplifiedKey(IHqlExpression * expr)

IHqlExpression * queryBaseIndexForKeyedJoin(IHqlExpression * expr)
{
if (expr->getOperator() == no_if)
node_operator op = expr->getOperator();
if (op == no_if)
{
IHqlExpression * left = queryBaseIndexForKeyedJoin(expr->queryChild(1));
IHqlExpression * right = queryBaseIndexForKeyedJoin(expr->queryChild(2));
if (left && right)
return left;
{
//IF (cond, index) and IF(cond, null, index) should be allowed, and will return the index
if (left->getOperator() != no_null)
return left;
return right;
}
return nullptr;
}
else if (op == no_chooseds)
{
IHqlExpression * result = nullptr;
ForEachChildFrom(i, expr, 1)
{
IHqlExpression * match = queryBaseIndexForKeyedJoin(expr->queryChild(i));
if (!match)
return nullptr;
if (!result || result->getOperator() == no_null)
result = match;
}
return result;
}
else if (op == no_null)
return expr;
else if (op == no_split)
return queryBaseIndexForKeyedJoin(expr->queryChild(0));

return queryPhysicalRootTable(expr);
}

Expand Down
18 changes: 11 additions & 7 deletions roxie/ccd/ccdactivities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2509,15 +2509,19 @@ class CRoxieKeyedActivity : public CRoxieAgentActivity
else
{
IKeyIndexBase *kib = keyArray->queryKeyPart(lastPartNo.partNo);
assertex(kib != NULL);
IKeyIndex *k = kib->queryPart(lastPartNo.fileNo);
if (filechanged)
if (!kib)
tlk.clear();
else
{
tlk.setown(createLocalKeyManager(*keyRecInfo, k, &logctx, hasNewSegmentMonitors(), !logctx.isBlind()));
createSegmentMonitorsPending = true;
IKeyIndex *k = kib->queryPart(lastPartNo.fileNo);
if (filechanged || !tlk)
{
tlk.setown(createLocalKeyManager(*keyRecInfo, k, &logctx, hasNewSegmentMonitors(), !logctx.isBlind()));
createSegmentMonitorsPending = true;
}
else
tlk->setKey(k);
}
else
tlk->setKey(k);
}
}

Expand Down
37 changes: 35 additions & 2 deletions roxie/ccd/ccdserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1501,7 +1501,11 @@ class CRoxieServerActivity : implements CInterfaceOf<IRoxieServerActivity>, impl
virtual IEngineRowStream *queryConcreteOutputStream(unsigned whichInput) { assertex(whichInput==0); return this; }
virtual IStrandJunction *queryConcreteOutputJunction(unsigned idx) const { assertex(idx==0); return junction; }
virtual IRoxieServerActivity *queryActivity() { return this; }
virtual IIndexReadActivityInfo *queryIndexReadActivity() { return NULL; }
virtual IIndexReadActivityInfo *queryIndexReadActivity()
{
CTXLOG("Activity does not implement queryIndexReadActivity");
return NULL;
}

virtual bool needsAllocator() const { return false; }

Expand Down Expand Up @@ -5527,6 +5531,18 @@ IRoxieServerActivityFactory *createRoxieServerApplyActivityFactory(unsigned _id,

//=================================================================================

static class CDummyIndexReadInfo : public CInterfaceOf<IIndexReadActivityInfo>
{
public:
virtual IKeyArray *getKeySet() const { return nullptr; }
virtual const IResolvedFile *getVarFileInfo() const { return nullptr; }
virtual ITranslatorSet *getTranslators() const { return nullptr; }

virtual void mergeSegmentMonitors(IIndexReadContext *irc) const { }
virtual IRoxieServerActivity *queryActivity() { throwUnexpected(); }; // Should never involve remote agent if keyset has returned nullptr
virtual const RemoteActivityId &queryRemoteId() const { throwUnexpected(); }
} dummyIndexReadInfo;

class CRoxieServerNullActivity : public CRoxieServerActivity
{
public:
Expand All @@ -5540,6 +5556,10 @@ class CRoxieServerNullActivity : public CRoxieServerActivity
return NULL;
}

virtual IIndexReadActivityInfo *queryIndexReadActivity()
{
return &dummyIndexReadInfo;
}
};

IRoxieServerActivity * createRoxieServerNullActivity(IRoxieAgentContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
Expand Down Expand Up @@ -9554,6 +9574,11 @@ class CRoxieServerThroughSpillActivity : public CRoxieServerActivity
CRoxieServerActivity::stop();
};

virtual IIndexReadActivityInfo *queryIndexReadActivity() override
{
return input->queryIndexReadActivity();
}

void reset(unsigned oid)
{
if (state != STATEreset) // make sure input is only reset once
Expand Down Expand Up @@ -21060,6 +21085,14 @@ class CRoxieServerCaseActivity : public CRoxieServerMultiInputBaseActivity
CRoxieServerMultiInputBaseActivity::reset();
}

virtual IIndexReadActivityInfo *queryIndexReadActivity()
{
//CHOOSE defaults to the last argument if out of range.
if (cond >= numInputs)
cond = numInputs - 1;
return inputArray[cond]->queryIndexReadActivity();
}

virtual const void *nextRow()
{
ActivityTimer t(activityStats, timeActivities);
Expand Down Expand Up @@ -21198,7 +21231,7 @@ class CRoxieServerIfActivity : public CRoxieServerActivity
IFinalRoxieInput *in = cond ? inputTrue : inputFalse;
if (in)
return in->queryIndexReadActivity();
return NULL;
return &dummyIndexReadInfo;
}

virtual void reset()
Expand Down
6 changes: 6 additions & 0 deletions testing/regress/ecl/key/stresstext_if.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<Dataset name='Result 1'>
<Row><Result_1>true</Result_1></Row>
</Dataset>
<Dataset name='Result 2'>
<Row><Result_2>Done</Result_2></Row>
</Dataset>
1 change: 1 addition & 0 deletions testing/regress/ecl/setup/files.ecl
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ EXPORT NameSearchSource := indexPrefix + 'searchSource';
EXPORT getWordIndex() := INDEX(TS.textSearchIndex, NameWordIndex());
EXPORT getSearchIndex() := INDEX(TS.textSearchIndex, NameSearchIndex);
EXPORT getSearchIndexVariant(string variant) := INDEX(TS.textSearchIndex, NameSearchIndex + IF(variant != '', '_' + variant, ''));
EXPORT getOptSearchIndexVariant(string variant) := INDEX(TS.textSearchIndex, NameSearchIndex + IF(variant != '', '_' + variant, ''), OPT);

EXPORT getSearchSuperIndex() := INDEX(TS.textSearchIndex, '{' + NameSearchIndex + ',' + NameWordIndex() + '}');
EXPORT getSearchSource() := DATASET(NameSearchSource, TS.textSourceRecord, THOR);
Expand Down
114 changes: 114 additions & 0 deletions testing/regress/ecl/stresstext_if.ecl
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*##############################################################################

HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
############################################################################## */

//nothor
//nohthor

//version multiPart=false
//version multiPart=true
//version multiPart=true,variant='inplace'
//version multiPart=true,variant='default'
//version multiPart=true,variant='inplace',conditionVersion=2
//version multiPart=true,variant='inplace',conditionVersion=3
//version multiPart=true,variant='',conditionVersion=2
//version multiPart=true,variant='',conditionVersion=4

// The settings below may be useful when trying to analyse Roxie keyed join behaviour, as they will
// eliminate some wait time for an agent queue to become available

//#option('roxie:minPayloadSize', 10000)
//#option('roxie:agentThreads', 400)
//#option('roxie:prestartAgentThreads', true)

import ^ as root;
multiPart := #IFDEFINED(root.multiPart, true);
variant := #IFDEFINED(root.variant, 'inplace') : stored('variant');
numJoins := #IFDEFINED(root.numJoins, 1);

conditionVersion := #IFDEFINED(root.conditionVersion, 1);

#option ('allowActivityForKeyedJoin', true);
#onwarning (4523, ignore);

trueExpr := true : stored('true');

//--- end of version configuration ---

import $.setup;
files := setup.files(multiPart, false);


createSample(unsigned i, unsigned num, unsigned numRows) := FUNCTION

//Add a keyed filter to ensure that no splitter is generated.
//The splitter performs pathologically on roxie - it may be worth further investigation
filtered := files.getSearchSource()(HASH32(kind, word, doc, segment, wpos) % num = i, keyed(word != ''));
inputFile := choosen(filtered, numRows);
keyFile1 := CASE(variant,
'inplace' => files.getSearchIndexVariant('inplace'),
'default' => files.getSearchIndexVariant('default'),
files.getOptSearchIndexVariant('doesnotexist')
);
keyFile2 := MAP(variant = 'inplace' => files.getSearchIndexVariant('inplace'),
variant = 'default' => files.getSearchIndexVariant('default'),
files.getOptSearchIndexVariant('doesnotexist')
);
keyFile3 := IF(variant = 'inplace', files.getSearchIndexVariant('inplace'),
files.getOptSearchIndexVariant('doesnotexist')
);

keyFile4 := MAP(variant = 'inplace' => files.getSearchIndexVariant('inplace'),
variant = 'default' => files.getSearchIndexVariant('default'),
files.getSearchIndexVariant('inplace')(false)
);
#if (conditionVersion = 1)
keyFile := keyFile1;
#elif (conditionVersion = 2)
keyFile := keyFile2;
#elif (conditionVersion = 3)
keyFile := keyFile3;
#else
keyFile := keyFile4;
#end

j := JOIN(inputFile, keyFile,
(LEFT.kind = RIGHT.kind) AND
(LEFT.word = RIGHT.word) AND
(LEFT.doc = RIGHT.doc) AND
(LEFT.segment = RIGHT.segment) AND
(LEFT.wpos = RIGHT.wpos), ATMOST(10), KEYED);
RETURN NOFOLD(j);
END;

createSamples(iters, numRows) := FUNCTIONMACRO
expectedCount := IF(variant != '', numRows, 0);
o := PARALLEL(
#DECLARE (count)
#SET (count, 0)
#LOOP
#IF (%count%>=iters)
#BREAK
#END
output(count(createSample(%count%, iters, numRows)) = expectedCount),
#SET (count, %count%+1)
#END
output('Done')
);
RETURN o;
ENDMACRO;

createSamples(numJoins, 60000);
Loading