From 283220be31647004950d3efc3c86a926e667fdbd Mon Sep 17 00:00:00 2001 From: Jake Smith Date: Thu, 2 Nov 2023 11:47:19 +0000 Subject: [PATCH] HPCC-30648 Check worker job and abort if unhealthy. Periodically check the worker job during registration to check it exists and hasn't failed. Abort the manager if has. Also set maximum registration time, fail if exceeded. Signed-off-by: Jake Smith --- system/jlib/jcontainerized.cpp | 9 +++- system/jlib/jcontainerized.hpp | 2 +- thorlcr/master/thmastermain.cpp | 90 ++++++++++++++++++++------------- 3 files changed, 64 insertions(+), 37 deletions(-) diff --git a/system/jlib/jcontainerized.cpp b/system/jlib/jcontainerized.cpp index 7f9ee6d5b9a..bb5d9347611 100644 --- a/system/jlib/jcontainerized.cpp +++ b/system/jlib/jcontainerized.cpp @@ -96,7 +96,7 @@ bool checkExitCodes(StringBuffer &output, const char *podStatuses) return false; } -void waitJob(const char *componentName, const char *resourceType, const char *job, unsigned pendingTimeoutSecs, KeepJobs keepJob) +void waitJob(const char *componentName, const char *resourceType, const char *job, unsigned pendingTimeoutSecs, unsigned totalWaitTimeSecs, KeepJobs keepJob) { VStringBuffer jobName("%s-%s-%s", componentName, resourceType, job); jobName.toLowerCase(); @@ -160,6 +160,11 @@ void waitJob(const char *componentName, const char *resourceType, const char *jo runKubectlCommand(componentName, getReason, nullptr, &output.clear()); throw makeStringExceptionV(0, "Failed to run %s - pod not scheduled after %u seconds: %s ", jobName.str(), pendingTimeoutSecs, output.str()); } + if (0 == totalWaitTimeSecs) + break; + if ((INFINITE != totalWaitTimeSecs) && msTick()-start > totalWaitTimeSecs*1000) + throw makeStringExceptionV(0, "Wait job timeout (%u secs) expired, whilst running: %s", totalWaitTimeSecs, jobName.str()); + MilliSleep(delay); if (delay < 10000) delay = delay * 2; @@ -236,7 +241,7 @@ void runJob(const char *componentName, const char *wuid, const char *jobName, co Owned exception; try { - waitJob(componentName, "job", jobName, pendingTimeoutSecs, keepJob); + waitJob(componentName, "job", jobName, pendingTimeoutSecs, INFINITE, keepJob); } catch (IException *e) { diff --git a/system/jlib/jcontainerized.hpp b/system/jlib/jcontainerized.hpp index de942b9636d..961db8523aa 100644 --- a/system/jlib/jcontainerized.hpp +++ b/system/jlib/jcontainerized.hpp @@ -33,7 +33,7 @@ jlib_decl KeepJobs translateKeepJobs(const char *keepJobs); jlib_decl bool isActiveService(const char *serviceName); jlib_decl void deleteResource(const char *componentName, const char *job, const char *resource); -jlib_decl void waitJob(const char *componentName, const char *job, unsigned pendingTimeoutSecs, KeepJobs keepJob); +jlib_decl void waitJob(const char *componentName, const char *resourceType, const char *job, unsigned pendingTimeoutSecs, unsigned totalWaitTimeSecs, KeepJobs keepJob); jlib_decl bool applyYaml(const char *componentName, const char *wuid, const char *job, const char *resourceType, const std::list> &extraParams, bool optional, bool autoCleanup); jlib_decl void runJob(const char *componentName, const char *wuid, const char *job, const std::list> &extraParams={}); diff --git a/thorlcr/master/thmastermain.cpp b/thorlcr/master/thmastermain.cpp index 99f38596b4f..019c3b1d3f7 100644 --- a/thorlcr/master/thmastermain.cpp +++ b/thorlcr/master/thmastermain.cpp @@ -124,6 +124,7 @@ class CThorEndHandler : implements IThreaded }; static CThorEndHandler *thorEndHandler = nullptr; +static StringBuffer cloudJobName; MODULE_INIT(INIT_PRIORITY_STANDARD) { @@ -266,58 +267,80 @@ class CRegistryServer : public CSimpleInterface } bool connect(unsigned slaves) { - LOG(MCdebugProgress, thorJob, "Waiting for %d slaves to register", slaves); - IPointerArrayOf connectedSlaves; connectedSlaves.ensureCapacity(slaves); unsigned remaining = slaves; INode *_sender = nullptr; CMessageBuffer msg; + + // Will wait for all workers to register within timelimit (default = 15 mins bare-metal, 60 mins containerized) + constexpr unsigned defaultMaxRegistrationMins = isContainerized() ? 60 : 15; + unsigned maxRegistrationMins = (unsigned)getExpertOptInt64("maxWorkerRegistrationMins", defaultMaxRegistrationMins); + constexpr unsigned oneMinMs = 60000; + + PROGLOG("Waiting for %u workers to register - max registration time = %u minutes", slaves, maxRegistrationMins); + CTimeMon registerTM(maxRegistrationMins * oneMinMs); while (remaining) { - if (!queryWorldCommunicator().recv(msg, nullptr, MPTAG_THORREGISTRATION, &_sender, MP_WAIT_FOREVER)) + // on timeout, check for any failed k8s worker job + if (!queryWorldCommunicator().recv(msg, nullptr, MPTAG_THORREGISTRATION, &_sender, oneMinMs)) { ::Release(_sender); - PROGLOG("Failed to initialize slaves"); - return false; - } - Owned sender = _sender; - if (NotFound != connectedSlaves.find(sender)) - { - StringBuffer epStr; - PROGLOG("Same slave registered twice!! : %s", sender->endpoint().getUrlStr(epStr).str()); - return false; - } + if (registerTM.timedout()) + { + WARNLOG("Timeout waiting for all workers to register within timeout period (%u mins)", maxRegistrationMins); + return false; + } - /* NB: in base metal setup, the slaves know which slave number they are in advance, and send their slavenum at registration. - * In non attached storage setup, they do not send a slave by default and instead are given a # once all are registered - */ - unsigned slaveNum; - msg.read(slaveNum); - StringBuffer slavePodName; - if (NotFound == slaveNum) - { - connectedSlaves.append(sender.getLink()); - slaveNum = connectedSlaves.ordinality(); if (isContainerized()) { - msg.read(slavePodName); - addConnectedWorkerPod(slavePodName); // NB: these are added in worker # order + // NB: this is checking for error only, will throw an exception if any found. + k8s::waitJob("thorworker", "job", cloudJobName.str(), 0, 0, k8s::KeepJobs::all); } + + // NB: will not reach here if waitJob fails. + PROGLOG("Waiting for %u remaining workers to register", remaining); } else { - unsigned pos = slaveNum - 1; // NB: slaveNum is 1 based - while (connectedSlaves.ordinality() < pos) - connectedSlaves.append(nullptr); - if (connectedSlaves.ordinality() == pos) + Owned sender = _sender; + if (NotFound != connectedSlaves.find(sender)) + { + StringBuffer epStr; + PROGLOG("Same slave registered twice!! : %s", sender->endpoint().getUrlStr(epStr).str()); + return false; + } + + /* NB: in base metal setup, the slaves know which slave number they are in advance, and send their slavenum at registration. + * In non attached storage setup, they do not send a slave by default and instead are given a # once all are registered + */ + unsigned slaveNum; + msg.read(slaveNum); + StringBuffer slavePodName; + if (NotFound == slaveNum) + { connectedSlaves.append(sender.getLink()); + slaveNum = connectedSlaves.ordinality(); + if (isContainerized()) + { + msg.read(slavePodName); + addConnectedWorkerPod(slavePodName); // NB: these are added in worker # order + } + } else - connectedSlaves.replace(sender.getLink(), pos); + { + unsigned pos = slaveNum - 1; // NB: slaveNum is 1 based + while (connectedSlaves.ordinality() < pos) + connectedSlaves.append(nullptr); + if (connectedSlaves.ordinality() == pos) + connectedSlaves.append(sender.getLink()); + else + connectedSlaves.replace(sender.getLink(), pos); + } + StringBuffer epStr; + PROGLOG("Slave %u connected from %s", slaveNum, sender->endpoint().getUrlStr(epStr).str()); + --remaining; } - StringBuffer epStr; - PROGLOG("Slave %u connected from %s", slaveNum, sender->endpoint().getUrlStr(epStr).str()); - --remaining; } assertex(slaves == connectedSlaves.ordinality()); @@ -946,7 +969,6 @@ int main( int argc, const char *argv[] ) StringBuffer queueName; // only for K8s - StringBuffer cloudJobName; bool workerNSInstalled = false; bool workerJobInstalled = false;