From 377a648ee387ac2fd31c3bea39f1ee9ec49913c0 Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Mon, 3 Feb 2025 14:24:30 +0530 Subject: [PATCH] grpc based transport --- .../clients/grpctransport/grpc_transport.go | 134 ++++++++++++++++++ .../grpctransport/grpc_transport_test.go | 105 ++++++++++++++ 2 files changed, 239 insertions(+) create mode 100644 xds/internal/clients/grpctransport/grpc_transport.go create mode 100644 xds/internal/clients/grpctransport/grpc_transport_test.go diff --git a/xds/internal/clients/grpctransport/grpc_transport.go b/xds/internal/clients/grpctransport/grpc_transport.go new file mode 100644 index 000000000000..d71398210324 --- /dev/null +++ b/xds/internal/clients/grpctransport/grpc_transport.go @@ -0,0 +1,134 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package grpctransport provides an implementation of the +// [clients.TransportBuilder] interface using gRPC. +package grpctransport + +import ( + "context" + "fmt" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/xds/internal/clients" +) + +// ServerConfigExtension extends the [clients.ServerConfig] for the gRPC-based +// transport builder. Any implementation of this must implement ServerConfig() +// method. +type ServerConfigExtension interface { + ServerConfig() *ServerConfig +} + +// ServerConfig holds the settings for connecting to an xDS management server +// using gRPC. +type ServerConfig struct { + // Credentials is the credential bundle containing the gRPC credentials for + // connecting to the xDS management server. + Credentials credentials.Bundle +} + +// ServerConfig returns the ServerConfig itself. This method is designed +// to satisfy [ServerConfigExtension] interface requirement. +func (s *ServerConfig) ServerConfig() *ServerConfig { + return s +} + +// Builder provides a way to build a gRPC-based transport to an xDS management +// server. +type Builder struct{} + +// Build creates a new gRPC-based transport to an xDS management server using +// the provided [clients.ServerConfig]. This involves creating a +// grpc.ClientConn to the server using the provided credentials and server URI. +// +// If any of ServerURI or Extensions of `sc` are not present, Build() will return +// an error. +func (b *Builder) Build(sc clients.ServerConfig) (clients.Transport, error) { + if sc.ServerURI == "" { + return nil, fmt.Errorf("xds: ServerConfig's ServerURI field cannot be empty") + } + if sc.Extensions == nil { + return nil, fmt.Errorf("xds: ServerConfig's Extensions field cannot be nil for gRPC transport") + } + gtsce, ok := sc.Extensions.(ServerConfigExtension) + if !ok { + return nil, fmt.Errorf("xds: ServerConfig's Extensions field cannot be anything other than grpctransport.ServerConfigExtension for gRPC transport") + } + gtsc := gtsce.ServerConfig() + if gtsc.Credentials == nil { + return nil, fmt.Errorf("xsd: ServerConfigExtensions's Credentials field cannot be nil for gRPC transport") + } + + // Dial the xDS management server with the provided credentials, server URI, + // and a static keepalive configuration that is common across gRPC language + // implementations. + kpCfg := grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 5 * time.Minute, + Timeout: 20 * time.Second, + }) + cc, err := grpc.NewClient(sc.ServerURI, kpCfg, grpc.WithCredentialsBundle(gtsc.Credentials)) + if err != nil { + return nil, fmt.Errorf("error creating grpc client for server uri %s, %v", sc.ServerURI, err) + } + cc.Connect() + + return &grpcTransport{cc: cc}, nil +} + +type grpcTransport struct { + cc *grpc.ClientConn +} + +// NewStream creates a new gRPC stream to the xDS management server for the +// specified method. The returned Stream interface can be used to send and +// receive messages on the stream. +func (g *grpcTransport) NewStream(ctx context.Context, method string) (clients.Stream, error) { + s, err := g.cc.NewStream(ctx, &grpc.StreamDesc{StreamName: method, ClientStreams: true, ServerStreams: true}, method) + if err != nil { + return nil, err + } + return &stream{stream: s}, nil +} + +type stream struct { + stream grpc.ClientStream +} + +// Send sends a message to the xDS management server. +func (s *stream) Send(msg []byte) error { + return s.stream.SendMsg(msg) +} + +// Recv receives a message from the xDS management server. +func (s *stream) Recv() ([]byte, error) { + var typedRes []byte + err := s.stream.RecvMsg(&typedRes) + if err != nil { + return typedRes, err + } + return typedRes, nil +} + +// Close closes the gRPC stream to the xDS management server. +func (g *grpcTransport) Close() error { + return g.cc.Close() +} diff --git a/xds/internal/clients/grpctransport/grpc_transport_test.go b/xds/internal/clients/grpctransport/grpc_transport_test.go new file mode 100644 index 000000000000..cd701c596f4f --- /dev/null +++ b/xds/internal/clients/grpctransport/grpc_transport_test.go @@ -0,0 +1,105 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpctransport + +import ( + "testing" + + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/xds/internal/clients" +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +// TestBuild verifies that the grpctransport.Builder creates a new +// grpc.ClientConn every time Build() is called. +// +// It covers the following scenarios: +// - ServerURI is empty. +// - Extensions is nil. +// - Extensions is not ServerConfigExtension. +// - Credentials are nil. +// - Success cases. +func (s) TestBuild(t *testing.T) { + tests := []struct { + name string + serverCfg clients.ServerConfig + wantErr bool + }{ + { + name: "ServerURI_empty", + serverCfg: clients.ServerConfig{ + ServerURI: "", + Extensions: &ServerConfig{Credentials: insecure.NewBundle()}, + }, + wantErr: true, + }, + { + name: "Extensions_nil", + serverCfg: clients.ServerConfig{ServerURI: "server-address"}, + wantErr: true, + }, + { + name: "Extensions_not_ServerConfigExtension", + serverCfg: clients.ServerConfig{ + ServerURI: "server-address", + Extensions: 1, + }, + wantErr: true, + }, + { + name: "ServerConfigExtension_Credentials_nil", + serverCfg: clients.ServerConfig{ + ServerURI: "server-address", + Extensions: &ServerConfig{}, + }, + wantErr: true, + }, + { + name: "success", + serverCfg: clients.ServerConfig{ + ServerURI: "server-address", + Extensions: &ServerConfig{Credentials: insecure.NewBundle()}, + }, + wantErr: false, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + b := &Builder{} + tr, err := b.Build(test.serverCfg) + if (err != nil) != test.wantErr { + t.Fatalf("Build() error = %v, wantErr %v", err, test.wantErr) + } + if tr != nil { + defer tr.Close() + } + if !test.wantErr && tr == nil { + t.Fatalf("got non-nil transport from Build(), want nil") + } + }) + } +}