Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-31009 Ensure Thor startup errors are properly reported. #18143

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14269,16 +14269,16 @@ void executeThorGraph(const char * graphName, IConstWorkUnit &workunit, const IP
unsigned runningTimeLimit = workunit.getDebugValueInt("maxRunTime", 0);
runningTimeLimit = runningTimeLimit ? runningTimeLimit : INFINITE;

std::list<WUState> expectedStates = { WUStateRunning, WUStateWait };
std::list<WUState> expectedStates = { WUStateRunning, WUStateWait, WUStateFailed };
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NB: not absolutely required in this commit, but related. This code should have handled the workunit having put in the failed state before. The only downside before was the production of an extra spurious "Query failed, state: ..." exception

unsigned __int64 blockedTime = 0;
for (unsigned i=0; i<2; i++)
{
WUState state = waitForWorkUnitToComplete(wuid, timelimit*1000, expectedStates);
DBGLOG("Got state: %s", getWorkunitStateStr(state));
if (WUStateWait == state) // already finished
if ((WUStateWait == state) || (WUStateFailed == state)) // already finished or failed
{
// workunit may have spent time in blocked state, but then transitioned to
// wait state quickly such that this code did not see its running state.
// wait or failed state quickly such that this code did not see its running state.
if (!blockedTime)
blockedTime = elapsedTimer.elapsedNs();
break;
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/master/thgraphmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ void relayWuidException(IConstWorkUnit *workunit, const IException *exception)
if (WUStateWait != state)
{
Owned<IWUException> we = wu->createException();
we->setSeverity(SeverityInformation);
we->setSeverity(SeverityError);
StringBuffer errStr;
exception->errorMessage(errStr);
we->setExceptionMessage(errStr);
Expand Down
134 changes: 58 additions & 76 deletions thorlcr/master/thmastermain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ class CRegistryServer : public CSimpleInterface
watchdog->addSlave(ep);
++slavesRegistered;
}
bool connect(unsigned slaves)
void connect(unsigned slaves)
{
IPointerArrayOf<INode> connectedSlaves;
connectedSlaves.ensureCapacity(slaves);
Expand All @@ -287,10 +287,7 @@ class CRegistryServer : public CSimpleInterface
{
::Release(_sender);
if (registerTM.timedout())
{
WARNLOG("Timeout waiting for all workers to register within timeout period (%u mins)", maxRegistrationMins);
return false;
}
throw makeStringExceptionV(TE_AbortException, "Timeout waiting for all workers to register within timeout period (%u mins)", maxRegistrationMins);

if (isContainerized())
{
Expand All @@ -307,8 +304,7 @@ class CRegistryServer : public CSimpleInterface
if (NotFound != connectedSlaves.find(sender))
{
StringBuffer epStr;
PROGLOG("Same slave registered twice!! : %s", sender->endpoint().getEndpointHostText(epStr).str());
return false;
throw makeStringExceptionV(TE_AbortException, "Same slave registered twice!! : %s", sender->endpoint().getEndpointHostText(epStr).str());
}

/* NB: in base metal setup, the slaves know which slave number they are in advance, and send their slavenum at registration.
Expand Down Expand Up @@ -395,13 +391,11 @@ class CRegistryServer : public CSimpleInterface
msg.append(masterSlaveMpTag);
msg.append(kjServiceMpTag);
if (!queryNodeComm().send(msg, RANK_ALL_OTHER, MPTAG_THORREGISTRATION, MP_ASYNC_SEND))
{
PROGLOG("Failed to initialize slaves");
return false;
}
throw makeStringException(TE_AbortException, "Failed to initialize slaves");

// Wait for confirmation from slaves
PROGLOG("Initialization sent to slave group");
Owned<IException> exception;
try
{
while (slavesRegistered < slaves)
Expand Down Expand Up @@ -429,11 +423,7 @@ class CRegistryServer : public CSimpleInterface
{
Owned<IException> e = deserializeException(msg);
EXCLOG(e, "Registration error");
if (TE_FailedToRegisterSlave == e->errorCode())
{
setExitCode(0); // to avoid thor auto-recycling
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NB: this was not needed before, the deliberate suppression of Thor restarts in bare-metal at this point, happens [deliberatly] because it hasn't got as far as creating the sentinel file (see init_thor script for non-restart when sentinel missing)

return false;
}
throw e.getClear();
}
registerNode(sender-1);
}
Expand All @@ -445,23 +435,21 @@ class CRegistryServer : public CSimpleInterface
{
CMessageBuffer msg;
if (!queryNodeComm().send(msg, s+1, MPTAG_THORREGISTRATION))
{
PROGLOG("Failed to acknowledge slave %d registration", s+1);
return false;
}
throw makeStringExceptionV(TE_AbortException, "Failed to acknowledge slave %d registration", s+1);
}
if (watchdog)
watchdog->start();
deregistrationWatch.start();
return true;
return;
}
catch (IException *e)
{
EXCLOG(e, "Slave registration exception");
e->Release();
exception.setown(e);
}
shutdown();
return false;
if (exception)
throw exception.getClear();
}
void stop()
{
Expand Down Expand Up @@ -999,7 +987,6 @@ int main( int argc, const char *argv[] )
kjServiceMpTag = allocateClusterMPTag();

unsigned numWorkers = 0;
bool doWorkerRegistration = false;
if (isContainerized())
{
saveWuidToFile(workunit);
Expand Down Expand Up @@ -1044,14 +1031,11 @@ int main( int argc, const char *argv[] )
StringBuffer myEp;
getRemoteAccessibleHostText(myEp, queryMyNode()->endpoint());

workerNSInstalled = k8s::applyYaml("thorworker", workunit, cloudJobName, "networkpolicy", { }, false, true);
if (workerNSInstalled)
{
k8s::KeepJobs keepJob = k8s::translateKeepJobs(globals->queryProp("@keepJobs"));
workerJobInstalled = k8s::applyYaml("thorworker", workunit, cloudJobName, "job", { { "graphName", graphName}, { "master", myEp.str() }, { "_HPCC_NUM_WORKERS_", std::to_string(numWorkers/numWorkersPerPod)} }, false, k8s::KeepJobs::none == keepJob);
if (workerJobInstalled)
doWorkerRegistration = true;
}
if (!k8s::applyYaml("thorworker", workunit, cloudJobName, "networkpolicy", { }, false, true))
throw makeStringException(TE_AbortException, "Failed to apply worker networkpolicy manifest");
k8s::KeepJobs keepJob = k8s::translateKeepJobs(globals->queryProp("@keepJobs"));
if (!k8s::applyYaml("thorworker", workunit, cloudJobName, "job", { { "graphName", graphName}, { "master", myEp.str() }, { "_HPCC_NUM_WORKERS_", std::to_string(numWorkers/numWorkersPerPod)} }, false, k8s::KeepJobs::none == keepJob))
throw makeStringException(TE_AbortException, "Failed to apply worker job manifest");
}
else
{
Expand All @@ -1064,7 +1048,6 @@ int main( int argc, const char *argv[] )
unsigned numWorkersPerNode = globals->getPropInt("@slavesPerNode", 1);
setClusterGroup(queryMyNode(), rawGroup, numWorkersPerNode, channelsPerWorker, slaveBasePort, localThorPortInc);
numWorkers = queryNodeClusterWidth();
doWorkerRegistration = true;
if (numWorkersPerNode > 1)
{
// Split memory based on numWorkersPerNode
Expand All @@ -1075,41 +1058,44 @@ int main( int argc, const char *argv[] )
}
}

if (doWorkerRegistration && registry->connect(numWorkers))
registry->connect(numWorkers);
if (!isContainerized())
{
// bare-metal - check health of dafilesrv's on the Thor cluster.
if (globals->getPropBool("@replicateOutputs")&&globals->getPropBool("@validateDAFS",true)&&!checkClusterRelicateDAFS(queryNodeGroup()))
{
FLLOG(MCoperatorError, thorJob, "ERROR: Validate failure(s) detected, exiting Thor");
return globals->getPropBool("@validateDAFSretCode"); // default is no recycle!
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be clearer if this explicitly returned an integer error code. E.g.

return globals->getPropBool("@validateDAFSretCode") ? TEC_DAFSdown : 0);

}
}

unsigned totSlaveProcs = queryNodeClusterWidth();
for (unsigned s=0; s<totSlaveProcs; s++)
unsigned totSlaveProcs = queryNodeClusterWidth();
for (unsigned s=0; s<totSlaveProcs; s++)
{
StringBuffer slaveStr;
for (unsigned c=0; c<channelsPerWorker; c++)
{
StringBuffer slaveStr;
for (unsigned c=0; c<channelsPerWorker; c++)
{
unsigned o = s + (c * totSlaveProcs);
if (c)
slaveStr.append(",");
slaveStr.append(o+1);
}
StringBuffer virtStr;
if (channelsPerWorker>1)
virtStr.append("virtual slaves:");
else
virtStr.append("slave:");
PROGLOG("Slave log %u contains %s %s", s+1, virtStr.str(), slaveStr.str());
unsigned o = s + (c * totSlaveProcs);
if (c)
slaveStr.append(",");
slaveStr.append(o+1);
}
StringBuffer virtStr;
if (channelsPerWorker>1)
virtStr.append("virtual slaves:");
else
virtStr.append("slave:");
PROGLOG("Slave log %u contains %s %s", s+1, virtStr.str(), slaveStr.str());
}

PROGLOG("verifying mp connection to rest of cluster");
if (!queryNodeComm().verifyAll(false, 1000*60*30, 1000*60))
throwStringExceptionV(0, "Failed to connect to all nodes");
PROGLOG("verified mp connection to rest of cluster");
PROGLOG("verifying mp connection to rest of cluster");
if (!queryNodeComm().verifyAll(false, 1000*60*30, 1000*60))
throwStringExceptionV(0, "Failed to connect to all nodes");
PROGLOG("verified mp connection to rest of cluster");

#ifdef _CONTAINERIZED
if (globals->getPropBool("@_dafsStorage"))
{
if (globals->getPropBool("@_dafsStorage"))
{
/* NB: This option is a developer option only.

* It is intended to be used to bring up a temporary Thor instance that uses local node storage,
Expand All @@ -1128,32 +1114,28 @@ int main( int argc, const char *argv[] )
* NB: This isn't a real StoragePlane, and it will not be accessible by any other component.
*
*/
StringBuffer uniqueGrpName;
queryNamedGroupStore().addUnique(&queryProcessGroup(), uniqueGrpName);
// change default plane
getComponentConfigSP()->setProp("@dataPlane", uniqueGrpName);
PROGLOG("Persistent Thor group created with group name: %s", uniqueGrpName.str());
}
StringBuffer uniqueGrpName;
queryNamedGroupStore().addUnique(&queryProcessGroup(), uniqueGrpName);
// change default plane
getComponentConfigSP()->setProp("@dataPlane", uniqueGrpName);
PROGLOG("Persistent Thor group created with group name: %s", uniqueGrpName.str());
}
#endif
LOG(MCauditInfo, ",Progress,Thor,Startup,%s,%s,%s,%s",nodeGroup.str(),thorname,queueName.str(),logUrl.str());
auditStartLogged = true;
LOG(MCauditInfo, ",Progress,Thor,Startup,%s,%s,%s,%s",nodeGroup.str(),thorname,queueName.str(),logUrl.str());
auditStartLogged = true;

writeSentinelFile(sentinelFile);
writeSentinelFile(sentinelFile);

#ifndef _CONTAINERIZED
unsigned pinterval = globals->getPropInt("@system_monitor_interval",1000*60);
if (pinterval)
startPerformanceMonitor(pinterval, PerfMonStandard, nullptr);
unsigned pinterval = globals->getPropInt("@system_monitor_interval",1000*60);
if (pinterval)
startPerformanceMonitor(pinterval, PerfMonStandard, nullptr);
#endif
configurePreferredPlanes();
configurePreferredPlanes();

// NB: workunit/graphName only set in one-shot mode (if isCloud())
thorMain(logHandler, workunit, graphName);
LOG(MCauditInfo, ",Progress,Thor,Terminate,%s,%s,%s",thorname,nodeGroup.str(),queueName.str());
}
else
PROGLOG("Registration aborted");
registry.clear();
// NB: workunit/graphName only set in one-shot mode (if isCloud())
thorMain(logHandler, workunit, graphName);
LOG(MCauditInfo, ",Progress,Thor,Terminate,%s,%s,%s",thorname,nodeGroup.str(),queueName.str());
LOG(MCdebugProgress, thorJob, "ThorMaster terminated OK");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make any difference that this is now before the destruction of registry?

}
catch (IException *e)
Expand Down
Loading