Skip to content

Commit

Permalink
fix: improve CRP requeue (#583)
Browse files Browse the repository at this point in the history
Co-authored-by: Zhiying Lin <zhiyinglin@microsoft.com>
Co-authored-by: Ryan Zhang <yangzhangrice@hotmail.com>
  • Loading branch information
3 people authored Oct 31, 2023
1 parent 712cb45 commit 6902bd8
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 10 deletions.
41 changes: 32 additions & 9 deletions pkg/controllers/clusterresourceplacement/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,10 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1beta1.Cluster
if err != nil {
return ctrl.Result{}, err
}
if err := r.setPlacementStatus(ctx, crp, selectedResourceIDs, latestSchedulingPolicySnapshot, latestResourceSnapshot); err != nil {

// isClusterScheduled is to indicate whether we need to requeue the CRP request to track the rollout status.
isClusterScheduled, err := r.setPlacementStatus(ctx, crp, selectedResourceIDs, latestSchedulingPolicySnapshot, latestResourceSnapshot)
if err != nil {
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -216,9 +219,25 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1beta1.Cluster
return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil
}

klog.V(2).InfoS("Placement rollout has not finished yet and requeue the request", "clusterResourcePlacement", crpKObj, "status", crp.Status)
if !isClusterScheduled {
// Note:
// If the scheduledCondition is failed, it means the placement requirement cannot be satisfied fully. For example,
// pickN deployment requires 5 clusters and scheduler schedules the resources on 3 clusters. And the appliedCondition
// could be true when resources are applied successfully on these 3 clusters and the detailed the resourcePlacementStatuses
// need to be populated.
// So that we cannot rely on the scheduledCondition as false to decide whether to requeue the request.

// When isClusterScheduled is false, either scheduler has not finished the scheduling or none of the clusters could be selected.
// Once the policy snapshot status changes, the policy snapshot watcher should enqueue the request.
// Here we requeue the request to prevent a bug in the watcher.
klog.V(2).InfoS("Scheduler has not scheduled any cluster yet and requeue the request as a backup",
"clusterResourcePlacement", crpKObj, "scheduledCondition", crp.GetCondition(string(fleetv1beta1.ClusterResourcePlacementScheduledConditionType)), "generation", crp.Generation)
return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil
}

klog.V(2).InfoS("Placement rollout has not finished yet and requeue the request", "clusterResourcePlacement", crpKObj, "status", crp.Status, "generation", crp.Generation)
// we need to requeue the request to update the status of the resources.
// TODO: adept the requeue time based on the rollout status.
// TODO: adjust the requeue time based on the rollout status.
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}

Expand Down Expand Up @@ -769,8 +788,9 @@ func parseResourceGroupHashFromAnnotation(s *fleetv1beta1.ClusterResourceSnapsho
return v, nil
}

// setPlacementStatus returns if there is a cluster scheduled by the scheduler.
func (r *Reconciler) setPlacementStatus(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement, selectedResourceIDs []fleetv1beta1.ResourceIdentifier,
latestSchedulingPolicySnapshot *fleetv1beta1.ClusterSchedulingPolicySnapshot, latestResourceSnapshot *fleetv1beta1.ClusterResourceSnapshot) error {
latestSchedulingPolicySnapshot *fleetv1beta1.ClusterSchedulingPolicySnapshot, latestResourceSnapshot *fleetv1beta1.ClusterResourceSnapshot) (bool, error) {
crp.Status.SelectedResources = selectedResourceIDs
scheduledCondition := buildScheduledCondition(crp, latestSchedulingPolicySnapshot)
crp.SetConditions(scheduledCondition)
Expand Down Expand Up @@ -804,7 +824,7 @@ func (r *Reconciler) setPlacementStatus(ctx context.Context, crp *fleetv1beta1.C
// The undeleted resources on these old clusters could lead to failed synchronized or applied condition.
// Today, we only track the resources progress if the same cluster is selected again.
crp.Status.PlacementStatuses = []fleetv1beta1.ResourcePlacementStatus{}
return nil
return false, nil
}

return r.setResourcePlacementStatusAndResourceConditions(ctx, crp, latestSchedulingPolicySnapshot, latestResourceSnapshot)
Expand Down Expand Up @@ -898,7 +918,9 @@ func (r *Reconciler) buildClusterResourceBindingMap(ctx context.Context, crp *fl
return res, nil
}

func (r *Reconciler) setResourcePlacementStatusAndResourceConditions(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement, latestSchedulingPolicySnapshot *fleetv1beta1.ClusterSchedulingPolicySnapshot, latestResourceSnapshot *fleetv1beta1.ClusterResourceSnapshot) error {
// setResourcePlacementStatusAndResourceConditions returns whether the scheduler selects any cluster or not.
func (r *Reconciler) setResourcePlacementStatusAndResourceConditions(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement,
latestSchedulingPolicySnapshot *fleetv1beta1.ClusterSchedulingPolicySnapshot, latestResourceSnapshot *fleetv1beta1.ClusterResourceSnapshot) (bool, error) {
placementStatuses := make([]fleetv1beta1.ResourcePlacementStatus, 0, len(latestSchedulingPolicySnapshot.Status.ClusterDecisions))
decisions := latestSchedulingPolicySnapshot.Status.ClusterDecisions
selected, unselected := classifyClusterDecisions(decisions)
Expand Down Expand Up @@ -927,7 +949,7 @@ func (r *Reconciler) setResourcePlacementStatusAndResourceConditions(ctx context
oldResourcePlacementStatusMap := buildResourcePlacementStatusMap(crp)
resourceBindingMap, err := r.buildClusterResourceBindingMap(ctx, crp, latestSchedulingPolicySnapshot, latestResourceSnapshot)
if err != nil {
return err
return false, err
}

for _, c := range selected {
Expand All @@ -951,7 +973,7 @@ func (r *Reconciler) setResourcePlacementStatusAndResourceConditions(ctx context
meta.SetStatusCondition(&rp.Conditions, scheduledCondition)
syncCondition, appliedCondition, err := r.setWorkStatusForResourcePlacementStatus(ctx, crp, latestResourceSnapshot, resourceBindingMap[c.ClusterName], &rp)
if err != nil {
return err
return false, err
}
if syncCondition == nil || syncCondition.Status != metav1.ConditionTrue {
syncPendingCount++
Expand All @@ -967,6 +989,7 @@ func (r *Reconciler) setResourcePlacementStatusAndResourceConditions(ctx context
}
placementStatuses = append(placementStatuses, rp)
}
isClusterScheduled := len(placementStatuses) > 0

for i := 0; i < unscheduledClusterCount && i < len(unselected); i++ {
// TODO: we could improve the message by summarizing the failure reasons from all of the unselected clusters.
Expand All @@ -988,7 +1011,7 @@ func (r *Reconciler) setResourcePlacementStatusAndResourceConditions(ctx context
crp.Status.PlacementStatuses = placementStatuses
crp.SetConditions(buildClusterResourcePlacementSyncCondition(crp, syncPendingCount, syncSucceededCount))
crp.SetConditions(buildClusterResourcePlacementApplyCondition(crp, syncPendingCount == 0, appliedPendingCount, appliedSucceededCount, appliedFailedCount))
return nil
return isClusterScheduled, nil
}

func isRolloutCompleted(crp *fleetv1beta1.ClusterResourcePlacement) bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func TestSetPlacementStatus(t *testing.T) {
latestResourceSnapshot *fleetv1beta1.ClusterResourceSnapshot
clusterResourceBindings []fleetv1beta1.ClusterResourceBinding
works []fleetv1beta1.Work
want bool
wantStatus *fleetv1beta1.ClusterResourcePlacementStatus
}{
{
Expand Down Expand Up @@ -118,6 +119,7 @@ func TestSetPlacementStatus(t *testing.T) {
},
},
},
want: false,
wantStatus: &fleetv1beta1.ClusterResourcePlacementStatus{
SelectedResources: selectedResources,
Conditions: []metav1.Condition{
Expand Down Expand Up @@ -207,6 +209,7 @@ func TestSetPlacementStatus(t *testing.T) {
},
},
},
want: false,
wantStatus: &fleetv1beta1.ClusterResourcePlacementStatus{
SelectedResources: []fleetv1beta1.ResourceIdentifier{
{
Expand Down Expand Up @@ -304,6 +307,7 @@ func TestSetPlacementStatus(t *testing.T) {
},
},
},
want: false,
wantStatus: &fleetv1beta1.ClusterResourcePlacementStatus{
SelectedResources: selectedResources,
Conditions: []metav1.Condition{
Expand Down Expand Up @@ -394,6 +398,7 @@ func TestSetPlacementStatus(t *testing.T) {
},
},
},
want: false,
wantStatus: &fleetv1beta1.ClusterResourcePlacementStatus{
SelectedResources: selectedResources,
Conditions: []metav1.Condition{
Expand Down Expand Up @@ -504,6 +509,7 @@ func TestSetPlacementStatus(t *testing.T) {
},
},
},
want: true,
wantStatus: &fleetv1beta1.ClusterResourcePlacementStatus{
SelectedResources: selectedResources,
Conditions: []metav1.Condition{
Expand Down Expand Up @@ -684,6 +690,7 @@ func TestSetPlacementStatus(t *testing.T) {
},
},
},
want: false,
wantStatus: &fleetv1beta1.ClusterResourcePlacementStatus{
SelectedResources: selectedResources,
Conditions: []metav1.Condition{
Expand Down Expand Up @@ -790,6 +797,7 @@ func TestSetPlacementStatus(t *testing.T) {
},
},
},
want: true,
wantStatus: &fleetv1beta1.ClusterResourcePlacementStatus{
SelectedResources: selectedResources,
Conditions: []metav1.Condition{
Expand Down Expand Up @@ -959,6 +967,7 @@ func TestSetPlacementStatus(t *testing.T) {
},
},
},
want: false,
wantStatus: &fleetv1beta1.ClusterResourcePlacementStatus{
SelectedResources: selectedResources,
Conditions: []metav1.Condition{
Expand Down Expand Up @@ -1105,6 +1114,7 @@ func TestSetPlacementStatus(t *testing.T) {
},
},
},
want: false,
wantStatus: &fleetv1beta1.ClusterResourcePlacementStatus{
SelectedResources: selectedResources,
Conditions: []metav1.Condition{
Expand Down Expand Up @@ -1251,6 +1261,7 @@ func TestSetPlacementStatus(t *testing.T) {
},
},
},
want: false,
wantStatus: &fleetv1beta1.ClusterResourcePlacementStatus{
SelectedResources: selectedResources,
Conditions: []metav1.Condition{
Expand Down Expand Up @@ -1452,6 +1463,7 @@ func TestSetPlacementStatus(t *testing.T) {
},
},
},
want: true,
wantStatus: &fleetv1beta1.ClusterResourcePlacementStatus{
SelectedResources: selectedResources,
Conditions: []metav1.Condition{
Expand Down Expand Up @@ -1717,6 +1729,7 @@ func TestSetPlacementStatus(t *testing.T) {
},
},
},
want: true,
wantStatus: &fleetv1beta1.ClusterResourcePlacementStatus{
SelectedResources: selectedResources,
Conditions: []metav1.Condition{
Expand Down Expand Up @@ -2092,6 +2105,7 @@ func TestSetPlacementStatus(t *testing.T) {
},
},
},
want: true,
wantStatus: &fleetv1beta1.ClusterResourcePlacementStatus{
SelectedResources: selectedResources,
Conditions: []metav1.Condition{
Expand Down Expand Up @@ -2250,9 +2264,13 @@ func TestSetPlacementStatus(t *testing.T) {
Recorder: record.NewFakeRecorder(10),
}
crp.Generation = crpGeneration
if err := r.setPlacementStatus(context.Background(), crp, selectedResources, tc.latestPolicySnapshot, tc.latestResourceSnapshot); err != nil {
got, err := r.setPlacementStatus(context.Background(), crp, selectedResources, tc.latestPolicySnapshot, tc.latestResourceSnapshot)
if err != nil {
t.Fatalf("setPlacementStatus() failed: %v", err)
}
if got != tc.want {
t.Errorf("setPlacementStatus() = %v, want %v", got, tc.want)
}

if diff := cmp.Diff(tc.wantStatus, &crp.Status, statusCmpOptions...); diff != "" {
t.Errorf("buildPlacementStatus() status mismatch (-want, +got):\n%s", diff)
Expand Down
111 changes: 111 additions & 0 deletions test/e2e/placement_updating_members_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
Copyright (c) Microsoft Corporation.
Licensed under the MIT license.
*/

package e2e

import (
"fmt"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"

clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1"
placementv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
)

// Note that this container won't run in parallel with other containers as it manipulates the member cluster information.
var _ = Describe("Updating member cluster", Serial, func() {
Context("Updating member cluster label", Ordered, func() {
crpName := fmt.Sprintf(crpNameTemplate, GinkgoParallelProcess())
clusterLabelKey := "updating-member-key"
clusterLabelValue := "updating-member-value"

BeforeAll(func() {
// Create the resources.
createWorkResources()

// Create the CRP.
crp := &placementv1beta1.ClusterResourcePlacement{
ObjectMeta: metav1.ObjectMeta{
Name: crpName,
// Add a custom finalizer; this would allow us to better observe
// the behavior of the controllers.
Finalizers: []string{customDeletionBlockerFinalizer},
},
Spec: placementv1beta1.ClusterResourcePlacementSpec{
ResourceSelectors: workResourceSelector(),
Policy: &placementv1beta1.PlacementPolicy{
PlacementType: placementv1beta1.PickNPlacementType,
Affinity: &placementv1beta1.Affinity{
ClusterAffinity: &placementv1beta1.ClusterAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &placementv1beta1.ClusterSelector{
ClusterSelectorTerms: []placementv1beta1.ClusterSelectorTerm{
{
LabelSelector: metav1.LabelSelector{
MatchLabels: map[string]string{
clusterLabelKey: clusterLabelValue,
},
},
},
},
},
},
},
NumberOfClusters: pointer.Int32(1),
},
Strategy: placementv1beta1.RolloutStrategy{
Type: placementv1beta1.RollingUpdateRolloutStrategyType,
RollingUpdate: &placementv1beta1.RollingUpdateConfig{
UnavailablePeriodSeconds: pointer.Int(2),
},
},
},
}
Expect(hubClient.Create(ctx, crp)).To(Succeed(), "Failed to create CRP")
})

It("should update CRP status as expected", func() {
statusUpdatedActual := crpStatusUpdatedActual(workResourceIdentifiers(), nil, []string{memberCluster1Name})
Eventually(statusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update CRP status as expected")
})

It("update the member cluster label", func() {
Eventually(func() error {
mcObj := &clusterv1beta1.MemberCluster{}
if err := hubClient.Get(ctx, types.NamespacedName{Name: memberCluster1Name}, mcObj); err != nil {
return err
}
mcObj.Labels[clusterLabelKey] = clusterLabelValue
return hubClient.Update(ctx, mcObj)
}, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update the member cluster")
})

It("should place resources on matching clusters", func() {
resourcePlacedActual := workNamespaceAndConfigMapPlacedOnClusterActual(memberCluster1)
Eventually(resourcePlacedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to place resources on matching clusters")
})

It("should update CRP status as expected", func() {
statusUpdatedActual := crpStatusUpdatedActual(workResourceIdentifiers(), []string{memberCluster1Name}, nil)
Eventually(statusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update CRP status as expected")
})

AfterAll(func() {
Eventually(func() error {
mcObj := &clusterv1beta1.MemberCluster{}
if err := hubClient.Get(ctx, types.NamespacedName{Name: memberCluster1Name}, mcObj); err != nil {
return err
}
delete(mcObj.Labels, clusterLabelKey)
return hubClient.Update(ctx, mcObj)
}, eventuallyDuration, eventuallyInterval, "Failed to update the member cluster")

ensureCRPAndRelatedResourcesDeletion(crpName, nil)
})
})
})

0 comments on commit 6902bd8

Please sign in to comment.