-
Notifications
You must be signed in to change notification settings - Fork 305
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HPCC-31009 Ensure Thor startup errors are properly reported. #18143
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -265,7 +265,7 @@ class CRegistryServer : public CSimpleInterface | |
watchdog->addSlave(ep); | ||
++slavesRegistered; | ||
} | ||
bool connect(unsigned slaves) | ||
void connect(unsigned slaves) | ||
{ | ||
IPointerArrayOf<INode> connectedSlaves; | ||
connectedSlaves.ensureCapacity(slaves); | ||
|
@@ -287,10 +287,7 @@ class CRegistryServer : public CSimpleInterface | |
{ | ||
::Release(_sender); | ||
if (registerTM.timedout()) | ||
{ | ||
WARNLOG("Timeout waiting for all workers to register within timeout period (%u mins)", maxRegistrationMins); | ||
return false; | ||
} | ||
throw makeStringExceptionV(TE_AbortException, "Timeout waiting for all workers to register within timeout period (%u mins)", maxRegistrationMins); | ||
|
||
if (isContainerized()) | ||
{ | ||
|
@@ -307,8 +304,7 @@ class CRegistryServer : public CSimpleInterface | |
if (NotFound != connectedSlaves.find(sender)) | ||
{ | ||
StringBuffer epStr; | ||
PROGLOG("Same slave registered twice!! : %s", sender->endpoint().getEndpointHostText(epStr).str()); | ||
return false; | ||
throw makeStringExceptionV(TE_AbortException, "Same slave registered twice!! : %s", sender->endpoint().getEndpointHostText(epStr).str()); | ||
} | ||
|
||
/* NB: in base metal setup, the slaves know which slave number they are in advance, and send their slavenum at registration. | ||
|
@@ -395,13 +391,11 @@ class CRegistryServer : public CSimpleInterface | |
msg.append(masterSlaveMpTag); | ||
msg.append(kjServiceMpTag); | ||
if (!queryNodeComm().send(msg, RANK_ALL_OTHER, MPTAG_THORREGISTRATION, MP_ASYNC_SEND)) | ||
{ | ||
PROGLOG("Failed to initialize slaves"); | ||
return false; | ||
} | ||
throw makeStringException(TE_AbortException, "Failed to initialize slaves"); | ||
|
||
// Wait for confirmation from slaves | ||
PROGLOG("Initialization sent to slave group"); | ||
Owned<IException> exception; | ||
try | ||
{ | ||
while (slavesRegistered < slaves) | ||
|
@@ -429,11 +423,7 @@ class CRegistryServer : public CSimpleInterface | |
{ | ||
Owned<IException> e = deserializeException(msg); | ||
EXCLOG(e, "Registration error"); | ||
if (TE_FailedToRegisterSlave == e->errorCode()) | ||
{ | ||
setExitCode(0); // to avoid thor auto-recycling | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NB: this was not needed before, the deliberate suppression of Thor restarts in bare-metal at this point, happens [deliberatly] because it hasn't got as far as creating the sentinel file (see init_thor script for non-restart when sentinel missing) |
||
return false; | ||
} | ||
throw e.getClear(); | ||
} | ||
registerNode(sender-1); | ||
} | ||
|
@@ -445,23 +435,21 @@ class CRegistryServer : public CSimpleInterface | |
{ | ||
CMessageBuffer msg; | ||
if (!queryNodeComm().send(msg, s+1, MPTAG_THORREGISTRATION)) | ||
{ | ||
PROGLOG("Failed to acknowledge slave %d registration", s+1); | ||
return false; | ||
} | ||
throw makeStringExceptionV(TE_AbortException, "Failed to acknowledge slave %d registration", s+1); | ||
} | ||
if (watchdog) | ||
watchdog->start(); | ||
deregistrationWatch.start(); | ||
return true; | ||
return; | ||
} | ||
catch (IException *e) | ||
{ | ||
EXCLOG(e, "Slave registration exception"); | ||
e->Release(); | ||
exception.setown(e); | ||
} | ||
shutdown(); | ||
return false; | ||
if (exception) | ||
throw exception.getClear(); | ||
} | ||
void stop() | ||
{ | ||
|
@@ -999,7 +987,6 @@ int main( int argc, const char *argv[] ) | |
kjServiceMpTag = allocateClusterMPTag(); | ||
|
||
unsigned numWorkers = 0; | ||
bool doWorkerRegistration = false; | ||
if (isContainerized()) | ||
{ | ||
saveWuidToFile(workunit); | ||
|
@@ -1044,14 +1031,11 @@ int main( int argc, const char *argv[] ) | |
StringBuffer myEp; | ||
getRemoteAccessibleHostText(myEp, queryMyNode()->endpoint()); | ||
|
||
workerNSInstalled = k8s::applyYaml("thorworker", workunit, cloudJobName, "networkpolicy", { }, false, true); | ||
if (workerNSInstalled) | ||
{ | ||
k8s::KeepJobs keepJob = k8s::translateKeepJobs(globals->queryProp("@keepJobs")); | ||
workerJobInstalled = k8s::applyYaml("thorworker", workunit, cloudJobName, "job", { { "graphName", graphName}, { "master", myEp.str() }, { "_HPCC_NUM_WORKERS_", std::to_string(numWorkers/numWorkersPerPod)} }, false, k8s::KeepJobs::none == keepJob); | ||
if (workerJobInstalled) | ||
doWorkerRegistration = true; | ||
} | ||
if (!k8s::applyYaml("thorworker", workunit, cloudJobName, "networkpolicy", { }, false, true)) | ||
throw makeStringException(TE_AbortException, "Failed to apply worker networkpolicy manifest"); | ||
k8s::KeepJobs keepJob = k8s::translateKeepJobs(globals->queryProp("@keepJobs")); | ||
if (!k8s::applyYaml("thorworker", workunit, cloudJobName, "job", { { "graphName", graphName}, { "master", myEp.str() }, { "_HPCC_NUM_WORKERS_", std::to_string(numWorkers/numWorkersPerPod)} }, false, k8s::KeepJobs::none == keepJob)) | ||
throw makeStringException(TE_AbortException, "Failed to apply worker job manifest"); | ||
} | ||
else | ||
{ | ||
|
@@ -1064,7 +1048,6 @@ int main( int argc, const char *argv[] ) | |
unsigned numWorkersPerNode = globals->getPropInt("@slavesPerNode", 1); | ||
setClusterGroup(queryMyNode(), rawGroup, numWorkersPerNode, channelsPerWorker, slaveBasePort, localThorPortInc); | ||
numWorkers = queryNodeClusterWidth(); | ||
doWorkerRegistration = true; | ||
if (numWorkersPerNode > 1) | ||
{ | ||
// Split memory based on numWorkersPerNode | ||
|
@@ -1075,41 +1058,44 @@ int main( int argc, const char *argv[] ) | |
} | ||
} | ||
|
||
if (doWorkerRegistration && registry->connect(numWorkers)) | ||
registry->connect(numWorkers); | ||
if (!isContainerized()) | ||
{ | ||
// bare-metal - check health of dafilesrv's on the Thor cluster. | ||
if (globals->getPropBool("@replicateOutputs")&&globals->getPropBool("@validateDAFS",true)&&!checkClusterRelicateDAFS(queryNodeGroup())) | ||
{ | ||
FLLOG(MCoperatorError, thorJob, "ERROR: Validate failure(s) detected, exiting Thor"); | ||
return globals->getPropBool("@validateDAFSretCode"); // default is no recycle! | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would be clearer if this explicitly returned an integer error code. E.g. return globals->getPropBool("@validateDAFSretCode") ? TEC_DAFSdown : 0); |
||
} | ||
} | ||
|
||
unsigned totSlaveProcs = queryNodeClusterWidth(); | ||
for (unsigned s=0; s<totSlaveProcs; s++) | ||
unsigned totSlaveProcs = queryNodeClusterWidth(); | ||
for (unsigned s=0; s<totSlaveProcs; s++) | ||
{ | ||
StringBuffer slaveStr; | ||
for (unsigned c=0; c<channelsPerWorker; c++) | ||
{ | ||
StringBuffer slaveStr; | ||
for (unsigned c=0; c<channelsPerWorker; c++) | ||
{ | ||
unsigned o = s + (c * totSlaveProcs); | ||
if (c) | ||
slaveStr.append(","); | ||
slaveStr.append(o+1); | ||
} | ||
StringBuffer virtStr; | ||
if (channelsPerWorker>1) | ||
virtStr.append("virtual slaves:"); | ||
else | ||
virtStr.append("slave:"); | ||
PROGLOG("Slave log %u contains %s %s", s+1, virtStr.str(), slaveStr.str()); | ||
unsigned o = s + (c * totSlaveProcs); | ||
if (c) | ||
slaveStr.append(","); | ||
slaveStr.append(o+1); | ||
} | ||
StringBuffer virtStr; | ||
if (channelsPerWorker>1) | ||
virtStr.append("virtual slaves:"); | ||
else | ||
virtStr.append("slave:"); | ||
PROGLOG("Slave log %u contains %s %s", s+1, virtStr.str(), slaveStr.str()); | ||
} | ||
|
||
PROGLOG("verifying mp connection to rest of cluster"); | ||
if (!queryNodeComm().verifyAll(false, 1000*60*30, 1000*60)) | ||
throwStringExceptionV(0, "Failed to connect to all nodes"); | ||
PROGLOG("verified mp connection to rest of cluster"); | ||
PROGLOG("verifying mp connection to rest of cluster"); | ||
if (!queryNodeComm().verifyAll(false, 1000*60*30, 1000*60)) | ||
throwStringExceptionV(0, "Failed to connect to all nodes"); | ||
PROGLOG("verified mp connection to rest of cluster"); | ||
|
||
#ifdef _CONTAINERIZED | ||
if (globals->getPropBool("@_dafsStorage")) | ||
{ | ||
if (globals->getPropBool("@_dafsStorage")) | ||
{ | ||
/* NB: This option is a developer option only. | ||
|
||
* It is intended to be used to bring up a temporary Thor instance that uses local node storage, | ||
|
@@ -1128,32 +1114,28 @@ int main( int argc, const char *argv[] ) | |
* NB: This isn't a real StoragePlane, and it will not be accessible by any other component. | ||
* | ||
*/ | ||
StringBuffer uniqueGrpName; | ||
queryNamedGroupStore().addUnique(&queryProcessGroup(), uniqueGrpName); | ||
// change default plane | ||
getComponentConfigSP()->setProp("@dataPlane", uniqueGrpName); | ||
PROGLOG("Persistent Thor group created with group name: %s", uniqueGrpName.str()); | ||
} | ||
StringBuffer uniqueGrpName; | ||
queryNamedGroupStore().addUnique(&queryProcessGroup(), uniqueGrpName); | ||
// change default plane | ||
getComponentConfigSP()->setProp("@dataPlane", uniqueGrpName); | ||
PROGLOG("Persistent Thor group created with group name: %s", uniqueGrpName.str()); | ||
} | ||
#endif | ||
LOG(MCauditInfo, ",Progress,Thor,Startup,%s,%s,%s,%s",nodeGroup.str(),thorname,queueName.str(),logUrl.str()); | ||
auditStartLogged = true; | ||
LOG(MCauditInfo, ",Progress,Thor,Startup,%s,%s,%s,%s",nodeGroup.str(),thorname,queueName.str(),logUrl.str()); | ||
auditStartLogged = true; | ||
|
||
writeSentinelFile(sentinelFile); | ||
writeSentinelFile(sentinelFile); | ||
|
||
#ifndef _CONTAINERIZED | ||
unsigned pinterval = globals->getPropInt("@system_monitor_interval",1000*60); | ||
if (pinterval) | ||
startPerformanceMonitor(pinterval, PerfMonStandard, nullptr); | ||
unsigned pinterval = globals->getPropInt("@system_monitor_interval",1000*60); | ||
if (pinterval) | ||
startPerformanceMonitor(pinterval, PerfMonStandard, nullptr); | ||
#endif | ||
configurePreferredPlanes(); | ||
configurePreferredPlanes(); | ||
|
||
// NB: workunit/graphName only set in one-shot mode (if isCloud()) | ||
thorMain(logHandler, workunit, graphName); | ||
LOG(MCauditInfo, ",Progress,Thor,Terminate,%s,%s,%s",thorname,nodeGroup.str(),queueName.str()); | ||
} | ||
else | ||
PROGLOG("Registration aborted"); | ||
registry.clear(); | ||
// NB: workunit/graphName only set in one-shot mode (if isCloud()) | ||
thorMain(logHandler, workunit, graphName); | ||
LOG(MCauditInfo, ",Progress,Thor,Terminate,%s,%s,%s",thorname,nodeGroup.str(),queueName.str()); | ||
LOG(MCdebugProgress, thorJob, "ThorMaster terminated OK"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it make any difference that this is now before the destruction of registry? |
||
} | ||
catch (IException *e) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NB: not absolutely required in this commit, but related. This code should have handled the workunit having put in the failed state before. The only downside before was the production of an extra spurious "Query failed, state: ..." exception