Skip to content

Commit

Permalink
feat: add TLS config to enable xDS Auth (#13)
Browse files Browse the repository at this point in the history
* feat: xDS auth
  • Loading branch information
ppzqh authored Mar 29, 2023
1 parent 5cd7573 commit 2f5e010
Show file tree
Hide file tree
Showing 10 changed files with 246 additions and 26 deletions.
114 changes: 114 additions & 0 deletions core/manager/auth/jwt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright 2023 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package auth

import (
"context"
"fmt"
"os"

"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/remote"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/metadata"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/transport"
)

const (
jwtTokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token" // token path in the pod.

jwtTokenKey = "Authorization" // Istiod gets the jwt from meta using this key.

// If Istio is deployed in a different cluster, set this env as the cluster id of this service.
// Usually, we can get the value using "kubectl config get-clusters".
clusterIDEnvKey = "ISTIO_META_CLUSTER_ID"

clusterIDMetadataKey = "clusterid" // Istiod retrieves clusterid and use it for auth of JWT.
)

var (
clusterID string
jwtToken string
)

func init() {
// init clusterID
clusterID = os.Getenv(clusterIDEnvKey)
// watch jwtToken file
if token, err := getJWTToken(); err != nil {
klog.Warnf("[XDS] Auth, getJWTToken error=%s. Ignore this log if not deploying on multiple clusters.\n", err.Error())
} else {
jwtToken = token
}
}

// ClientHTTP2JwtHandler is used to set jwt token to http2 header
var ClientHTTP2JwtHandler = &clientHTTP2JwtHandler{}

type clientHTTP2JwtHandler struct{}

var (
_ remote.MetaHandler = ClientHTTP2JwtHandler
_ remote.StreamingMetaHandler = ClientHTTP2JwtHandler
)

func (*clientHTTP2JwtHandler) OnConnectStream(ctx context.Context) (context.Context, error) {
ri := rpcinfo.GetRPCInfo(ctx)
if !isGRPC(ri) {
return ctx, nil
}
var md metadata.MD
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.MD{}
}
// Set JWT and clusterID for auth
md.Set(jwtTokenKey, jwtTokenValueFmt(jwtToken))
md.Set(clusterIDMetadataKey, clusterID)
return metadata.NewOutgoingContext(ctx, md), nil
}

func (*clientHTTP2JwtHandler) OnReadStream(ctx context.Context) (context.Context, error) {
return ctx, nil
}

func (ch *clientHTTP2JwtHandler) WriteMeta(ctx context.Context, msg remote.Message) (context.Context, error) {
return ctx, nil
}

func (ch *clientHTTP2JwtHandler) ReadMeta(ctx context.Context, msg remote.Message) (context.Context, error) {
return ctx, nil
}

func isGRPC(ri rpcinfo.RPCInfo) bool {
return ri.Config().TransportProtocol()&transport.GRPC == transport.GRPC
}

var jwtTokenValueFmt = func(jwtToken string) string {
return fmt.Sprintf("Bearer %s", jwtToken)
}

func getJWTToken() (string, error) {
saToken := jwtTokenPath

token, err := os.ReadFile(saToken)
if err != nil {
return "", err
}

return string(token), nil
}
30 changes: 30 additions & 0 deletions core/manager/auth/tls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2023 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package auth

import (
"crypto/tls"
)

// GetTLSConfig returns a tls.Config for xDS Client.
func GetTLSConfig(xdsServerName string) (*tls.Config, error) {
cfg := &tls.Config{
// Skip this and use JWT for auth in Istiod.
InsecureSkipVerify: true,
}
return cfg, nil
}
31 changes: 31 additions & 0 deletions core/manager/auth/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2023 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package auth

import (
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/codes"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/status"
)

func IsAuthError(err error) bool {
if s, ok := status.FromError(err); ok {
if s.Code() == codes.Unauthenticated {
return true
}
}
return false
}
30 changes: 13 additions & 17 deletions core/manager/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ import (
)

const (
PodNamespace = "POD_NAMESPACE"
PodName = "POD_NAME"
InstanceIP = "INSTANCE_IP"
IstiodAddr = "istiod.istio-system.svc:15010"
IstioVersion = "ISTIO_VERSION"
nodeIDSuffix = "svc.cluster.local"
PodNamespace = "POD_NAMESPACE"
PodName = "POD_NAME"
InstanceIP = "INSTANCE_IP"
IstiodAddr = "istiod.istio-system.svc:15010"
IstiodSvrName = "istiod.istio-system.svc"
IstioVersion = "ISTIO_VERSION"
nodeIDSuffix = "svc.cluster.local"
)

type BootstrapConfig struct {
Expand All @@ -39,12 +40,14 @@ type BootstrapConfig struct {
}

type XDSServerConfig struct {
SvrAddr string
NDSNotRequired bool // required by default for Istio
SvrName string // The name of the xDS server
SvrAddr string // The address of the xDS server
XDSAuth bool // If this xDS enable the authentication of xDS stream
NDSNotRequired bool // required by default for Istio
}

// newBootstrapConfig constructs the bootstrapConfig
func newBootstrapConfig(xdsSvrAddress string) (*BootstrapConfig, error) {
func newBootstrapConfig(config *XDSServerConfig) (*BootstrapConfig, error) {
// Get info from env
// Info needed for construct the nodeID
namespace := os.Getenv(PodNamespace)
Expand All @@ -62,11 +65,6 @@ func newBootstrapConfig(xdsSvrAddress string) (*BootstrapConfig, error) {
// specify the version of istio in case of the canary deployment of istiod
istioVersion := os.Getenv(IstioVersion)

// use default istiod address if not specified
if xdsSvrAddress == "" {
xdsSvrAddress = IstiodAddr
}

return &BootstrapConfig{
node: &v3core.Node{
//"sidecar~" + podIP + "~" + podName + "." + namespace + "~" + namespace + ".svc.cluster.local",
Expand All @@ -79,8 +77,6 @@ func newBootstrapConfig(xdsSvrAddress string) (*BootstrapConfig, error) {
},
},
},
xdsSvrCfg: &XDSServerConfig{
SvrAddr: xdsSvrAddress,
},
xdsSvrCfg: config,
}, nil
}
25 changes: 21 additions & 4 deletions core/manager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cenkalti/backoff/v4"

"github.com/kitex-contrib/xds/core/api/kitex_gen/envoy/service/discovery/v3/aggregateddiscoveryservice"
"github.com/kitex-contrib/xds/core/manager/auth"
"github.com/kitex-contrib/xds/core/xdsresource"

"github.com/cloudwego/kitex/client"
Expand All @@ -41,10 +42,20 @@ type (
)

// newADSClient constructs a new stream client that communicates with the xds server
func newADSClient(addr string) (ADSClient, error) {
cli, err := aggregateddiscoveryservice.NewClient("xds_servers",
client.WithHostPorts(addr),
)
func newADSClient(xdsSvrCfg *XDSServerConfig) (ADSClient, error) {
var opts []client.Option
opts = append(opts, client.WithHostPorts(xdsSvrCfg.SvrAddr))

if xdsSvrCfg.XDSAuth {
if tlsConfig, err := auth.GetTLSConfig(xdsSvrCfg.SvrAddr); err != nil {
return nil, err
} else {
opts = append(opts, client.WithGRPCTLSConfig(tlsConfig))
}
opts = append(opts, client.WithMetaHandler(auth.ClientHTTP2JwtHandler))
}

cli, err := aggregateddiscoveryservice.NewClient(xdsSvrCfg.SvrName, opts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -252,6 +263,12 @@ func (c *xdsClient) receiver(as ADSStream) {
if err != nil {
klog.Errorf("KITEX: [XDS] client, receive failed, error=%s", err)
currStream.Close()
if auth.IsAuthError(err) {
// if it is auth error, return directly.
klog.Errorf("KITEX: [XDS] client, authentication of the control plane failed, close the xDS client. Please check the error log in control plane for more details.")
c.close()
return
}
if s, e := c.reconnect(); e == nil {
currStream = s
}
Expand Down
2 changes: 1 addition & 1 deletion core/manager/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func Test_newXdsClient(t *testing.T) {

c, err := initXDSClient(&BootstrapConfig{
node: &v3core.Node{},
xdsSvrCfg: &XDSServerConfig{SvrAddr: address},
xdsSvrCfg: &XDSServerConfig{SvrAddr: address, SvrName: IstiodSvrName},
}, nil)
defer c.close()
assert.Nil(t, err)
Expand Down
4 changes: 2 additions & 2 deletions core/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func NewXDSResourceManager(bootstrapConfig *BootstrapConfig, opts ...Option) (*x
}
// Initial xds client
if bootstrapConfig == nil {
bootstrapConfig, err = newBootstrapConfig(m.opts.XDSSvrConfig.SvrAddr)
bootstrapConfig, err = newBootstrapConfig(m.opts.XDSSvrConfig)
if err != nil {
return nil, err
}
Expand All @@ -92,7 +92,7 @@ func NewXDSResourceManager(bootstrapConfig *BootstrapConfig, opts ...Option) (*x

func initXDSClient(bootstrapConfig *BootstrapConfig, m *xdsResourceManager) (*xdsClient, error) {
// build ads client that communicates with the xds server
ac, err := newADSClient(bootstrapConfig.xdsSvrCfg.SvrAddr)
ac, err := newADSClient(bootstrapConfig.xdsSvrCfg)
if err != nil {
return nil, fmt.Errorf("[XDS] client: construct ads client failed, %s", err.Error())
}
Expand Down
1 change: 1 addition & 0 deletions core/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var (
}
XdsServerConfig = &XDSServerConfig{
SvrAddr: XdsServerAddress,
SvrName: IstiodSvrName,
}
XdsBootstrapConfig = &BootstrapConfig{
node: NodeProto,
Expand Down
23 changes: 21 additions & 2 deletions core/manager/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package manager

import "fmt"

type Options struct {
XDSSvrConfig *XDSServerConfig
DumpPath string
Expand All @@ -31,13 +33,30 @@ type Option struct {
F func(o *Options)
}

func NewOptions(opts []Option) *Options {
o := &Options{
func DefaultOptions() *Options {
return &Options{
XDSSvrConfig: &XDSServerConfig{
SvrAddr: IstiodAddr,
SvrName: IstiodSvrName,
XDSAuth: false,
},
DumpPath: defaultDumpPath,
}
}

func NewOptions(opts []Option) *Options {
o := DefaultOptions()
o.Apply(opts)
return o
}

func CheckXDSSvrConfig(cfg *XDSServerConfig) error {
if cfg.SvrAddr == "" {
return fmt.Errorf("[XDS] Option: xDS server address should be specified")
}
if cfg.SvrName == "" {
// set default server name
cfg.SvrName = IstiodSvrName
}
return nil
}
12 changes: 12 additions & 0 deletions option.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,15 @@ func WithXDSServerAddress(address string) manager.Option {
},
}
}

// WithXDSServerConfig set the xDS server config.
func WithXDSServerConfig(cfg *manager.XDSServerConfig) manager.Option {
return manager.Option{
F: func(o *manager.Options) {
if err := manager.CheckXDSSvrConfig(cfg); err != nil {
panic(err)
}
o.XDSSvrConfig = cfg
},
}
}

0 comments on commit 2f5e010

Please sign in to comment.