Skip to content

Commit

Permalink
Merge pull request #17208 from justinsb/reconcile_new_cluster
Browse files Browse the repository at this point in the history
reconcile: wait for apiserver to response before trying rolling-update
  • Loading branch information
k8s-ci-robot authored Jan 15, 2025
2 parents e0edd66 + f2d4eeb commit dde7601
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 51 deletions.
2 changes: 1 addition & 1 deletion cmd/kops/delete_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func RunDeleteInstance(ctx context.Context, f *util.Factory, out io.Writer, opti

var clusterValidator validation.ClusterValidator
if !options.CloudOnly {
clusterValidator, err = validation.NewClusterValidator(cluster, cloud, list, restConfig, k8sClient)
clusterValidator, err = validation.NewClusterValidator(cluster, cloud, list, nil, nil, restConfig, k8sClient)
if err != nil {
return fmt.Errorf("cannot create cluster validator: %v", err)
}
Expand Down
26 changes: 26 additions & 0 deletions cmd/kops/reconcile_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"context"
"fmt"
"io"
"time"

"github.com/spf13/cobra"
v1 "k8s.io/api/core/v1"
"k8s.io/kops/cmd/kops/util"
"k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/commands/commandutils"
Expand Down Expand Up @@ -134,6 +136,30 @@ func RunReconcileCluster(ctx context.Context, f *util.Factory, out io.Writer, c
}
}

// Particularly for a new cluster, we need to wait for the control plane to be answering requests
// before we can do a rolling update.
fmt.Fprintf(out, "Waiting for the kubernetes API to be served\n")
{
opt := &ValidateClusterOptions{}
opt.InitDefaults()
opt.ClusterName = c.ClusterName
opt.wait = 10 * time.Minute

// filter the instance group to only include the control plane
opt.filterInstanceGroups = func(ig *kops.InstanceGroup) bool {
return ig.Spec.Role == kops.InstanceGroupRoleAPIServer || ig.Spec.Role == kops.InstanceGroupRoleControlPlane
}

// Ignore all pods, we just want to check the control plane is responding
opt.filterPodsForValidation = func(pod *v1.Pod) bool {
return false
}

if _, err := RunValidateCluster(ctx, f, out, opt); err != nil {
return fmt.Errorf("waiting for kubernetes API to be served: %w", err)
}
}

fmt.Fprintf(out, "Doing rolling-update for control plane\n")
{
opt := &RollingUpdateOptions{}
Expand Down
2 changes: 1 addition & 1 deletion cmd/kops/rolling-update_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ func RunRollingUpdateCluster(ctx context.Context, f *util.Factory, out io.Writer
return fmt.Errorf("getting rest config: %w", err)
}

clusterValidator, err = validation.NewClusterValidator(cluster, cloud, list, restConfig, k8sClient)
clusterValidator, err = validation.NewClusterValidator(cluster, cloud, list, nil, nil, restConfig, k8sClient)
if err != nil {
return fmt.Errorf("cannot create cluster validator: %v", err)
}
Expand Down
24 changes: 16 additions & 8 deletions cmd/kops/validate_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"k8s.io/kops/cmd/kops/util"
"k8s.io/kops/pkg/apis/kops"
kopsapi "k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/validation"
"k8s.io/kops/util/pkg/tables"
Expand All @@ -61,12 +62,19 @@ var (
)

type ValidateClusterOptions struct {
ClusterName string
output string
wait time.Duration
count int
interval time.Duration
kubeconfig string
ClusterName string
InstanceGroupRoles []kops.InstanceGroupRole
output string
wait time.Duration
count int
interval time.Duration
kubeconfig string

// filterInstanceGroups is a function that returns true if the instance group should be validated
filterInstanceGroups func(ig *kops.InstanceGroup) bool

// filterPodsForValidation is a function that returns true if the pod should be validated
filterPodsForValidation func(pod *v1.Pod) bool
}

func (o *ValidateClusterOptions) InitDefaults() {
Expand Down Expand Up @@ -164,7 +172,7 @@ func RunValidateCluster(ctx context.Context, f *util.Factory, out io.Writer, opt

timeout := time.Now().Add(options.wait)

validator, err := validation.NewClusterValidator(cluster, cloud, list, restConfig, k8sClient)
validator, err := validation.NewClusterValidator(cluster, cloud, list, options.filterInstanceGroups, options.filterPodsForValidation, restConfig, k8sClient)
if err != nil {
return nil, fmt.Errorf("unexpected error creating validatior: %v", err)
}
Expand All @@ -175,7 +183,7 @@ func RunValidateCluster(ctx context.Context, f *util.Factory, out io.Writer, opt
return nil, fmt.Errorf("wait time exceeded during validation")
}

result, err := validator.Validate()
result, err := validator.Validate(ctx)
if err != nil {
consecutive = 0
if options.wait > 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/instancegroups/instancegroups.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ func (c *RollingUpdateCluster) validateClusterWithTimeout(validateCount int, gro

for {
// Note that we validate at least once before checking the timeout, in case the cluster is healthy with a short timeout
result, err := c.ClusterValidator.Validate()
result, err := c.ClusterValidator.Validate(ctx)
if err == nil && !hasFailureRelevantToGroup(result.Failures, group) {
successCount++
if successCount >= validateCount {
Expand Down
22 changes: 10 additions & 12 deletions pkg/instancegroups/rollingupdate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ func getTestSetup() (*RollingUpdateCluster, *awsup.MockAWSCloud) {

type successfulClusterValidator struct{}

func (*successfulClusterValidator) Validate() (*validation.ValidationCluster, error) {
func (*successfulClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
return &validation.ValidationCluster{}, nil
}

type failingClusterValidator struct{}

func (*failingClusterValidator) Validate() (*validation.ValidationCluster, error) {
func (*failingClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
return &validation.ValidationCluster{
Failures: []*validation.ValidationError{
{
Expand All @@ -104,7 +104,7 @@ func (*failingClusterValidator) Validate() (*validation.ValidationCluster, error

type erroringClusterValidator struct{}

func (*erroringClusterValidator) Validate() (*validation.ValidationCluster, error) {
func (*erroringClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
return nil, errors.New("testing validation error")
}

Expand All @@ -113,7 +113,7 @@ type instanceGroupNodeSpecificErrorClusterValidator struct {
InstanceGroup *kopsapi.InstanceGroup
}

func (igErrorValidator *instanceGroupNodeSpecificErrorClusterValidator) Validate() (*validation.ValidationCluster, error) {
func (igErrorValidator *instanceGroupNodeSpecificErrorClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
return &validation.ValidationCluster{
Failures: []*validation.ValidationError{
{
Expand All @@ -130,7 +130,7 @@ type assertNotCalledClusterValidator struct {
T *testing.T
}

func (v *assertNotCalledClusterValidator) Validate() (*validation.ValidationCluster, error) {
func (v *assertNotCalledClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
v.T.Fatal("validator called unexpectedly")
return nil, errors.New("validator called unexpectedly")
}
Expand Down Expand Up @@ -425,8 +425,7 @@ type failAfterOneNodeClusterValidator struct {
ReturnError bool
}

func (v *failAfterOneNodeClusterValidator) Validate() (*validation.ValidationCluster, error) {
ctx := context.TODO()
func (v *failAfterOneNodeClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
asgGroups, _ := v.Cloud.Autoscaling().DescribeAutoScalingGroups(ctx, &autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: []string{v.Group},
})
Expand Down Expand Up @@ -648,8 +647,7 @@ type flappingClusterValidator struct {
invocationCount int
}

func (v *flappingClusterValidator) Validate() (*validation.ValidationCluster, error) {
ctx := context.TODO()
func (v *flappingClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
asgGroups, _ := v.Cloud.Autoscaling().DescribeAutoScalingGroups(ctx, &autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: []string{"master-1"},
})
Expand Down Expand Up @@ -706,7 +704,7 @@ type failThreeTimesClusterValidator struct {
invocationCount int
}

func (v *failThreeTimesClusterValidator) Validate() (*validation.ValidationCluster, error) {
func (v *failThreeTimesClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
v.invocationCount++
if v.invocationCount <= 3 {
return &validation.ValidationCluster{
Expand Down Expand Up @@ -1060,7 +1058,7 @@ type concurrentTest struct {
detached map[string]bool
}

func (c *concurrentTest) Validate() (*validation.ValidationCluster, error) {
func (c *concurrentTest) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
c.mutex.Lock()
defer c.mutex.Unlock()

Expand Down Expand Up @@ -1441,7 +1439,7 @@ type alreadyDetachedTest struct {
detached map[string]bool
}

func (t *alreadyDetachedTest) Validate() (*validation.ValidationCluster, error) {
func (t *alreadyDetachedTest) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
t.mutex.Lock()
defer t.mutex.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion pkg/instancegroups/rollingupdate_warmpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type countingValidator struct {
numValidations int
}

func (c *countingValidator) Validate() (*validation.ValidationCluster, error) {
func (c *countingValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
c.numValidations++
return &validation.ValidationCluster{}, nil
}
Expand Down
83 changes: 60 additions & 23 deletions pkg/validation/validate_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,23 @@ type ValidationError struct {

type ClusterValidator interface {
// Validate validates a k8s cluster
Validate() (*ValidationCluster, error)
Validate(ctx context.Context) (*ValidationCluster, error)
}

type clusterValidatorImpl struct {
cluster *kops.Cluster
cloud fi.Cloud
instanceGroups []*kops.InstanceGroup
restConfig *rest.Config
k8sClient kubernetes.Interface
cluster *kops.Cluster
cloud fi.Cloud
restConfig *rest.Config
k8sClient kubernetes.Interface

// allInstanceGroups is the list of all instance groups in the cluster
allInstanceGroups []*kops.InstanceGroup

// filterInstanceGroups is a function that returns true if the instance group should be validated
filterInstanceGroups func(ig *kops.InstanceGroup) bool

// filterPodsForValidation is a function that returns true if the pod should be validated
filterPodsForValidation func(pod *v1.Pod) bool
}

func (v *ValidationCluster) addError(failure *ValidationError) {
Expand Down Expand Up @@ -101,30 +109,44 @@ func hasPlaceHolderIP(host string) (string, error) {
return "", nil
}

func NewClusterValidator(cluster *kops.Cluster, cloud fi.Cloud, instanceGroupList *kops.InstanceGroupList, restConfig *rest.Config, k8sClient kubernetes.Interface) (ClusterValidator, error) {
var instanceGroups []*kops.InstanceGroup
func NewClusterValidator(cluster *kops.Cluster, cloud fi.Cloud, instanceGroupList *kops.InstanceGroupList, filterInstanceGroups func(ig *kops.InstanceGroup) bool, filterPodsForValidation func(pod *v1.Pod) bool, restConfig *rest.Config, k8sClient kubernetes.Interface) (ClusterValidator, error) {
var allInstanceGroups []*kops.InstanceGroup

for i := range instanceGroupList.Items {
ig := &instanceGroupList.Items[i]
instanceGroups = append(instanceGroups, ig)
allInstanceGroups = append(allInstanceGroups, ig)
}

if len(instanceGroups) == 0 {
if len(allInstanceGroups) == 0 {
return nil, fmt.Errorf("no InstanceGroup objects found")
}

// If no filter is provided, validate all instance groups
if filterInstanceGroups == nil {
filterInstanceGroups = func(ig *kops.InstanceGroup) bool {
return true
}
}

// If no filter is provided, validate all pods
if filterPodsForValidation == nil {
filterPodsForValidation = func(pod *v1.Pod) bool {
return true
}
}

return &clusterValidatorImpl{
cluster: cluster,
cloud: cloud,
instanceGroups: instanceGroups,
restConfig: restConfig,
k8sClient: k8sClient,
cluster: cluster,
cloud: cloud,
allInstanceGroups: allInstanceGroups,
restConfig: restConfig,
k8sClient: k8sClient,
filterInstanceGroups: filterInstanceGroups,
filterPodsForValidation: filterPodsForValidation,
}, nil
}

func (v *clusterValidatorImpl) Validate() (*ValidationCluster, error) {
ctx := context.TODO()

func (v *clusterValidatorImpl) Validate(ctx context.Context) (*ValidationCluster, error) {
validation := &ValidationCluster{}

// Do not use if we are running gossip or without dns
Expand Down Expand Up @@ -161,13 +183,14 @@ func (v *clusterValidatorImpl) Validate() (*ValidationCluster, error) {
}

warnUnmatched := false
cloudGroups, err := v.cloud.GetCloudGroups(v.cluster, v.instanceGroups, warnUnmatched, nodeList.Items)
cloudGroups, err := v.cloud.GetCloudGroups(v.cluster, v.allInstanceGroups, warnUnmatched, nodeList.Items)
if err != nil {
return nil, err
}
readyNodes, nodeInstanceGroupMapping := validation.validateNodes(cloudGroups, v.instanceGroups)

if err := validation.collectPodFailures(ctx, v.k8sClient, readyNodes, nodeInstanceGroupMapping); err != nil {
readyNodes, nodeInstanceGroupMapping := validation.validateNodes(cloudGroups, v.allInstanceGroups, v.filterInstanceGroups)

if err := validation.collectPodFailures(ctx, v.k8sClient, readyNodes, nodeInstanceGroupMapping, v.filterPodsForValidation); err != nil {
return nil, fmt.Errorf("cannot get pod health for %q: %v", v.cluster.Name, err)
}

Expand All @@ -181,7 +204,7 @@ var masterStaticPods = []string{
}

func (v *ValidationCluster) collectPodFailures(ctx context.Context, client kubernetes.Interface, nodes []v1.Node,
nodeInstanceGroupMapping map[string]*kops.InstanceGroup,
nodeInstanceGroupMapping map[string]*kops.InstanceGroup, podValidationFilter func(pod *v1.Pod) bool,
) error {
masterWithoutPod := map[string]map[string]bool{}
nodeByAddress := map[string]string{}
Expand Down Expand Up @@ -210,10 +233,16 @@ func (v *ValidationCluster) collectPodFailures(ctx context.Context, client kuber
delete(masterWithoutPod[nodeByAddress[pod.Status.HostIP]], app)
}

// Ignore pods that we don't want to validate
if !podValidationFilter(pod) {
return nil
}

priority := pod.Spec.PriorityClassName
if priority != "system-cluster-critical" && priority != "system-node-critical" {
return nil
}

if pod.Status.Phase == v1.PodSucceeded {
return nil
}
Expand Down Expand Up @@ -275,12 +304,16 @@ func (v *ValidationCluster) collectPodFailures(ctx context.Context, client kuber
return nil
}

func (v *ValidationCluster) validateNodes(cloudGroups map[string]*cloudinstances.CloudInstanceGroup, groups []*kops.InstanceGroup) ([]v1.Node, map[string]*kops.InstanceGroup) {
func (v *ValidationCluster) validateNodes(cloudGroups map[string]*cloudinstances.CloudInstanceGroup, groups []*kops.InstanceGroup, shouldValidateInstanceGroup func(ig *kops.InstanceGroup) bool) ([]v1.Node, map[string]*kops.InstanceGroup) {
var readyNodes []v1.Node
groupsSeen := map[string]bool{}
nodeInstanceGroupMapping := map[string]*kops.InstanceGroup{}

for _, cloudGroup := range cloudGroups {
if cloudGroup.InstanceGroup != nil && !shouldValidateInstanceGroup(cloudGroup.InstanceGroup) {
continue
}

var allMembers []*cloudinstances.CloudInstance
allMembers = append(allMembers, cloudGroup.Ready...)
allMembers = append(allMembers, cloudGroup.NeedUpdate...)
Expand Down Expand Up @@ -372,6 +405,10 @@ func (v *ValidationCluster) validateNodes(cloudGroups map[string]*cloudinstances
}

for _, ig := range groups {
if !shouldValidateInstanceGroup(ig) {
continue
}

if !groupsSeen[ig.Name] {
v.addError(&ValidationError{
Kind: "InstanceGroup",
Expand Down
Loading

0 comments on commit dde7601

Please sign in to comment.