diff --git a/roxie/ccd/ccd.hpp b/roxie/ccd/ccd.hpp index 7da23b7d033..af0d00649eb 100644 --- a/roxie/ccd/ccd.hpp +++ b/roxie/ccd/ccd.hpp @@ -314,6 +314,8 @@ extern bool blockedLocalAgent; extern bool acknowledgeAllRequests; extern unsigned packetAcknowledgeTimeout; extern cycle_t dynPriorityAdjustCycles; +extern bool traceThreadStartDelay; +extern int adjustBGThreadNiceValue; extern bool alwaysTrustFormatCrcs; extern bool allFilesDynamic; extern bool lockSuperFiles; diff --git a/roxie/ccd/ccdmain.cpp b/roxie/ccd/ccdmain.cpp index 92db1614993..0c00c4f98b2 100644 --- a/roxie/ccd/ccdmain.cpp +++ b/roxie/ccd/ccdmain.cpp @@ -76,6 +76,8 @@ bool blockedLocalAgent = true; bool acknowledgeAllRequests = true; unsigned packetAcknowledgeTimeout = 100; cycle_t dynPriorityAdjustCycles = 0; // default off (0) +bool traceThreadStartDelay = true; +int adjustBGThreadNiceValue = 5; unsigned ccdMulticastPort; bool enableHeartBeat = true; unsigned parallelLoopFlowLimit = 100; @@ -1010,6 +1012,12 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) unsigned dynAdjustMsec = topology->getPropInt("@dynPriorityAdjustTime", 0); if (dynAdjustMsec) dynPriorityAdjustCycles = dynAdjustMsec * (queryOneSecCycles() / 1000ULL); + traceThreadStartDelay = topology->getPropBool("@traceThreadStartDelay", traceThreadStartDelay); + adjustBGThreadNiceValue = topology->getPropInt("@adjustBGThreadNiceValue", adjustBGThreadNiceValue); + if (adjustBGThreadNiceValue < 0) + adjustBGThreadNiceValue = 0; + if (adjustBGThreadNiceValue > 19) + adjustBGThreadNiceValue = 19; ccdMulticastPort = topology->getPropInt("@multicastPort", CCD_MULTICAST_PORT); statsExpiryTime = topology->getPropInt("@statsExpiryTime", 3600); roxiemem::setMemTraceSizeLimit((memsize_t) topology->getPropInt64("@memTraceSizeLimit", 0)); diff --git a/roxie/ccd/ccdqueue.cpp b/roxie/ccd/ccdqueue.cpp index 0918f6df969..7046420cc08 100644 --- a/roxie/ccd/ccdqueue.cpp +++ b/roxie/ccd/ccdqueue.cpp @@ -1188,6 +1188,13 @@ class RoxieQueue : public CInterface, implements IThreadFactory if (qname && *qname) tname.appendf(" (%s)", qname); workers.setown(createThreadPool(tname.str(), this, false, nullptr, numWorkers)); + if (traceThreadStartDelay) + workers->setStartDelayTracing(60); + if (qname && *qname) + { + if (streq(qname, "BG")) + workers->setNiceValue(adjustBGThreadNiceValue); + } started = 0; idle = 0; if (IBYTIbufferSize) @@ -1893,7 +1900,7 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface public: IMPLEMENT_IINTERFACE; - RoxieReceiverBase(unsigned _numWorkers) : slaQueue(_numWorkers, "SLA"), hiQueue(_numWorkers, "HIGH"), loQueue(_numWorkers, "LOW"), bgQueue(_numWorkers/2 + 1, "BG"), numWorkers(_numWorkers) + RoxieReceiverBase(unsigned _numWorkers) : slaQueue(_numWorkers, "SLA"), hiQueue(_numWorkers, "HIGH"), loQueue(_numWorkers, "LOW"), bgQueue(_numWorkers, "BG"), numWorkers(_numWorkers) { } @@ -1902,7 +1909,7 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface loQueue.start(); hiQueue.start(); slaQueue.start(); - bgQueue.start(); // consider nice(+3) BG threads + bgQueue.start(); // NB BG thread priority can be adjusted } virtual void stop() diff --git a/roxie/ccd/ccdserver.cpp b/roxie/ccd/ccdserver.cpp index a4d611da13f..c17b2ecd5e7 100644 --- a/roxie/ccd/ccdserver.cpp +++ b/roxie/ccd/ccdserver.cpp @@ -4611,6 +4611,7 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie ctx->queryOptions().dynPriority = QUERY_BG_PRIORITY_VALUE; unsigned dynAdjustMsec = (dynPriorityAdjustCycles * 1000ULL) / queryOneSecCycles(); UWARNLOG("WARNING: %d msec dynamic adjustment threshold reached, shifting query to BG queue", dynAdjustMsec); + p->queryHeader().activityId &= ~ROXIE_PRIORITY_MASK; p->queryHeader().activityId |= ROXIE_BG_PRIORITY; // TODO: what to do about still running activities' continuation/ack priorities ? } diff --git a/system/jlib/jthread.cpp b/system/jlib/jthread.cpp index 8545d2bd641..769f04e9595 100644 --- a/system/jlib/jthread.cpp +++ b/system/jlib/jthread.cpp @@ -995,6 +995,7 @@ class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInter unsigned stacksize; unsigned timeoutOnRelease; unsigned traceStartDelayPeriod = 0; + int niceValue = 0; unsigned startsInPeriod = 0; cycle_t startDelayInPeriod = 0; CCycleTimer overAllTimer; @@ -1114,6 +1115,8 @@ class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInter CPooledThreadWrapper &ret = *new CPooledThreadWrapper(*this,newid,factory->createNew()); if (stacksize) ret.setStackSize(stacksize); + if (niceValue) + ret.setNice(niceValue); ret.start(false); threadwrappers.append(ret); return ret; @@ -1281,6 +1284,10 @@ class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInter { traceStartDelayPeriod = secs; } + void setNiceValue(int value) + { + niceValue = value; + } bool waitAvailable(unsigned timeout) { if (!defaultmax) diff --git a/system/jlib/jthread.hpp b/system/jlib/jthread.hpp index 5d312aa9d2f..b48dff635a1 100644 --- a/system/jlib/jthread.hpp +++ b/system/jlib/jthread.hpp @@ -289,6 +289,7 @@ interface IThreadPool : extends IInterface virtual unsigned runningCount()=0; // number of currently running threads virtual PooledThreadHandle startNoBlock(void *param)=0; // starts a new thread if it can do so without blocking, else throws exception virtual void setStartDelayTracing(unsigned secs) = 0; // set start delay tracing period + virtual void setNiceValue(int value) = 0; // set priority for thread virtual bool waitAvailable(unsigned timeout) = 0; // wait until a pool member is available };