Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport: add GatedMaListener type #3186

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,11 @@ func (cfg *Config) addTransports() ([]fx.Option, error) {
fx.Provide(func() connmgr.ConnectionGater { return cfg.ConnectionGater }),
fx.Provide(func() pnet.PSK { return cfg.PSK }),
fx.Provide(func() network.ResourceManager { return cfg.ResourceManager }),
fx.Provide(func(gater connmgr.ConnectionGater, rcmgr network.ResourceManager) *tcpreuse.ConnMgr {
fx.Provide(func(upgrader transport.Upgrader) *tcpreuse.ConnMgr {
if !cfg.ShareTCPListener {
return nil
}
return tcpreuse.NewConnMgr(tcpreuse.EnvReuseportVal, gater, rcmgr)
return tcpreuse.NewConnMgr(tcpreuse.EnvReuseportVal, upgrader)
}),
fx.Provide(func(cm *quicreuse.ConnManager, sw *swarm.Swarm) libp2pwebrtc.ListenUDPFn {
hasQuicAddrPortFor := func(network string, laddr *net.UDPAddr) bool {
Expand Down
30 changes: 30 additions & 0 deletions core/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,41 @@ type TransportNetwork interface {
AddTransport(t Transport) error
}

// GatedMaListener is listener that listens for raw(unsecured and non-multiplexed) incoming connections,
// gates them with a `connmgr.ConnGater`and creates a resource management scope for them.
// It can be upgraded to a full libp2p transport listener by the Upgrader.
//
// Compared to manet.Listener, this listener creates the resource management scope for the accepted connection.
type GatedMaListener interface {
// Accept waits for and returns the next connection to the listener.
Accept() (manet.Conn, network.ConnManagementScope, error)

// Close closes the listener.
// Any blocked Accept operations will be unblocked and return errors.
Close() error

// Multiaddr returns the listener's (local) Multiaddr.
Multiaddr() ma.Multiaddr

// Addr returns the net.Listener's network address.
Addr() net.Addr
}

// Upgrader is a multistream upgrader that can upgrade an underlying connection
// to a full transport connection (secure and multiplexed).
type Upgrader interface {
// UpgradeListener upgrades the passed multiaddr-net listener into a full libp2p-transport listener.
//
// Deprecated: Use UpgradeGatedMaListener(upgrader.GateMaListener(manet.Listener)) instead.
UpgradeListener(Transport, manet.Listener) Listener
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is anyone besides us using this? Should we just remove it?


// GateMaListener creates a GatedMaListener from a manet.Listener. It gates the accepted connection
// and creates a resource scope for it.
GateMaListener(manet.Listener) GatedMaListener

// UpgradeGatedMaListener upgrades the passed GatedMaListener into a full libp2p-transport listener.
UpgradeGatedMaListener(Transport, GatedMaListener) Listener

// Upgrade upgrades the multiaddr/net connection into a full libp2p-transport connection.
Upgrade(ctx context.Context, t Transport, maconn manet.Conn, dir network.Direction, p peer.ID, scope network.ConnManagementScope) (CapableConn, error)
}
Expand Down
84 changes: 47 additions & 37 deletions p2p/net/upgrader/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"
"sync"

"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/transport"

Expand All @@ -17,7 +18,7 @@ import (
var log = logging.Logger("upgrader")

type listener struct {
manet.Listener
transport.GatedMaListener

transport transport.Transport
upgrader *upgrader
Expand All @@ -35,10 +36,12 @@ type listener struct {
cancel func()
}

var _ transport.Listener = (*listener)(nil)

// Close closes the listener.
func (l *listener) Close() error {
// Do this first to try to get any relevant errors.
err := l.Listener.Close()
err := l.GatedMaListener.Close()

l.cancel()
// Drain and wait.
Expand All @@ -61,7 +64,7 @@ func (l *listener) handleIncoming() {
var wg sync.WaitGroup
defer func() {
// make sure we're closed
l.Listener.Close()
l.GatedMaListener.Close()
if l.err == nil {
l.err = fmt.Errorf("listener closed")
}
Expand All @@ -72,7 +75,7 @@ func (l *listener) handleIncoming() {

var catcher tec.TempErrCatcher
for l.ctx.Err() == nil {
maconn, err := l.Listener.Accept()
maconn, connScope, err := l.GatedMaListener.Accept()
if err != nil {
// Note: function may pause the accept loop.
if catcher.IsTemporary(err) {
Expand All @@ -84,33 +87,10 @@ func (l *listener) handleIncoming() {
}
catcher.Reset()

// Check if we already have a connection scope. See the comment in tcpreuse/listener.go for an explanation.
var connScope network.ConnManagementScope
if sc, ok := maconn.(interface {
Scope() network.ConnManagementScope
}); ok {
connScope = sc.Scope()
}
if connScope == nil {
// gate the connection if applicable
if l.upgrader.connGater != nil && !l.upgrader.connGater.InterceptAccept(maconn) {
log.Debugf("gater blocked incoming connection on local addr %s from %s",
maconn.LocalMultiaddr(), maconn.RemoteMultiaddr())
if err := maconn.Close(); err != nil {
log.Warnf("failed to close incoming connection rejected by gater: %s", err)
}
continue
}

var err error
connScope, err = l.rcmgr.OpenConnection(network.DirInbound, true, maconn.RemoteMultiaddr())
if err != nil {
log.Debugw("resource manager blocked accept of new connection", "error", err)
if err := maconn.Close(); err != nil {
log.Warnf("failed to open incoming connection. Rejected by resource manager: %s", err)
}
continue
}
log.Errorf("BUG: got nil connScope for incoming connection from %s", maconn.RemoteMultiaddr())
maconn.Close()
continue
}

// The go routine below calls Release when the context is
Expand Down Expand Up @@ -154,14 +134,10 @@ func (l *listener) handleIncoming() {
select {
case l.incoming <- conn:
case <-ctx.Done():
// Listener not closed but the accept timeout expired.
if l.ctx.Err() == nil {
// Listener *not* closed but the accept timeout expired.
log.Warn("listener dropped connection due to slow accept")
log.Warnf("listener dropped connection due to slow accept. remote addr: %s peer: %s", maconn.RemoteMultiaddr(), conn.RemotePeer())
}
// Wait on the context with a timeout. This way,
// if we stop accepting connections for some reason,
// we'll eventually close all the open ones
// instead of hanging onto them.
conn.CloseWithError(network.ConnRateLimited)
}
}()
Expand Down Expand Up @@ -189,4 +165,38 @@ func (l *listener) String() string {
return fmt.Sprintf("<stream.Listener %s>", l.Multiaddr())
}

var _ transport.Listener = (*listener)(nil)
type gatedMaListener struct {
manet.Listener
rcmgr network.ResourceManager
connGater connmgr.ConnectionGater
}

var _ transport.GatedMaListener = &gatedMaListener{}

func (l *gatedMaListener) Accept() (manet.Conn, network.ConnManagementScope, error) {
for {
conn, err := l.Listener.Accept()
if err != nil {
return nil, nil, err
}
// gate the connection if applicable
if l.connGater != nil && !l.connGater.InterceptAccept(conn) {
log.Debugf("gater blocked incoming connection on local addr %s from %s",
conn.LocalMultiaddr(), conn.RemoteMultiaddr())
if err := conn.Close(); err != nil {
log.Warnf("failed to close incoming connection rejected by gater: %s", err)
}
continue
}

connScope, err := l.rcmgr.OpenConnection(network.DirInbound, true, conn.RemoteMultiaddr())
if err != nil {
log.Debugw("resource manager blocked accept of new connection", "error", err)
if err := conn.Close(); err != nil {
log.Warnf("failed to open incoming connection. Rejected by resource manager: %s", err)
}
continue
}
return conn, connScope, nil
}
}
2 changes: 1 addition & 1 deletion p2p/net/upgrader/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func createListener(t *testing.T, u transport.Upgrader) transport.Listener {
require.NoError(t, err)
ln, err := manet.Listen(addr)
require.NoError(t, err)
return u.UpgradeListener(nil, ln)
return u.UpgradeGatedMaListener(nil, u.GateMaListener(ln))
}

func TestAcceptSingleConn(t *testing.T) {
Expand Down
41 changes: 33 additions & 8 deletions p2p/net/upgrader/upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,44 @@ func New(security []sec.SecureTransport, muxers []StreamMuxer, psk ipnet.PSK, rc
func (u *upgrader) UpgradeListener(t transport.Transport, list manet.Listener) transport.Listener {
ctx, cancel := context.WithCancel(context.Background())
l := &listener{
Listener: list,
upgrader: u,
transport: t,
rcmgr: u.rcmgr,
threshold: newThreshold(AcceptQueueLength),
incoming: make(chan transport.CapableConn),
cancel: cancel,
ctx: ctx,
GatedMaListener: u.GateMaListener(list),
upgrader: u,
transport: t,
rcmgr: u.rcmgr,
threshold: newThreshold(AcceptQueueLength),
incoming: make(chan transport.CapableConn),
cancel: cancel,
ctx: ctx,
}
go l.handleIncoming()
return l
}

func (u *upgrader) GateMaListener(l manet.Listener) transport.GatedMaListener {
return &gatedMaListener{
Listener: l,
rcmgr: u.rcmgr,
connGater: u.connGater,
}
}

// UpgradeGatedMaListener upgrades the passed multiaddr-net listener into a full libp2p-transport listener.
func (u *upgrader) UpgradeGatedMaListener(t transport.Transport, l transport.GatedMaListener) transport.Listener {
ctx, cancel := context.WithCancel(context.Background())
list := &listener{
GatedMaListener: l,
upgrader: u,
transport: t,
rcmgr: u.rcmgr,
threshold: newThreshold(AcceptQueueLength),
incoming: make(chan transport.CapableConn),
cancel: cancel,
ctx: ctx,
}
go list.handleIncoming()
return list
}

// Upgrade upgrades the multiaddr/net connection into a full libp2p-transport connection.
func (u *upgrader) Upgrade(ctx context.Context, t transport.Transport, maconn manet.Conn, dir network.Direction, p peer.ID, connScope network.ConnManagementScope) (transport.CapableConn, error) {
c, err := u.upgrade(ctx, t, maconn, dir, p, connScope)
Expand Down
2 changes: 1 addition & 1 deletion p2p/protocol/circuitv2/client/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (c *Client) Listen(addr ma.Multiaddr) (transport.Listener, error) {
return nil, err
}

return c.upgrader.UpgradeListener(c, c.Listener()), nil
return c.upgrader.UpgradeGatedMaListener(c, c.upgrader.GateMaListener(c.Listener())), nil
}

func (c *Client) Protocols() []int {
Expand Down
2 changes: 1 addition & 1 deletion p2p/test/transport/gating_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func TestInterceptAccept(t *testing.T) {
connGater.EXPECT().InterceptAccept(gomock.Any()).Do(func(addrs network.ConnMultiaddrs) {
require.Equal(t, normalize(h2.Addrs()[0]), normalize(addrs.LocalMultiaddr()))
}).AnyTimes()
} else if strings.Contains(tc.Name, "WebSocket-Shared") || strings.Contains(tc.Name, "WebSocket-Secured-Shared") {
} else if strings.Contains(tc.Name, "WebSocket") {
connGater.EXPECT().InterceptAccept(gomock.Any()).Do(func(addrs network.ConnMultiaddrs) {
require.Equal(t, addrPort(h2.Addrs()[0]), addrPort(addrs.LocalMultiaddr()))
})
Expand Down
38 changes: 21 additions & 17 deletions p2p/transport/tcp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/transport"
"github.com/marten-seemann/tcp"
"github.com/mikioh/tcpinfo"
manet "github.com/multiformats/go-multiaddr/net"
Expand Down Expand Up @@ -253,16 +254,6 @@ func (c *tracingConn) Close() error {
return c.closeErr
}

func (c *tracingConn) Scope() network.ConnManagementScope {
if cs, ok := c.Conn.(interface {
Scope() network.ConnManagementScope
}); ok {
return cs.Scope()
}
// upgrader is expected to handle this
return nil
}

func (c *tracingConn) getTCPInfo() (*tcpinfo.Info, error) {
var o tcpinfo.Info
var b [256]byte
Expand All @@ -275,19 +266,32 @@ func (c *tracingConn) getTCPInfo() (*tcpinfo.Info, error) {
}

type tracingListener struct {
manet.Listener
transport.GatedMaListener
collector *aggregatingCollector
}

// newTracingListener wraps a manet.Listener with a tracingListener. A nil collector will use the default collector.
func newTracingListener(l manet.Listener, collector *aggregatingCollector) *tracingListener {
return &tracingListener{Listener: l, collector: collector}
func newTracingListener(l transport.GatedMaListener, collector *aggregatingCollector) *tracingListener {
return &tracingListener{GatedMaListener: l, collector: collector}
}

func (l *tracingListener) Accept() (manet.Conn, error) {
conn, err := l.Listener.Accept()
func (l *tracingListener) Accept() (manet.Conn, network.ConnManagementScope, error) {
conn, scope, err := l.GatedMaListener.Accept()
if err != nil {
return nil, err
if scope != nil {
scope.Done()
log.Errorf("BUG: got non-nil scope but also an error: %s", err)
}
return nil, nil, err
}

// TODO(sukunrt): Should we log and ignore this error? We can proceed. We just won't have metrics for this connection.
tc, err := newTracingConn(conn, l.collector, false)
if err != nil {
log.Errorf("failed to create tracingConn: %s", err)
conn.Close()
scope.Done()
return nil, nil, err
}
return newTracingConn(conn, l.collector, false)
return tc, scope, nil
}
9 changes: 7 additions & 2 deletions p2p/transport/tcp/metrics_none.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@

package tcp

import manet "github.com/multiformats/go-multiaddr/net"
import (
"github.com/libp2p/go-libp2p/core/transport"
manet "github.com/multiformats/go-multiaddr/net"
)

type aggregatingCollector struct{}

func newTracingConn(c manet.Conn, collector *aggregatingCollector, isClient bool) (manet.Conn, error) {
return c, nil
}
func newTracingListener(l manet.Listener, collector *aggregatingCollector) manet.Listener { return l }
func newTracingListener(l transport.GatedMaListener, collector *aggregatingCollector) transport.GatedMaListener {
return l
}
6 changes: 4 additions & 2 deletions p2p/transport/tcp/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ func TestTcpTransportCollectsMetricsWithSharedTcpSocket(t *testing.T) {
peerA, ia := makeInsecureMuxer(t)
_, ib := makeInsecureMuxer(t)

sharedTCPSocketA := tcpreuse.NewConnMgr(false, nil, nil)
sharedTCPSocketB := tcpreuse.NewConnMgr(false, nil, nil)
upg, err := tptu.New(ia, muxers, nil, nil, nil)
require.NoError(t, err)
sharedTCPSocketA := tcpreuse.NewConnMgr(false, upg)
sharedTCPSocketB := tcpreuse.NewConnMgr(false, upg)

ua, err := tptu.New(ia, muxers, nil, nil, nil)
require.NoError(t, err)
Expand Down
Loading
Loading