Skip to content

Commit

Permalink
* gRPC connection will be forcefully closed on DNS resolver errors fr…
Browse files Browse the repository at this point in the history
…om now on
  • Loading branch information
asmyasnikov committed Oct 24, 2024
1 parent 2e44601 commit 5eac1d8
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 148 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* gRPC connection will be forcefully closed on DNS resolver errors from now on

## v3.88.0
* Removed UUID methods from ydb.ParamsBuilder()

Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (c *Config) ExcludeGRPCCodesForPessimization() []grpcCodes.Code {
// GrpcDialOptions reports about used grpc dialing options
func (c *Config) GrpcDialOptions() []grpc.DialOption {
return append(
defaultGrpcOptions(c.trace, c.secure, c.tlsConfig),
defaultGrpcOptions(c.secure, c.tlsConfig),
c.grpcOptions...,
)
}
Expand Down
12 changes: 1 addition & 11 deletions config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/balancers"
"github.com/ydb-platform/ydb-go-sdk/v3/credentials"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xresolver"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

Expand All @@ -30,7 +29,7 @@ var (
}
)

func defaultGrpcOptions(t *trace.Driver, secure bool, tlsConfig *tls.Config) (opts []grpc.DialOption) {
func defaultGrpcOptions(secure bool, tlsConfig *tls.Config) (opts []grpc.DialOption) {
opts = append(opts,
// keep-aliving all connections
grpc.WithKeepaliveParams(
Expand All @@ -45,15 +44,6 @@ func defaultGrpcOptions(t *trace.Driver, secure bool, tlsConfig *tls.Config) (op
grpc.MaxCallRecvMsgSize(DefaultGRPCMsgSize),
grpc.MaxCallSendMsgSize(DefaultGRPCMsgSize),
),
// use proxy-resolvers
// 1) for interpret schemas `ydb`, `grpc` and `grpcs` in node URLs as for dns resolver
// 2) for observe resolving events
grpc.WithResolvers(
xresolver.New("", t),
xresolver.New("ydb", t),
xresolver.New("grpc", t),
xresolver.New("grpcs", t),
),
)
if secure {
opts = append(opts, grpc.WithTransportCredentials(
Expand Down
4 changes: 2 additions & 2 deletions driver_string_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestDriver_String(t *testing.T) {
config.WithDatabase("local"),
config.WithSecure(false),
)},
s: `Driver{Endpoint:"localhost",Database:"local",Secure:false,Credentials:Anonymous{From:"github.com/ydb-platform/ydb-go-sdk/v3/config.defaultConfig(defaults.go:90)"}}`, //nolint:lll
s: `Driver{Endpoint:"localhost",Database:"local",Secure:false,Credentials:Anonymous{From:"github.com/ydb-platform/ydb-go-sdk/v3/config.defaultConfig(defaults.go:80)"}}`, //nolint:lll
},
{
name: xtest.CurrentFileLine(),
Expand All @@ -32,7 +32,7 @@ func TestDriver_String(t *testing.T) {
config.WithDatabase("local"),
config.WithSecure(true),
)},
s: `Driver{Endpoint:"localhost",Database:"local",Secure:true,Credentials:Anonymous{From:"github.com/ydb-platform/ydb-go-sdk/v3/config.defaultConfig(defaults.go:90)"}}`, //nolint:lll
s: `Driver{Endpoint:"localhost",Database:"local",Secure:true,Credentials:Anonymous{From:"github.com/ydb-platform/ydb-go-sdk/v3/config.defaultConfig(defaults.go:80)"}}`, //nolint:lll
},
{
name: xtest.CurrentFileLine(),
Expand Down
126 changes: 72 additions & 54 deletions internal/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strings"
"sync/atomic"
"time"

"google.golang.org/grpc"

Expand All @@ -28,24 +29,18 @@ import (

var ErrNoEndpoints = xerrors.Wrap(fmt.Errorf("no endpoints"))

type discoveryClient interface {
closer.Closer

Discover(ctx context.Context) ([]endpoint.Endpoint, error)
}

type Balancer struct {
driverConfig *config.Config
config balancerConfig.Config
pool *conn.Pool
discoveryClient discoveryClient
discoveryRepeater repeater.Repeater
localDCDetector func(ctx context.Context, endpoints []endpoint.Endpoint) (string, error)

connectionsState atomic.Pointer[connectionsState]

mu xsync.RWMutex
onApplyDiscoveredEndpoints []func(ctx context.Context, endpoints []endpoint.Info)
discoveryConfig *discoveryConfig.Config
}

func (b *Balancer) OnUpdate(onApplyDiscoveredEndpoints func(ctx context.Context, endpoints []endpoint.Info)) {
Expand Down Expand Up @@ -82,45 +77,75 @@ func (b *Balancer) clusterDiscovery(ctx context.Context) (err error) {
)
}

func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) {
var (
address = "ydb:///" + b.driverConfig.Endpoint()
onDone = trace.DriverOnBalancerClusterDiscoveryAttempt(
b.driverConfig.Trace(), &ctx,
stack.FunctionID(
"github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer.(*Balancer).clusterDiscoveryAttempt",
),
address,
b.driverConfig.Database(),
)
endpoints []endpoint.Endpoint
localDC string
cancel context.CancelFunc
type clusterDiscoveryAttemptSettings struct {
address string
dialTimeout time.Duration
client interface {
closer.Closer

Discover(ctx context.Context) ([]endpoint.Endpoint, error)
}
}

func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context, opts ...func(settings *clusterDiscoveryAttemptSettings)) (err error) {

Check failure on line 90 in internal/balancer/balancer.go

View workflow job for this annotation

GitHub Actions / golangci-lint

the line is 134 characters long, which exceeds the maximum of 120 characters. (lll)
settings := clusterDiscoveryAttemptSettings{
address: "dns:///" + b.driverConfig.Endpoint(),
dialTimeout: b.driverConfig.DialTimeout(),
}
for _, opt := range opts {
if opt != nil {
opt(&settings)
}
}

onDone := trace.DriverOnBalancerClusterDiscoveryAttempt(
b.driverConfig.Trace(), &ctx,
stack.FunctionID(
"github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer.(*Balancer).clusterDiscoveryAttempt",
),
settings.address,
b.driverConfig.Database(),
)
defer func() {
onDone(err)
}()

if dialTimeout := b.driverConfig.DialTimeout(); dialTimeout > 0 {
ctx, cancel = xcontext.WithTimeout(ctx, dialTimeout)
} else {
ctx, cancel = xcontext.WithCancel(ctx)
if settings.dialTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = xcontext.WithTimeout(ctx, settings.dialTimeout)
defer cancel()
}

if settings.client == nil {
cc, err := grpc.DialContext(ctx, settings.address, b.driverConfig.GrpcDialOptions()...)
if err != nil {
return xerrors.WithStackTrace(err)
}
defer func() {
_ = cc.Close()
}()

settings.client = internalDiscovery.New(ctx, cc, b.discoveryConfig)
defer func() {
_ = settings.client.Close(ctx)
}()
}
defer cancel()

endpoints, err = b.discoveryClient.Discover(ctx)
endpoints, err := settings.client.Discover(ctx)
if err != nil {
return xerrors.WithStackTrace(err)
}

if b.config.DetectNearestDC {
localDC, err = b.localDCDetector(ctx, endpoints)
localDC, err := b.localDCDetector(ctx, endpoints)
if err != nil {
return xerrors.WithStackTrace(err)
}
}

b.applyDiscoveredEndpoints(ctx, endpoints, localDC)
b.applyDiscoveredEndpoints(ctx, endpoints, localDC)
} else {
b.applyDiscoveredEndpoints(ctx, endpoints, "")
}

return nil
}
Expand Down Expand Up @@ -184,10 +209,6 @@ func (b *Balancer) Close(ctx context.Context) (err error) {
b.discoveryRepeater.Stop()
}

if err = b.discoveryClient.Close(ctx); err != nil {
return xerrors.WithStackTrace(err)
}

return nil
}

Expand All @@ -197,30 +218,25 @@ func New(
pool *conn.Pool,
opts ...discoveryConfig.Option,
) (b *Balancer, finalErr error) {
var (
onDone = trace.DriverOnBalancerInit(
driverConfig.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer.New"),
driverConfig.Balancer().String(),
)
discoveryConfig = discoveryConfig.New(append(opts,
discoveryConfig.With(driverConfig.Common),
discoveryConfig.WithEndpoint(driverConfig.Endpoint()),
discoveryConfig.WithDatabase(driverConfig.Database()),
discoveryConfig.WithSecure(driverConfig.Secure()),
discoveryConfig.WithMeta(driverConfig.Meta()),
)...)
onDone := trace.DriverOnBalancerInit(
driverConfig.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer.New"),
driverConfig.Balancer().String(),
)
defer func() {
onDone(finalErr)
}()

b = &Balancer{
driverConfig: driverConfig,
pool: pool,
discoveryClient: internalDiscovery.New(ctx, pool.Get(
endpoint.New(driverConfig.Endpoint()),
), discoveryConfig),
discoveryConfig: discoveryConfig.New(append(opts,
discoveryConfig.With(driverConfig.Common),
discoveryConfig.WithEndpoint(driverConfig.Endpoint()),
discoveryConfig.WithDatabase(driverConfig.Database()),
discoveryConfig.WithSecure(driverConfig.Secure()),
discoveryConfig.WithMeta(driverConfig.Meta()),
)...),
pool: pool,
localDCDetector: detectLocalDC,
}

Expand All @@ -240,9 +256,11 @@ func New(
return nil, xerrors.WithStackTrace(err)
}
// run background discovering
if d := discoveryConfig.Interval(); d > 0 {
b.discoveryRepeater = repeater.New(xcontext.ValueOnly(ctx),
d, b.clusterDiscoveryAttempt,
if d := b.discoveryConfig.Interval(); d > 0 {
b.discoveryRepeater = repeater.New(xcontext.ValueOnly(ctx), d,
func(ctx context.Context) (err error) {
return b.clusterDiscoveryAttempt(ctx)
},
repeater.WithName("discovery"),
repeater.WithTrace(b.driverConfig.Trace()),
)
Expand Down
13 changes: 7 additions & 6 deletions internal/balancer/local_dc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,17 +137,18 @@ func TestLocalDCDiscovery(t *testing.T) {
driverConfig: cfg,
config: *cfg.Balancer(),
pool: conn.NewPool(context.Background(), cfg),
discoveryClient: discoveryMock{endpoints: []endpoint.Endpoint{
&mock.Endpoint{AddrField: "a:123", LocationField: "a"},
&mock.Endpoint{AddrField: "b:234", LocationField: "b"},
&mock.Endpoint{AddrField: "c:456", LocationField: "c"},
}},
localDCDetector: func(ctx context.Context, endpoints []endpoint.Endpoint) (string, error) {
return "b", nil
},
}

err := r.clusterDiscoveryAttempt(ctx)
err := r.clusterDiscoveryAttempt(ctx, func(settings *clusterDiscoveryAttemptSettings) {
settings.client = discoveryMock{endpoints: []endpoint.Endpoint{
&mock.Endpoint{AddrField: "a:123", LocationField: "a"},
&mock.Endpoint{AddrField: "b:234", LocationField: "b"},
&mock.Endpoint{AddrField: "c:456", LocationField: "c"},
}}
})
require.NoError(t, err)

for i := 0; i < 100; i++ {
Expand Down
10 changes: 1 addition & 9 deletions internal/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,13 +216,9 @@ func (c *conn) realConn(ctx context.Context) (cc *grpc.ClientConn, err error) {
onDone(err)
}()

// prepend "ydb" scheme for grpc dns-resolver to find the proper scheme
// three slashes in "ydb:///" is ok. It needs for good parse scheme in grpc resolver.
address := "ydb:///" + c.endpoint.Address()

dialOption := makeDialOption(c.endpoint.OverrideHost())

cc, err = grpc.DialContext(ctx, address, append( //nolint:staticcheck,nolintlint
cc, err = grpc.DialContext(ctx, c.endpoint.Address(), append( //nolint:staticcheck,nolintlint
dialOption,
c.config.GrpcDialOptions()...,
)...)
Expand Down Expand Up @@ -604,10 +600,6 @@ func newConn(e endpoint.Endpoint, config Config, opts ...option) *conn {
return c
}

func New(e endpoint.Endpoint, config Config, opts ...option) Conn {
return newConn(e, config, opts...)
}

var _ stats.Handler = statsHandler{}

type statsHandler struct{}
Expand Down
Loading

0 comments on commit 5eac1d8

Please sign in to comment.