Skip to content

Commit

Permalink
Call remove node endpoint on machine deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
HomayoonAlimohammadi committed Sep 27, 2024
1 parent 4f3a678 commit 862748d
Show file tree
Hide file tree
Showing 5 changed files with 412 additions and 0 deletions.
57 changes: 57 additions & 0 deletions controllers/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@ import (
"context"
"fmt"
"math/rand"
"net"
"sort"
"strings"
"time"

clusterv1beta1 "github.com/canonical/cluster-api-control-plane-provider-microk8s/api/v1beta1"
"github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/clusteragent"
"golang.org/x/mod/semver"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/storage/names"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/external"
Expand Down Expand Up @@ -575,6 +578,16 @@ func (r *MicroK8sControlPlaneReconciler) scaleDownControlPlane(ctx context.Conte
return ctrl.Result{RequeueAfter: 20 * time.Second}, fmt.Errorf("%q machine does not have a nodeRef", deleteMachine.Name)
}

// get cluster agent client and delete node from dqlite
clusterAgentClient, err := getClusterAgentClient(machines, deleteMachine)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to get cluster agent client: %w", err)
}

if err := r.removeNodeFromDqlite(ctx, clusterAgentClient, deleteMachine); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to remove node %q from dqlite: %w", deleteMachine.Name, err)
}

node := deleteMachine.Status.NodeRef

logger = logger.WithValues("machineName", deleteMachine.Name, "nodeName", node.Name)
Expand All @@ -595,6 +608,50 @@ func (r *MicroK8sControlPlaneReconciler) scaleDownControlPlane(ctx context.Conte
return ctrl.Result{Requeue: true}, nil
}

func getClusterAgentClient(machines []clusterv1.Machine, delMachine clusterv1.Machine) (*clusteragent.Client, error) {
opts := clusteragent.Options{
IgnoreNodeIPs: sets.NewString(),
// NOTE(hue): Port is hard coded according to
// https://github.com/canonical/cluster-api-control-plane-provider-microk8s/blob/v0.6.10/control-plane-components.yaml#L96-L102
Port: "30000",
}
// NOTE(hue): We want to pick a random machine's IP to call POST /dqlite/remove on its cluster agent endpoint.
// This machine should preferably not be the <delMachine> itself but this is not forced by Microk8s.
for _, addr := range delMachine.Status.Addresses {
opts.IgnoreNodeIPs.Insert(addr.Address)
}

clusterAgentClient, err := clusteragent.NewClient(machines, opts)
if err != nil {
return nil, fmt.Errorf("failed to initialize cluster agent client: %w", err)
}

return clusterAgentClient, nil
}

// removeMicrok8sNode removes the node from
func (r *MicroK8sControlPlaneReconciler) removeNodeFromDqlite(ctx context.Context, clusterAgentClient *clusteragent.Client, delMachine clusterv1.Machine) error {
var removeEp string
for _, addr := range delMachine.Status.Addresses {
if ip := net.ParseIP(addr.Address); ip != nil {
// NOTE(hue): Port is hard coded according to
// https://github.com/canonical/cluster-api-control-plane-provider-microk8s/blob/v0.6.10/control-plane-components.yaml#L96-L102
removeEp = fmt.Sprintf("%s:2379", addr.Address)
break
}
}

if removeEp == "" {
return fmt.Errorf("failed to extract endpoint of the deleting machine %q", delMachine.Name)
}

if err := clusterAgentClient.RemoveNodeFromDqlite(ctx, removeEp); err != nil {
return fmt.Errorf("failed to remove node %q from dqlite: %w", removeEp, err)
}

return nil
}

func createUpgradePod(ctx context.Context, kubeclient *kubernetesClient, nodeName string, nodeVersion string) (*corev1.Pod, error) {
nodeVersion = strings.TrimPrefix(semver.MajorMinor(nodeVersion), "v")

Expand Down
117 changes: 117 additions & 0 deletions pkg/clusteragent/clusteragent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package clusteragent

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"net/http"
"time"

"k8s.io/apimachinery/pkg/util/sets"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
)

const defaultClusterAgentPort = "25000"

// Options should be used when initializing a new client.
type Options struct {
// IgnoreNodeIPs is a set of ignored IPs that we don't want to pick for the cluster agent endpoint.
IgnoreNodeIPs sets.String
// Port overwrites the default cluster agent port to connect.
Port string
// InsecureSkipVerify skips the verification of the server's certificate chain and host name.
InsecureSkipVerify bool
}

type Client struct {
ip, port string
client *http.Client
}

// NewClient picks an IP from one of the given machines and creates a new client for the cluster agent
// with that IP.
func NewClient(machines []clusterv1.Machine, opts Options) (*Client, error) {
var ip string
for _, m := range machines {
for _, addr := range m.Status.Addresses {
if !opts.IgnoreNodeIPs.Has(addr.Address) {
ip = addr.Address
break
}
}
}

if ip == "" {
return nil, errors.New("failed to find an IP for cluster agent")
}

port := defaultClusterAgentPort
if opts.Port != "" {
port = opts.Port
}

transport := &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: opts.InsecureSkipVerify,
},
}

return &Client{
ip: ip,
port: port,
client: &http.Client{
Timeout: 30 * time.Second,
Transport: transport,
},
}, nil
}

func (c *Client) Endpoint() string {
return fmt.Sprintf("https://%s:%s", c.ip, c.port)
}

// Do makes a request to the given endpoint with the given method. It marshals the request and unmarshals
// server response body if the provided response is not nil.
// The endpoint should _not_ have a leading slash.
func (c *Client) Do(ctx context.Context, method, endpoint string, request any, response any) error {
url := fmt.Sprintf("https://%s:%s/%s", c.ip, c.port, endpoint)

requestBody, err := json.Marshal(request)
if err != nil {
return fmt.Errorf("failed to prepare worker info request: %w", err)
}

req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewBuffer(requestBody))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}

res, err := c.client.Do(req)
if err != nil {
return fmt.Errorf("failed to call cluster agent: %w", err)
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK {
// NOTE(hue): Marshal and print any response that we got since it might contain valuable information
// on why the request failed.
// Ignore JSON errors to prevent unnecessarily complicated error handling.
anyResp := make(map[string]any)
_ = json.NewDecoder(res.Body).Decode(&anyResp)
b, _ := json.Marshal(anyResp)
resStr := string(b)

return fmt.Errorf("HTTP request to cluster agent failed with status code %d, got response: %q", res.StatusCode, resStr)
}

if response != nil {
if err := json.NewDecoder(res.Body).Decode(response); err != nil {
return fmt.Errorf("failed to decode response: %w", err)
}
}

return nil
}
184 changes: 184 additions & 0 deletions pkg/clusteragent/clusteragent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package clusteragent_test

import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"net/http/httptest"
"strings"
"testing"

. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/clusteragent"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
)

func TestClient(t *testing.T) {
t.Run("CanNotFindAddress", func(t *testing.T) {
g := NewWithT(t)

// Machines don't have any addresses.
machines := []clusterv1.Machine{{}, {}}
_, err := clusteragent.NewClient(machines, clusteragent.Options{})

g.Expect(err).To(HaveOccurred())

// The only machine is the ignored one.
addr := "1.1.1.1"
machines = []clusterv1.Machine{
{
Status: clusterv1.MachineStatus{
Addresses: clusterv1.MachineAddresses{
{
Address: addr,
},
},
},
},
}
_, err = clusteragent.NewClient(machines, clusteragent.Options{IgnoreNodeIPs: sets.NewString(addr)})

g.Expect(err).To(HaveOccurred())
})

t.Run("CorrectEndpoint", func(t *testing.T) {
g := NewWithT(t)

port := "30000"
firstAddr := "1.1.1.1"
secondAddr := "2.2.2.2"
thirdAddr := "3.3.3.3"
ignoreAddr := "8.8.8.8"
machines := []clusterv1.Machine{
{
Status: clusterv1.MachineStatus{
Addresses: clusterv1.MachineAddresses{
{
Address: firstAddr,
},
},
},
},
{
Status: clusterv1.MachineStatus{
Addresses: clusterv1.MachineAddresses{
{
Address: secondAddr,
},
},
},
},
{
Status: clusterv1.MachineStatus{
Addresses: clusterv1.MachineAddresses{
{
Address: thirdAddr,
},
},
},
},
}

opts := clusteragent.Options{
IgnoreNodeIPs: sets.NewString(ignoreAddr),
Port: port,
}

// NOTE(Hue): Repeat the test to make sure the IP is not picked by chance (reduce flakiness).
for i := 0; i < 30; i++ {
c, err := clusteragent.NewClient(machines, opts)

g.Expect(err).ToNot(HaveOccurred())

// Check if the endpoint is one of the expected ones and not the ignored one.
g.Expect([]string{fmt.Sprintf("https://%s:%s", firstAddr, port), fmt.Sprintf("https://%s:%s", secondAddr, port), fmt.Sprintf("https://%s:%s", thirdAddr, port)}).To(ContainElement(c.Endpoint()))
g.Expect(c.Endpoint()).ToNot(Equal(fmt.Sprintf("https://%s:%s", ignoreAddr, port)))
}

})
}

func TestDo(t *testing.T) {
g := NewWithT(t)

path := "/random/path"
method := http.MethodPost
resp := map[string]string{
"key": "value",
}
servM := NewServerMock(method, path, resp)
defer servM.ts.Close()

ip, port, err := net.SplitHostPort(strings.TrimPrefix(servM.ts.URL, "https://"))
g.Expect(err).ToNot(HaveOccurred())
c, err := clusteragent.NewClient([]clusterv1.Machine{
{
Status: clusterv1.MachineStatus{
Addresses: clusterv1.MachineAddresses{
{
Address: ip,
},
},
},
},
}, clusteragent.Options{Port: port, InsecureSkipVerify: true})

g.Expect(err).ToNot(HaveOccurred())

response := make(map[string]string)
req := map[string]string{"req": "value"}
path = strings.TrimPrefix(path, "/")
g.Expect(c.Do(context.Background(), method, path, req, &response)).To(Succeed())

g.Expect(response).To(Equal(resp))
}

type serverMock struct {
method string
path string
response any
request map[string]any
ts *httptest.Server
}

// NewServerMock creates a test server that responds with the given response when called with the given method and path.
// Make sure to close the server after the test is done.
// Server will try to decode the request body into a map[string]any.
func NewServerMock(method string, path string, response any) *serverMock {
req := make(map[string]any)
ts := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != path {
http.NotFound(w, r)
return
}
if r.Method != method {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}

if response != nil {
if err := json.NewEncoder(w).Encode(response); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
}
w.WriteHeader(http.StatusOK)
}))

return &serverMock{
method: method,
path: path,
response: response,
request: req,
ts: ts,
}
}
Loading

0 comments on commit 862748d

Please sign in to comment.