Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: track opened grpc connections #10284

Merged
merged 1 commit into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (suite *TimedSuite) TestTime() {
grpc.WithContextDialer(dialer.DialUnix()),
)
suite.Require().NoError(err)
suite.T().Cleanup(func() { conn.Close() }) //nolint:errcheck

nClient := timeapi.NewTimeServiceClient(conn)
reply, err := nClient.Time(context.Background(), &emptypb.Empty{})
Expand Down Expand Up @@ -108,6 +109,7 @@ func (suite *TimedSuite) TestTimeCheck() {
grpc.WithContextDialer(dialer.DialUnix()),
)
suite.Require().NoError(err)
suite.T().Cleanup(func() { conn.Close() }) //nolint:errcheck

nClient := timeapi.NewTimeServiceClient(conn)
reply, err := nClient.TimeCheck(context.Background(), &timeapi.TimeRequest{Server: testServer})
Expand Down
13 changes: 11 additions & 2 deletions internal/pkg/cri/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package cri

import (
"fmt"
"runtime/pprof"
"time"

"google.golang.org/grpc"
Expand All @@ -15,6 +16,8 @@ import (
"github.com/siderolabs/talos/pkg/grpc/dialer"
)

var newClientPprof = pprof.NewProfile("internal/pkg/cri.NewClient")

// Client is a lightweight implementation of CRI client.
type Client struct {
conn *grpc.ClientConn
Expand All @@ -38,14 +41,20 @@ func NewClient(endpoint string, _ time.Duration) (*Client, error) {
return nil, fmt.Errorf("error connecting to CRI: %w", err)
}

return &Client{
res := &Client{
conn: conn,
runtimeClient: runtimeapi.NewRuntimeServiceClient(conn),
imagesClient: runtimeapi.NewImageServiceClient(conn),
}, nil
}

newClientPprof.Add(res, 1)

return res, nil
}

// Close connection.
func (c *Client) Close() error {
newClientPprof.Remove(c)

return c.conn.Close()
}
4 changes: 4 additions & 0 deletions internal/pkg/encryption/keys/kms.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func (h *KMSKeyHandler) NewKey(ctx context.Context) (*encryption.Key, token.Toke
return nil, nil, fmt.Errorf("error dialing KMS endpoint %q: %w", h.kmsEndpoint, err)
}

defer conn.Close() //nolint:errcheck

client := kms.NewKMSServiceClient(conn)

key := make([]byte, 32)
Expand Down Expand Up @@ -102,6 +104,8 @@ func (h *KMSKeyHandler) GetKey(ctx context.Context, t token.Token) (*encryption.
return nil, fmt.Errorf("error dialing KMS endpoint %q: %w", h.kmsEndpoint, err)
}

defer conn.Close() //nolint:errcheck

client := kms.NewKMSServiceClient(conn)

systemInformation, err := h.getSystemInfo(ctx)
Expand Down
7 changes: 7 additions & 0 deletions pkg/grpc/gen/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"errors"
"fmt"
"runtime/pprof"
"strings"
"time"

Expand All @@ -21,6 +22,8 @@ import (
"github.com/siderolabs/talos/pkg/machinery/constants"
)

var remoteGeneratorPprof = pprof.NewProfile("pkg/grpc/gen.RemoteGenerator")

// RemoteGenerator represents the OS identity generator.
type RemoteGenerator struct {
conn *grpc.ClientConn
Expand All @@ -37,6 +40,8 @@ func NewRemoteGenerator(token string, endpoints []string, acceptedCAs []*x509.PE

g = &RemoteGenerator{}

remoteGeneratorPprof.Add(g, 1)

conn, err := basic.NewConnection(fmt.Sprintf("%s:///%s", resolver.RoundRobinResolverScheme, strings.Join(endpoints, ",")), basic.NewTokenCredentials(token), acceptedCAs)
if err != nil {
return nil, err
Expand Down Expand Up @@ -87,5 +92,7 @@ func (g *RemoteGenerator) IdentityContext(ctx context.Context, csr *x509.Certifi

// Close closes the gRPC client connection.
func (g *RemoteGenerator) Close() error {
remoteGeneratorPprof.Remove(g)

return g.conn.Close()
}
75 changes: 75 additions & 0 deletions pkg/machinery/client/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package client_test

import (
"context"
"errors"
"fmt"
"runtime"
"runtime/pprof"
"time"

"github.com/siderolabs/gen/ensure"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/siderolabs/talos/pkg/machinery/client"
)

func ExampleNew() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

connPprof := pprof.Lookup("machinery/client/grpc.grpcConn")
if connPprof == nil {
panic(errors.New("profile machinery/client/grpc.grpcConn not found"))
}

fmt.Println("before:", connPprof.Count())

c := ensure.Value(
client.New(
ctx,
client.WithUnixSocket("/path/to/socket"),
client.WithGRPCDialOptions(
grpc.WithTransportCredentials(insecure.NewCredentials()),
),
),
)

fmt.Println("after client.New:", connPprof.Count())

if err := c.Close(); err != nil {
panic(err)
}

fmt.Println("after client.Close:", connPprof.Count())

c2 := ensure.Value(
client.New(
ctx,
client.WithUnixSocket("/path/to/socket"),
client.WithGRPCDialOptions(
grpc.WithTransportCredentials(insecure.NewCredentials()),
),
),
)

fmt.Println("after client.New 2:", connPprof.Count())

_ = c2

runtime.GC()

fmt.Println("after gc:", connPprof.Count())

// Output:
// before: 0
// after client.New: 1
// after client.Close: 0
// after client.New 2: 1
// after gc: 1
}
16 changes: 15 additions & 1 deletion pkg/machinery/client/grpc_connection_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,29 @@ package client

import (
"context"
"runtime/pprof"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

var grpcConnPprof = pprof.NewProfile("machinery/client/grpc.grpcConn")

type grpcConnectionWrapper struct {
*grpc.ClientConn

clusterName string
}

func newGRPCConnectionWrapper(clusterName string, conn *grpc.ClientConn) *grpcConnectionWrapper {
return &grpcConnectionWrapper{
res := &grpcConnectionWrapper{
ClientConn: conn,
clusterName: clusterName,
}

grpcConnPprof.Add(res, 1)

return res
}

// Invoke performs a unary RPC and returns after the response is received
Expand All @@ -35,6 +42,13 @@ func (c *grpcConnectionWrapper) NewStream(ctx context.Context, desc *grpc.Stream
return c.ClientConn.NewStream(c.appendMetadata(ctx), desc, method, opts...)
}

// Close closes the connection.
func (c *grpcConnectionWrapper) Close() error {
grpcConnPprof.Remove(c)

return c.ClientConn.Close()
}

func (c *grpcConnectionWrapper) appendMetadata(ctx context.Context) context.Context {
ctx = metadata.AppendToOutgoingContext(ctx, "runtime", "Talos")

Expand Down