Skip to content

Commit

Permalink
Merge pull request #18143 from jakesmith/HPCC-31009-registration-timeout
Browse files Browse the repository at this point in the history
HPCC-31009 Ensure Thor startup errors are properly reported.

Reviewed-By: Shamser Ahmed <shamser.ahmed@lexisnexis.com>
Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Merged-by: Gavin Halliday <ghalliday@hpccsystems.com>
  • Loading branch information
ghalliday authored Jan 3, 2024
2 parents 5d5418b + a4f279c commit c9afd94
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 80 deletions.
6 changes: 3 additions & 3 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14269,16 +14269,16 @@ void executeThorGraph(const char * graphName, IConstWorkUnit &workunit, const IP
unsigned runningTimeLimit = workunit.getDebugValueInt("maxRunTime", 0);
runningTimeLimit = runningTimeLimit ? runningTimeLimit : INFINITE;

std::list<WUState> expectedStates = { WUStateRunning, WUStateWait };
std::list<WUState> expectedStates = { WUStateRunning, WUStateWait, WUStateFailed };
unsigned __int64 blockedTime = 0;
for (unsigned i=0; i<2; i++)
{
WUState state = waitForWorkUnitToComplete(wuid, timelimit*1000, expectedStates);
DBGLOG("Got state: %s", getWorkunitStateStr(state));
if (WUStateWait == state) // already finished
if ((WUStateWait == state) || (WUStateFailed == state)) // already finished or failed
{
// workunit may have spent time in blocked state, but then transitioned to
// wait state quickly such that this code did not see its running state.
// wait or failed state quickly such that this code did not see its running state.
if (!blockedTime)
blockedTime = elapsedTimer.elapsedNs();
break;
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/master/thgraphmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ void relayWuidException(IConstWorkUnit *workunit, const IException *exception)
if (WUStateWait != state)
{
Owned<IWUException> we = wu->createException();
we->setSeverity(SeverityInformation);
we->setSeverity(SeverityError);
StringBuffer errStr;
exception->errorMessage(errStr);
we->setExceptionMessage(errStr);
Expand Down
134 changes: 58 additions & 76 deletions thorlcr/master/thmastermain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ class CRegistryServer : public CSimpleInterface
watchdog->addSlave(ep);
++slavesRegistered;
}
bool connect(unsigned slaves)
void connect(unsigned slaves)
{
IPointerArrayOf<INode> connectedSlaves;
connectedSlaves.ensureCapacity(slaves);
Expand All @@ -287,10 +287,7 @@ class CRegistryServer : public CSimpleInterface
{
::Release(_sender);
if (registerTM.timedout())
{
WARNLOG("Timeout waiting for all workers to register within timeout period (%u mins)", maxRegistrationMins);
return false;
}
throw makeStringExceptionV(TE_AbortException, "Timeout waiting for all workers to register within timeout period (%u mins)", maxRegistrationMins);

if (isContainerized())
{
Expand All @@ -307,8 +304,7 @@ class CRegistryServer : public CSimpleInterface
if (NotFound != connectedSlaves.find(sender))
{
StringBuffer epStr;
PROGLOG("Same slave registered twice!! : %s", sender->endpoint().getEndpointHostText(epStr).str());
return false;
throw makeStringExceptionV(TE_AbortException, "Same slave registered twice!! : %s", sender->endpoint().getEndpointHostText(epStr).str());
}

/* NB: in base metal setup, the slaves know which slave number they are in advance, and send their slavenum at registration.
Expand Down Expand Up @@ -395,13 +391,11 @@ class CRegistryServer : public CSimpleInterface
msg.append(masterSlaveMpTag);
msg.append(kjServiceMpTag);
if (!queryNodeComm().send(msg, RANK_ALL_OTHER, MPTAG_THORREGISTRATION, MP_ASYNC_SEND))
{
PROGLOG("Failed to initialize slaves");
return false;
}
throw makeStringException(TE_AbortException, "Failed to initialize slaves");

// Wait for confirmation from slaves
PROGLOG("Initialization sent to slave group");
Owned<IException> exception;
try
{
while (slavesRegistered < slaves)
Expand Down Expand Up @@ -429,11 +423,7 @@ class CRegistryServer : public CSimpleInterface
{
Owned<IException> e = deserializeException(msg);
EXCLOG(e, "Registration error");
if (TE_FailedToRegisterSlave == e->errorCode())
{
setExitCode(0); // to avoid thor auto-recycling
return false;
}
throw e.getClear();
}
registerNode(sender-1);
}
Expand All @@ -445,23 +435,21 @@ class CRegistryServer : public CSimpleInterface
{
CMessageBuffer msg;
if (!queryNodeComm().send(msg, s+1, MPTAG_THORREGISTRATION))
{
PROGLOG("Failed to acknowledge slave %d registration", s+1);
return false;
}
throw makeStringExceptionV(TE_AbortException, "Failed to acknowledge slave %d registration", s+1);
}
if (watchdog)
watchdog->start();
deregistrationWatch.start();
return true;
return;
}
catch (IException *e)
{
EXCLOG(e, "Slave registration exception");
e->Release();
exception.setown(e);
}
shutdown();
return false;
if (exception)
throw exception.getClear();
}
void stop()
{
Expand Down Expand Up @@ -999,7 +987,6 @@ int main( int argc, const char *argv[] )
kjServiceMpTag = allocateClusterMPTag();

unsigned numWorkers = 0;
bool doWorkerRegistration = false;
if (isContainerized())
{
saveWuidToFile(workunit);
Expand Down Expand Up @@ -1044,14 +1031,11 @@ int main( int argc, const char *argv[] )
StringBuffer myEp;
getRemoteAccessibleHostText(myEp, queryMyNode()->endpoint());

workerNSInstalled = k8s::applyYaml("thorworker", workunit, cloudJobName, "networkpolicy", { }, false, true);
if (workerNSInstalled)
{
k8s::KeepJobs keepJob = k8s::translateKeepJobs(globals->queryProp("@keepJobs"));
workerJobInstalled = k8s::applyYaml("thorworker", workunit, cloudJobName, "job", { { "graphName", graphName}, { "master", myEp.str() }, { "_HPCC_NUM_WORKERS_", std::to_string(numWorkers/numWorkersPerPod)} }, false, k8s::KeepJobs::none == keepJob);
if (workerJobInstalled)
doWorkerRegistration = true;
}
if (!k8s::applyYaml("thorworker", workunit, cloudJobName, "networkpolicy", { }, false, true))
throw makeStringException(TE_AbortException, "Failed to apply worker networkpolicy manifest");
k8s::KeepJobs keepJob = k8s::translateKeepJobs(globals->queryProp("@keepJobs"));
if (!k8s::applyYaml("thorworker", workunit, cloudJobName, "job", { { "graphName", graphName}, { "master", myEp.str() }, { "_HPCC_NUM_WORKERS_", std::to_string(numWorkers/numWorkersPerPod)} }, false, k8s::KeepJobs::none == keepJob))
throw makeStringException(TE_AbortException, "Failed to apply worker job manifest");
}
else
{
Expand All @@ -1064,7 +1048,6 @@ int main( int argc, const char *argv[] )
unsigned numWorkersPerNode = globals->getPropInt("@slavesPerNode", 1);
setClusterGroup(queryMyNode(), rawGroup, numWorkersPerNode, channelsPerWorker, slaveBasePort, localThorPortInc);
numWorkers = queryNodeClusterWidth();
doWorkerRegistration = true;
if (numWorkersPerNode > 1)
{
// Split memory based on numWorkersPerNode
Expand All @@ -1075,41 +1058,44 @@ int main( int argc, const char *argv[] )
}
}

if (doWorkerRegistration && registry->connect(numWorkers))
registry->connect(numWorkers);
if (!isContainerized())
{
// bare-metal - check health of dafilesrv's on the Thor cluster.
if (globals->getPropBool("@replicateOutputs")&&globals->getPropBool("@validateDAFS",true)&&!checkClusterRelicateDAFS(queryNodeGroup()))
{
FLLOG(MCoperatorError, thorJob, "ERROR: Validate failure(s) detected, exiting Thor");
return globals->getPropBool("@validateDAFSretCode"); // default is no recycle!
}
}

unsigned totSlaveProcs = queryNodeClusterWidth();
for (unsigned s=0; s<totSlaveProcs; s++)
unsigned totSlaveProcs = queryNodeClusterWidth();
for (unsigned s=0; s<totSlaveProcs; s++)
{
StringBuffer slaveStr;
for (unsigned c=0; c<channelsPerWorker; c++)
{
StringBuffer slaveStr;
for (unsigned c=0; c<channelsPerWorker; c++)
{
unsigned o = s + (c * totSlaveProcs);
if (c)
slaveStr.append(",");
slaveStr.append(o+1);
}
StringBuffer virtStr;
if (channelsPerWorker>1)
virtStr.append("virtual slaves:");
else
virtStr.append("slave:");
PROGLOG("Slave log %u contains %s %s", s+1, virtStr.str(), slaveStr.str());
unsigned o = s + (c * totSlaveProcs);
if (c)
slaveStr.append(",");
slaveStr.append(o+1);
}
StringBuffer virtStr;
if (channelsPerWorker>1)
virtStr.append("virtual slaves:");
else
virtStr.append("slave:");
PROGLOG("Slave log %u contains %s %s", s+1, virtStr.str(), slaveStr.str());
}

PROGLOG("verifying mp connection to rest of cluster");
if (!queryNodeComm().verifyAll(false, 1000*60*30, 1000*60))
throwStringExceptionV(0, "Failed to connect to all nodes");
PROGLOG("verified mp connection to rest of cluster");
PROGLOG("verifying mp connection to rest of cluster");
if (!queryNodeComm().verifyAll(false, 1000*60*30, 1000*60))
throwStringExceptionV(0, "Failed to connect to all nodes");
PROGLOG("verified mp connection to rest of cluster");

#ifdef _CONTAINERIZED
if (globals->getPropBool("@_dafsStorage"))
{
if (globals->getPropBool("@_dafsStorage"))
{
/* NB: This option is a developer option only.
* It is intended to be used to bring up a temporary Thor instance that uses local node storage,
Expand All @@ -1128,32 +1114,28 @@ int main( int argc, const char *argv[] )
* NB: This isn't a real StoragePlane, and it will not be accessible by any other component.
*
*/
StringBuffer uniqueGrpName;
queryNamedGroupStore().addUnique(&queryProcessGroup(), uniqueGrpName);
// change default plane
getComponentConfigSP()->setProp("@dataPlane", uniqueGrpName);
PROGLOG("Persistent Thor group created with group name: %s", uniqueGrpName.str());
}
StringBuffer uniqueGrpName;
queryNamedGroupStore().addUnique(&queryProcessGroup(), uniqueGrpName);
// change default plane
getComponentConfigSP()->setProp("@dataPlane", uniqueGrpName);
PROGLOG("Persistent Thor group created with group name: %s", uniqueGrpName.str());
}
#endif
LOG(MCauditInfo, ",Progress,Thor,Startup,%s,%s,%s,%s",nodeGroup.str(),thorname,queueName.str(),logUrl.str());
auditStartLogged = true;
LOG(MCauditInfo, ",Progress,Thor,Startup,%s,%s,%s,%s",nodeGroup.str(),thorname,queueName.str(),logUrl.str());
auditStartLogged = true;

writeSentinelFile(sentinelFile);
writeSentinelFile(sentinelFile);

#ifndef _CONTAINERIZED
unsigned pinterval = globals->getPropInt("@system_monitor_interval",1000*60);
if (pinterval)
startPerformanceMonitor(pinterval, PerfMonStandard, nullptr);
unsigned pinterval = globals->getPropInt("@system_monitor_interval",1000*60);
if (pinterval)
startPerformanceMonitor(pinterval, PerfMonStandard, nullptr);
#endif
configurePreferredPlanes();
configurePreferredPlanes();

// NB: workunit/graphName only set in one-shot mode (if isCloud())
thorMain(logHandler, workunit, graphName);
LOG(MCauditInfo, ",Progress,Thor,Terminate,%s,%s,%s",thorname,nodeGroup.str(),queueName.str());
}
else
PROGLOG("Registration aborted");
registry.clear();
// NB: workunit/graphName only set in one-shot mode (if isCloud())
thorMain(logHandler, workunit, graphName);
LOG(MCauditInfo, ",Progress,Thor,Terminate,%s,%s,%s",thorname,nodeGroup.str(),queueName.str());
LOG(MCdebugProgress, thorJob, "ThorMaster terminated OK");
}
catch (IException *e)
Expand Down

0 comments on commit c9afd94

Please sign in to comment.