Skip to content

Commit

Permalink
Handle upgrade pod already exists
Browse files Browse the repository at this point in the history
  • Loading branch information
HomayoonAlimohammadi committed Oct 31, 2024
1 parent 14d9900 commit c19ed72
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 69 deletions.
88 changes: 62 additions & 26 deletions controllers/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (r *MicroK8sControlPlaneReconciler) reconcileMachines(ctx context.Context,

if numMachines > 0 {
sort.Sort(SortByCreationTimestamp(machines))
oldVersion = semver.MajorMinor(*machines[0].Spec.Version)
oldVersion = getOldestVersion(machines)
newVersion = semver.MajorMinor(mcp.Spec.Version)
}

Expand Down Expand Up @@ -202,6 +202,9 @@ func (r *MicroK8sControlPlaneReconciler) reconcileMachines(ctx context.Context,

// For each machine, get the node and upgrade it
for _, machine := range machines {
if isMachineUpgraded(machine, newVersion) {
continue
}

// Get the node for the machine
node, err := kubeclient.CoreV1().Nodes().Get(ctx, machine.Status.NodeRef.Name, metav1.GetOptions{})
Expand All @@ -212,40 +215,33 @@ func (r *MicroK8sControlPlaneReconciler) reconcileMachines(ctx context.Context,
logger.Info(fmt.Sprintf("Creating upgrade pod on %s...", node.Name))
pod, err := createUpgradePod(ctx, kubeclient, node.Name, mcp.Spec.Version)
if err != nil {
logger.Error(err, "Error creating upgrade pod.")
return ctrl.Result{}, fmt.Errorf("failed to create upgrade pod: %w", err)
}

logger.Info("Waiting for upgrade node to be updated to the given version...")
err = waitForNodeUpgrade(ctx, kubeclient, node.Name, mcp.Spec.Version)
if err != nil {
logger.Error(err, "Error waiting for node upgrade.")
logger.Info("Waiting for node to be updated to the given version...", "node", node.Name)
if err := waitForNodeUpgrade(ctx, kubeclient, node.Name, mcp.Spec.Version); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to wait for node upgrade: %w", err)
}

time.Sleep(10 * time.Second)

// Get the current machine
logger.Info("Node upgraded successfully.", "node", node.Name)
// Update the machine version
currentMachine := &clusterv1.Machine{}
currentMachineName := node.Annotations["cluster.x-k8s.io/machine"]
err = r.Client.Get(ctx, client.ObjectKey{Namespace: cluster.Namespace, Name: currentMachineName}, currentMachine)
if err != nil {
logger.Error(err, "Error getting machine.")
if err := r.Client.Get(ctx, client.ObjectKey{Namespace: cluster.Namespace, Name: currentMachineName}, currentMachine); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to get machine: %w", err)
}

// Update the machine version
logger.Info("Updating machine version...", "machine", currentMachine.Name)
currentMachine.Spec.Version = &mcp.Spec.Version
logger.Info(fmt.Sprintf("Now updating machine %s version to %s...", currentMachine.Name, *currentMachine.Spec.Version))
err = r.Client.Update(ctx, currentMachine)
if err != nil {
logger.Error(err, "Could not update the machine version. We will retry.")
if err := r.Client.Update(ctx, currentMachine); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to update machine: %w", err)
}

time.Sleep(10 * time.Second)

// wait until pod is deleted
logger.Info(fmt.Sprintf("Removing upgrade pod %s from %s...", pod.ObjectMeta.Name, node.Name))
err = waitForPodDeletion(ctx, kubeclient, pod.ObjectMeta.Name)
if err != nil {
logger.Error(err, "Error waiting for pod deletion.")
if err := waitForPodDeletion(ctx, kubeclient, pod.ObjectMeta.Name); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to wait for pod deletion: %w", err)
}

logger.Info(fmt.Sprintf("Upgrade of node %s completed.\n", node.Name))
Expand Down Expand Up @@ -689,18 +685,28 @@ func (r *MicroK8sControlPlaneReconciler) removeNodeFromDqlite(ctx context.Contex
return nil
}

// createUpgradePod creates a pod that upgrades the node to the given version.
// if the upgrade pod already exists, it is deleted and a new one will be created.
func createUpgradePod(ctx context.Context, kubeclient *kubernetesClient, nodeName string, nodeVersion string) (*corev1.Pod, error) {
podName := "upgrade-pod"

// delete the pod if it exists
if err := waitForPodDeletion(ctx, kubeclient, podName); err != nil {
return nil, fmt.Errorf("failed to delete pod %s: %w", podName, err)
}

nodeVersion = strings.TrimPrefix(semver.MajorMinor(nodeVersion), "v")

uid := int64(0)
priv := true

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "upgrade-pod",
Name: podName,
},
Spec: corev1.PodSpec{
NodeName: nodeName,
NodeName: nodeName,
RestartPolicy: corev1.RestartPolicyNever,
Containers: []corev1.Container{
{
Name: "upgrade",
Expand Down Expand Up @@ -736,7 +742,7 @@ func createUpgradePod(ctx context.Context, kubeclient *kubernetesClient, nodeNam

pod, err := kubeclient.CoreV1().Pods("default").Create(ctx, pod, metav1.CreateOptions{})
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to create pod %s: %w", podName, err)
}

return pod, nil
Expand All @@ -748,7 +754,7 @@ func waitForNodeUpgrade(ctx context.Context, kubeclient *kubernetesClient, nodeN
for attempts > 0 {
node, err := kubeclient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
return err
return fmt.Errorf("failed to get node %s: %w", nodeName, err)
}
currentVersion := semver.MajorMinor(node.Status.NodeInfo.KubeletVersion)
nodeVersion = semver.MajorMinor(nodeVersion)
Expand All @@ -761,6 +767,7 @@ func waitForNodeUpgrade(ctx context.Context, kubeclient *kubernetesClient, nodeN
return nil
}

// waitForPodDeletion waits for the pod to be deleted. If the pod doesn't exist, it returns nil.
func waitForPodDeletion(ctx context.Context, kubeclient *kubernetesClient, podName string) error {
for {
gracePeriod := int64(0)
Expand All @@ -773,10 +780,39 @@ func waitForPodDeletion(ctx context.Context, kubeclient *kubernetesClient, podNa
if apierrors.IsNotFound(err) {
break
}
return err
return fmt.Errorf("failed to delete pod %s: %w", podName, err)
} else {
break
}
}
return nil
}

// getOldestVersion returns the oldest version of the machines.
func getOldestVersion(machines []clusterv1.Machine) (v string) {
for _, m := range machines {
if m.Spec.Version == nil {
// weird!
continue
}

if v == "" {
v = semver.MajorMinor(*m.Spec.Version)
continue
}

if semver.Compare(v, *m.Spec.Version) > 0 {
v = semver.MajorMinor(*m.Spec.Version)
}
}

return
}

func isMachineUpgraded(m clusterv1.Machine, newVersion string) bool {
if m.Spec.Version == nil {
return false
}
machineVersion := semver.MajorMinor(*m.Spec.Version)
return semver.Compare(machineVersion, newVersion) == 0
}
11 changes: 9 additions & 2 deletions pkg/clusteragent/clusteragent.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,13 @@ func (c *Client) do(ctx context.Context, method, endpoint string, header http.He

// createPod creates a pod that runs a curl command.
func (c *Client) createPod(ctx context.Context, method, endpoint string, header http.Header, data map[string]any) (*corev1.Pod, error) {
podName := fmt.Sprintf(CallerPodNameFormat, c.nodeName)

// delete the pod if it exists
if err := c.deletePod(ctx, podName); err != nil {
return nil, fmt.Errorf("failed to delete pod: %w", err)
}

curl, err := c.createCURLString(method, endpoint, header, data)
if err != nil {
return nil, fmt.Errorf("failed to create curl string: %w", err)
Expand All @@ -156,7 +163,7 @@ func (c *Client) createPod(ctx context.Context, method, endpoint string, header

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf(CallerPodNameFormat, c.nodeName),
Name: podName,
},
Spec: corev1.PodSpec{
NodeName: c.nodeName,
Expand Down Expand Up @@ -211,7 +218,7 @@ func (c *Client) createCURLString(method, endpoint string, header http.Header, d
return req, nil
}

// deletePod deletes a pod.
// deletePod deletes a pod. It will succeed if the pod doesn't exist.
func (c *Client) deletePod(ctx context.Context, podName string) error {
deleteOptions := metav1.DeleteOptions{
GracePeriodSeconds: ptr.To(int64(0)),
Expand Down
139 changes: 98 additions & 41 deletions pkg/clusteragent/clusteragent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,57 +125,114 @@ func TestClient(t *testing.T) {
}

func TestDo(t *testing.T) {
g := NewWithT(t)

kubeclient := fake.NewSimpleClientset()
nodeName := "node"
nodeAddress := "5.6.7.8"
port := "1234"
method := "POST"
endpoint := "my/endpoint"
dataKey, dataValue := "dkey", "dvalue"
data := map[string]any{
dataKey: dataValue,
}
headerKey, headerValue := "hkey", "hvalue"
header := map[string][]string{
headerKey: {headerValue},
}
t.Run("Success", func(t *testing.T) {
g := NewWithT(t)

kubeclient := fake.NewSimpleClientset()
nodeName := "node"
nodeAddress := "5.6.7.8"
port := "1234"
method := "POST"
endpoint := "my/endpoint"
dataKey, dataValue := "dkey", "dvalue"
data := map[string]any{
dataKey: dataValue,
}
headerKey, headerValue := "hkey", "hvalue"
header := map[string][]string{
headerKey: {headerValue},
}

c, err := NewClient(kubeclient, newLogger(), []clusterv1.Machine{
{
Status: clusterv1.MachineStatus{
NodeRef: &corev1.ObjectReference{
Name: nodeName,
c, err := NewClient(kubeclient, newLogger(), []clusterv1.Machine{
{
Status: clusterv1.MachineStatus{
NodeRef: &corev1.ObjectReference{
Name: nodeName,
},
Addresses: clusterv1.MachineAddresses{
{
Address: nodeAddress,
},
},
},
Addresses: clusterv1.MachineAddresses{
{
Address: nodeAddress,
},
}, port, Options{SkipSucceededCheck: true, SkipPodCleanup: true})

g.Expect(err).ToNot(HaveOccurred())

g.Expect(c.do(context.Background(), method, endpoint, header, data)).To(Succeed())

pod, err := kubeclient.CoreV1().Pods(DefaultPodNameSpace).Get(context.Background(), fmt.Sprintf(CallerPodNameFormat, nodeName), v1.GetOptions{})
g.Expect(err).ToNot(HaveOccurred())

g.Expect(pod.Spec.NodeName).To(Equal(nodeName))
g.Expect(pod.Spec.Containers).To(HaveLen(1))

container := pod.Spec.Containers[0]
g.Expect(container.Image).To(Equal(images.CurlImage))
g.Expect(*container.SecurityContext.Privileged).To(BeTrue())
g.Expect(*container.SecurityContext.RunAsUser).To(Equal(int64(0)))
g.Expect(container.Command).To(HaveLen(3))
g.Expect(container.Command[2]).To(Equal(fmt.Sprintf(
"curl -k -X %s -H \"%s: %s\" -d '{\"%s\":\"%s\"}' https://%s:%s/%s",
method, headerKey, headerValue, dataKey, dataValue, nodeAddress, port, endpoint,
)))
})

t.Run("PodAlreadyExists", func(t *testing.T) {
g := NewWithT(t)

nodeName := "node"
podName := fmt.Sprintf(CallerPodNameFormat, nodeName)
kubeclient := fake.NewSimpleClientset(&corev1.Pod{ObjectMeta: v1.ObjectMeta{Name: podName, Namespace: "default"}})
nodeAddress := "5.6.7.8"
port := "1234"
method := "POST"
endpoint := "my/endpoint"
dataKey, dataValue := "dkey", "dvalue"
data := map[string]any{
dataKey: dataValue,
}
headerKey, headerValue := "hkey", "hvalue"
header := map[string][]string{
headerKey: {headerValue},
}

c, err := NewClient(kubeclient, newLogger(), []clusterv1.Machine{
{
Status: clusterv1.MachineStatus{
NodeRef: &corev1.ObjectReference{
Name: nodeName,
},
Addresses: clusterv1.MachineAddresses{
{
Address: nodeAddress,
},
},
},
},
},
}, port, Options{SkipSucceededCheck: true, SkipPodCleanup: true})
}, port, Options{SkipSucceededCheck: true, SkipPodCleanup: true})

g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(HaveOccurred())

g.Expect(c.do(context.Background(), method, endpoint, header, data)).To(Succeed())
g.Expect(c.do(context.Background(), method, endpoint, header, data)).To(Succeed())

pod, err := kubeclient.CoreV1().Pods(DefaultPodNameSpace).Get(context.Background(), fmt.Sprintf(CallerPodNameFormat, nodeName), v1.GetOptions{})
g.Expect(err).ToNot(HaveOccurred())
pod, err := kubeclient.CoreV1().Pods(DefaultPodNameSpace).Get(context.Background(), fmt.Sprintf(CallerPodNameFormat, nodeName), v1.GetOptions{})
g.Expect(err).ToNot(HaveOccurred())

g.Expect(pod.Spec.NodeName).To(Equal(nodeName))
g.Expect(pod.Spec.Containers).To(HaveLen(1))
g.Expect(pod.Spec.NodeName).To(Equal(nodeName))
g.Expect(pod.Spec.Containers).To(HaveLen(1))

container := pod.Spec.Containers[0]
g.Expect(container.Image).To(Equal(images.CurlImage))
g.Expect(*container.SecurityContext.Privileged).To(BeTrue())
g.Expect(*container.SecurityContext.RunAsUser).To(Equal(int64(0)))
g.Expect(container.Command).To(HaveLen(3))
g.Expect(container.Command[2]).To(Equal(fmt.Sprintf(
"curl -k -X %s -H \"%s: %s\" -d '{\"%s\":\"%s\"}' https://%s:%s/%s",
method, headerKey, headerValue, dataKey, dataValue, nodeAddress, port, endpoint,
)))
container := pod.Spec.Containers[0]
g.Expect(container.Image).To(Equal(images.CurlImage))
g.Expect(*container.SecurityContext.Privileged).To(BeTrue())
g.Expect(*container.SecurityContext.RunAsUser).To(Equal(int64(0)))
g.Expect(container.Command).To(HaveLen(3))
g.Expect(container.Command[2]).To(Equal(fmt.Sprintf(
"curl -k -X %s -H \"%s: %s\" -d '{\"%s\":\"%s\"}' https://%s:%s/%s",
method, headerKey, headerValue, dataKey, dataValue, nodeAddress, port, endpoint,
)))
})
}

func shuffleMachines(src []clusterv1.Machine) []clusterv1.Machine {
Expand Down

0 comments on commit c19ed72

Please sign in to comment.