Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-30648 Check worker job and abort if unhealthy. #18067

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions system/jlib/jcontainerized.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ bool checkExitCodes(StringBuffer &output, const char *podStatuses)
return false;
}

void waitJob(const char *componentName, const char *resourceType, const char *job, unsigned pendingTimeoutSecs, KeepJobs keepJob)
void waitJob(const char *componentName, const char *resourceType, const char *job, unsigned pendingTimeoutSecs, unsigned totalWaitTimeSecs, KeepJobs keepJob)
{
VStringBuffer jobName("%s-%s-%s", componentName, resourceType, job);
jobName.toLowerCase();
Expand Down Expand Up @@ -160,6 +160,11 @@ void waitJob(const char *componentName, const char *resourceType, const char *jo
runKubectlCommand(componentName, getReason, nullptr, &output.clear());
throw makeStringExceptionV(0, "Failed to run %s - pod not scheduled after %u seconds: %s ", jobName.str(), pendingTimeoutSecs, output.str());
}
if (0 == totalWaitTimeSecs)
break;
if ((INFINITE != totalWaitTimeSecs) && msTick()-start > totalWaitTimeSecs*1000)
throw makeStringExceptionV(0, "Wait job timeout (%u secs) expired, whilst running: %s", totalWaitTimeSecs, jobName.str());

MilliSleep(delay);
if (delay < 10000)
delay = delay * 2;
Expand Down Expand Up @@ -236,7 +241,7 @@ void runJob(const char *componentName, const char *wuid, const char *jobName, co
Owned<IException> exception;
try
{
waitJob(componentName, "job", jobName, pendingTimeoutSecs, keepJob);
waitJob(componentName, "job", jobName, pendingTimeoutSecs, INFINITE, keepJob);
}
catch (IException *e)
{
Expand Down
2 changes: 1 addition & 1 deletion system/jlib/jcontainerized.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jlib_decl KeepJobs translateKeepJobs(const char *keepJobs);

jlib_decl bool isActiveService(const char *serviceName);
jlib_decl void deleteResource(const char *componentName, const char *job, const char *resource);
jlib_decl void waitJob(const char *componentName, const char *job, unsigned pendingTimeoutSecs, KeepJobs keepJob);
jlib_decl void waitJob(const char *componentName, const char *resourceType, const char *job, unsigned pendingTimeoutSecs, unsigned totalWaitTimeSecs, KeepJobs keepJob);
jlib_decl bool applyYaml(const char *componentName, const char *wuid, const char *job, const char *resourceType, const std::list<std::pair<std::string, std::string>> &extraParams, bool optional, bool autoCleanup);
jlib_decl void runJob(const char *componentName, const char *wuid, const char *job, const std::list<std::pair<std::string, std::string>> &extraParams={});

Expand Down
90 changes: 56 additions & 34 deletions thorlcr/master/thmastermain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ class CThorEndHandler : implements IThreaded
};

static CThorEndHandler *thorEndHandler = nullptr;
static StringBuffer cloudJobName;

MODULE_INIT(INIT_PRIORITY_STANDARD)
{
Expand Down Expand Up @@ -266,58 +267,80 @@ class CRegistryServer : public CSimpleInterface
}
bool connect(unsigned slaves)
{
LOG(MCdebugProgress, thorJob, "Waiting for %d slaves to register", slaves);

IPointerArrayOf<INode> connectedSlaves;
connectedSlaves.ensureCapacity(slaves);
unsigned remaining = slaves;
INode *_sender = nullptr;
CMessageBuffer msg;

// Will wait for all workers to register within timelimit (default = 15 mins bare-metal, 60 mins containerized)
constexpr unsigned defaultMaxRegistrationMins = isContainerized() ? 60 : 15;
unsigned maxRegistrationMins = (unsigned)getExpertOptInt64("maxWorkerRegistrationMins", defaultMaxRegistrationMins);
constexpr unsigned oneMinMs = 60000;

PROGLOG("Waiting for %u workers to register - max registration time = %u minutes", slaves, maxRegistrationMins);
CTimeMon registerTM(maxRegistrationMins * oneMinMs);
while (remaining)
{
if (!queryWorldCommunicator().recv(msg, nullptr, MPTAG_THORREGISTRATION, &_sender, MP_WAIT_FOREVER))
// on timeout, check for any failed k8s worker job
if (!queryWorldCommunicator().recv(msg, nullptr, MPTAG_THORREGISTRATION, &_sender, oneMinMs))
{
::Release(_sender);
PROGLOG("Failed to initialize slaves");
return false;
}
Owned<INode> sender = _sender;
if (NotFound != connectedSlaves.find(sender))
{
StringBuffer epStr;
PROGLOG("Same slave registered twice!! : %s", sender->endpoint().getUrlStr(epStr).str());
return false;
}
if (registerTM.timedout())
{
WARNLOG("Timeout waiting for all workers to register within timeout period (%u mins)", maxRegistrationMins);
return false;
}

/* NB: in base metal setup, the slaves know which slave number they are in advance, and send their slavenum at registration.
* In non attached storage setup, they do not send a slave by default and instead are given a # once all are registered
*/
unsigned slaveNum;
msg.read(slaveNum);
StringBuffer slavePodName;
if (NotFound == slaveNum)
{
connectedSlaves.append(sender.getLink());
slaveNum = connectedSlaves.ordinality();
if (isContainerized())
{
msg.read(slavePodName);
addConnectedWorkerPod(slavePodName); // NB: these are added in worker # order
// NB: this is checking for error only, will throw an exception if any found.
k8s::waitJob("thorworker", "job", cloudJobName.str(), 0, 0, k8s::KeepJobs::all);
}

// NB: will not reach here if waitJob fails.
PROGLOG("Waiting for %u remaining workers to register", remaining);
}
else
{
unsigned pos = slaveNum - 1; // NB: slaveNum is 1 based
while (connectedSlaves.ordinality() < pos)
connectedSlaves.append(nullptr);
if (connectedSlaves.ordinality() == pos)
Owned<INode> sender = _sender;
if (NotFound != connectedSlaves.find(sender))
{
StringBuffer epStr;
PROGLOG("Same slave registered twice!! : %s", sender->endpoint().getUrlStr(epStr).str());
return false;
}

/* NB: in base metal setup, the slaves know which slave number they are in advance, and send their slavenum at registration.
* In non attached storage setup, they do not send a slave by default and instead are given a # once all are registered
*/
unsigned slaveNum;
msg.read(slaveNum);
StringBuffer slavePodName;
if (NotFound == slaveNum)
{
connectedSlaves.append(sender.getLink());
slaveNum = connectedSlaves.ordinality();
if (isContainerized())
{
msg.read(slavePodName);
addConnectedWorkerPod(slavePodName); // NB: these are added in worker # order
}
}
else
connectedSlaves.replace(sender.getLink(), pos);
{
unsigned pos = slaveNum - 1; // NB: slaveNum is 1 based
while (connectedSlaves.ordinality() < pos)
connectedSlaves.append(nullptr);
if (connectedSlaves.ordinality() == pos)
connectedSlaves.append(sender.getLink());
else
connectedSlaves.replace(sender.getLink(), pos);
}
StringBuffer epStr;
PROGLOG("Slave %u connected from %s", slaveNum, sender->endpoint().getUrlStr(epStr).str());
--remaining;
}
StringBuffer epStr;
PROGLOG("Slave %u connected from %s", slaveNum, sender->endpoint().getUrlStr(epStr).str());
--remaining;
}
assertex(slaves == connectedSlaves.ordinality());

Expand Down Expand Up @@ -946,7 +969,6 @@ int main( int argc, const char *argv[] )
StringBuffer queueName;

// only for K8s
StringBuffer cloudJobName;
bool workerNSInstalled = false;
bool workerJobInstalled = false;

Expand Down
Loading