Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add capi-auth-token header to /dqlite/remove request #68

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions controllers/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

clusterv1beta1 "github.com/canonical/cluster-api-control-plane-provider-microk8s/api/v1beta1"
"github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/clusteragent"
"github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/token"
"golang.org/x/mod/semver"

"github.com/pkg/errors"
Expand Down Expand Up @@ -475,7 +476,7 @@ func (r *MicroK8sControlPlaneReconciler) reconcileDelete(ctx context.Context, cl
}

// clean up MicroK8s cluster secrets
for _, secretName := range []string{"kubeconfig", "ca", "jointoken"} {
for _, secretName := range []string{"kubeconfig", "ca", "jointoken", token.AuthTokenNameSuffix} {
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: cluster.Namespace,
Expand Down Expand Up @@ -594,12 +595,14 @@ func (r *MicroK8sControlPlaneReconciler) scaleDownControlPlane(ctx context.Conte
// The issue is that we were not removing the endpoint from dqlite when we were deleting a machine.
// This would cause a situation were a joining node failed to join because the endpoint was already in the dqlite cluster.
// How? The IP assigned to the joining (new) node, previously belonged to a node that was deleted, but the IP is still there in dqlite.
// If we have 2 or more machines left, get cluster agent client and delete node from dqlite
if len(machines) > 1 {
// If we have 2 machines, deleting one is not safe because it can be the leader and we're not taking care of
// leadership transfers in the cluster-agent for now. Maybe something for later (TODO)
// If we have 3 or more machines left, get cluster agent client and delete node from dqlite.
if len(machines) > 2 {
portRemap := tcp != nil && tcp.Spec.ControlPlaneConfig.ClusterConfiguration != nil && tcp.Spec.ControlPlaneConfig.ClusterConfiguration.PortCompatibilityRemap

if clusterAgentClient, err := getClusterAgentClient(machines, deleteMachine, portRemap); err == nil {
if err := r.removeNodeFromDqlite(ctx, clusterAgentClient, deleteMachine, portRemap); err != nil {
if err := r.removeNodeFromDqlite(ctx, clusterAgentClient, cluster, deleteMachine, portRemap); err != nil {
logger.Error(err, "failed to remove node from dqlite: %w", "machineName", deleteMachine.Name, "nodeName", node.Name)
}
} else {
Expand Down Expand Up @@ -646,7 +649,8 @@ func getClusterAgentClient(machines []clusterv1.Machine, delMachine clusterv1.Ma
}

// removeMicrok8sNode removes the node from
func (r *MicroK8sControlPlaneReconciler) removeNodeFromDqlite(ctx context.Context, clusterAgentClient *clusteragent.Client, delMachine clusterv1.Machine, portRemap bool) error {
func (r *MicroK8sControlPlaneReconciler) removeNodeFromDqlite(ctx context.Context, clusterAgentClient *clusteragent.Client,
clusterKey client.ObjectKey, delMachine clusterv1.Machine, portRemap bool) error {
dqlitePort := defaultDqlitePort
if portRemap {
// https://github.com/canonical/cluster-api-control-plane-provider-microk8s/blob/v0.6.10/control-plane-components.yaml#L96-L102
Expand All @@ -665,7 +669,12 @@ func (r *MicroK8sControlPlaneReconciler) removeNodeFromDqlite(ctx context.Contex
return fmt.Errorf("failed to extract endpoint of the deleting machine %q", delMachine.Name)
}

if err := clusterAgentClient.RemoveNodeFromDqlite(ctx, removeEp); err != nil {
token, err := token.Lookup(ctx, r.Client, clusterKey)
if err != nil {
return fmt.Errorf("failed to lookup token: %w", err)
}

if err := clusterAgentClient.RemoveNodeFromDqlite(ctx, token, removeEp); err != nil {
return fmt.Errorf("failed to remove node %q from dqlite: %w", removeEp, err)
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
cloud.google.com/go/compute v1.10.0 // indirect
github.com/blang/semver v3.5.1+incompatible // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.20.0 // indirect
Expand Down
6 changes: 4 additions & 2 deletions pkg/clusteragent/clusteragent.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ func (c *Client) Endpoint() string {
return fmt.Sprintf("https://%s:%s", c.ip, c.port)
}

// Do makes a request to the given endpoint with the given method. It marshals the request and unmarshals
// do makes a request to the given endpoint with the given method. It marshals the request and unmarshals
// server response body if the provided response is not nil.
// The endpoint should _not_ have a leading slash.
func (c *Client) Do(ctx context.Context, method, endpoint string, request any, response any) error {
func (c *Client) do(ctx context.Context, method, endpoint string, request any, header map[string][]string, response any) error {
url := fmt.Sprintf("https://%s:%s/%s", c.ip, c.port, endpoint)

requestBody, err := json.Marshal(request)
Expand All @@ -85,6 +85,8 @@ func (c *Client) Do(ctx context.Context, method, endpoint string, request any, r
return fmt.Errorf("failed to create request: %w", err)
}

req.Header = http.Header(header)

res, err := c.client.Do(req)
if err != nil {
return fmt.Errorf("failed to call cluster agent: %w", err)
Expand Down
74 changes: 13 additions & 61 deletions pkg/clusteragent/clusteragent_test.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
package clusteragent_test
package clusteragent

import (
"context"
"encoding/json"
"fmt"
"math/rand"
"net"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

. "github.com/onsi/gomega"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/clusteragent"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"

"github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/httptest"
)

func TestClient(t *testing.T) {
Expand All @@ -26,7 +24,7 @@ func TestClient(t *testing.T) {

// Machines don't have any addresses.
machines := []clusterv1.Machine{{}, {}}
_, err := clusteragent.NewClient(machines, "25000", time.Second, clusteragent.Options{})
_, err := NewClient(machines, "25000", time.Second, Options{})

g.Expect(err).To(HaveOccurred())

Expand All @@ -46,7 +44,7 @@ func TestClient(t *testing.T) {
},
},
}
_, err = clusteragent.NewClient(machines, "25000", time.Second, clusteragent.Options{IgnoreMachineNames: sets.NewString(ignoreName)})
_, err = NewClient(machines, "25000", time.Second, Options{IgnoreMachineNames: sets.NewString(ignoreName)})

g.Expect(err).To(HaveOccurred())
})
Expand Down Expand Up @@ -103,14 +101,14 @@ func TestClient(t *testing.T) {
},
}

opts := clusteragent.Options{
opts := Options{
IgnoreMachineNames: sets.NewString(ignoreName),
}

// NOTE(Hue): Repeat the test to make sure the ignored machine's IP is not picked by chance (reduce flakiness).
for i := 0; i < 100; i++ {
machines = shuffleMachines(machines)
c, err := clusteragent.NewClient(machines, port, time.Second, opts)
c, err := NewClient(machines, port, time.Second, opts)

g.Expect(err).ToNot(HaveOccurred())

Expand All @@ -130,12 +128,12 @@ func TestDo(t *testing.T) {
resp := map[string]string{
"key": "value",
}
servM := NewServerMock(method, path, resp)
defer servM.ts.Close()
servM := httptest.NewServerMock(method, path, resp)
defer servM.Srv.Close()

ip, port, err := net.SplitHostPort(strings.TrimPrefix(servM.ts.URL, "https://"))
ip, port, err := net.SplitHostPort(strings.TrimPrefix(servM.Srv.URL, "https://"))
g.Expect(err).ToNot(HaveOccurred())
c, err := clusteragent.NewClient([]clusterv1.Machine{
c, err := NewClient([]clusterv1.Machine{
{
Status: clusterv1.MachineStatus{
Addresses: clusterv1.MachineAddresses{
Expand All @@ -145,64 +143,18 @@ func TestDo(t *testing.T) {
},
},
},
}, port, time.Second, clusteragent.Options{})
}, port, time.Second, Options{})

g.Expect(err).ToNot(HaveOccurred())

response := make(map[string]string)
req := map[string]string{"req": "value"}
path = strings.TrimPrefix(path, "/")
g.Expect(c.Do(context.Background(), method, path, req, &response)).To(Succeed())
g.Expect(c.do(context.Background(), method, path, req, nil, &response)).To(Succeed())

g.Expect(response).To(Equal(resp))
}

type serverMock struct {
method string
path string
response any
request map[string]any
ts *httptest.Server
}

// NewServerMock creates a test server that responds with the given response when called with the given method and path.
// Make sure to close the server after the test is done.
// Server will try to decode the request body into a map[string]any.
func NewServerMock(method string, path string, response any) *serverMock {
req := make(map[string]any)
ts := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != path {
http.NotFound(w, r)
return
}
if r.Method != method {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}

if response != nil {
if err := json.NewEncoder(w).Encode(response); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
}
w.WriteHeader(http.StatusOK)
}))

return &serverMock{
method: method,
path: path,
response: response,
request: req,
ts: ts,
}
}

func shuffleMachines(src []clusterv1.Machine) []clusterv1.Machine {
dest := make([]clusterv1.Machine, len(src))
perm := rand.Perm(len(src))
Expand Down
5 changes: 5 additions & 0 deletions pkg/clusteragent/const.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package clusteragent

const (
AuthTokenHeader = "capi-auth-token"
)
9 changes: 6 additions & 3 deletions pkg/clusteragent/remove_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import (

// RemoveNodeFromDqlite calls the /v2/dqlite/remove endpoint on cluster agent to remove the given address from Dqlite.
// The endpoint should be in the format of "address:port".
func (p *Client) RemoveNodeFromDqlite(ctx context.Context, removeEp string) error {
request := map[string]string{"removeEndpoint": removeEp}
return p.Do(ctx, http.MethodPost, "cluster/api/v2.0/dqlite/remove", request, nil)
func (p *Client) RemoveNodeFromDqlite(ctx context.Context, token string, removeEp string) error {
request := map[string]string{"remove_endpoint": removeEp}
header := map[string][]string{
AuthTokenHeader: {token},
}
return p.do(ctx, http.MethodPost, "cluster/api/v2.0/dqlite/remove", request, header, nil)
}
15 changes: 9 additions & 6 deletions pkg/clusteragent/remove_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,22 @@ import (
"time"

. "github.com/onsi/gomega"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"

"github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/clusteragent"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/httptest"
)

func TestRemoveFromDqlite(t *testing.T) {
g := NewWithT(t)

path := "/cluster/api/v2.0/dqlite/remove"
token := "myRandomToken"
method := http.MethodPost
servM := NewServerMock(method, path, nil)
defer servM.ts.Close()
servM := httptest.NewServerMock(method, path, nil)
defer servM.Srv.Close()

ip, port, err := net.SplitHostPort(strings.TrimPrefix(servM.ts.URL, "https://"))
ip, port, err := net.SplitHostPort(strings.TrimPrefix(servM.Srv.URL, "https://"))
g.Expect(err).ToNot(HaveOccurred())
c, err := clusteragent.NewClient([]clusterv1.Machine{
{
Expand All @@ -37,6 +39,7 @@ func TestRemoveFromDqlite(t *testing.T) {
}, port, time.Second, clusteragent.Options{})

g.Expect(err).ToNot(HaveOccurred())
g.Expect(c.RemoveNodeFromDqlite(context.Background(), "1.1.1.1:1234")).To(Succeed())
g.Expect(servM.request).To(HaveKeyWithValue("removeEndpoint", "1.1.1.1:1234"))
g.Expect(c.RemoveNodeFromDqlite(context.Background(), token, "1.1.1.1:1234")).To(Succeed())
g.Expect(servM.Request).To(HaveKeyWithValue("remove_endpoint", "1.1.1.1:1234"))
g.Expect(servM.Header.Get(clusteragent.AuthTokenHeader)).To(Equal(token))
}
62 changes: 62 additions & 0 deletions pkg/httptest/httptest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package httptest

import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
)

type serverMock struct {
Method string
Path string
Response any
Request map[string]any
Header http.Header
Srv *httptest.Server
}

// NewServerMock creates a test server that responds with the given response when called with the given method and path.
// Make sure to close the server after the test is done.
// Server will try to decode the request body into a map[string]any.
func NewServerMock(method string, path string, response any) *serverMock {
req := make(map[string]any)
header := make(map[string][]string)
ts := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != path {
http.NotFound(w, r)
return
}
if r.Method != method {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

for k, vv := range map[string][]string(r.Header) {
header[k] = vv
}

if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}

if response != nil {
if err := json.NewEncoder(w).Encode(response); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
}
w.WriteHeader(http.StatusOK)
}))

fmt.Println("header:", header)
return &serverMock{
Method: method,
Path: path,
Response: response,
Header: header,
Request: req,
Srv: ts,
}
}
Loading
Loading