From a4f279c542618b2dad4eb84bdf7db7ccfd2bf7bd Mon Sep 17 00:00:00 2001 From: Jake Smith Date: Wed, 13 Dec 2023 19:40:42 +0000 Subject: [PATCH] HPCC-31009 Ensure Thor startup errors are properly reported. This also closes a few paths in k8s where an early error, e.g. an error during worker registration, would not be correctly reported back to the workunit, and cause the agent to be unaware of the failure and deadlock. Signed-off-by: Jake Smith --- common/workunit/workunit.cpp | 6 +- thorlcr/master/thgraphmanager.cpp | 2 +- thorlcr/master/thmastermain.cpp | 134 +++++++++++++----------------- 3 files changed, 62 insertions(+), 80 deletions(-) diff --git a/common/workunit/workunit.cpp b/common/workunit/workunit.cpp index 0675947fb51..22174c28342 100644 --- a/common/workunit/workunit.cpp +++ b/common/workunit/workunit.cpp @@ -14269,16 +14269,16 @@ void executeThorGraph(const char * graphName, IConstWorkUnit &workunit, const IP unsigned runningTimeLimit = workunit.getDebugValueInt("maxRunTime", 0); runningTimeLimit = runningTimeLimit ? runningTimeLimit : INFINITE; - std::list expectedStates = { WUStateRunning, WUStateWait }; + std::list expectedStates = { WUStateRunning, WUStateWait, WUStateFailed }; unsigned __int64 blockedTime = 0; for (unsigned i=0; i<2; i++) { WUState state = waitForWorkUnitToComplete(wuid, timelimit*1000, expectedStates); DBGLOG("Got state: %s", getWorkunitStateStr(state)); - if (WUStateWait == state) // already finished + if ((WUStateWait == state) || (WUStateFailed == state)) // already finished or failed { // workunit may have spent time in blocked state, but then transitioned to - // wait state quickly such that this code did not see its running state. + // wait or failed state quickly such that this code did not see its running state. if (!blockedTime) blockedTime = elapsedTimer.elapsedNs(); break; diff --git a/thorlcr/master/thgraphmanager.cpp b/thorlcr/master/thgraphmanager.cpp index 7d940ecaf74..4cf6300e651 100644 --- a/thorlcr/master/thgraphmanager.cpp +++ b/thorlcr/master/thgraphmanager.cpp @@ -69,7 +69,7 @@ void relayWuidException(IConstWorkUnit *workunit, const IException *exception) if (WUStateWait != state) { Owned we = wu->createException(); - we->setSeverity(SeverityInformation); + we->setSeverity(SeverityError); StringBuffer errStr; exception->errorMessage(errStr); we->setExceptionMessage(errStr); diff --git a/thorlcr/master/thmastermain.cpp b/thorlcr/master/thmastermain.cpp index 12bcdbba09f..15799a5c593 100644 --- a/thorlcr/master/thmastermain.cpp +++ b/thorlcr/master/thmastermain.cpp @@ -265,7 +265,7 @@ class CRegistryServer : public CSimpleInterface watchdog->addSlave(ep); ++slavesRegistered; } - bool connect(unsigned slaves) + void connect(unsigned slaves) { IPointerArrayOf connectedSlaves; connectedSlaves.ensureCapacity(slaves); @@ -287,10 +287,7 @@ class CRegistryServer : public CSimpleInterface { ::Release(_sender); if (registerTM.timedout()) - { - WARNLOG("Timeout waiting for all workers to register within timeout period (%u mins)", maxRegistrationMins); - return false; - } + throw makeStringExceptionV(TE_AbortException, "Timeout waiting for all workers to register within timeout period (%u mins)", maxRegistrationMins); if (isContainerized()) { @@ -307,8 +304,7 @@ class CRegistryServer : public CSimpleInterface if (NotFound != connectedSlaves.find(sender)) { StringBuffer epStr; - PROGLOG("Same slave registered twice!! : %s", sender->endpoint().getEndpointHostText(epStr).str()); - return false; + throw makeStringExceptionV(TE_AbortException, "Same slave registered twice!! : %s", sender->endpoint().getEndpointHostText(epStr).str()); } /* NB: in base metal setup, the slaves know which slave number they are in advance, and send their slavenum at registration. @@ -395,13 +391,11 @@ class CRegistryServer : public CSimpleInterface msg.append(masterSlaveMpTag); msg.append(kjServiceMpTag); if (!queryNodeComm().send(msg, RANK_ALL_OTHER, MPTAG_THORREGISTRATION, MP_ASYNC_SEND)) - { - PROGLOG("Failed to initialize slaves"); - return false; - } + throw makeStringException(TE_AbortException, "Failed to initialize slaves"); // Wait for confirmation from slaves PROGLOG("Initialization sent to slave group"); + Owned exception; try { while (slavesRegistered < slaves) @@ -429,11 +423,7 @@ class CRegistryServer : public CSimpleInterface { Owned e = deserializeException(msg); EXCLOG(e, "Registration error"); - if (TE_FailedToRegisterSlave == e->errorCode()) - { - setExitCode(0); // to avoid thor auto-recycling - return false; - } + throw e.getClear(); } registerNode(sender-1); } @@ -445,23 +435,21 @@ class CRegistryServer : public CSimpleInterface { CMessageBuffer msg; if (!queryNodeComm().send(msg, s+1, MPTAG_THORREGISTRATION)) - { - PROGLOG("Failed to acknowledge slave %d registration", s+1); - return false; - } + throw makeStringExceptionV(TE_AbortException, "Failed to acknowledge slave %d registration", s+1); } if (watchdog) watchdog->start(); deregistrationWatch.start(); - return true; + return; } catch (IException *e) { EXCLOG(e, "Slave registration exception"); - e->Release(); + exception.setown(e); } shutdown(); - return false; + if (exception) + throw exception.getClear(); } void stop() { @@ -999,7 +987,6 @@ int main( int argc, const char *argv[] ) kjServiceMpTag = allocateClusterMPTag(); unsigned numWorkers = 0; - bool doWorkerRegistration = false; if (isContainerized()) { saveWuidToFile(workunit); @@ -1044,14 +1031,11 @@ int main( int argc, const char *argv[] ) StringBuffer myEp; getRemoteAccessibleHostText(myEp, queryMyNode()->endpoint()); - workerNSInstalled = k8s::applyYaml("thorworker", workunit, cloudJobName, "networkpolicy", { }, false, true); - if (workerNSInstalled) - { - k8s::KeepJobs keepJob = k8s::translateKeepJobs(globals->queryProp("@keepJobs")); - workerJobInstalled = k8s::applyYaml("thorworker", workunit, cloudJobName, "job", { { "graphName", graphName}, { "master", myEp.str() }, { "_HPCC_NUM_WORKERS_", std::to_string(numWorkers/numWorkersPerPod)} }, false, k8s::KeepJobs::none == keepJob); - if (workerJobInstalled) - doWorkerRegistration = true; - } + if (!k8s::applyYaml("thorworker", workunit, cloudJobName, "networkpolicy", { }, false, true)) + throw makeStringException(TE_AbortException, "Failed to apply worker networkpolicy manifest"); + k8s::KeepJobs keepJob = k8s::translateKeepJobs(globals->queryProp("@keepJobs")); + if (!k8s::applyYaml("thorworker", workunit, cloudJobName, "job", { { "graphName", graphName}, { "master", myEp.str() }, { "_HPCC_NUM_WORKERS_", std::to_string(numWorkers/numWorkersPerPod)} }, false, k8s::KeepJobs::none == keepJob)) + throw makeStringException(TE_AbortException, "Failed to apply worker job manifest"); } else { @@ -1064,7 +1048,6 @@ int main( int argc, const char *argv[] ) unsigned numWorkersPerNode = globals->getPropInt("@slavesPerNode", 1); setClusterGroup(queryMyNode(), rawGroup, numWorkersPerNode, channelsPerWorker, slaveBasePort, localThorPortInc); numWorkers = queryNodeClusterWidth(); - doWorkerRegistration = true; if (numWorkersPerNode > 1) { // Split memory based on numWorkersPerNode @@ -1075,41 +1058,44 @@ int main( int argc, const char *argv[] ) } } - if (doWorkerRegistration && registry->connect(numWorkers)) + registry->connect(numWorkers); + if (!isContainerized()) { + // bare-metal - check health of dafilesrv's on the Thor cluster. if (globals->getPropBool("@replicateOutputs")&&globals->getPropBool("@validateDAFS",true)&&!checkClusterRelicateDAFS(queryNodeGroup())) { FLLOG(MCoperatorError, thorJob, "ERROR: Validate failure(s) detected, exiting Thor"); return globals->getPropBool("@validateDAFSretCode"); // default is no recycle! } + } - unsigned totSlaveProcs = queryNodeClusterWidth(); - for (unsigned s=0; s1) - virtStr.append("virtual slaves:"); - else - virtStr.append("slave:"); - PROGLOG("Slave log %u contains %s %s", s+1, virtStr.str(), slaveStr.str()); + unsigned o = s + (c * totSlaveProcs); + if (c) + slaveStr.append(","); + slaveStr.append(o+1); } + StringBuffer virtStr; + if (channelsPerWorker>1) + virtStr.append("virtual slaves:"); + else + virtStr.append("slave:"); + PROGLOG("Slave log %u contains %s %s", s+1, virtStr.str(), slaveStr.str()); + } - PROGLOG("verifying mp connection to rest of cluster"); - if (!queryNodeComm().verifyAll(false, 1000*60*30, 1000*60)) - throwStringExceptionV(0, "Failed to connect to all nodes"); - PROGLOG("verified mp connection to rest of cluster"); + PROGLOG("verifying mp connection to rest of cluster"); + if (!queryNodeComm().verifyAll(false, 1000*60*30, 1000*60)) + throwStringExceptionV(0, "Failed to connect to all nodes"); + PROGLOG("verified mp connection to rest of cluster"); #ifdef _CONTAINERIZED - if (globals->getPropBool("@_dafsStorage")) - { + if (globals->getPropBool("@_dafsStorage")) + { /* NB: This option is a developer option only. * It is intended to be used to bring up a temporary Thor instance that uses local node storage, @@ -1128,32 +1114,28 @@ int main( int argc, const char *argv[] ) * NB: This isn't a real StoragePlane, and it will not be accessible by any other component. * */ - StringBuffer uniqueGrpName; - queryNamedGroupStore().addUnique(&queryProcessGroup(), uniqueGrpName); - // change default plane - getComponentConfigSP()->setProp("@dataPlane", uniqueGrpName); - PROGLOG("Persistent Thor group created with group name: %s", uniqueGrpName.str()); - } + StringBuffer uniqueGrpName; + queryNamedGroupStore().addUnique(&queryProcessGroup(), uniqueGrpName); + // change default plane + getComponentConfigSP()->setProp("@dataPlane", uniqueGrpName); + PROGLOG("Persistent Thor group created with group name: %s", uniqueGrpName.str()); + } #endif - LOG(MCauditInfo, ",Progress,Thor,Startup,%s,%s,%s,%s",nodeGroup.str(),thorname,queueName.str(),logUrl.str()); - auditStartLogged = true; + LOG(MCauditInfo, ",Progress,Thor,Startup,%s,%s,%s,%s",nodeGroup.str(),thorname,queueName.str(),logUrl.str()); + auditStartLogged = true; - writeSentinelFile(sentinelFile); + writeSentinelFile(sentinelFile); #ifndef _CONTAINERIZED - unsigned pinterval = globals->getPropInt("@system_monitor_interval",1000*60); - if (pinterval) - startPerformanceMonitor(pinterval, PerfMonStandard, nullptr); + unsigned pinterval = globals->getPropInt("@system_monitor_interval",1000*60); + if (pinterval) + startPerformanceMonitor(pinterval, PerfMonStandard, nullptr); #endif - configurePreferredPlanes(); + configurePreferredPlanes(); - // NB: workunit/graphName only set in one-shot mode (if isCloud()) - thorMain(logHandler, workunit, graphName); - LOG(MCauditInfo, ",Progress,Thor,Terminate,%s,%s,%s",thorname,nodeGroup.str(),queueName.str()); - } - else - PROGLOG("Registration aborted"); - registry.clear(); + // NB: workunit/graphName only set in one-shot mode (if isCloud()) + thorMain(logHandler, workunit, graphName); + LOG(MCauditInfo, ",Progress,Thor,Terminate,%s,%s,%s",thorname,nodeGroup.str(),queueName.str()); LOG(MCdebugProgress, thorJob, "ThorMaster terminated OK"); } catch (IException *e)