Skip to content

Commit

Permalink
HPCC-32964 Add a Roxie Background priority queue
Browse files Browse the repository at this point in the history
Signed-off-by: M Kelly <mark.kelly+copilot@lexisnexisrisk.com>
  • Loading branch information
mckellyln committed Jan 3, 2025
1 parent fc98a1c commit 0913ec4
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 37 deletions.
2 changes: 2 additions & 0 deletions roxie/ccd/ccd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ void setMulticastEndpoints(unsigned numChannels);
#define ROXIE_SLA_PRIORITY 0x40000000 // mask in activityId indicating it goes SLA priority queue
#define ROXIE_HIGH_PRIORITY 0x80000000 // mask in activityId indicating it goes on the fast queue
#define ROXIE_LOW_PRIORITY 0x00000000 // mask in activityId indicating it goes on the slow queue (= default)
// background priority queue is when both ROXIE_SLA_PRIORITY and ROXIE_HIGH_PRIORITY are set
#define ROXIE_BG_PRIORITY 0xc0000000 // mask in activityId indicating it goes on the bg queue
#define ROXIE_PRIORITY_MASK (ROXIE_SLA_PRIORITY | ROXIE_HIGH_PRIORITY | ROXIE_LOW_PRIORITY)

#define ROXIE_ACTIVITY_FETCH 0x20000000 // or'ed into activityId for fetch part of full keyed join activities
Expand Down
12 changes: 8 additions & 4 deletions roxie/ccd/ccdlistener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1239,11 +1239,12 @@ class RoxieWorkUnitWorker : public RoxieQueryWorker
{
void noteQuery(bool failed, unsigned elapsedTime, unsigned priority)
{
switch(priority)
switch((int)priority)
{
case 0: loQueryStats.noteQuery(failed, elapsedTime); break;
case 1: hiQueryStats.noteQuery(failed, elapsedTime); break;
case 2: slaQueryStats.noteQuery(failed, elapsedTime); break;
case -1: bgQueryStats.noteQuery(failed, elapsedTime); break;
}
combinedQueryStats.noteQuery(failed, elapsedTime);
}
Expand Down Expand Up @@ -1355,11 +1356,12 @@ class RoxieWorkUnitWorker : public RoxieQueryWorker
isBlind = isBlind || blindLogging;
logctx.setBlind(isBlind);
priority = queryFactory->queryOptions().priority;
switch (priority)
switch ((int)priority)
{
case 0: loQueryStats.noteActive(); break;
case 1: hiQueryStats.noteActive(); break;
case 2: slaQueryStats.noteActive(); break;
case -1: bgQueryStats.noteActive(); break;
}
combinedQueryStats.noteActive();
Owned<IRoxieServerContext> ctx = queryFactory->createContext(wu, logctx);
Expand Down Expand Up @@ -1524,11 +1526,12 @@ class RoxieProtocolMsgContext : implements IHpccProtocolMsgContext, public CInte
virtual void noteQueryActive()
{
unsigned priority = getQueryPriority();
switch (priority)
switch ((int)priority)
{
case 0: loQueryStats.noteActive(); break;
case 1: hiQueryStats.noteActive(); break;
case 2: slaQueryStats.noteActive(); break;
case -1: bgQueryStats.noteActive(); break;
}
unknownQueryStats.noteComplete();
combinedQueryStats.noteActive();
Expand Down Expand Up @@ -1677,11 +1680,12 @@ class RoxieProtocolMsgContext : implements IHpccProtocolMsgContext, public CInte
}
else
{
switch(getQueryPriority())
switch((int)getQueryPriority())
{
case 0: loQueryStats.noteQuery(failed, elapsedTime); break;
case 1: hiQueryStats.noteQuery(failed, elapsedTime); break;
case 2: slaQueryStats.noteQuery(failed, elapsedTime); break;
case -1: bgQueryStats.noteQuery(failed, elapsedTime); break;
default: unknownQueryStats.noteQuery(failed, elapsedTime); return; // Don't include unknown in the combined stats
}
combinedQueryStats.noteQuery(failed, elapsedTime);
Expand Down
13 changes: 11 additions & 2 deletions roxie/ccd/ccdquery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,17 @@ void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stat
updateFromWorkUnit(priority, wu, "priority");
if (stateInfo)
updateFromContext(priority, stateInfo, "@priority");
timeLimit = defaultTimeLimit[priority];
warnTimeLimit = defaultWarnTimeLimit[priority];
if ((int)priority < 0)
{
// use LOW queue time limits ...
timeLimit = defaultTimeLimit[0];
warnTimeLimit = defaultWarnTimeLimit[0];
}
else
{
timeLimit = defaultTimeLimit[priority];
warnTimeLimit = defaultWarnTimeLimit[priority];
}
updateFromWorkUnit(timeLimit, wu, "timeLimit");
updateFromWorkUnit(warnTimeLimit, wu, "warnTimeLimit");
updateFromWorkUnitM(memoryLimit, wu, "memoryLimit");
Expand Down
71 changes: 41 additions & 30 deletions roxie/ccd/ccdqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ StringBuffer &RoxiePacketHeader::toString(StringBuffer &ret) const
{
const IpAddress serverIP = serverId.getIpAddress();
ret.append("activityId=");
switch(activityId & ~ROXIE_PRIORITY_MASK)
switch (activityId & ~ROXIE_PRIORITY_MASK)
{
case 0: ret.append("IBYTI"); break;
case ROXIE_UNLOAD: ret.append("ROXIE_UNLOAD"); break;
Expand All @@ -157,11 +157,12 @@ StringBuffer &RoxiePacketHeader::toString(StringBuffer &ret) const
break;
}
ret.appendf(" uid=" RUIDF " pri=", uid);
switch(activityId & ROXIE_PRIORITY_MASK)
switch (activityId & ROXIE_PRIORITY_MASK)
{
case ROXIE_SLA_PRIORITY: ret.append("SLA"); break;
case ROXIE_HIGH_PRIORITY: ret.append("HIGH"); break;
case ROXIE_LOW_PRIORITY: ret.append("LOW"); break;
case ROXIE_BG_PRIORITY: ret.append("BG"); break;
default: ret.append("???"); break;
}
ret.appendf(" queryHash=%" I64F "x ch=%u seq=%d cont=%d server=", queryHash, channel, overflowSequence, continueSequence);
Expand Down Expand Up @@ -1166,11 +1167,14 @@ class RoxieQueue : public CInterface, implements IThreadFactory
public:
IMPLEMENT_IINTERFACE;

RoxieQueue(unsigned _headRegionSize, unsigned _numWorkers)
RoxieQueue(unsigned _headRegionSize, unsigned _numWorkers, const char *qname=nullptr)
{
headRegionSize = _headRegionSize;
numWorkers = _numWorkers;
workers.setown(createThreadPool("RoxieWorkers", this, false, nullptr, numWorkers));
StringBuffer tname("RoxieWorkers");
if (qname && *qname)
tname.appendf(" (%s)", qname);
workers.setown(createThreadPool(tname.str(), this, false, nullptr, numWorkers));
started = 0;
idle = 0;
if (IBYTIbufferSize)
Expand Down Expand Up @@ -1904,12 +1908,13 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface
RoxieQueue slaQueue;
RoxieQueue hiQueue;
RoxieQueue loQueue;
RoxieQueue bgQueue;
unsigned numWorkers;

public:
IMPLEMENT_IINTERFACE;

RoxieReceiverBase(unsigned _numWorkers) : slaQueue(headRegionSize, _numWorkers), hiQueue(headRegionSize, _numWorkers), loQueue(headRegionSize, _numWorkers), numWorkers(_numWorkers)
RoxieReceiverBase(unsigned _numWorkers) : slaQueue(headRegionSize, _numWorkers, "SLA"), hiQueue(headRegionSize, _numWorkers, "HIGH"), loQueue(headRegionSize, _numWorkers, "LOW"), bgQueue(headRegionSize, _numWorkers/2 + 1, "BG"), numWorkers(_numWorkers)
{
}

Expand All @@ -1923,27 +1928,31 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface
slaQueue.setHeadRegionSize(newSize);
hiQueue.setHeadRegionSize(newSize);
loQueue.setHeadRegionSize(newSize);
bgQueue.setHeadRegionSize(newSize);
}

virtual void start()
{
loQueue.start();
hiQueue.start();
slaQueue.start();
bgQueue.start(); // consider nice(+3) BG threads
}

virtual void stop()
{
loQueue.stopAll();
hiQueue.stopAll();
slaQueue.stopAll();
bgQueue.stopAll();
}

virtual void join()
{
loQueue.join();
hiQueue.join();
slaQueue.join();
bgQueue.join();
}

IArrayOf<CallbackEntry> callbacks;
Expand Down Expand Up @@ -2254,7 +2263,7 @@ class DelayedPacketQueue
}

// Move any that we are done waiting for our buddy onto the active queue
void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue)
void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue, RoxieQueue &bgQueue)
{
assert(GetCurrentThreadId()==roxiePacketReaderThread);
DelayedPacketEntry *finger = head;
Expand All @@ -2270,12 +2279,13 @@ class DelayedPacketQueue
DBGLOG("No IBYTI received in time for delayed packet %s - enqueuing", header.toString(s).str());
}
unsigned __int64 IBYTIdelay = nsTick()-packet->queryEnqueuedTimeStamp();
if (header.activityId & ROXIE_SLA_PRIORITY)
slaQueue.enqueue(packet, IBYTIdelay);
else if (header.activityId & ROXIE_HIGH_PRIORITY)
hiQueue.enqueue(packet, IBYTIdelay);
else
loQueue.enqueue(packet, IBYTIdelay);
switch (header.activityId & ROXIE_PRIORITY_MASK)
{
case ROXIE_SLA_PRIORITY: slaQueue.enqueue(packet, IBYTIdelay); break;
case ROXIE_HIGH_PRIORITY: hiQueue.enqueue(packet, IBYTIdelay); break;
case ROXIE_LOW_PRIORITY: loQueue.enqueue(packet, IBYTIdelay); break;
default: bgQueue.enqueue(packet, IBYTIdelay); break;
}
for (unsigned subChannel = 0; subChannel < MAX_SUBCHANNEL; subChannel++)
{
if (header.subChannels[subChannel].isMe() || header.subChannels[subChannel].isNull())
Expand Down Expand Up @@ -2354,11 +2364,11 @@ class DelayedPacketQueueChannel : public CInterface
}
return min;
}
void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue)
void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue, RoxieQueue &bgQueue)
{
for (unsigned queue = 0; queue <= maxSeen; queue++)
{
queues[queue].checkExpired(now, slaQueue, hiQueue, loQueue);
queues[queue].checkExpired(now, slaQueue, hiQueue, loQueue, bgQueue);
}
}
private:
Expand Down Expand Up @@ -2398,11 +2408,11 @@ class DelayedPacketQueueManager
}
return ret;
}
void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue)
void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue, RoxieQueue &bgQueue)
{
ForEachItemIn(idx, channels)
{
channels.item(idx).checkExpired(now, slaQueue, hiQueue, loQueue);
channels.item(idx).checkExpired(now, slaQueue, hiQueue, loQueue, bgQueue);
}
}
private:
Expand Down Expand Up @@ -2894,12 +2904,13 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
StringBuffer s;
DBGLOG("Read roxie packet: %s", header.toString(s).str());
}
if (header.activityId & ROXIE_SLA_PRIORITY)
processMessage(mb, header, slaQueue);
else if (header.activityId & ROXIE_HIGH_PRIORITY)
processMessage(mb, header, hiQueue);
else
processMessage(mb, header, loQueue);
switch (header.activityId & ROXIE_PRIORITY_MASK)
{
case ROXIE_SLA_PRIORITY: processMessage(mb, header, slaQueue); break;
case ROXIE_HIGH_PRIORITY: processMessage(mb, header, hiQueue); break;
case ROXIE_LOW_PRIORITY: processMessage(mb, header, loQueue); break;
default: processMessage(mb, header, bgQueue); break;
}
}
catch (IException *E)
{
Expand Down Expand Up @@ -2938,7 +2949,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
}
}
#ifdef NEW_IBYTI
delayed.checkExpired(msTick(), slaQueue, hiQueue, loQueue);
delayed.checkExpired(msTick(), slaQueue, hiQueue, loQueue, bgQueue);
#endif
}
return 0;
Expand Down Expand Up @@ -3708,13 +3719,13 @@ class RoxieLocalQueueManager : public RoxieReceiverBase
return; // No point sending the retry in localAgent mode
}
RoxieQueue *targetQueue;
if (header.activityId & ROXIE_SLA_PRIORITY)
targetQueue = &slaQueue;
else if (header.activityId & ROXIE_HIGH_PRIORITY)
targetQueue = &hiQueue;
else
targetQueue = &loQueue;

switch (header.activityId & ROXIE_PRIORITY_MASK)
{
case ROXIE_SLA_PRIORITY: targetQueue = &slaQueue; break;
case ROXIE_HIGH_PRIORITY: targetQueue = &hiQueue; break;
case ROXIE_LOW_PRIORITY: targetQueue = &loQueue; break;
default: targetQueue = &bgQueue; break;
}
Owned<ISerializedRoxieQueryPacket> serialized = packet->serialize();
if (header.channel)
{
Expand Down
3 changes: 2 additions & 1 deletion roxie/ccd/ccdserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4557,8 +4557,9 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie
// But this could still cause too many reply packets on the fastlane
// (higher priority output Q), which may cause the activities on the
// low priority output Q to not get service on time.
unsigned pmask = p->queryHeader().activityId & ROXIE_PRIORITY_MASK;
if ((colocalArg == 0) && // not a child query activity??
(p->queryHeader().activityId & (ROXIE_SLA_PRIORITY | ROXIE_HIGH_PRIORITY)) &&
( (pmask == ROXIE_SLA_PRIORITY) || (pmask == ROXIE_HIGH_PRIORITY) ) &&
(p->queryHeader().overflowSequence == 0) &&
(p->queryHeader().continueSequence & ~CONTINUE_SEQUENCE_SKIPTO)==0)
p->queryHeader().retries |= ROXIE_FASTLANE;
Expand Down
2 changes: 2 additions & 0 deletions roxie/ccd/ccdsnmp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ RoxieQueryStats unknownQueryStats;
RoxieQueryStats loQueryStats;
RoxieQueryStats hiQueryStats;
RoxieQueryStats slaQueryStats;
RoxieQueryStats bgQueryStats;
RoxieQueryStats combinedQueryStats;

#define addMetric(a) doAddMetric(a, #a)
Expand Down Expand Up @@ -185,6 +186,7 @@ CRoxieMetricsManager::CRoxieMetricsManager()
loQueryStats.addMetrics(this, "lo");
hiQueryStats.addMetrics(this, "hi");
slaQueryStats.addMetrics(this, "sla");
bgQueryStats.addMetrics(this, "bg");
combinedQueryStats.addMetrics(this, "all");
addMetric(restarts);

Expand Down
1 change: 1 addition & 0 deletions roxie/ccd/ccdsnmp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ extern RoxieQueryStats unknownQueryStats;
extern RoxieQueryStats loQueryStats;
extern RoxieQueryStats hiQueryStats;
extern RoxieQueryStats slaQueryStats;
extern RoxieQueryStats bgQueryStats;
extern RoxieQueryStats combinedQueryStats;

interface IRoxieMetricsManager : extends IInterface
Expand Down

0 comments on commit 0913ec4

Please sign in to comment.