diff --git a/system/jlib/jcontainerized.cpp b/system/jlib/jcontainerized.cpp index 7f9ee6d5b9a..bb5d9347611 100644 --- a/system/jlib/jcontainerized.cpp +++ b/system/jlib/jcontainerized.cpp @@ -96,7 +96,7 @@ bool checkExitCodes(StringBuffer &output, const char *podStatuses) return false; } -void waitJob(const char *componentName, const char *resourceType, const char *job, unsigned pendingTimeoutSecs, KeepJobs keepJob) +void waitJob(const char *componentName, const char *resourceType, const char *job, unsigned pendingTimeoutSecs, unsigned totalWaitTimeSecs, KeepJobs keepJob) { VStringBuffer jobName("%s-%s-%s", componentName, resourceType, job); jobName.toLowerCase(); @@ -160,6 +160,11 @@ void waitJob(const char *componentName, const char *resourceType, const char *jo runKubectlCommand(componentName, getReason, nullptr, &output.clear()); throw makeStringExceptionV(0, "Failed to run %s - pod not scheduled after %u seconds: %s ", jobName.str(), pendingTimeoutSecs, output.str()); } + if (0 == totalWaitTimeSecs) + break; + if ((INFINITE != totalWaitTimeSecs) && msTick()-start > totalWaitTimeSecs*1000) + throw makeStringExceptionV(0, "Wait job timeout (%u secs) expired, whilst running: %s", totalWaitTimeSecs, jobName.str()); + MilliSleep(delay); if (delay < 10000) delay = delay * 2; @@ -236,7 +241,7 @@ void runJob(const char *componentName, const char *wuid, const char *jobName, co Owned exception; try { - waitJob(componentName, "job", jobName, pendingTimeoutSecs, keepJob); + waitJob(componentName, "job", jobName, pendingTimeoutSecs, INFINITE, keepJob); } catch (IException *e) { diff --git a/system/jlib/jcontainerized.hpp b/system/jlib/jcontainerized.hpp index de942b9636d..961db8523aa 100644 --- a/system/jlib/jcontainerized.hpp +++ b/system/jlib/jcontainerized.hpp @@ -33,7 +33,7 @@ jlib_decl KeepJobs translateKeepJobs(const char *keepJobs); jlib_decl bool isActiveService(const char *serviceName); jlib_decl void deleteResource(const char *componentName, const char *job, const char *resource); -jlib_decl void waitJob(const char *componentName, const char *job, unsigned pendingTimeoutSecs, KeepJobs keepJob); +jlib_decl void waitJob(const char *componentName, const char *resourceType, const char *job, unsigned pendingTimeoutSecs, unsigned totalWaitTimeSecs, KeepJobs keepJob); jlib_decl bool applyYaml(const char *componentName, const char *wuid, const char *job, const char *resourceType, const std::list> &extraParams, bool optional, bool autoCleanup); jlib_decl void runJob(const char *componentName, const char *wuid, const char *job, const std::list> &extraParams={}); diff --git a/thorlcr/master/thmastermain.cpp b/thorlcr/master/thmastermain.cpp index 99f38596b4f..019c3b1d3f7 100644 --- a/thorlcr/master/thmastermain.cpp +++ b/thorlcr/master/thmastermain.cpp @@ -124,6 +124,7 @@ class CThorEndHandler : implements IThreaded }; static CThorEndHandler *thorEndHandler = nullptr; +static StringBuffer cloudJobName; MODULE_INIT(INIT_PRIORITY_STANDARD) { @@ -266,58 +267,80 @@ class CRegistryServer : public CSimpleInterface } bool connect(unsigned slaves) { - LOG(MCdebugProgress, thorJob, "Waiting for %d slaves to register", slaves); - IPointerArrayOf connectedSlaves; connectedSlaves.ensureCapacity(slaves); unsigned remaining = slaves; INode *_sender = nullptr; CMessageBuffer msg; + + // Will wait for all workers to register within timelimit (default = 15 mins bare-metal, 60 mins containerized) + constexpr unsigned defaultMaxRegistrationMins = isContainerized() ? 60 : 15; + unsigned maxRegistrationMins = (unsigned)getExpertOptInt64("maxWorkerRegistrationMins", defaultMaxRegistrationMins); + constexpr unsigned oneMinMs = 60000; + + PROGLOG("Waiting for %u workers to register - max registration time = %u minutes", slaves, maxRegistrationMins); + CTimeMon registerTM(maxRegistrationMins * oneMinMs); while (remaining) { - if (!queryWorldCommunicator().recv(msg, nullptr, MPTAG_THORREGISTRATION, &_sender, MP_WAIT_FOREVER)) + // on timeout, check for any failed k8s worker job + if (!queryWorldCommunicator().recv(msg, nullptr, MPTAG_THORREGISTRATION, &_sender, oneMinMs)) { ::Release(_sender); - PROGLOG("Failed to initialize slaves"); - return false; - } - Owned sender = _sender; - if (NotFound != connectedSlaves.find(sender)) - { - StringBuffer epStr; - PROGLOG("Same slave registered twice!! : %s", sender->endpoint().getUrlStr(epStr).str()); - return false; - } + if (registerTM.timedout()) + { + WARNLOG("Timeout waiting for all workers to register within timeout period (%u mins)", maxRegistrationMins); + return false; + } - /* NB: in base metal setup, the slaves know which slave number they are in advance, and send their slavenum at registration. - * In non attached storage setup, they do not send a slave by default and instead are given a # once all are registered - */ - unsigned slaveNum; - msg.read(slaveNum); - StringBuffer slavePodName; - if (NotFound == slaveNum) - { - connectedSlaves.append(sender.getLink()); - slaveNum = connectedSlaves.ordinality(); if (isContainerized()) { - msg.read(slavePodName); - addConnectedWorkerPod(slavePodName); // NB: these are added in worker # order + // NB: this is checking for error only, will throw an exception if any found. + k8s::waitJob("thorworker", "job", cloudJobName.str(), 0, 0, k8s::KeepJobs::all); } + + // NB: will not reach here if waitJob fails. + PROGLOG("Waiting for %u remaining workers to register", remaining); } else { - unsigned pos = slaveNum - 1; // NB: slaveNum is 1 based - while (connectedSlaves.ordinality() < pos) - connectedSlaves.append(nullptr); - if (connectedSlaves.ordinality() == pos) + Owned sender = _sender; + if (NotFound != connectedSlaves.find(sender)) + { + StringBuffer epStr; + PROGLOG("Same slave registered twice!! : %s", sender->endpoint().getUrlStr(epStr).str()); + return false; + } + + /* NB: in base metal setup, the slaves know which slave number they are in advance, and send their slavenum at registration. + * In non attached storage setup, they do not send a slave by default and instead are given a # once all are registered + */ + unsigned slaveNum; + msg.read(slaveNum); + StringBuffer slavePodName; + if (NotFound == slaveNum) + { connectedSlaves.append(sender.getLink()); + slaveNum = connectedSlaves.ordinality(); + if (isContainerized()) + { + msg.read(slavePodName); + addConnectedWorkerPod(slavePodName); // NB: these are added in worker # order + } + } else - connectedSlaves.replace(sender.getLink(), pos); + { + unsigned pos = slaveNum - 1; // NB: slaveNum is 1 based + while (connectedSlaves.ordinality() < pos) + connectedSlaves.append(nullptr); + if (connectedSlaves.ordinality() == pos) + connectedSlaves.append(sender.getLink()); + else + connectedSlaves.replace(sender.getLink(), pos); + } + StringBuffer epStr; + PROGLOG("Slave %u connected from %s", slaveNum, sender->endpoint().getUrlStr(epStr).str()); + --remaining; } - StringBuffer epStr; - PROGLOG("Slave %u connected from %s", slaveNum, sender->endpoint().getUrlStr(epStr).str()); - --remaining; } assertex(slaves == connectedSlaves.ordinality()); @@ -946,7 +969,6 @@ int main( int argc, const char *argv[] ) StringBuffer queueName; // only for K8s - StringBuffer cloudJobName; bool workerNSInstalled = false; bool workerJobInstalled = false;