diff --git a/common/workunit/workunit.cpp b/common/workunit/workunit.cpp index 0675947fb51..22174c28342 100644 --- a/common/workunit/workunit.cpp +++ b/common/workunit/workunit.cpp @@ -14269,16 +14269,16 @@ void executeThorGraph(const char * graphName, IConstWorkUnit &workunit, const IP unsigned runningTimeLimit = workunit.getDebugValueInt("maxRunTime", 0); runningTimeLimit = runningTimeLimit ? runningTimeLimit : INFINITE; - std::list expectedStates = { WUStateRunning, WUStateWait }; + std::list expectedStates = { WUStateRunning, WUStateWait, WUStateFailed }; unsigned __int64 blockedTime = 0; for (unsigned i=0; i<2; i++) { WUState state = waitForWorkUnitToComplete(wuid, timelimit*1000, expectedStates); DBGLOG("Got state: %s", getWorkunitStateStr(state)); - if (WUStateWait == state) // already finished + if ((WUStateWait == state) || (WUStateFailed == state)) // already finished or failed { // workunit may have spent time in blocked state, but then transitioned to - // wait state quickly such that this code did not see its running state. + // wait or failed state quickly such that this code did not see its running state. if (!blockedTime) blockedTime = elapsedTimer.elapsedNs(); break; diff --git a/thorlcr/master/thgraphmanager.cpp b/thorlcr/master/thgraphmanager.cpp index 7d940ecaf74..4cf6300e651 100644 --- a/thorlcr/master/thgraphmanager.cpp +++ b/thorlcr/master/thgraphmanager.cpp @@ -69,7 +69,7 @@ void relayWuidException(IConstWorkUnit *workunit, const IException *exception) if (WUStateWait != state) { Owned we = wu->createException(); - we->setSeverity(SeverityInformation); + we->setSeverity(SeverityError); StringBuffer errStr; exception->errorMessage(errStr); we->setExceptionMessage(errStr); diff --git a/thorlcr/master/thmastermain.cpp b/thorlcr/master/thmastermain.cpp index 12bcdbba09f..15799a5c593 100644 --- a/thorlcr/master/thmastermain.cpp +++ b/thorlcr/master/thmastermain.cpp @@ -265,7 +265,7 @@ class CRegistryServer : public CSimpleInterface watchdog->addSlave(ep); ++slavesRegistered; } - bool connect(unsigned slaves) + void connect(unsigned slaves) { IPointerArrayOf connectedSlaves; connectedSlaves.ensureCapacity(slaves); @@ -287,10 +287,7 @@ class CRegistryServer : public CSimpleInterface { ::Release(_sender); if (registerTM.timedout()) - { - WARNLOG("Timeout waiting for all workers to register within timeout period (%u mins)", maxRegistrationMins); - return false; - } + throw makeStringExceptionV(TE_AbortException, "Timeout waiting for all workers to register within timeout period (%u mins)", maxRegistrationMins); if (isContainerized()) { @@ -307,8 +304,7 @@ class CRegistryServer : public CSimpleInterface if (NotFound != connectedSlaves.find(sender)) { StringBuffer epStr; - PROGLOG("Same slave registered twice!! : %s", sender->endpoint().getEndpointHostText(epStr).str()); - return false; + throw makeStringExceptionV(TE_AbortException, "Same slave registered twice!! : %s", sender->endpoint().getEndpointHostText(epStr).str()); } /* NB: in base metal setup, the slaves know which slave number they are in advance, and send their slavenum at registration. @@ -395,13 +391,11 @@ class CRegistryServer : public CSimpleInterface msg.append(masterSlaveMpTag); msg.append(kjServiceMpTag); if (!queryNodeComm().send(msg, RANK_ALL_OTHER, MPTAG_THORREGISTRATION, MP_ASYNC_SEND)) - { - PROGLOG("Failed to initialize slaves"); - return false; - } + throw makeStringException(TE_AbortException, "Failed to initialize slaves"); // Wait for confirmation from slaves PROGLOG("Initialization sent to slave group"); + Owned exception; try { while (slavesRegistered < slaves) @@ -429,11 +423,7 @@ class CRegistryServer : public CSimpleInterface { Owned e = deserializeException(msg); EXCLOG(e, "Registration error"); - if (TE_FailedToRegisterSlave == e->errorCode()) - { - setExitCode(0); // to avoid thor auto-recycling - return false; - } + throw e.getClear(); } registerNode(sender-1); } @@ -445,23 +435,21 @@ class CRegistryServer : public CSimpleInterface { CMessageBuffer msg; if (!queryNodeComm().send(msg, s+1, MPTAG_THORREGISTRATION)) - { - PROGLOG("Failed to acknowledge slave %d registration", s+1); - return false; - } + throw makeStringExceptionV(TE_AbortException, "Failed to acknowledge slave %d registration", s+1); } if (watchdog) watchdog->start(); deregistrationWatch.start(); - return true; + return; } catch (IException *e) { EXCLOG(e, "Slave registration exception"); - e->Release(); + exception.setown(e); } shutdown(); - return false; + if (exception) + throw exception.getClear(); } void stop() { @@ -999,7 +987,6 @@ int main( int argc, const char *argv[] ) kjServiceMpTag = allocateClusterMPTag(); unsigned numWorkers = 0; - bool doWorkerRegistration = false; if (isContainerized()) { saveWuidToFile(workunit); @@ -1044,14 +1031,11 @@ int main( int argc, const char *argv[] ) StringBuffer myEp; getRemoteAccessibleHostText(myEp, queryMyNode()->endpoint()); - workerNSInstalled = k8s::applyYaml("thorworker", workunit, cloudJobName, "networkpolicy", { }, false, true); - if (workerNSInstalled) - { - k8s::KeepJobs keepJob = k8s::translateKeepJobs(globals->queryProp("@keepJobs")); - workerJobInstalled = k8s::applyYaml("thorworker", workunit, cloudJobName, "job", { { "graphName", graphName}, { "master", myEp.str() }, { "_HPCC_NUM_WORKERS_", std::to_string(numWorkers/numWorkersPerPod)} }, false, k8s::KeepJobs::none == keepJob); - if (workerJobInstalled) - doWorkerRegistration = true; - } + if (!k8s::applyYaml("thorworker", workunit, cloudJobName, "networkpolicy", { }, false, true)) + throw makeStringException(TE_AbortException, "Failed to apply worker networkpolicy manifest"); + k8s::KeepJobs keepJob = k8s::translateKeepJobs(globals->queryProp("@keepJobs")); + if (!k8s::applyYaml("thorworker", workunit, cloudJobName, "job", { { "graphName", graphName}, { "master", myEp.str() }, { "_HPCC_NUM_WORKERS_", std::to_string(numWorkers/numWorkersPerPod)} }, false, k8s::KeepJobs::none == keepJob)) + throw makeStringException(TE_AbortException, "Failed to apply worker job manifest"); } else { @@ -1064,7 +1048,6 @@ int main( int argc, const char *argv[] ) unsigned numWorkersPerNode = globals->getPropInt("@slavesPerNode", 1); setClusterGroup(queryMyNode(), rawGroup, numWorkersPerNode, channelsPerWorker, slaveBasePort, localThorPortInc); numWorkers = queryNodeClusterWidth(); - doWorkerRegistration = true; if (numWorkersPerNode > 1) { // Split memory based on numWorkersPerNode @@ -1075,41 +1058,44 @@ int main( int argc, const char *argv[] ) } } - if (doWorkerRegistration && registry->connect(numWorkers)) + registry->connect(numWorkers); + if (!isContainerized()) { + // bare-metal - check health of dafilesrv's on the Thor cluster. if (globals->getPropBool("@replicateOutputs")&&globals->getPropBool("@validateDAFS",true)&&!checkClusterRelicateDAFS(queryNodeGroup())) { FLLOG(MCoperatorError, thorJob, "ERROR: Validate failure(s) detected, exiting Thor"); return globals->getPropBool("@validateDAFSretCode"); // default is no recycle! } + } - unsigned totSlaveProcs = queryNodeClusterWidth(); - for (unsigned s=0; s1) - virtStr.append("virtual slaves:"); - else - virtStr.append("slave:"); - PROGLOG("Slave log %u contains %s %s", s+1, virtStr.str(), slaveStr.str()); + unsigned o = s + (c * totSlaveProcs); + if (c) + slaveStr.append(","); + slaveStr.append(o+1); } + StringBuffer virtStr; + if (channelsPerWorker>1) + virtStr.append("virtual slaves:"); + else + virtStr.append("slave:"); + PROGLOG("Slave log %u contains %s %s", s+1, virtStr.str(), slaveStr.str()); + } - PROGLOG("verifying mp connection to rest of cluster"); - if (!queryNodeComm().verifyAll(false, 1000*60*30, 1000*60)) - throwStringExceptionV(0, "Failed to connect to all nodes"); - PROGLOG("verified mp connection to rest of cluster"); + PROGLOG("verifying mp connection to rest of cluster"); + if (!queryNodeComm().verifyAll(false, 1000*60*30, 1000*60)) + throwStringExceptionV(0, "Failed to connect to all nodes"); + PROGLOG("verified mp connection to rest of cluster"); #ifdef _CONTAINERIZED - if (globals->getPropBool("@_dafsStorage")) - { + if (globals->getPropBool("@_dafsStorage")) + { /* NB: This option is a developer option only. * It is intended to be used to bring up a temporary Thor instance that uses local node storage, @@ -1128,32 +1114,28 @@ int main( int argc, const char *argv[] ) * NB: This isn't a real StoragePlane, and it will not be accessible by any other component. * */ - StringBuffer uniqueGrpName; - queryNamedGroupStore().addUnique(&queryProcessGroup(), uniqueGrpName); - // change default plane - getComponentConfigSP()->setProp("@dataPlane", uniqueGrpName); - PROGLOG("Persistent Thor group created with group name: %s", uniqueGrpName.str()); - } + StringBuffer uniqueGrpName; + queryNamedGroupStore().addUnique(&queryProcessGroup(), uniqueGrpName); + // change default plane + getComponentConfigSP()->setProp("@dataPlane", uniqueGrpName); + PROGLOG("Persistent Thor group created with group name: %s", uniqueGrpName.str()); + } #endif - LOG(MCauditInfo, ",Progress,Thor,Startup,%s,%s,%s,%s",nodeGroup.str(),thorname,queueName.str(),logUrl.str()); - auditStartLogged = true; + LOG(MCauditInfo, ",Progress,Thor,Startup,%s,%s,%s,%s",nodeGroup.str(),thorname,queueName.str(),logUrl.str()); + auditStartLogged = true; - writeSentinelFile(sentinelFile); + writeSentinelFile(sentinelFile); #ifndef _CONTAINERIZED - unsigned pinterval = globals->getPropInt("@system_monitor_interval",1000*60); - if (pinterval) - startPerformanceMonitor(pinterval, PerfMonStandard, nullptr); + unsigned pinterval = globals->getPropInt("@system_monitor_interval",1000*60); + if (pinterval) + startPerformanceMonitor(pinterval, PerfMonStandard, nullptr); #endif - configurePreferredPlanes(); + configurePreferredPlanes(); - // NB: workunit/graphName only set in one-shot mode (if isCloud()) - thorMain(logHandler, workunit, graphName); - LOG(MCauditInfo, ",Progress,Thor,Terminate,%s,%s,%s",thorname,nodeGroup.str(),queueName.str()); - } - else - PROGLOG("Registration aborted"); - registry.clear(); + // NB: workunit/graphName only set in one-shot mode (if isCloud()) + thorMain(logHandler, workunit, graphName); + LOG(MCauditInfo, ",Progress,Thor,Terminate,%s,%s,%s",thorname,nodeGroup.str(),queueName.str()); LOG(MCdebugProgress, thorJob, "ThorMaster terminated OK"); } catch (IException *e)