From 862748d6f788b49b9fffdeeedc3ed10f526e82c4 Mon Sep 17 00:00:00 2001 From: "Homayoon (Hue) Alimohammadi" Date: Fri, 27 Sep 2024 14:56:10 +0400 Subject: [PATCH] Call remove node endpoint on machine deletion --- controllers/reconcile.go | 57 ++++++++ pkg/clusteragent/clusteragent.go | 117 ++++++++++++++++ pkg/clusteragent/clusteragent_test.go | 184 ++++++++++++++++++++++++++ pkg/clusteragent/remove_node.go | 13 ++ pkg/clusteragent/remove_node_test.go | 41 ++++++ 5 files changed, 412 insertions(+) create mode 100644 pkg/clusteragent/clusteragent.go create mode 100644 pkg/clusteragent/clusteragent_test.go create mode 100644 pkg/clusteragent/remove_node.go create mode 100644 pkg/clusteragent/remove_node_test.go diff --git a/controllers/reconcile.go b/controllers/reconcile.go index b6cb197..d359c70 100644 --- a/controllers/reconcile.go +++ b/controllers/reconcile.go @@ -4,11 +4,13 @@ import ( "context" "fmt" "math/rand" + "net" "sort" "strings" "time" clusterv1beta1 "github.com/canonical/cluster-api-control-plane-provider-microk8s/api/v1beta1" + "github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/clusteragent" "golang.org/x/mod/semver" "github.com/pkg/errors" @@ -16,6 +18,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/storage/names" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" "sigs.k8s.io/cluster-api/controllers/external" @@ -575,6 +578,16 @@ func (r *MicroK8sControlPlaneReconciler) scaleDownControlPlane(ctx context.Conte return ctrl.Result{RequeueAfter: 20 * time.Second}, fmt.Errorf("%q machine does not have a nodeRef", deleteMachine.Name) } + // get cluster agent client and delete node from dqlite + clusterAgentClient, err := getClusterAgentClient(machines, deleteMachine) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to get cluster agent client: %w", err) + } + + if err := r.removeNodeFromDqlite(ctx, clusterAgentClient, deleteMachine); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to remove node %q from dqlite: %w", deleteMachine.Name, err) + } + node := deleteMachine.Status.NodeRef logger = logger.WithValues("machineName", deleteMachine.Name, "nodeName", node.Name) @@ -595,6 +608,50 @@ func (r *MicroK8sControlPlaneReconciler) scaleDownControlPlane(ctx context.Conte return ctrl.Result{Requeue: true}, nil } +func getClusterAgentClient(machines []clusterv1.Machine, delMachine clusterv1.Machine) (*clusteragent.Client, error) { + opts := clusteragent.Options{ + IgnoreNodeIPs: sets.NewString(), + // NOTE(hue): Port is hard coded according to + // https://github.com/canonical/cluster-api-control-plane-provider-microk8s/blob/v0.6.10/control-plane-components.yaml#L96-L102 + Port: "30000", + } + // NOTE(hue): We want to pick a random machine's IP to call POST /dqlite/remove on its cluster agent endpoint. + // This machine should preferably not be the itself but this is not forced by Microk8s. + for _, addr := range delMachine.Status.Addresses { + opts.IgnoreNodeIPs.Insert(addr.Address) + } + + clusterAgentClient, err := clusteragent.NewClient(machines, opts) + if err != nil { + return nil, fmt.Errorf("failed to initialize cluster agent client: %w", err) + } + + return clusterAgentClient, nil +} + +// removeMicrok8sNode removes the node from +func (r *MicroK8sControlPlaneReconciler) removeNodeFromDqlite(ctx context.Context, clusterAgentClient *clusteragent.Client, delMachine clusterv1.Machine) error { + var removeEp string + for _, addr := range delMachine.Status.Addresses { + if ip := net.ParseIP(addr.Address); ip != nil { + // NOTE(hue): Port is hard coded according to + // https://github.com/canonical/cluster-api-control-plane-provider-microk8s/blob/v0.6.10/control-plane-components.yaml#L96-L102 + removeEp = fmt.Sprintf("%s:2379", addr.Address) + break + } + } + + if removeEp == "" { + return fmt.Errorf("failed to extract endpoint of the deleting machine %q", delMachine.Name) + } + + if err := clusterAgentClient.RemoveNodeFromDqlite(ctx, removeEp); err != nil { + return fmt.Errorf("failed to remove node %q from dqlite: %w", removeEp, err) + } + + return nil +} + func createUpgradePod(ctx context.Context, kubeclient *kubernetesClient, nodeName string, nodeVersion string) (*corev1.Pod, error) { nodeVersion = strings.TrimPrefix(semver.MajorMinor(nodeVersion), "v") diff --git a/pkg/clusteragent/clusteragent.go b/pkg/clusteragent/clusteragent.go new file mode 100644 index 0000000..c9c84ec --- /dev/null +++ b/pkg/clusteragent/clusteragent.go @@ -0,0 +1,117 @@ +package clusteragent + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/json" + "errors" + "fmt" + "net/http" + "time" + + "k8s.io/apimachinery/pkg/util/sets" + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" +) + +const defaultClusterAgentPort = "25000" + +// Options should be used when initializing a new client. +type Options struct { + // IgnoreNodeIPs is a set of ignored IPs that we don't want to pick for the cluster agent endpoint. + IgnoreNodeIPs sets.String + // Port overwrites the default cluster agent port to connect. + Port string + // InsecureSkipVerify skips the verification of the server's certificate chain and host name. + InsecureSkipVerify bool +} + +type Client struct { + ip, port string + client *http.Client +} + +// NewClient picks an IP from one of the given machines and creates a new client for the cluster agent +// with that IP. +func NewClient(machines []clusterv1.Machine, opts Options) (*Client, error) { + var ip string + for _, m := range machines { + for _, addr := range m.Status.Addresses { + if !opts.IgnoreNodeIPs.Has(addr.Address) { + ip = addr.Address + break + } + } + } + + if ip == "" { + return nil, errors.New("failed to find an IP for cluster agent") + } + + port := defaultClusterAgentPort + if opts.Port != "" { + port = opts.Port + } + + transport := &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: opts.InsecureSkipVerify, + }, + } + + return &Client{ + ip: ip, + port: port, + client: &http.Client{ + Timeout: 30 * time.Second, + Transport: transport, + }, + }, nil +} + +func (c *Client) Endpoint() string { + return fmt.Sprintf("https://%s:%s", c.ip, c.port) +} + +// Do makes a request to the given endpoint with the given method. It marshals the request and unmarshals +// server response body if the provided response is not nil. +// The endpoint should _not_ have a leading slash. +func (c *Client) Do(ctx context.Context, method, endpoint string, request any, response any) error { + url := fmt.Sprintf("https://%s:%s/%s", c.ip, c.port, endpoint) + + requestBody, err := json.Marshal(request) + if err != nil { + return fmt.Errorf("failed to prepare worker info request: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewBuffer(requestBody)) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + + res, err := c.client.Do(req) + if err != nil { + return fmt.Errorf("failed to call cluster agent: %w", err) + } + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + // NOTE(hue): Marshal and print any response that we got since it might contain valuable information + // on why the request failed. + // Ignore JSON errors to prevent unnecessarily complicated error handling. + anyResp := make(map[string]any) + _ = json.NewDecoder(res.Body).Decode(&anyResp) + b, _ := json.Marshal(anyResp) + resStr := string(b) + + return fmt.Errorf("HTTP request to cluster agent failed with status code %d, got response: %q", res.StatusCode, resStr) + } + + if response != nil { + if err := json.NewDecoder(res.Body).Decode(response); err != nil { + return fmt.Errorf("failed to decode response: %w", err) + } + } + + return nil +} diff --git a/pkg/clusteragent/clusteragent_test.go b/pkg/clusteragent/clusteragent_test.go new file mode 100644 index 0000000..d999e9d --- /dev/null +++ b/pkg/clusteragent/clusteragent_test.go @@ -0,0 +1,184 @@ +package clusteragent_test + +import ( + "context" + "encoding/json" + "fmt" + "net" + "net/http" + "net/http/httptest" + "strings" + "testing" + + . "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/util/sets" + + "github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/clusteragent" + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" +) + +func TestClient(t *testing.T) { + t.Run("CanNotFindAddress", func(t *testing.T) { + g := NewWithT(t) + + // Machines don't have any addresses. + machines := []clusterv1.Machine{{}, {}} + _, err := clusteragent.NewClient(machines, clusteragent.Options{}) + + g.Expect(err).To(HaveOccurred()) + + // The only machine is the ignored one. + addr := "1.1.1.1" + machines = []clusterv1.Machine{ + { + Status: clusterv1.MachineStatus{ + Addresses: clusterv1.MachineAddresses{ + { + Address: addr, + }, + }, + }, + }, + } + _, err = clusteragent.NewClient(machines, clusteragent.Options{IgnoreNodeIPs: sets.NewString(addr)}) + + g.Expect(err).To(HaveOccurred()) + }) + + t.Run("CorrectEndpoint", func(t *testing.T) { + g := NewWithT(t) + + port := "30000" + firstAddr := "1.1.1.1" + secondAddr := "2.2.2.2" + thirdAddr := "3.3.3.3" + ignoreAddr := "8.8.8.8" + machines := []clusterv1.Machine{ + { + Status: clusterv1.MachineStatus{ + Addresses: clusterv1.MachineAddresses{ + { + Address: firstAddr, + }, + }, + }, + }, + { + Status: clusterv1.MachineStatus{ + Addresses: clusterv1.MachineAddresses{ + { + Address: secondAddr, + }, + }, + }, + }, + { + Status: clusterv1.MachineStatus{ + Addresses: clusterv1.MachineAddresses{ + { + Address: thirdAddr, + }, + }, + }, + }, + } + + opts := clusteragent.Options{ + IgnoreNodeIPs: sets.NewString(ignoreAddr), + Port: port, + } + + // NOTE(Hue): Repeat the test to make sure the IP is not picked by chance (reduce flakiness). + for i := 0; i < 30; i++ { + c, err := clusteragent.NewClient(machines, opts) + + g.Expect(err).ToNot(HaveOccurred()) + + // Check if the endpoint is one of the expected ones and not the ignored one. + g.Expect([]string{fmt.Sprintf("https://%s:%s", firstAddr, port), fmt.Sprintf("https://%s:%s", secondAddr, port), fmt.Sprintf("https://%s:%s", thirdAddr, port)}).To(ContainElement(c.Endpoint())) + g.Expect(c.Endpoint()).ToNot(Equal(fmt.Sprintf("https://%s:%s", ignoreAddr, port))) + } + + }) +} + +func TestDo(t *testing.T) { + g := NewWithT(t) + + path := "/random/path" + method := http.MethodPost + resp := map[string]string{ + "key": "value", + } + servM := NewServerMock(method, path, resp) + defer servM.ts.Close() + + ip, port, err := net.SplitHostPort(strings.TrimPrefix(servM.ts.URL, "https://")) + g.Expect(err).ToNot(HaveOccurred()) + c, err := clusteragent.NewClient([]clusterv1.Machine{ + { + Status: clusterv1.MachineStatus{ + Addresses: clusterv1.MachineAddresses{ + { + Address: ip, + }, + }, + }, + }, + }, clusteragent.Options{Port: port, InsecureSkipVerify: true}) + + g.Expect(err).ToNot(HaveOccurred()) + + response := make(map[string]string) + req := map[string]string{"req": "value"} + path = strings.TrimPrefix(path, "/") + g.Expect(c.Do(context.Background(), method, path, req, &response)).To(Succeed()) + + g.Expect(response).To(Equal(resp)) +} + +type serverMock struct { + method string + path string + response any + request map[string]any + ts *httptest.Server +} + +// NewServerMock creates a test server that responds with the given response when called with the given method and path. +// Make sure to close the server after the test is done. +// Server will try to decode the request body into a map[string]any. +func NewServerMock(method string, path string, response any) *serverMock { + req := make(map[string]any) + ts := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != path { + http.NotFound(w, r) + return + } + if r.Method != method { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + if response != nil { + if err := json.NewEncoder(w).Encode(response); err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + } + w.WriteHeader(http.StatusOK) + })) + + return &serverMock{ + method: method, + path: path, + response: response, + request: req, + ts: ts, + } +} diff --git a/pkg/clusteragent/remove_node.go b/pkg/clusteragent/remove_node.go new file mode 100644 index 0000000..20e62e8 --- /dev/null +++ b/pkg/clusteragent/remove_node.go @@ -0,0 +1,13 @@ +package clusteragent + +import ( + "context" + "net/http" +) + +// RemoveNodeFromDqlite calls the /v2/dqlite/remove endpoint on cluster agent to remove the given address from Dqlite. +// The endpoint should be in the format of "address:port". +func (p *Client) RemoveNodeFromDqlite(ctx context.Context, removeEp string) error { + request := map[string]string{"removeEndpoint": removeEp} + return p.Do(ctx, http.MethodPost, "cluster/api/v2.0/dqlite/remove", request, nil) +} diff --git a/pkg/clusteragent/remove_node_test.go b/pkg/clusteragent/remove_node_test.go new file mode 100644 index 0000000..9658cdf --- /dev/null +++ b/pkg/clusteragent/remove_node_test.go @@ -0,0 +1,41 @@ +package clusteragent_test + +import ( + "context" + "net" + "net/http" + "strings" + "testing" + + . "github.com/onsi/gomega" + + "github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/clusteragent" + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" +) + +func TestRemoveFromDqlite(t *testing.T) { + g := NewWithT(t) + + path := "/cluster/api/v2.0/dqlite/remove" + method := http.MethodPost + servM := NewServerMock(method, path, nil) + defer servM.ts.Close() + + ip, port, err := net.SplitHostPort(strings.TrimPrefix(servM.ts.URL, "https://")) + g.Expect(err).ToNot(HaveOccurred()) + c, err := clusteragent.NewClient([]clusterv1.Machine{ + { + Status: clusterv1.MachineStatus{ + Addresses: clusterv1.MachineAddresses{ + { + Address: ip, + }, + }, + }, + }, + }, clusteragent.Options{Port: port, InsecureSkipVerify: true}) + + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(c.RemoveNodeFromDqlite(context.Background(), "1.1.1.1:1234")).To(Succeed()) + g.Expect(servM.request).To(HaveKeyWithValue("removeEndpoint", "1.1.1.1:1234")) +}