Skip to content

Commit

Permalink
HPCC-32964 Add a Roxie Background priority queue
Browse files Browse the repository at this point in the history
Signed-off-by: M Kelly <mark.kelly+copilot@lexisnexisrisk.com>
  • Loading branch information
mckellyln committed Nov 11, 2024
1 parent 7067a3c commit b0c6f2b
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 39 deletions.
5 changes: 3 additions & 2 deletions roxie/ccd/ccd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ void setMulticastEndpoints(unsigned numChannels);
#define ROXIE_SLA_PRIORITY 0x40000000 // mask in activityId indicating it goes SLA priority queue
#define ROXIE_HIGH_PRIORITY 0x80000000 // mask in activityId indicating it goes on the fast queue
#define ROXIE_LOW_PRIORITY 0x00000000 // mask in activityId indicating it goes on the slow queue (= default)
// background priority queue is when both ROXIE_SLA_PRIORITY and ROXIE_HIGH_PRIORITY are set
#define ROXIE_PRIORITY_MASK (ROXIE_SLA_PRIORITY | ROXIE_HIGH_PRIORITY | ROXIE_LOW_PRIORITY)

#define ROXIE_ACTIVITY_FETCH 0x20000000 // or'ed into activityId for fetch part of full keyed join activities
Expand Down Expand Up @@ -332,8 +333,8 @@ extern unsigned memoryStatsInterval;
extern unsigned pingInterval;
extern unsigned socketCheckInterval;
extern memsize_t defaultMemoryLimit;
extern unsigned defaultTimeLimit[3];
extern unsigned defaultWarnTimeLimit[3];
extern unsigned defaultTimeLimit[4];
extern unsigned defaultWarnTimeLimit[4];
extern unsigned defaultThorConnectTimeout;
extern bool pretendAllOpt;
extern ClientCertificate clientCert;
Expand Down
12 changes: 8 additions & 4 deletions roxie/ccd/ccdlistener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1239,11 +1239,12 @@ class RoxieWorkUnitWorker : public RoxieQueryWorker
{
void noteQuery(bool failed, unsigned elapsedTime, unsigned priority)
{
switch(priority)
switch((int)priority)
{
case 0: loQueryStats.noteQuery(failed, elapsedTime); break;
case 1: hiQueryStats.noteQuery(failed, elapsedTime); break;
case 2: slaQueryStats.noteQuery(failed, elapsedTime); break;
case -1: bgQueryStats.noteQuery(failed, elapsedTime); break;
}
combinedQueryStats.noteQuery(failed, elapsedTime);
}
Expand Down Expand Up @@ -1355,11 +1356,12 @@ class RoxieWorkUnitWorker : public RoxieQueryWorker
isBlind = isBlind || blindLogging;
logctx.setBlind(isBlind);
priority = queryFactory->queryOptions().priority;
switch (priority)
switch ((int)priority)
{
case 0: loQueryStats.noteActive(); break;
case 1: hiQueryStats.noteActive(); break;
case 2: slaQueryStats.noteActive(); break;
case -1: bgQueryStats.noteActive(); break;
}
combinedQueryStats.noteActive();
Owned<IRoxieServerContext> ctx = queryFactory->createContext(wu, logctx);
Expand Down Expand Up @@ -1524,11 +1526,12 @@ class RoxieProtocolMsgContext : implements IHpccProtocolMsgContext, public CInte
virtual void noteQueryActive()
{
unsigned priority = getQueryPriority();
switch (priority)
switch ((int)priority)
{
case 0: loQueryStats.noteActive(); break;
case 1: hiQueryStats.noteActive(); break;
case 2: slaQueryStats.noteActive(); break;
case -1: bgQueryStats.noteActive(); break;
}
unknownQueryStats.noteComplete();
combinedQueryStats.noteActive();
Expand Down Expand Up @@ -1677,11 +1680,12 @@ class RoxieProtocolMsgContext : implements IHpccProtocolMsgContext, public CInte
}
else
{
switch(getQueryPriority())
switch((int)getQueryPriority())
{
case 0: loQueryStats.noteQuery(failed, elapsedTime); break;
case 1: hiQueryStats.noteQuery(failed, elapsedTime); break;
case 2: slaQueryStats.noteQuery(failed, elapsedTime); break;
case -1: bgQueryStats.noteQuery(failed, elapsedTime); break;
default: unknownQueryStats.noteQuery(failed, elapsedTime); return; // Don't include unknown in the combined stats
}
combinedQueryStats.noteQuery(failed, elapsedTime);
Expand Down
6 changes: 4 additions & 2 deletions roxie/ccd/ccdmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ int backgroundCopyPrio = 0;

unsigned memoryStatsInterval = 0;
memsize_t defaultMemoryLimit;
unsigned defaultTimeLimit[3] = {0, 0, 0};
unsigned defaultWarnTimeLimit[3] = {0, 5000, 5000};
unsigned defaultTimeLimit[4] = {0, 0, 0, 0};
unsigned defaultWarnTimeLimit[4] = {0, 5000, 5000, 10000};
unsigned defaultThorConnectTimeout;

unsigned defaultParallelJoinPreload = 0;
Expand Down Expand Up @@ -1169,9 +1169,11 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
defaultTimeLimit[0] = (unsigned) topology->getPropInt64("@defaultLowPriorityTimeLimit", 0);
defaultTimeLimit[1] = (unsigned) topology->getPropInt64("@defaultHighPriorityTimeLimit", 0);
defaultTimeLimit[2] = (unsigned) topology->getPropInt64("@defaultSLAPriorityTimeLimit", 0);
defaultTimeLimit[3] = (unsigned) topology->getPropInt64("@defaultBGPriorityTimeLimit", 0);
defaultWarnTimeLimit[0] = (unsigned) topology->getPropInt64("@defaultLowPriorityTimeWarning", 0);
defaultWarnTimeLimit[1] = (unsigned) topology->getPropInt64("@defaultHighPriorityTimeWarning", 0);
defaultWarnTimeLimit[2] = (unsigned) topology->getPropInt64("@defaultSLAPriorityTimeWarning", 0);
defaultWarnTimeLimit[3] = (unsigned) topology->getPropInt64("@defaultBGPriorityTimeWarning", 0);
defaultThorConnectTimeout = (unsigned) topology->getPropInt64("@defaultThorConnectTimeout", 60);
continuationCompressThreshold = (unsigned) topology->getPropInt64("@continuationCompressThreshold", 1024);

Expand Down
12 changes: 10 additions & 2 deletions roxie/ccd/ccdquery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,16 @@ void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stat
updateFromWorkUnit(priority, wu, "priority");
if (stateInfo)
updateFromContext(priority, stateInfo, "@priority");
timeLimit = defaultTimeLimit[priority];
warnTimeLimit = defaultWarnTimeLimit[priority];
if ((int)priority < 0)
{
timeLimit = defaultTimeLimit[3];
warnTimeLimit = defaultWarnTimeLimit[3];
}
else
{
timeLimit = defaultTimeLimit[priority];
warnTimeLimit = defaultWarnTimeLimit[priority];
}
updateFromWorkUnit(timeLimit, wu, "timeLimit");
updateFromWorkUnit(warnTimeLimit, wu, "warnTimeLimit");
updateFromWorkUnitM(memoryLimit, wu, "memoryLimit");
Expand Down
67 changes: 39 additions & 28 deletions roxie/ccd/ccdqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ StringBuffer &RoxiePacketHeader::toString(StringBuffer &ret) const
case ROXIE_SLA_PRIORITY: ret.append("SLA"); break;
case ROXIE_HIGH_PRIORITY: ret.append("HIGH"); break;
case ROXIE_LOW_PRIORITY: ret.append("LOW"); break;
case ROXIE_SLA_PRIORITY + ROXIE_HIGH_PRIORITY: ret.append("BG"); break;
default: ret.append("???"); break;
}
ret.appendf(" queryHash=%" I64F "x ch=%u seq=%d cont=%d server=", queryHash, channel, overflowSequence, continueSequence);
Expand Down Expand Up @@ -1166,11 +1167,14 @@ class RoxieQueue : public CInterface, implements IThreadFactory
public:
IMPLEMENT_IINTERFACE;

RoxieQueue(unsigned _headRegionSize, unsigned _numWorkers)
RoxieQueue(unsigned _headRegionSize, unsigned _numWorkers, const char *qname=nullptr)
{
headRegionSize = _headRegionSize;
numWorkers = _numWorkers;
workers.setown(createThreadPool("RoxieWorkers", this, false, nullptr, numWorkers));
StringBuffer tname("RoxieWorkers");
if (qname && *qname)
tname.appendf(" (%s)", qname);
workers.setown(createThreadPool(tname.str(), this, false, nullptr, numWorkers));
started = 0;
idle = 0;
if (IBYTIbufferSize)
Expand Down Expand Up @@ -1904,12 +1908,13 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface
RoxieQueue slaQueue;
RoxieQueue hiQueue;
RoxieQueue loQueue;
RoxieQueue bgQueue;
unsigned numWorkers;

public:
IMPLEMENT_IINTERFACE;

RoxieReceiverBase(unsigned _numWorkers) : slaQueue(headRegionSize, _numWorkers), hiQueue(headRegionSize, _numWorkers), loQueue(headRegionSize, _numWorkers), numWorkers(_numWorkers)
RoxieReceiverBase(unsigned _numWorkers) : slaQueue(headRegionSize, _numWorkers, "SLA"), hiQueue(headRegionSize, _numWorkers, "HIGH"), loQueue(headRegionSize, _numWorkers, "LOW"), bgQueue(headRegionSize, _numWorkers/2 + 1, "BG"), numWorkers(_numWorkers)
{
}

Expand All @@ -1923,27 +1928,31 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface
slaQueue.setHeadRegionSize(newSize);
hiQueue.setHeadRegionSize(newSize);
loQueue.setHeadRegionSize(newSize);
bgQueue.setHeadRegionSize(newSize);
}

virtual void start()
{
loQueue.start();
hiQueue.start();
slaQueue.start();
bgQueue.start();
}

virtual void stop()
{
loQueue.stopAll();
hiQueue.stopAll();
slaQueue.stopAll();
bgQueue.stopAll();
}

virtual void join()
{
loQueue.join();
hiQueue.join();
slaQueue.join();
bgQueue.join();
}

IArrayOf<CallbackEntry> callbacks;
Expand Down Expand Up @@ -2254,7 +2263,7 @@ class DelayedPacketQueue
}

// Move any that we are done waiting for our buddy onto the active queue
void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue)
void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue, RoxieQueue &bgQueue)
{
assert(GetCurrentThreadId()==roxiePacketReaderThread);
DelayedPacketEntry *finger = head;
Expand All @@ -2270,12 +2279,13 @@ class DelayedPacketQueue
DBGLOG("No IBYTI received in time for delayed packet %s - enqueuing", header.toString(s).str());
}
unsigned __int64 IBYTIdelay = nsTick()-packet->queryEnqueuedTimeStamp();
if (header.activityId & ROXIE_SLA_PRIORITY)
slaQueue.enqueue(packet, IBYTIdelay);
else if (header.activityId & ROXIE_HIGH_PRIORITY)
hiQueue.enqueue(packet, IBYTIdelay);
else
loQueue.enqueue(packet, IBYTIdelay);
switch(header.activityId & ROXIE_PRIORITY_MASK)
{
case ROXIE_SLA_PRIORITY: slaQueue.enqueue(packet, IBYTIdelay); break;
case ROXIE_HIGH_PRIORITY: hiQueue.enqueue(packet, IBYTIdelay); break;
case ROXIE_LOW_PRIORITY: loQueue.enqueue(packet, IBYTIdelay); break;
default: bgQueue.enqueue(packet, IBYTIdelay); break;
}
for (unsigned subChannel = 0; subChannel < MAX_SUBCHANNEL; subChannel++)
{
if (header.subChannels[subChannel].isMe() || header.subChannels[subChannel].isNull())
Expand Down Expand Up @@ -2354,11 +2364,11 @@ class DelayedPacketQueueChannel : public CInterface
}
return min;
}
void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue)
void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue, RoxieQueue &bgQueue)
{
for (unsigned queue = 0; queue <= maxSeen; queue++)
{
queues[queue].checkExpired(now, slaQueue, hiQueue, loQueue);
queues[queue].checkExpired(now, slaQueue, hiQueue, loQueue, bgQueue);
}
}
private:
Expand Down Expand Up @@ -2398,11 +2408,11 @@ class DelayedPacketQueueManager
}
return ret;
}
void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue)
void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue, RoxieQueue &bgQueue)
{
ForEachItemIn(idx, channels)
{
channels.item(idx).checkExpired(now, slaQueue, hiQueue, loQueue);
channels.item(idx).checkExpired(now, slaQueue, hiQueue, loQueue, bgQueue);
}
}
private:
Expand Down Expand Up @@ -2894,12 +2904,13 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
StringBuffer s;
DBGLOG("Read roxie packet: %s", header.toString(s).str());
}
if (header.activityId & ROXIE_SLA_PRIORITY)
processMessage(mb, header, slaQueue);
else if (header.activityId & ROXIE_HIGH_PRIORITY)
processMessage(mb, header, hiQueue);
else
processMessage(mb, header, loQueue);
switch(header.activityId & ROXIE_PRIORITY_MASK)
{
case ROXIE_SLA_PRIORITY: processMessage(mb, header, slaQueue); break;
case ROXIE_HIGH_PRIORITY: processMessage(mb, header, hiQueue); break;
case ROXIE_LOW_PRIORITY: processMessage(mb, header, loQueue); break;
default: processMessage(mb, header, bgQueue); break;
}
}
catch (IException *E)
{
Expand Down Expand Up @@ -2938,7 +2949,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
}
}
#ifdef NEW_IBYTI
delayed.checkExpired(msTick(), slaQueue, hiQueue, loQueue);
delayed.checkExpired(msTick(), slaQueue, hiQueue, loQueue, bgQueue);
#endif
}
return 0;
Expand Down Expand Up @@ -3708,13 +3719,13 @@ class RoxieLocalQueueManager : public RoxieReceiverBase
return; // No point sending the retry in localAgent mode
}
RoxieQueue *targetQueue;
if (header.activityId & ROXIE_SLA_PRIORITY)
targetQueue = &slaQueue;
else if (header.activityId & ROXIE_HIGH_PRIORITY)
targetQueue = &hiQueue;
else
targetQueue = &loQueue;

switch(header.activityId & ROXIE_PRIORITY_MASK)
{
case ROXIE_SLA_PRIORITY: targetQueue = &slaQueue; break;
case ROXIE_HIGH_PRIORITY: targetQueue = &hiQueue; break;
case ROXIE_LOW_PRIORITY: targetQueue = &loQueue; break;
default: targetQueue = &bgQueue; break;
}
Owned<ISerializedRoxieQueryPacket> serialized = packet->serialize();
if (header.channel)
{
Expand Down
3 changes: 2 additions & 1 deletion roxie/ccd/ccdserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4557,8 +4557,9 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie
// But this could still cause too many reply packets on the fastlane
// (higher priority output Q), which may cause the activities on the
// low priority output Q to not get service on time.
unsigned pmask = p->queryHeader().activityId & ROXIE_PRIORITY_MASK;
if ((colocalArg == 0) && // not a child query activity??
(p->queryHeader().activityId & (ROXIE_SLA_PRIORITY | ROXIE_HIGH_PRIORITY)) &&
(pmask && (pmask != ROXIE_PRIORITY_MASK)) &&
(p->queryHeader().overflowSequence == 0) &&
(p->queryHeader().continueSequence & ~CONTINUE_SEQUENCE_SKIPTO)==0)
p->queryHeader().retries |= ROXIE_FASTLANE;
Expand Down
2 changes: 2 additions & 0 deletions roxie/ccd/ccdsnmp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ RoxieQueryStats unknownQueryStats;
RoxieQueryStats loQueryStats;
RoxieQueryStats hiQueryStats;
RoxieQueryStats slaQueryStats;
RoxieQueryStats bgQueryStats;
RoxieQueryStats combinedQueryStats;

#define addMetric(a) doAddMetric(a, #a)
Expand Down Expand Up @@ -185,6 +186,7 @@ CRoxieMetricsManager::CRoxieMetricsManager()
loQueryStats.addMetrics(this, "lo");
hiQueryStats.addMetrics(this, "hi");
slaQueryStats.addMetrics(this, "sla");
bgQueryStats.addMetrics(this, "bg");
combinedQueryStats.addMetrics(this, "all");
addMetric(restarts);

Expand Down
1 change: 1 addition & 0 deletions roxie/ccd/ccdsnmp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ extern RoxieQueryStats unknownQueryStats;
extern RoxieQueryStats loQueryStats;
extern RoxieQueryStats hiQueryStats;
extern RoxieQueryStats slaQueryStats;
extern RoxieQueryStats bgQueryStats;
extern RoxieQueryStats combinedQueryStats;

interface IRoxieMetricsManager : extends IInterface
Expand Down
10 changes: 10 additions & 0 deletions roxie/ccd/ccdstate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2439,6 +2439,16 @@ class CRoxiePackageSetManager : implements IRoxieQueryPackageManagerSet, impleme
defaultWarnTimeLimit[2] = control->getPropInt("@limit", 0);
topology->setPropInt("@defaultSLAPriorityTimeWarning", defaultWarnTimeLimit[2]);
}
else if (stricmp(queryName, "control:defaultBGPriorityTimeLimit")==0)
{
defaultTimeLimit[3] = control->getPropInt("@limit", 0);
topology->setPropInt("@defaultBGPriorityTimeLimit", defaultTimeLimit[3]);
}
else if (stricmp(queryName, "control:defaultBGPriorityTimeWarning")==0)
{
defaultWarnTimeLimit[3] = control->getPropInt("@limit", 0);
topology->setPropInt("@defaultBGPriorityTimeWarning", defaultWarnTimeLimit[3]);
}
else if (stricmp(queryName, "control:deleteUnneededPhysicalFiles")==0)
{
UNIMPLEMENTED;
Expand Down

0 comments on commit b0c6f2b

Please sign in to comment.