Skip to content

Commit

Permalink
transport: add GatedMaListener type
Browse files Browse the repository at this point in the history
This introduces a new GatedMaListener type which gates conns
accepted from a manet.Listener with a gater and creates the rcmgr
scope for it. Explicitly passing the scope allows for many guardrails
that the previous interface assertion didn't.

This breaks the previous responsibility of the upgradeListener method
into two, one gating the connection initially, and the other upgrading
the connection with a security and muxer selection.

This split makes it easy to gate the connection with the resource
manager as early as possible. This is especially true for websocket
because we want to gate the connection just after the TCP connection is
established, and not after the tls handshake + websocket upgrade is
completed.
  • Loading branch information
sukunrt committed Feb 16, 2025
1 parent 427ea4b commit f1b1ee9
Show file tree
Hide file tree
Showing 19 changed files with 614 additions and 314 deletions.
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,11 @@ func (cfg *Config) addTransports() ([]fx.Option, error) {
fx.Provide(func() connmgr.ConnectionGater { return cfg.ConnectionGater }),
fx.Provide(func() pnet.PSK { return cfg.PSK }),
fx.Provide(func() network.ResourceManager { return cfg.ResourceManager }),
fx.Provide(func(gater connmgr.ConnectionGater, rcmgr network.ResourceManager) *tcpreuse.ConnMgr {
fx.Provide(func(upgrader transport.Upgrader) *tcpreuse.ConnMgr {
if !cfg.ShareTCPListener {
return nil
}
return tcpreuse.NewConnMgr(tcpreuse.EnvReuseportVal, gater, rcmgr)
return tcpreuse.NewConnMgr(tcpreuse.EnvReuseportVal, upgrader)
}),
fx.Provide(func(cm *quicreuse.ConnManager, sw *swarm.Swarm) libp2pwebrtc.ListenUDPFn {
hasQuicAddrPortFor := func(network string, laddr *net.UDPAddr) bool {
Expand Down
30 changes: 30 additions & 0 deletions core/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,41 @@ type TransportNetwork interface {
AddTransport(t Transport) error
}

// GatedMaListener is listener that listens for raw(unsecured and non-multiplexed) incoming connections,
// gates them with a `connmgr.ConnGater`and creates a resource management scope for them.
// It can be upgraded to a full libp2p transport listener by the Upgrader.
//
// Compared to manet.Listener, this listener creates the resource management scope for the accepted connection.
type GatedMaListener interface {
// Accept waits for and returns the next connection to the listener.
Accept() (manet.Conn, network.ConnManagementScope, error)

// Close closes the listener.
// Any blocked Accept operations will be unblocked and return errors.
Close() error

// Multiaddr returns the listener's (local) Multiaddr.
Multiaddr() ma.Multiaddr

// Addr returns the net.Listener's network address.
Addr() net.Addr
}

// Upgrader is a multistream upgrader that can upgrade an underlying connection
// to a full transport connection (secure and multiplexed).
type Upgrader interface {
// UpgradeListener upgrades the passed multiaddr-net listener into a full libp2p-transport listener.
//
// Deprecated: Use UpgradeGatedMaListener(upgrader.GateMaListener(manet.Listener)) instead.
UpgradeListener(Transport, manet.Listener) Listener

// GateMaListener creates a GatedMaListener from a manet.Listener. It gates the accepted connection
// and creates a resource scope for it.
GateMaListener(manet.Listener) GatedMaListener

// UpgradeGatedMaListener upgrades the passed GatedMaListener into a full libp2p-transport listener.
UpgradeGatedMaListener(Transport, GatedMaListener) Listener

// Upgrade upgrades the multiaddr/net connection into a full libp2p-transport connection.
Upgrade(ctx context.Context, t Transport, maconn manet.Conn, dir network.Direction, p peer.ID, scope network.ConnManagementScope) (CapableConn, error)
}
Expand Down
84 changes: 47 additions & 37 deletions p2p/net/upgrader/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"
"sync"

"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/transport"

Expand All @@ -17,7 +18,7 @@ import (
var log = logging.Logger("upgrader")

type listener struct {
manet.Listener
transport.GatedMaListener

transport transport.Transport
upgrader *upgrader
Expand All @@ -35,10 +36,12 @@ type listener struct {
cancel func()
}

var _ transport.Listener = (*listener)(nil)

// Close closes the listener.
func (l *listener) Close() error {
// Do this first to try to get any relevant errors.
err := l.Listener.Close()
err := l.GatedMaListener.Close()

l.cancel()
// Drain and wait.
Expand All @@ -61,7 +64,7 @@ func (l *listener) handleIncoming() {
var wg sync.WaitGroup
defer func() {
// make sure we're closed
l.Listener.Close()
l.GatedMaListener.Close()
if l.err == nil {
l.err = fmt.Errorf("listener closed")
}
Expand All @@ -72,7 +75,7 @@ func (l *listener) handleIncoming() {

var catcher tec.TempErrCatcher
for l.ctx.Err() == nil {
maconn, err := l.Listener.Accept()
maconn, connScope, err := l.GatedMaListener.Accept()
if err != nil {
// Note: function may pause the accept loop.
if catcher.IsTemporary(err) {
Expand All @@ -84,33 +87,10 @@ func (l *listener) handleIncoming() {
}
catcher.Reset()

// Check if we already have a connection scope. See the comment in tcpreuse/listener.go for an explanation.
var connScope network.ConnManagementScope
if sc, ok := maconn.(interface {
Scope() network.ConnManagementScope
}); ok {
connScope = sc.Scope()
}
if connScope == nil {
// gate the connection if applicable
if l.upgrader.connGater != nil && !l.upgrader.connGater.InterceptAccept(maconn) {
log.Debugf("gater blocked incoming connection on local addr %s from %s",
maconn.LocalMultiaddr(), maconn.RemoteMultiaddr())
if err := maconn.Close(); err != nil {
log.Warnf("failed to close incoming connection rejected by gater: %s", err)
}
continue
}

var err error
connScope, err = l.rcmgr.OpenConnection(network.DirInbound, true, maconn.RemoteMultiaddr())
if err != nil {
log.Debugw("resource manager blocked accept of new connection", "error", err)
if err := maconn.Close(); err != nil {
log.Warnf("failed to open incoming connection. Rejected by resource manager: %s", err)
}
continue
}
log.Errorf("BUG: got nil connScope for incoming connection from %s", maconn.RemoteMultiaddr())
maconn.Close()
continue
}

// The go routine below calls Release when the context is
Expand Down Expand Up @@ -154,14 +134,10 @@ func (l *listener) handleIncoming() {
select {
case l.incoming <- conn:
case <-ctx.Done():
// Listener not closed but the accept timeout expired.
if l.ctx.Err() == nil {
// Listener *not* closed but the accept timeout expired.
log.Warn("listener dropped connection due to slow accept")
log.Warnf("listener dropped connection due to slow accept. remote addr: %s peer: %s", maconn.RemoteMultiaddr(), conn.RemotePeer())
}
// Wait on the context with a timeout. This way,
// if we stop accepting connections for some reason,
// we'll eventually close all the open ones
// instead of hanging onto them.
conn.CloseWithError(network.ConnRateLimited)
}
}()
Expand Down Expand Up @@ -189,4 +165,38 @@ func (l *listener) String() string {
return fmt.Sprintf("<stream.Listener %s>", l.Multiaddr())
}

var _ transport.Listener = (*listener)(nil)
type gatedMaListener struct {
manet.Listener
rcmgr network.ResourceManager
connGater connmgr.ConnectionGater
}

var _ transport.GatedMaListener = &gatedMaListener{}

func (l *gatedMaListener) Accept() (manet.Conn, network.ConnManagementScope, error) {
for {
conn, err := l.Listener.Accept()
if err != nil {
return nil, nil, err
}
// gate the connection if applicable
if l.connGater != nil && !l.connGater.InterceptAccept(conn) {
log.Debugf("gater blocked incoming connection on local addr %s from %s",
conn.LocalMultiaddr(), conn.RemoteMultiaddr())
if err := conn.Close(); err != nil {
log.Warnf("failed to close incoming connection rejected by gater: %s", err)
}
continue
}

connScope, err := l.rcmgr.OpenConnection(network.DirInbound, true, conn.RemoteMultiaddr())
if err != nil {
log.Debugw("resource manager blocked accept of new connection", "error", err)
if err := conn.Close(); err != nil {
log.Warnf("failed to open incoming connection. Rejected by resource manager: %s", err)
}
continue
}
return conn, connScope, nil
}
}
2 changes: 1 addition & 1 deletion p2p/net/upgrader/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func createListener(t *testing.T, u transport.Upgrader) transport.Listener {
require.NoError(t, err)
ln, err := manet.Listen(addr)
require.NoError(t, err)
return u.UpgradeListener(nil, ln)
return u.UpgradeGatedMaListener(nil, u.GateMaListener(ln))
}

func TestAcceptSingleConn(t *testing.T) {
Expand Down
41 changes: 33 additions & 8 deletions p2p/net/upgrader/upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,44 @@ func New(security []sec.SecureTransport, muxers []StreamMuxer, psk ipnet.PSK, rc
func (u *upgrader) UpgradeListener(t transport.Transport, list manet.Listener) transport.Listener {
ctx, cancel := context.WithCancel(context.Background())
l := &listener{
Listener: list,
upgrader: u,
transport: t,
rcmgr: u.rcmgr,
threshold: newThreshold(AcceptQueueLength),
incoming: make(chan transport.CapableConn),
cancel: cancel,
ctx: ctx,
GatedMaListener: u.GateMaListener(list),
upgrader: u,
transport: t,
rcmgr: u.rcmgr,
threshold: newThreshold(AcceptQueueLength),
incoming: make(chan transport.CapableConn),
cancel: cancel,
ctx: ctx,
}
go l.handleIncoming()
return l
}

func (u *upgrader) GateMaListener(l manet.Listener) transport.GatedMaListener {
return &gatedMaListener{
Listener: l,
rcmgr: u.rcmgr,
connGater: u.connGater,
}
}

// UpgradeGatedMaListener upgrades the passed multiaddr-net listener into a full libp2p-transport listener.
func (u *upgrader) UpgradeGatedMaListener(t transport.Transport, l transport.GatedMaListener) transport.Listener {
ctx, cancel := context.WithCancel(context.Background())
list := &listener{
GatedMaListener: l,
upgrader: u,
transport: t,
rcmgr: u.rcmgr,
threshold: newThreshold(AcceptQueueLength),
incoming: make(chan transport.CapableConn),
cancel: cancel,
ctx: ctx,
}
go list.handleIncoming()
return list
}

// Upgrade upgrades the multiaddr/net connection into a full libp2p-transport connection.
func (u *upgrader) Upgrade(ctx context.Context, t transport.Transport, maconn manet.Conn, dir network.Direction, p peer.ID, connScope network.ConnManagementScope) (transport.CapableConn, error) {
c, err := u.upgrade(ctx, t, maconn, dir, p, connScope)
Expand Down
2 changes: 1 addition & 1 deletion p2p/protocol/circuitv2/client/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (c *Client) Listen(addr ma.Multiaddr) (transport.Listener, error) {
return nil, err
}

return c.upgrader.UpgradeListener(c, c.Listener()), nil
return c.upgrader.UpgradeGatedMaListener(c, c.upgrader.GateMaListener(c.Listener())), nil
}

func (c *Client) Protocols() []int {
Expand Down
2 changes: 1 addition & 1 deletion p2p/test/transport/gating_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func TestInterceptAccept(t *testing.T) {
connGater.EXPECT().InterceptAccept(gomock.Any()).Do(func(addrs network.ConnMultiaddrs) {
require.Equal(t, normalize(h2.Addrs()[0]), normalize(addrs.LocalMultiaddr()))
}).AnyTimes()
} else if strings.Contains(tc.Name, "WebSocket-Shared") || strings.Contains(tc.Name, "WebSocket-Secured-Shared") {
} else if strings.Contains(tc.Name, "WebSocket") {
connGater.EXPECT().InterceptAccept(gomock.Any()).Do(func(addrs network.ConnMultiaddrs) {
require.Equal(t, addrPort(h2.Addrs()[0]), addrPort(addrs.LocalMultiaddr()))
})
Expand Down
38 changes: 21 additions & 17 deletions p2p/transport/tcp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/transport"
"github.com/marten-seemann/tcp"
"github.com/mikioh/tcpinfo"
manet "github.com/multiformats/go-multiaddr/net"
Expand Down Expand Up @@ -253,16 +254,6 @@ func (c *tracingConn) Close() error {
return c.closeErr
}

func (c *tracingConn) Scope() network.ConnManagementScope {
if cs, ok := c.Conn.(interface {
Scope() network.ConnManagementScope
}); ok {
return cs.Scope()
}
// upgrader is expected to handle this
return nil
}

func (c *tracingConn) getTCPInfo() (*tcpinfo.Info, error) {
var o tcpinfo.Info
var b [256]byte
Expand All @@ -275,19 +266,32 @@ func (c *tracingConn) getTCPInfo() (*tcpinfo.Info, error) {
}

type tracingListener struct {
manet.Listener
transport.GatedMaListener
collector *aggregatingCollector
}

// newTracingListener wraps a manet.Listener with a tracingListener. A nil collector will use the default collector.
func newTracingListener(l manet.Listener, collector *aggregatingCollector) *tracingListener {
return &tracingListener{Listener: l, collector: collector}
func newTracingListener(l transport.GatedMaListener, collector *aggregatingCollector) *tracingListener {
return &tracingListener{GatedMaListener: l, collector: collector}
}

func (l *tracingListener) Accept() (manet.Conn, error) {
conn, err := l.Listener.Accept()
func (l *tracingListener) Accept() (manet.Conn, network.ConnManagementScope, error) {
conn, scope, err := l.GatedMaListener.Accept()
if err != nil {
return nil, err
if scope != nil {
scope.Done()
log.Errorf("BUG: got non-nil scope but also an error: %s", err)
}
return nil, nil, err
}

// TODO(sukunrt): Should we log and ignore this error? We can proceed. We just won't have metrics for this connection.
tc, err := newTracingConn(conn, l.collector, false)
if err != nil {
log.Errorf("failed to create tracingConn: %s", err)
conn.Close()
scope.Done()
return nil, nil, err
}
return newTracingConn(conn, l.collector, false)
return tc, scope, nil
}
9 changes: 7 additions & 2 deletions p2p/transport/tcp/metrics_none.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@

package tcp

import manet "github.com/multiformats/go-multiaddr/net"
import (
"github.com/libp2p/go-libp2p/core/transport"
manet "github.com/multiformats/go-multiaddr/net"
)

type aggregatingCollector struct{}

func newTracingConn(c manet.Conn, collector *aggregatingCollector, isClient bool) (manet.Conn, error) {
return c, nil
}
func newTracingListener(l manet.Listener, collector *aggregatingCollector) manet.Listener { return l }
func newTracingListener(l transport.GatedMaListener, collector *aggregatingCollector) transport.GatedMaListener {
return l
}
6 changes: 4 additions & 2 deletions p2p/transport/tcp/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ func TestTcpTransportCollectsMetricsWithSharedTcpSocket(t *testing.T) {
peerA, ia := makeInsecureMuxer(t)
_, ib := makeInsecureMuxer(t)

sharedTCPSocketA := tcpreuse.NewConnMgr(false, nil, nil)
sharedTCPSocketB := tcpreuse.NewConnMgr(false, nil, nil)
upg, err := tptu.New(ia, muxers, nil, nil, nil)
require.NoError(t, err)
sharedTCPSocketA := tcpreuse.NewConnMgr(false, upg)
sharedTCPSocketB := tcpreuse.NewConnMgr(false, upg)

ua, err := tptu.New(ia, muxers, nil, nil, nil)
require.NoError(t, err)
Expand Down
Loading

0 comments on commit f1b1ee9

Please sign in to comment.