From eaad280239d075d3dfaf7a9e116ca22babefe918 Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Mon, 5 Feb 2024 15:01:19 +0200 Subject: [PATCH] Retain the error message in task execution even when next pod succeeds --- CHANGELOG.md | 1 + Makefile | 2 +- go.mod | 2 +- .../control/cassandratask_controller.go | 61 ++++++++++--------- .../control/cassandratask_controller_test.go | 16 +++-- pkg/httphelper/server_test_utils.go | 2 +- 6 files changed, 46 insertions(+), 38 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 598ce5c24..0c1b7237d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ Changelog for Cass Operator, new PRs should update the `main / unreleased` secti * [FEATURE] [#601](https://github.com/k8ssandra/cass-operator/pull/601) Add additionalAnnotations field to CR so that all resources created by the operator can be annotated. * [BUGFIX] [#607](https://github.com/k8ssandra/cass-operator/issues/607) Add missing additional labels and annotations to the superuserSecret. +* [BUGFIX] [#612](https://github.com/k8ssandra/cass-operator/issues/612) Improve error message handling in the task jobs by retaining the message that previously failed pod has generated ## v1.18.2 diff --git a/Makefile b/Makefile index d12fb0d96..4917407c0 100644 --- a/Makefile +++ b/Makefile @@ -245,7 +245,7 @@ CONTROLLER_TOOLS_VERSION ?= v0.12.0 OPERATOR_SDK_VERSION ?= 1.29.0 HELM_VERSION ?= 3.12.0 OPM_VERSION ?= 1.26.5 -GOLINT_VERSION ?= 1.52.2 +GOLINT_VERSION ?= 1.55.2 .PHONY: cert-manager cert-manager: ## Install cert-manager to the cluster diff --git a/go.mod b/go.mod index 540cdad16..00d827633 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( require ( github.com/Jeffail/gabs/v2 v2.7.0 github.com/onsi/ginkgo/v2 v2.9.4 + github.com/prometheus/client_golang v1.15.1 go.uber.org/zap v1.24.0 golang.org/x/mod v0.12.0 k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 @@ -55,7 +56,6 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_golang v1.15.1 // indirect github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.42.0 // indirect github.com/prometheus/procfs v0.9.0 // indirect diff --git a/internal/controllers/control/cassandratask_controller.go b/internal/controllers/control/cassandratask_controller.go index 78b1803f0..c99207df9 100644 --- a/internal/controllers/control/cassandratask_controller.go +++ b/internal/controllers/control/cassandratask_controller.go @@ -262,6 +262,7 @@ func (r *CassandraTaskReconciler) Reconcile(ctx context.Context, req ctrl.Reques taskId := string(cassTask.UID) var err error + var errMsg string var failed, completed int JobDefinition: for _, job := range cassTask.Spec.Jobs { @@ -337,7 +338,7 @@ JobDefinition: } } - res, failed, completed, err = r.reconcileEveryPodTask(ctx, dc, taskConfig) + res, failed, completed, errMsg, err = r.reconcileEveryPodTask(ctx, dc, taskConfig) if err != nil { return ctrl.Result{}, err @@ -368,7 +369,6 @@ JobDefinition: SetCondition(&cassTask, api.JobRunning, metav1.ConditionFalse, "") if failed > 0 { - errMsg := "" if err != nil { errMsg = err.Error() } @@ -579,13 +579,13 @@ func (r *CassandraTaskReconciler) cleanupJobAnnotations(ctx context.Context, dc } // reconcileEveryPodTask executes the given task against all the Datacenter pods -func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc *cassapi.CassandraDatacenter, taskConfig *TaskConfiguration) (ctrl.Result, int, int, error) { +func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc *cassapi.CassandraDatacenter, taskConfig *TaskConfiguration) (ctrl.Result, int, int, string, error) { logger := log.FromContext(ctx) // We sort to ensure we process the dcPods in the same order dcPods, err := r.getDatacenterPods(ctx, dc) if err != nil { - return ctrl.Result{}, 0, 0, err + return ctrl.Result{}, 0, 0, "", err } sort.Slice(dcPods, func(i, j int) bool { @@ -601,10 +601,11 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc nodeMgmtClient, err := httphelper.NewMgmtClient(ctx, r.Client, dc) if err != nil { - return ctrl.Result{}, 0, 0, err + return ctrl.Result{}, 0, 0, "", err } failed, completed := 0, 0 + errMsg := "" for idx, pod := range dcPods { // TODO Do we need post-pod processing functionality also? In case we need to wait for some other event to happen (processed by cass-operator). @@ -617,7 +618,7 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc features, err := nodeMgmtClient.FeatureSet(&pod) if err != nil { - return ctrl.Result{}, failed, completed, err + return ctrl.Result{}, failed, completed, errMsg, err } if pod.Annotations == nil { @@ -626,7 +627,7 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc jobStatus, err := GetJobStatusFromPodAnnotations(taskConfig.Id, pod.Annotations) if err != nil { - return ctrl.Result{}, failed, completed, err + return ctrl.Result{}, failed, completed, errMsg, err } if jobStatus.Id != "" { @@ -645,7 +646,7 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc details, err := nodeMgmtClient.JobDetails(&pod, jobStatus.Id) if err != nil { logger.Error(err, "Could not get JobDetails for pod", "Pod", pod) - return ctrl.Result{}, failed, completed, err + return ctrl.Result{}, failed, completed, errMsg, err } if details.Id == "" { @@ -653,12 +654,14 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc delete(pod.Annotations, getJobAnnotationKey(taskConfig.Id)) err = r.Client.Update(ctx, &pod) if err != nil { - return ctrl.Result{}, failed, completed, err + return ctrl.Result{}, failed, completed, errMsg, err } - return ctrl.Result{RequeueAfter: 1 * time.Second}, failed, completed, nil + return ctrl.Result{RequeueAfter: 1 * time.Second}, failed, completed, errMsg, nil } else if details.Status == podJobError { // Log the error, move on - logger.Error(fmt.Errorf("task failed: %s", details.Error), "Job failed to successfully complete the task", "Pod", pod) + errMsg = details.Error + err = fmt.Errorf("task failed: %s", errMsg) + logger.Error(err, "Job failed to successfully complete the task", "Pod", pod) if taskConfig.RestartPolicy != corev1.RestartPolicyOnFailure || jobStatus.Retries >= 1 { jobStatus.Status = podJobError } else { @@ -669,11 +672,11 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc } if err = JobStatusToPodAnnotations(taskConfig.Id, pod.Annotations, jobStatus); err != nil { - return ctrl.Result{}, failed, completed, err + return ctrl.Result{}, failed, completed, errMsg, err } if err = r.Client.Update(ctx, &pod); err != nil { - return ctrl.Result{}, failed, completed, err + return ctrl.Result{}, failed, completed, errMsg, err } if jobStatus.Status == podJobError { @@ -684,32 +687,32 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc // Pod has finished, remove the job_id and let us move to the next pod jobStatus.Status = podJobCompleted if err = JobStatusToPodAnnotations(taskConfig.Id, pod.Annotations, jobStatus); err != nil { - return ctrl.Result{}, failed, completed, err + return ctrl.Result{}, failed, completed, errMsg, err } if err = r.Client.Update(ctx, &pod); err != nil { - return ctrl.Result{}, failed, completed, err + return ctrl.Result{}, failed, completed, errMsg, err } completed++ continue } else if details.Status == podJobWaiting { // Job is still running or waiting - return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, nil + return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, errMsg, nil } } else { if len(jobRunner) > 0 { // Something is still holding the worker - return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, nil + return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, errMsg, nil } // Nothing is holding the job, this pod has finished jobStatus.Status = podJobCompleted if err = JobStatusToPodAnnotations(taskConfig.Id, pod.Annotations, jobStatus); err != nil { - return ctrl.Result{}, failed, completed, err + return ctrl.Result{}, failed, completed, errMsg, err } if err = r.Client.Update(ctx, &pod); err != nil { - return ctrl.Result{}, failed, completed, err + return ctrl.Result{}, failed, completed, errMsg, err } completed++ continue @@ -720,24 +723,24 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc // Pod isn't running anything at the moment, this pod should run next jobId, err := taskConfig.AsyncFunc(nodeMgmtClient, &pod, taskConfig) if err != nil { - return ctrl.Result{}, failed, completed, err + return ctrl.Result{}, failed, completed, errMsg, err } jobStatus.Handler = jobHandlerMgmtApi jobStatus.Id = jobId if err = JobStatusToPodAnnotations(taskConfig.Id, pod.Annotations, jobStatus); err != nil { - return ctrl.Result{}, failed, completed, err + return ctrl.Result{}, failed, completed, errMsg, err } err = r.Client.Update(ctx, &pod) if err != nil { logger.Error(err, "Failed to patch pod's status to include jobId", "Pod", pod) - return ctrl.Result{}, failed, completed, err + return ctrl.Result{}, failed, completed, errMsg, err } } else { if len(jobRunner) > 0 { // Something is still holding the worker - return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, nil + return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, errMsg, nil } if taskConfig.SyncFunc == nil { @@ -745,7 +748,7 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc err := fmt.Errorf("this job isn't supported by the target pod") logger.Error(err, "unable to execute requested job against pod", "Pod", pod) failed++ - return ctrl.Result{}, failed, completed, err + return ctrl.Result{}, failed, completed, errMsg, err } jobId := strconv.Itoa(idx) @@ -755,13 +758,13 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc jobStatus.Id = jobId if err = JobStatusToPodAnnotations(taskConfig.Id, pod.Annotations, jobStatus); err != nil { - return ctrl.Result{}, failed, completed, err + return ctrl.Result{}, failed, completed, errMsg, err } err = r.Client.Update(ctx, &pod) if err != nil { logger.Error(err, "Failed to patch pod's status to indicate its running a local job", "Pod", pod) - return ctrl.Result{}, failed, completed, err + return ctrl.Result{}, failed, completed, errMsg, err } pod := pod @@ -806,13 +809,13 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc } // We have a job going on, return back later to check the status - return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, nil + return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, errMsg, nil } if len(jobRunner) > 0 { // Something is still holding the worker while none of the existing pods are, probably the replace job. - return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, nil + return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, errMsg, nil } - return ctrl.Result{}, failed, completed, nil + return ctrl.Result{}, failed, completed, errMsg, nil } diff --git a/internal/controllers/control/cassandratask_controller_test.go b/internal/controllers/control/cassandratask_controller_test.go index 75912fab4..b157bd3ea 100644 --- a/internal/controllers/control/cassandratask_controller_test.go +++ b/internal/controllers/control/cassandratask_controller_test.go @@ -473,7 +473,9 @@ var _ = Describe("CassandraTask controller tests", func() { Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", nodeCount)) Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount)) - Expect(completedTask.Status.Failed).To(BeNumerically(">=", nodeCount)) + Expect(completedTask.Status.Failed).To(BeNumerically("==", nodeCount)) + Expect(completedTask.Status.Conditions[2].Type).To(Equal(string(api.JobFailed))) + Expect(completedTask.Status.Conditions[2].Message).To(Equal("any error")) }) It("If retryPolicy is set, we should see a retry", func() { By("Creating fake mgmt-api server") @@ -492,12 +494,14 @@ var _ = Describe("CassandraTask controller tests", func() { completedTask := waitForTaskCompletion(taskKey) - // Due to retry, we have double the amount of calls - Expect(callDetails.URLCounts["/api/v1/ops/keyspace/cleanup"]).To(Equal(nodeCount * 2)) - Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", nodeCount*2)) - Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount*2)) + // Due to retry, we try twice and then bail out + Expect(callDetails.URLCounts["/api/v1/ops/keyspace/cleanup"]).To(Equal(2 * nodeCount)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", 2*nodeCount)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 2*nodeCount)) - Expect(completedTask.Status.Failed).To(BeNumerically(">=", nodeCount)) + Expect(completedTask.Status.Failed).To(BeNumerically("==", nodeCount)) + Expect(completedTask.Status.Conditions[2].Type).To(Equal(string(api.JobFailed))) + Expect(completedTask.Status.Conditions[2].Message).To(Equal("any error")) }) It("Replace a node in the datacenter without specifying the pod", func() { testFailedNamespaceName := fmt.Sprintf("test-task-failed-%d", rand.Int31()) diff --git a/pkg/httphelper/server_test_utils.go b/pkg/httphelper/server_test_utils.go index 7b555aae0..26cf8909a 100644 --- a/pkg/httphelper/server_test_utils.go +++ b/pkg/httphelper/server_test_utils.go @@ -26,7 +26,7 @@ var featuresReply = `{ var jobDetailsCompleted = `{"submit_time":"1638545895255","end_time":"1638545895255","id":"%s","type":"Cleanup","status":"COMPLETED"}` -var jobDetailsFailed = `{"submit_time":"1638545895255","end_time":"1638545895255","id":"%s","type":"Cleanup","status":"ERROR"}` +var jobDetailsFailed = `{"submit_time":"1638545895255","end_time":"1638545895255","id":"%s","type":"Cleanup","status":"ERROR","error":"any error"}` func mgmtApiListener() (net.Listener, error) { mgmtApiListener, err := net.Listen("tcp", "127.0.0.1:8080")