Skip to content

Commit

Permalink
HPCC-32657 review changes
Browse files Browse the repository at this point in the history
Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
  • Loading branch information
jakesmith committed Sep 24, 2024
1 parent d464c56 commit 7869ef7
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 126 deletions.
30 changes: 5 additions & 25 deletions helm/hpcc/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -799,23 +799,15 @@ Pass in dict with root, component (in case of error), optional (true if daliArg
{{- end -}}

{{/*
Returns the image pattern
Get image name
*/}}
{{- define "hpcc.getImagePattern" -}}
{{- define "hpcc.imageName" -}}
{{- /* Pass in a dictionary with root and me defined */ -}}
{{- $imageRoot := "" -}}
{{- $imageName := "" -}}
{{- $imageVersion := "" -}}
{{- if .me.image -}}
{{- $imageRoot = .me.image.root | default .root.Values.global.image.root | default "hpccsystems" -}}
{{- $imageName = .me.image.name | default .root.Values.global.image.name | default "platform-core" -}}
{{- $imageVersion = .me.image.version | default .root.Values.global.image.version | default .root.Chart.Version -}}
{{ .me.image.root | default .root.Values.global.image.root | default "hpccsystems" }}/{{ .me.image.name | default .root.Values.global.image.name | default "platform-core" }}:{{ .me.image.version | default .root.Values.global.image.version | default .root.Chart.Version }}
{{- else -}}
{{- $imageRoot = .root.Values.global.image.root | default "hpccsystems" -}}
{{- $imageName = .root.Values.global.image.name | default "platform-core" -}}
{{- $imageVersion = .root.Values.global.image.version | default .root.Chart.Version -}}
{{ .root.Values.global.image.root | default "hpccsystems" }}/{{ .root.Values.global.image.name | default "platform-core" }}:{{ .root.Values.global.image.version | default .root.Chart.Version }}
{{- end -}}
{{- printf "%s/%s:%s" $imageRoot $imageName $imageVersion -}}
{{- end -}}

{{/*
Expand All @@ -824,19 +816,7 @@ Generates imagePattern as env variable
{{- define "hpcc.generateImageEnv" -}}
{{- /* Pass in a dictionary with root and me defined */ -}}
- name: imagePattern
value: {{ include "hpcc.getImagePattern" . }}
{{- end -}}

{{/*
Get image name
*/}}
{{- define "hpcc.imageName" -}}
{{- /* Pass in a dictionary with root and me defined */ -}}
{{- if .me.image -}}
{{ .me.image.root | default .root.Values.global.image.root | default "hpccsystems" }}/{{ .me.image.name | default .root.Values.global.image.name | default "platform-core" }}:{{ .me.image.version | default .root.Values.global.image.version | default .root.Chart.Version }}
{{- else -}}
{{ .root.Values.global.image.root | default "hpccsystems" }}/{{ .root.Values.global.image.name | default "platform-core" }}:{{ .root.Values.global.image.version | default .root.Chart.Version }}
{{- end -}}
value: {{ include "hpcc.imageName" . }}
{{- end -}}

{{/*
Expand Down
10 changes: 2 additions & 8 deletions system/jlib/jcontainerized.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,23 +229,17 @@ bool applyYaml(const char *componentName, const char *wuid, const char *job, con
const char *imagePattern = getenv("imagePattern");
if (imagePattern)
{
DBGLOG("Changing image version");
DBGLOG("Old yaml");
PROGLOG("%s", jobYaml.str());
PROGLOG("========");
const char *imageVersionStart = strstr(imagePattern, ":");
if (imageVersionStart)
{
VStringBuffer oriImagePatternSpec("image: %s", imagePattern);
VStringBuffer newImagePatternSpec("image: %.*s:%s", (int)(imageVersionStart-imagePattern), imagePattern, p.second.c_str());
jobYaml.replaceString(oriImagePatternSpec, newImagePatternSpec);
DBGLOG("New yaml");
PROGLOG("%s", jobYaml.str());
PROGLOG("========");
DBGLOG("Job image version changed from '%s' to '%s'", imageVersionStart, p.second.c_str());
}
}
}
if (hasPrefix(p.first.c_str(), "_HPCC_", false)) // job yaml substitution
else if (hasPrefix(p.first.c_str(), "_HPCC_", false)) // job yaml substitution
jobYaml.replaceString(p.first.c_str(), p.second.c_str());
else
args.append(" \"--").append(p.first.c_str()).append('=').append(p.second.c_str()).append("\"");
Expand Down
179 changes: 86 additions & 93 deletions thorlcr/master/thgraphmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1499,118 +1499,111 @@ void thorMain(ILogMsgHandler *logHandler, const char *wuid, const char *graphNam
{
SCMStringBuffer jobVersion;
workunit->getDebugValue("platformVersion", jobVersion);
if (jobVersion.length() && !thorQueue)
bool mismatch = false;
const char *imagePattern = getenv("imagePattern");
const char *imageVersion = nullptr;
if (isEmptyString(imagePattern))
{
Owned<IException> exception = MakeStringException(TE_AbortException, "Custom platformVersion with multiJobLinger=false not supported");
relayWuidException(workunit, exception);
IWARNLOG("imagePattern not set in environment");
imageVersion = "unknown";
runtimePlatformVersion.set(imageVersion);
}
else
{
bool mismatch = false;
const char *imagePattern = getenv("imagePattern");
const char *imageVersion = nullptr;
if (isEmptyString(imagePattern))
IWARNLOG("imagePattern not set in environment");
imageVersion = strrchr(imagePattern, ':');
if (!imageVersion)
{
IWARNLOG("imagePattern has unexpected format: %s", imagePattern);
imageVersion = "unknown";
runtimePlatformVersion.set(imageVersion);
}
else
{
imageVersion = strrchr(imagePattern, ':');
if (!imageVersion)
IWARNLOG("imagePattern has unexpected format: %s", imagePattern);
else
imageVersion++;
if (jobVersion.length())
{
imageVersion++;
if (jobVersion.length())
{
if (0 == runtimePlatformVersion.length()) // 1st time
{
// check if jobVersion is same as original helm chart image
// if it is, then we are not a custom runtime version
if (!streq(jobVersion.str(), imageVersion))
{
// record the runtime job version, to be checked on subsequent runs
runtimePlatformVersion.set(jobVersion.str());
}
}
else if (!streq(jobVersion.str(), runtimePlatformVersion.str()))
mismatch = true;
}
else
if (0 == runtimePlatformVersion.length()) // 1st time
runtimePlatformVersion.set(jobVersion.str());
else if (!streq(jobVersion.str(), runtimePlatformVersion.str()))
mismatch = true;
}
else if (runtimePlatformVersion.length())
{
if (!streq(imageVersion, runtimePlatformVersion.str())) // can only happen if a previous job had launched instance with a jobVersion
{
if (runtimePlatformVersion.length())
{
if (!streq(imageVersion, runtimePlatformVersion.str()))
{
// this is a custom runtime version, but this job has not specified a jobVersion
// therefore we mismatch
mismatch = true;
}
}
else
runtimePlatformVersion.set(imageVersion);
// this is a custom runtime version, but this job has not specified a jobVersion
// therefore we mismatch
mismatch = true;
}
}
else
runtimePlatformVersion.set(imageVersion);
}
}

if (mismatch)
{
// This Thor has picked up a job that has submitted with a different #option platformVersion.
// requeue it, and wait a bit, so that it can either be picked up by an existing compatible Thor, or
// an agent

VStringBuffer job("%s/%s/%s", currentWfId.str(), currentWuid.str(), currentGraphName.str());
Owned<IJobQueueItem> item = createJobQueueItem(job);
item->setOwner(workunit->queryUser());
item->setPriority(workunit->getPriorityValue());
thorQueue->enqueue(item.getClear());
currentWuid.clear();
constexpr unsigned pauseSecs = 10;
const char *version = jobVersion.length() ? jobVersion.str() : imageVersion;
WARNLOG("Job=%s requeued due to version mismatch (this Thor version=%s, Job version=%s). Pausing for %u seconds", job.str(), runtimePlatformVersion.str(), version, pauseSecs);
MilliSleep(pauseSecs*1000);
}
if (mismatch)
{
assertex(thorQueue); // it should never be possible for a non-lingering Thor to have a mismatch

// This Thor has picked up a job that has submitted with a different #option platformVersion.
// requeue it, and wait a bit, so that it can either be picked up by an existing compatible Thor, or
// an agent

VStringBuffer job("%s/%s/%s", currentWfId.str(), currentWuid.str(), currentGraphName.str());
Owned<IJobQueueItem> item = createJobQueueItem(job);
item->setOwner(workunit->queryUser());
item->setPriority(workunit->getPriorityValue());
thorQueue->enqueue(item.getClear());
currentWuid.clear();
constexpr unsigned pauseSecs = 10;
if (jobVersion.length())
WARNLOG("Job=%s requeued due to version mismatch (this Thor version=%s, Job version=%s). Pausing for %u seconds", job.str(), runtimePlatformVersion.str(), jobVersion.str(), pauseSecs);
else
{
JobNameScope activeJobName(currentWuid.str());
saveWuidToFile(currentWuid);
VStringBuffer msg("Executing: wuid=%s, graph=%s", currentWuid.str(), currentGraphName.str());
if (!streq(imageVersion, runtimePlatformVersion.str()))
msg.appendf(" (custom runtime version=%s)", runtimePlatformVersion.str());
PROGLOG("%s", msg.str());
WARNLOG("Job=%s requeued due to version mismatch (this Thor version=%s, Job version not specified, uses helm version=%s). Pausing for %u seconds", job.str(), runtimePlatformVersion.str(), imageVersion, pauseSecs);
MilliSleep(pauseSecs*1000);
}
else
{
JobNameScope activeJobName(currentWuid.str());
saveWuidToFile(currentWuid);
VStringBuffer msg("Executing: wuid=%s, graph=%s", currentWuid.str(), currentGraphName.str());
if (!streq(imageVersion, runtimePlatformVersion.str()))
msg.appendf(" (custom runtime version=%s)", runtimePlatformVersion.str());
PROGLOG("%s", msg.str());

{
Owned<IWorkUnit> wu = &workunit->lock();
publishPodNames(wu, currentGraphName, nullptr);
}
SocketEndpoint dummyAgentEp;
jobManager->execute(workunit, currentWuid, currentGraphName, dummyAgentEp);
{
Owned<IWorkUnit> wu = &workunit->lock();
publishPodNames(wu, currentGraphName, nullptr);
}
SocketEndpoint dummyAgentEp;
jobManager->execute(workunit, currentWuid, currentGraphName, dummyAgentEp);

Owned<IWorkUnit> w = &workunit->lock();
if (!multiJobLinger && lingerPeriod)
w->setDebugValue(instance, "1", true);
Owned<IWorkUnit> w = &workunit->lock();
if (!multiJobLinger && lingerPeriod)
w->setDebugValue(instance, "1", true);

if (jobManager->queryExitException())
{
// NB: exitException has already been relayed.
jobManager->clearExitException();
}
else
if (jobManager->queryExitException())
{
// NB: exitException has already been relayed.
jobManager->clearExitException();
}
else
{
switch (w->getState())
{
switch (w->getState())
{
case WUStateRunning:
w->setState(WUStateWait);
break;
case WUStateAborting:
case WUStateAborted:
case WUStateFailed:
break;
default:
w->setState(WUStateFailed);
break;
}
case WUStateRunning:
w->setState(WUStateWait);
break;
case WUStateAborting:
case WUStateAborted:
case WUStateFailed:
break;
default:
w->setState(WUStateFailed);
break;
}
lingerTimer.reset(lingerPeriod);
}
lingerTimer.reset(lingerPeriod);
}
}
}
Expand Down

0 comments on commit 7869ef7

Please sign in to comment.