Skip to content

Commit

Permalink
HPCC-32958 Roxie dynamic priority
Browse files Browse the repository at this point in the history
Signed-off-by: M Kelly <mark.kelly+copilot@lexisnexisrisk.com>
  • Loading branch information
mckellyln committed Jan 10, 2025
1 parent 4993fb1 commit e5c65f2
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 49 deletions.
14 changes: 12 additions & 2 deletions roxie/ccd/ccd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ void setMulticastEndpoints(unsigned numChannels);
#define ROXIE_BG_PRIORITY 0xc0000000 // mask in activityId indicating it goes on the bg queue
#define ROXIE_PRIORITY_MASK (ROXIE_SLA_PRIORITY | ROXIE_HIGH_PRIORITY | ROXIE_LOW_PRIORITY)

#define QUERY_BG_PRIORITY_VALUE -1
#define QUERY_LOW_PRIORITY_VALUE 0
#define QUERY_HIGH_PRIORITY_VALUE 1
#define QUERY_SLA_PRIORITY_VALUE 2
static constexpr int queryMinPriorityValue = QUERY_BG_PRIORITY_VALUE;
static constexpr int queryMaxPriorityValue = QUERY_SLA_PRIORITY_VALUE;

#define ROXIE_ACTIVITY_FETCH 0x20000000 // or'ed into activityId for fetch part of full keyed join activities

// Status information returned in the activityId field of the header:
Expand Down Expand Up @@ -305,6 +312,7 @@ extern StringArray allQuerySetNames;
extern bool blockedLocalAgent;
extern bool acknowledgeAllRequests;
extern unsigned packetAcknowledgeTimeout;
extern cycle_t dynPriorityAdjustCycles;
extern bool alwaysTrustFormatCrcs;
extern bool allFilesDynamic;
extern bool lockSuperFiles;
Expand Down Expand Up @@ -334,8 +342,8 @@ extern unsigned memoryStatsInterval;
extern unsigned pingInterval;
extern unsigned socketCheckInterval;
extern memsize_t defaultMemoryLimit;
extern unsigned defaultTimeLimit[4];
extern unsigned defaultWarnTimeLimit[4];
extern unsigned defaultTimeLimit[3];
extern unsigned defaultWarnTimeLimit[3];
extern unsigned defaultThorConnectTimeout;
extern bool pretendAllOpt;
extern ClientCertificate clientCert;
Expand Down Expand Up @@ -484,6 +492,8 @@ inline unsigned getBondedChannel(unsigned partNo)
return ((partNo - 1) % numChannels) + 1;
}

extern unsigned getPriorityMask(int priority);

extern void FatalError(const char *format, ...) __attribute__((format(printf, 1, 2)));
extern unsigned getNextInstanceId();
extern void closedown();
Expand Down
5 changes: 5 additions & 0 deletions roxie/ccd/ccdcontext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1530,6 +1530,11 @@ class CRoxieContextBase : implements IRoxieAgentContext, implements ICodeContext
return options;
}

virtual cycle_t queryElapsedCycles() const
{
return elapsedTimer.elapsedCycles();
}

const char *queryAuthToken()
{
return authToken.str();
Expand Down
1 change: 1 addition & 0 deletions roxie/ccd/ccdcontext.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ interface IRoxieAgentContext : extends IRoxieContextLogger
virtual void noteChildGraph(unsigned id, IActivityGraph *childGraph) = 0;
virtual roxiemem::IRowManager &queryRowManager() = 0;
virtual const QueryOptions &queryOptions() const = 0;
virtual cycle_t queryElapsedCycles() const = 0;
virtual void addAgentsReplyLen(unsigned len, unsigned duplicates, unsigned resends) = 0;
virtual const char *queryAuthToken() = 0;
virtual const IResolvedFile *resolveLFN(const char *filename, bool isOpt, bool isPrivilegedUser) = 0;
Expand Down
26 changes: 13 additions & 13 deletions roxie/ccd/ccdlistener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1241,10 +1241,10 @@ class RoxieWorkUnitWorker : public RoxieQueryWorker
{
switch((int)priority)
{
case 0: loQueryStats.noteQuery(failed, elapsedTime); break;
case 1: hiQueryStats.noteQuery(failed, elapsedTime); break;
case 2: slaQueryStats.noteQuery(failed, elapsedTime); break;
case -1: bgQueryStats.noteQuery(failed, elapsedTime); break;
case QUERY_LOW_PRIORITY_VALUE: loQueryStats.noteQuery(failed, elapsedTime); break;
case QUERY_HIGH_PRIORITY_VALUE: hiQueryStats.noteQuery(failed, elapsedTime); break;
case QUERY_SLA_PRIORITY_VALUE: slaQueryStats.noteQuery(failed, elapsedTime); break;
case QUERY_BG_PRIORITY_VALUE: bgQueryStats.noteQuery(failed, elapsedTime); break;
}
combinedQueryStats.noteQuery(failed, elapsedTime);
}
Expand Down Expand Up @@ -1334,7 +1334,7 @@ class RoxieWorkUnitWorker : public RoxieQueryWorker
unsigned agentsReplyLen = 0;
unsigned agentsDuplicates = 0;
unsigned agentsResends = 0;
unsigned priority = (unsigned) -2;
unsigned priority = (unsigned) -2; // NB -2 is outside of priority range
try
{
bool isBlind = wu->getDebugValueBool("blindLogging", false);
Expand All @@ -1358,10 +1358,10 @@ class RoxieWorkUnitWorker : public RoxieQueryWorker
priority = queryFactory->queryOptions().priority;
switch ((int)priority)
{
case 0: loQueryStats.noteActive(); break;
case 1: hiQueryStats.noteActive(); break;
case 2: slaQueryStats.noteActive(); break;
case -1: bgQueryStats.noteActive(); break;
case QUERY_LOW_PRIORITY_VALUE: loQueryStats.noteActive(); break;
case QUERY_HIGH_PRIORITY_VALUE: hiQueryStats.noteActive(); break;
case QUERY_SLA_PRIORITY_VALUE: slaQueryStats.noteActive(); break;
case QUERY_BG_PRIORITY_VALUE: bgQueryStats.noteActive(); break;
}
combinedQueryStats.noteActive();
Owned<IRoxieServerContext> ctx = queryFactory->createContext(wu, logctx);
Expand Down Expand Up @@ -1528,10 +1528,10 @@ class RoxieProtocolMsgContext : implements IHpccProtocolMsgContext, public CInte
unsigned priority = getQueryPriority();
switch ((int)priority)
{
case 0: loQueryStats.noteActive(); break;
case 1: hiQueryStats.noteActive(); break;
case 2: slaQueryStats.noteActive(); break;
case -1: bgQueryStats.noteActive(); break;
case QUERY_LOW_PRIORITY_VALUE: loQueryStats.noteActive(); break;
case QUERY_HIGH_PRIORITY_VALUE: hiQueryStats.noteActive(); break;
case QUERY_SLA_PRIORITY_VALUE: slaQueryStats.noteActive(); break;
case QUERY_BG_PRIORITY_VALUE: bgQueryStats.noteActive(); break;
}
unknownQueryStats.noteComplete();
combinedQueryStats.noteActive();
Expand Down
10 changes: 6 additions & 4 deletions roxie/ccd/ccdmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ unsigned numRequestArrayThreads = 5;
bool blockedLocalAgent = true;
bool acknowledgeAllRequests = true;
unsigned packetAcknowledgeTimeout = 100;
cycle_t dynPriorityAdjustCycles = 0; // default off (0)
unsigned headRegionSize;
unsigned ccdMulticastPort;
bool enableHeartBeat = true;
Expand Down Expand Up @@ -164,8 +165,8 @@ int backgroundCopyPrio = 0;

unsigned memoryStatsInterval = 0;
memsize_t defaultMemoryLimit;
unsigned defaultTimeLimit[4] = {0, 0, 0, 0};
unsigned defaultWarnTimeLimit[4] = {0, 5000, 5000, 10000};
unsigned defaultTimeLimit[3] = {0, 0, 0};
unsigned defaultWarnTimeLimit[3] = {0, 5000, 5000};
unsigned defaultThorConnectTimeout;

unsigned defaultParallelJoinPreload = 0;
Expand Down Expand Up @@ -1007,6 +1008,9 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
acknowledgeAllRequests = topology->getPropBool("@acknowledgeAllRequests", acknowledgeAllRequests);
headRegionSize = topology->getPropInt("@headRegionSize", 0);
packetAcknowledgeTimeout = topology->getPropInt("@packetAcknowledgeTimeout", packetAcknowledgeTimeout);
unsigned dynAdjustMsec = topology->getPropInt("@dynPriorityAdjustTime", 0);
if (dynAdjustMsec)
dynPriorityAdjustCycles = dynAdjustMsec * (queryOneSecCycles() / 1000ULL);
ccdMulticastPort = topology->getPropInt("@multicastPort", CCD_MULTICAST_PORT);
statsExpiryTime = topology->getPropInt("@statsExpiryTime", 3600);
roxiemem::setMemTraceSizeLimit((memsize_t) topology->getPropInt64("@memTraceSizeLimit", 0));
Expand Down Expand Up @@ -1169,11 +1173,9 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
defaultTimeLimit[0] = (unsigned) topology->getPropInt64("@defaultLowPriorityTimeLimit", 0);
defaultTimeLimit[1] = (unsigned) topology->getPropInt64("@defaultHighPriorityTimeLimit", 0);
defaultTimeLimit[2] = (unsigned) topology->getPropInt64("@defaultSLAPriorityTimeLimit", 0);
defaultTimeLimit[3] = (unsigned) topology->getPropInt64("@defaultBGPriorityTimeLimit", 0);
defaultWarnTimeLimit[0] = (unsigned) topology->getPropInt64("@defaultLowPriorityTimeWarning", 0);
defaultWarnTimeLimit[1] = (unsigned) topology->getPropInt64("@defaultHighPriorityTimeWarning", 0);
defaultWarnTimeLimit[2] = (unsigned) topology->getPropInt64("@defaultSLAPriorityTimeWarning", 0);
defaultWarnTimeLimit[3] = (unsigned) topology->getPropInt64("@defaultBGPriorityTimeWarning", 0);
defaultThorConnectTimeout = (unsigned) topology->getPropInt64("@defaultThorConnectTimeout", 60);
continuationCompressThreshold = (unsigned) topology->getPropInt64("@continuationCompressThreshold", 1024);

Expand Down
54 changes: 36 additions & 18 deletions roxie/ccd/ccdquery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,8 @@ class CSharedOnceContext : public CInterfaceOf<ISharedOnceContext>

QueryOptions::QueryOptions()
{
priority = 0;
priority = QUERY_LOW_PRIORITY_VALUE;
dynPriority = QUERY_LOW_PRIORITY_VALUE;
timeLimit = defaultTimeLimit[0];
warnTimeLimit = defaultWarnTimeLimit[0];

Expand Down Expand Up @@ -358,6 +359,7 @@ QueryOptions::QueryOptions()
QueryOptions::QueryOptions(const QueryOptions &other)
{
priority = other.priority;
dynPriority = other.dynPriority.load();
timeLimit = other.timeLimit;
warnTimeLimit = other.warnTimeLimit;

Expand Down Expand Up @@ -394,23 +396,31 @@ QueryOptions::QueryOptions(const QueryOptions &other)
numWorkflowThreads = other.numWorkflowThreads;
}

void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stateInfo)
void QueryOptions::updateDynPriority(int _priority)
{
// calculate priority before others since it affects the defaults of others
updateFromWorkUnit(priority, wu, "priority");
if (stateInfo)
updateFromContext(priority, stateInfo, "@priority");
if ((int)priority < 0)
dynPriority = _priority;
if (dynPriority < QUERY_LOW_PRIORITY_VALUE)
{
// use LOW queue time limits ...
timeLimit = defaultTimeLimit[0];
warnTimeLimit = defaultWarnTimeLimit[0];
}
else
{
timeLimit = defaultTimeLimit[priority];
warnTimeLimit = defaultWarnTimeLimit[priority];
timeLimit = defaultTimeLimit[_priority];
warnTimeLimit = defaultWarnTimeLimit[_priority];
}
}

void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stateInfo)
{
// calculate priority before others since it affects the defaults of others
updateFromWorkUnit(priority, wu, "priority");
if (stateInfo)
updateFromContext(priority, stateInfo, "@priority");

updateDynPriority((int)priority);

updateFromWorkUnit(timeLimit, wu, "timeLimit");
updateFromWorkUnit(warnTimeLimit, wu, "warnTimeLimit");
updateFromWorkUnitM(memoryLimit, wu, "memoryLimit");
Expand Down Expand Up @@ -495,6 +505,20 @@ void QueryOptions::setFromContext(const IPropertyTree *ctx)
if (ctx)
{
// Note: priority cannot be set at context level
// b/c this is after activities have been created, but we could
// dynamically adj priority in the header activityId before sending
int tmpPriority = (int)priority;
updateFromContext(tmpPriority, ctx, "@priority", "_Priority");

if (tmpPriority > queryMaxPriorityValue)
tmpPriority = queryMaxPriorityValue;
if (tmpPriority < queryMinPriorityValue)
tmpPriority = queryMinPriorityValue;

// only adjust lower ...
if (tmpPriority < (int)priority)
updateDynPriority(tmpPriority);

updateFromContext(timeLimit, ctx, "@timeLimit", "_TimeLimit");
updateFromContext(warnTimeLimit, ctx, "@warnTimeLimit", "_WarnTimeLimit");
updateFromContextM(memoryLimit, ctx, "@memoryLimit", "_MemoryLimit");
Expand Down Expand Up @@ -624,15 +648,9 @@ class CQueryFactory : implements IQueryFactory, implements IResourceContext, pub

if (isSuspended)
return createRoxieServerDummyActivityFactory(id, subgraphId, *this, NULL, TAKnone, node, false); // Is there actually any point?
switch (options.priority)
{
case 1:
rid |= ROXIE_HIGH_PRIORITY;
break;
case 2:
rid |= ROXIE_SLA_PRIORITY;
break;
}

rid |= getPriorityMask(options.priority);

StringBuffer helperName;
helperName.append("fAc").append(id);
HelperFactory *helperFactory = dll->getFactory(helperName);
Expand Down
3 changes: 2 additions & 1 deletion roxie/ccd/ccdquery.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,10 @@ class QueryOptions
void setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stateInfo);
void setFromContext(const IPropertyTree *ctx);
void setFromAgentLoggingFlags(unsigned loggingFlags);

void updateDynPriority(int _priority);

unsigned priority;
mutable std::atomic<int> dynPriority;
unsigned timeLimit;
unsigned warnTimeLimit;
unsigned traceLimit;
Expand Down
56 changes: 55 additions & 1 deletion roxie/ccd/ccdserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,10 @@ class IndirectAgentContext : implements IRoxieAgentContext, public CInterface
{
return ctx->queryOptions();
}
virtual cycle_t queryElapsedCycles() const override
{
return ctx->queryElapsedCycles();
}
virtual void addAgentsReplyLen(unsigned len, unsigned duplicates, unsigned resends) override
{
ctx->addAgentsReplyLen(len, duplicates, resends);
Expand Down Expand Up @@ -3625,6 +3629,24 @@ void throwRemoteException(IMessageUnpackCursor *extra)
throwUnexpected();
}

unsigned getPriorityMask(int priority)
{
unsigned newPri = ROXIE_BG_PRIORITY;
switch (priority)
{
case QUERY_SLA_PRIORITY_VALUE:
newPri = ROXIE_SLA_PRIORITY;
break;
case QUERY_HIGH_PRIORITY_VALUE:
newPri = ROXIE_HIGH_PRIORITY;
break;
case QUERY_LOW_PRIORITY_VALUE:
newPri = ROXIE_LOW_PRIORITY;
break;
}
return newPri;
}

class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxieInput, implements IExceptionHandler, public CInterface
{
friend class CRemoteResultMerger;
Expand Down Expand Up @@ -4557,6 +4579,27 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie
// But this could still cause too many reply packets on the fastlane
// (higher priority output Q), which may cause the activities on the
// low priority output Q to not get service on time.

int origPriority = (int)ctx->queryOptions().priority;
int dynPriority = ctx->queryOptions().dynPriority;
if (dynPriority < origPriority)
{
unsigned newPri = getPriorityMask(dynPriority);
p->queryHeader().activityId &= ~ROXIE_PRIORITY_MASK;
p->queryHeader().activityId |= newPri;
}

// TODO: perhaps check elapsed every Nth msg ?
if ( (dynPriorityAdjustCycles > 0) && (origPriority == QUERY_LOW_PRIORITY_VALUE) && (dynPriority == QUERY_LOW_PRIORITY_VALUE) &&
(ctx->queryElapsedCycles() > dynPriorityAdjustCycles) )
{
ctx->queryOptions().dynPriority = QUERY_BG_PRIORITY_VALUE;
unsigned dynAdjustMsec = (dynPriorityAdjustCycles * 1000ULL) / queryOneSecCycles();
UWARNLOG("WARNING: %d msec dynamic adjustment threshold reached, shifting query to BG queue", dynAdjustMsec);
p->queryHeader().activityId |= ROXIE_BG_PRIORITY;
// TODO: what to do about still running activities' continuation/ack priorities ?
}

unsigned pmask = p->queryHeader().activityId & ROXIE_PRIORITY_MASK;
if ((colocalArg == 0) && // not a child query activity??
( (pmask == ROXIE_SLA_PRIORITY) || (pmask == ROXIE_HIGH_PRIORITY) ) &&
Expand Down Expand Up @@ -5014,7 +5057,18 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie
mu.clear();
SimpleActivityTimer t(unpackerWaitCycles, timeActivities);
unsigned ctxTraceLevel = activity.queryLogCtx().queryTraceLevel();
unsigned timeout = remoteId.isSLAPriority() ? slaTimeout : (remoteId.isHighPriority() ? highTimeout : lowTimeout);

unsigned timeout = lowTimeout;
switch (activity.queryContext()->queryOptions().dynPriority)
{
case QUERY_SLA_PRIORITY_VALUE:
timeout = slaTimeout;
break;
case QUERY_HIGH_PRIORITY_VALUE:
timeout = highTimeout;
break;
}

unsigned checkInterval = activity.queryContext()->checkInterval();
if (checkInterval > timeout)
checkInterval = timeout;
Expand Down
10 changes: 0 additions & 10 deletions roxie/ccd/ccdstate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2439,16 +2439,6 @@ class CRoxiePackageSetManager : implements IRoxieQueryPackageManagerSet, impleme
defaultWarnTimeLimit[2] = control->getPropInt("@limit", 0);
topology->setPropInt("@defaultSLAPriorityTimeWarning", defaultWarnTimeLimit[2]);
}
else if (stricmp(queryName, "control:defaultBGPriorityTimeLimit")==0)
{
defaultTimeLimit[3] = control->getPropInt("@limit", 0);
topology->setPropInt("@defaultBGPriorityTimeLimit", defaultTimeLimit[3]);
}
else if (stricmp(queryName, "control:defaultBGPriorityTimeWarning")==0)
{
defaultWarnTimeLimit[3] = control->getPropInt("@limit", 0);
topology->setPropInt("@defaultBGPriorityTimeWarning", defaultWarnTimeLimit[3]);
}
else if (stricmp(queryName, "control:deleteUnneededPhysicalFiles")==0)
{
UNIMPLEMENTED;
Expand Down

0 comments on commit e5c65f2

Please sign in to comment.