Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retain the error message in task execution even when next pod succeeds #614

Merged
merged 1 commit into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Changelog for Cass Operator, new PRs should update the `main / unreleased` secti

* [FEATURE] [#601](https://github.com/k8ssandra/cass-operator/pull/601) Add additionalAnnotations field to CR so that all resources created by the operator can be annotated.
* [BUGFIX] [#607](https://github.com/k8ssandra/cass-operator/issues/607) Add missing additional labels and annotations to the superuserSecret.
* [BUGFIX] [#612](https://github.com/k8ssandra/cass-operator/issues/612) Improve error message handling in the task jobs by retaining the message that previously failed pod has generated

## v1.18.2

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ CONTROLLER_TOOLS_VERSION ?= v0.12.0
OPERATOR_SDK_VERSION ?= 1.29.0
HELM_VERSION ?= 3.12.0
OPM_VERSION ?= 1.26.5
GOLINT_VERSION ?= 1.52.2
GOLINT_VERSION ?= 1.55.2

.PHONY: cert-manager
cert-manager: ## Install cert-manager to the cluster
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
require (
github.com/Jeffail/gabs/v2 v2.7.0
github.com/onsi/ginkgo/v2 v2.9.4
github.com/prometheus/client_golang v1.15.1
go.uber.org/zap v1.24.0
golang.org/x/mod v0.12.0
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2
Expand Down Expand Up @@ -55,7 +56,6 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.15.1 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
Expand Down
61 changes: 32 additions & 29 deletions internal/controllers/control/cassandratask_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ func (r *CassandraTaskReconciler) Reconcile(ctx context.Context, req ctrl.Reques
taskId := string(cassTask.UID)

var err error
var errMsg string
var failed, completed int
JobDefinition:
for _, job := range cassTask.Spec.Jobs {
Expand Down Expand Up @@ -337,7 +338,7 @@ JobDefinition:
}
}

res, failed, completed, err = r.reconcileEveryPodTask(ctx, dc, taskConfig)
res, failed, completed, errMsg, err = r.reconcileEveryPodTask(ctx, dc, taskConfig)

if err != nil {
return ctrl.Result{}, err
Expand Down Expand Up @@ -368,7 +369,6 @@ JobDefinition:
SetCondition(&cassTask, api.JobRunning, metav1.ConditionFalse, "")

if failed > 0 {
errMsg := ""
if err != nil {
errMsg = err.Error()
}
Expand Down Expand Up @@ -579,13 +579,13 @@ func (r *CassandraTaskReconciler) cleanupJobAnnotations(ctx context.Context, dc
}

// reconcileEveryPodTask executes the given task against all the Datacenter pods
func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc *cassapi.CassandraDatacenter, taskConfig *TaskConfiguration) (ctrl.Result, int, int, error) {
func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc *cassapi.CassandraDatacenter, taskConfig *TaskConfiguration) (ctrl.Result, int, int, string, error) {
logger := log.FromContext(ctx)

// We sort to ensure we process the dcPods in the same order
dcPods, err := r.getDatacenterPods(ctx, dc)
if err != nil {
return ctrl.Result{}, 0, 0, err
return ctrl.Result{}, 0, 0, "", err
}

sort.Slice(dcPods, func(i, j int) bool {
Expand All @@ -601,10 +601,11 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc

nodeMgmtClient, err := httphelper.NewMgmtClient(ctx, r.Client, dc)
if err != nil {
return ctrl.Result{}, 0, 0, err
return ctrl.Result{}, 0, 0, "", err
}

failed, completed := 0, 0
errMsg := ""

for idx, pod := range dcPods {
// TODO Do we need post-pod processing functionality also? In case we need to wait for some other event to happen (processed by cass-operator).
Expand All @@ -617,7 +618,7 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc

features, err := nodeMgmtClient.FeatureSet(&pod)
if err != nil {
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}

if pod.Annotations == nil {
Expand All @@ -626,7 +627,7 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc

jobStatus, err := GetJobStatusFromPodAnnotations(taskConfig.Id, pod.Annotations)
if err != nil {
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}

if jobStatus.Id != "" {
Expand All @@ -645,20 +646,22 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc
details, err := nodeMgmtClient.JobDetails(&pod, jobStatus.Id)
if err != nil {
logger.Error(err, "Could not get JobDetails for pod", "Pod", pod)
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}

if details.Id == "" {
// This job was not found, pod most likely restarted. Let's retry..
delete(pod.Annotations, getJobAnnotationKey(taskConfig.Id))
err = r.Client.Update(ctx, &pod)
if err != nil {
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}
return ctrl.Result{RequeueAfter: 1 * time.Second}, failed, completed, nil
return ctrl.Result{RequeueAfter: 1 * time.Second}, failed, completed, errMsg, nil
} else if details.Status == podJobError {
// Log the error, move on
logger.Error(fmt.Errorf("task failed: %s", details.Error), "Job failed to successfully complete the task", "Pod", pod)
errMsg = details.Error
err = fmt.Errorf("task failed: %s", errMsg)
logger.Error(err, "Job failed to successfully complete the task", "Pod", pod)
if taskConfig.RestartPolicy != corev1.RestartPolicyOnFailure || jobStatus.Retries >= 1 {
jobStatus.Status = podJobError
} else {
Expand All @@ -669,11 +672,11 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc
}

if err = JobStatusToPodAnnotations(taskConfig.Id, pod.Annotations, jobStatus); err != nil {
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}

if err = r.Client.Update(ctx, &pod); err != nil {
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}

if jobStatus.Status == podJobError {
Expand All @@ -684,32 +687,32 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc
// Pod has finished, remove the job_id and let us move to the next pod
jobStatus.Status = podJobCompleted
if err = JobStatusToPodAnnotations(taskConfig.Id, pod.Annotations, jobStatus); err != nil {
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}

if err = r.Client.Update(ctx, &pod); err != nil {
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}
completed++
continue
} else if details.Status == podJobWaiting {
// Job is still running or waiting
return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, nil
return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, errMsg, nil
}
} else {
if len(jobRunner) > 0 {
// Something is still holding the worker
return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, nil
return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, errMsg, nil
}

// Nothing is holding the job, this pod has finished
jobStatus.Status = podJobCompleted
if err = JobStatusToPodAnnotations(taskConfig.Id, pod.Annotations, jobStatus); err != nil {
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}

if err = r.Client.Update(ctx, &pod); err != nil {
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}
completed++
continue
Expand All @@ -720,32 +723,32 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc
// Pod isn't running anything at the moment, this pod should run next
jobId, err := taskConfig.AsyncFunc(nodeMgmtClient, &pod, taskConfig)
if err != nil {
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}
jobStatus.Handler = jobHandlerMgmtApi
jobStatus.Id = jobId

if err = JobStatusToPodAnnotations(taskConfig.Id, pod.Annotations, jobStatus); err != nil {
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}

err = r.Client.Update(ctx, &pod)
if err != nil {
logger.Error(err, "Failed to patch pod's status to include jobId", "Pod", pod)
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}
} else {
if len(jobRunner) > 0 {
// Something is still holding the worker
return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, nil
return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, errMsg, nil
}

if taskConfig.SyncFunc == nil {
// This feature is not supported in sync mode, mark everything as done
err := fmt.Errorf("this job isn't supported by the target pod")
logger.Error(err, "unable to execute requested job against pod", "Pod", pod)
failed++
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}

jobId := strconv.Itoa(idx)
Expand All @@ -755,13 +758,13 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc
jobStatus.Id = jobId

if err = JobStatusToPodAnnotations(taskConfig.Id, pod.Annotations, jobStatus); err != nil {
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}

err = r.Client.Update(ctx, &pod)
if err != nil {
logger.Error(err, "Failed to patch pod's status to indicate its running a local job", "Pod", pod)
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}

pod := pod
Expand Down Expand Up @@ -806,13 +809,13 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc
}

// We have a job going on, return back later to check the status
return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, nil
return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, errMsg, nil
}

if len(jobRunner) > 0 {
// Something is still holding the worker while none of the existing pods are, probably the replace job.
return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, nil
return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, errMsg, nil
}

return ctrl.Result{}, failed, completed, nil
return ctrl.Result{}, failed, completed, errMsg, nil
}
16 changes: 10 additions & 6 deletions internal/controllers/control/cassandratask_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,9 @@ var _ = Describe("CassandraTask controller tests", func() {
Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", nodeCount))
Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount))

Expect(completedTask.Status.Failed).To(BeNumerically(">=", nodeCount))
Expect(completedTask.Status.Failed).To(BeNumerically("==", nodeCount))
Expect(completedTask.Status.Conditions[2].Type).To(Equal(string(api.JobFailed)))
Expect(completedTask.Status.Conditions[2].Message).To(Equal("any error"))
})
It("If retryPolicy is set, we should see a retry", func() {
By("Creating fake mgmt-api server")
Expand All @@ -492,12 +494,14 @@ var _ = Describe("CassandraTask controller tests", func() {

completedTask := waitForTaskCompletion(taskKey)

// Due to retry, we have double the amount of calls
Expect(callDetails.URLCounts["/api/v1/ops/keyspace/cleanup"]).To(Equal(nodeCount * 2))
Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", nodeCount*2))
Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount*2))
// Due to retry, we try twice and then bail out
Expect(callDetails.URLCounts["/api/v1/ops/keyspace/cleanup"]).To(Equal(2 * nodeCount))
Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", 2*nodeCount))
Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 2*nodeCount))

Expect(completedTask.Status.Failed).To(BeNumerically(">=", nodeCount))
Expect(completedTask.Status.Failed).To(BeNumerically("==", nodeCount))
Expect(completedTask.Status.Conditions[2].Type).To(Equal(string(api.JobFailed)))
Expect(completedTask.Status.Conditions[2].Message).To(Equal("any error"))
})
It("Replace a node in the datacenter without specifying the pod", func() {
testFailedNamespaceName := fmt.Sprintf("test-task-failed-%d", rand.Int31())
Expand Down
2 changes: 1 addition & 1 deletion pkg/httphelper/server_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var featuresReply = `{

var jobDetailsCompleted = `{"submit_time":"1638545895255","end_time":"1638545895255","id":"%s","type":"Cleanup","status":"COMPLETED"}`

var jobDetailsFailed = `{"submit_time":"1638545895255","end_time":"1638545895255","id":"%s","type":"Cleanup","status":"ERROR"}`
var jobDetailsFailed = `{"submit_time":"1638545895255","end_time":"1638545895255","id":"%s","type":"Cleanup","status":"ERROR","error":"any error"}`

func mgmtApiListener() (net.Listener, error) {
mgmtApiListener, err := net.Listen("tcp", "127.0.0.1:8080")
Expand Down
Loading