Skip to content

Commit

Permalink
chore: track opened grpc connections
Browse files Browse the repository at this point in the history
This is an alternative to #10283 which doesn't close the connections and instead allows
us to easily track the lost grpc connections using pprof.

Fix various grpc leaks while we are at it.

Signed-off-by: Dmitriy Matrenichev <dmitry.matrenichev@siderolabs.com>
  • Loading branch information
DmitriyMV committed Feb 4, 2025
1 parent 5e28c8e commit 8d36b55
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (suite *TimedSuite) TestTime() {
grpc.WithContextDialer(dialer.DialUnix()),
)
suite.Require().NoError(err)
suite.T().Cleanup(func() { conn.Close() }) //nolint:errcheck

nClient := timeapi.NewTimeServiceClient(conn)
reply, err := nClient.Time(context.Background(), &emptypb.Empty{})
Expand Down Expand Up @@ -108,6 +109,7 @@ func (suite *TimedSuite) TestTimeCheck() {
grpc.WithContextDialer(dialer.DialUnix()),
)
suite.Require().NoError(err)
suite.T().Cleanup(func() { conn.Close() }) //nolint:errcheck

nClient := timeapi.NewTimeServiceClient(conn)
reply, err := nClient.TimeCheck(context.Background(), &timeapi.TimeRequest{Server: testServer})
Expand Down
13 changes: 11 additions & 2 deletions internal/pkg/cri/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package cri

import (
"fmt"
"runtime/pprof"
"time"

"google.golang.org/grpc"
Expand All @@ -15,6 +16,8 @@ import (
"github.com/siderolabs/talos/pkg/grpc/dialer"
)

var newClientPprof = pprof.NewProfile("internal/pkg/cri.NewClient")

// Client is a lightweight implementation of CRI client.
type Client struct {
conn *grpc.ClientConn
Expand All @@ -38,14 +41,20 @@ func NewClient(endpoint string, _ time.Duration) (*Client, error) {
return nil, fmt.Errorf("error connecting to CRI: %w", err)
}

return &Client{
res := &Client{
conn: conn,
runtimeClient: runtimeapi.NewRuntimeServiceClient(conn),
imagesClient: runtimeapi.NewImageServiceClient(conn),
}, nil
}

newClientPprof.Add(res, 1)

return res, nil
}

// Close connection.
func (c *Client) Close() error {
newClientPprof.Remove(c)

return c.conn.Close()
}
4 changes: 4 additions & 0 deletions internal/pkg/encryption/keys/kms.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func (h *KMSKeyHandler) NewKey(ctx context.Context) (*encryption.Key, token.Toke
return nil, nil, fmt.Errorf("error dialing KMS endpoint %q: %w", h.kmsEndpoint, err)
}

defer conn.Close() //nolint:errcheck

client := kms.NewKMSServiceClient(conn)

key := make([]byte, 32)
Expand Down Expand Up @@ -102,6 +104,8 @@ func (h *KMSKeyHandler) GetKey(ctx context.Context, t token.Token) (*encryption.
return nil, fmt.Errorf("error dialing KMS endpoint %q: %w", h.kmsEndpoint, err)
}

defer conn.Close() //nolint:errcheck

client := kms.NewKMSServiceClient(conn)

systemInformation, err := h.getSystemInfo(ctx)
Expand Down
7 changes: 7 additions & 0 deletions pkg/grpc/gen/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"errors"
"fmt"
"runtime/pprof"
"strings"
"time"

Expand All @@ -21,6 +22,8 @@ import (
"github.com/siderolabs/talos/pkg/machinery/constants"
)

var remoteGeneratorPprof = pprof.NewProfile("pkg/grpc/gen.RemoteGenerator")

// RemoteGenerator represents the OS identity generator.
type RemoteGenerator struct {
conn *grpc.ClientConn
Expand All @@ -37,6 +40,8 @@ func NewRemoteGenerator(token string, endpoints []string, acceptedCAs []*x509.PE

g = &RemoteGenerator{}

remoteGeneratorPprof.Add(g, 1)

conn, err := basic.NewConnection(fmt.Sprintf("%s:///%s", resolver.RoundRobinResolverScheme, strings.Join(endpoints, ",")), basic.NewTokenCredentials(token), acceptedCAs)
if err != nil {
return nil, err
Expand Down Expand Up @@ -87,5 +92,7 @@ func (g *RemoteGenerator) IdentityContext(ctx context.Context, csr *x509.Certifi

// Close closes the gRPC client connection.
func (g *RemoteGenerator) Close() error {
remoteGeneratorPprof.Remove(g)

return g.conn.Close()
}
75 changes: 75 additions & 0 deletions pkg/machinery/client/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package client_test

import (
"context"
"errors"
"fmt"
"runtime"
"runtime/pprof"
"time"

"github.com/siderolabs/gen/ensure"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/siderolabs/talos/pkg/machinery/client"
)

func ExampleNew() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

connPprof := pprof.Lookup("machinery/client/grpc.grpcConn")
if connPprof == nil {
panic(errors.New("profile machinery/client/grpc.grpcConn not found"))
}

fmt.Println("before:", connPprof.Count())

c := ensure.Value(
client.New(
ctx,
client.WithUnixSocket("/path/to/socket"),
client.WithGRPCDialOptions(
grpc.WithTransportCredentials(insecure.NewCredentials()),
),
),
)

fmt.Println("after client.New :", connPprof.Count())

if err := c.Close(); err != nil {
panic(err)
}

fmt.Println("after client.Close:", connPprof.Count())

c2 := ensure.Value(
client.New(
ctx,
client.WithUnixSocket("/path/to/socket"),
client.WithGRPCDialOptions(
grpc.WithTransportCredentials(insecure.NewCredentials()),
),
),
)

fmt.Println("after client.New 2:", connPprof.Count())

_ = c2

runtime.GC()

fmt.Println("after gc:", connPprof.Count())

// Output:
// before: 0
// after client.New : 1
// after client.Close: 0
// after client.New 2: 1
// after gc: 1
}
16 changes: 15 additions & 1 deletion pkg/machinery/client/grpc_connection_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,29 @@ package client

import (
"context"
"runtime/pprof"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

var grpcConnPprof = pprof.NewProfile("machinery/client/grpc.grpcConn")

type grpcConnectionWrapper struct {
*grpc.ClientConn

clusterName string
}

func newGRPCConnectionWrapper(clusterName string, conn *grpc.ClientConn) *grpcConnectionWrapper {
return &grpcConnectionWrapper{
res := &grpcConnectionWrapper{
ClientConn: conn,
clusterName: clusterName,
}

grpcConnPprof.Add(res, 1)

return res
}

// Invoke performs a unary RPC and returns after the response is received
Expand All @@ -35,6 +42,13 @@ func (c *grpcConnectionWrapper) NewStream(ctx context.Context, desc *grpc.Stream
return c.ClientConn.NewStream(c.appendMetadata(ctx), desc, method, opts...)
}

// Close closes the connection.
func (c *grpcConnectionWrapper) Close() error {
grpcConnPprof.Remove(c)

return c.ClientConn.Close()
}

func (c *grpcConnectionWrapper) appendMetadata(ctx context.Context) context.Context {
ctx = metadata.AppendToOutgoingContext(ctx, "runtime", "Talos")

Expand Down

0 comments on commit 8d36b55

Please sign in to comment.