Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

autorelay: send addresses on eventbus; dont wrap address factory #3071

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func (cfg *Config) NewNode() (host.Host, error) {
}
fxopts = append(fxopts, transportOpts...)

// Configure routing and autorelay
// Configure routing
if cfg.Routing != nil {
fxopts = append(fxopts,
fx.Provide(cfg.Routing),
Expand Down
5 changes: 5 additions & 0 deletions core/event/addrs.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,8 @@ type EvtLocalAddressesUpdated struct {
// wrapped in a record.Envelope and signed by the Host's private key.
SignedPeerRecord *record.Envelope
}

// EvtAutoRelayAddrsUpdated is sent by the autorelay when the node's relay addresses are updated
type EvtAutoRelayAddrsUpdated struct {
RelayAddrs []ma.Multiaddr
}
1 change: 0 additions & 1 deletion libp2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,6 @@ func TestDialCircuitAddrWithWrappedResourceManager(t *testing.T) {
),
peerstore.TempAddrTTL,
)
require.NoError(t, err)

require.Eventually(t, func() bool {
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
Expand Down
15 changes: 4 additions & 11 deletions p2p/host/autorelay/addrsplosion.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (

// This function cleans up a relay's address set to remove private addresses and curtail
// addrsplosion.
// TODO: Remove this, we don't need this. The current method tries to select the
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we removing this for this PR?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should bound the number of addresses we re-advertise (which I'm assuming this
code does).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will remove this in a follow up PR. Not related to the current change.

// best address for the relay. Instead we should rely on the addresses provided by the
// relay in response to the reservation request.
func cleanupAddressSet(addrs []ma.Multiaddr) []ma.Multiaddr {
var public, private []ma.Multiaddr

Expand All @@ -17,7 +20,7 @@ func cleanupAddressSet(addrs []ma.Multiaddr) []ma.Multiaddr {
continue
}

if manet.IsPublicAddr(a) || isDNSAddr(a) {
if manet.IsPublicAddr(a) {
public = append(public, a)
continue
}
Expand Down Expand Up @@ -51,16 +54,6 @@ func isRelayAddr(a ma.Multiaddr) bool {
return isRelay
}

func isDNSAddr(a ma.Multiaddr) bool {
if first, _ := ma.SplitFirst(a); first != nil {
switch first.Protocol().Code {
case ma.P_DNS, ma.P_DNS4, ma.P_DNS6, ma.P_DNSADDR:
return true
}
}
return false
}

// we have addrsplosion if for some protocol we advertise multiple ports on
// the same base address.
func hasAddrsplosion(addrs []ma.Multiaddr) bool {
Expand Down
33 changes: 8 additions & 25 deletions p2p/host/autorelay/autorelay.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@ package autorelay
import (
"context"
"errors"
"fmt"
"sync"

"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
basic "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"

logging "github.com/ipfs/go-log/v2"
ma "github.com/multiformats/go-multiaddr"
)

var log = logging.Logger("autorelay")
Expand All @@ -22,8 +21,6 @@ type AutoRelay struct {
ctx context.Context
ctxCancel context.CancelFunc

conf *config

mx sync.Mutex
status network.Reachability

Expand All @@ -34,9 +31,9 @@ type AutoRelay struct {
metricsTracer MetricsTracer
}

func NewAutoRelay(bhost *basic.BasicHost, opts ...Option) (*AutoRelay, error) {
func NewAutoRelay(host host.Host, opts ...Option) (*AutoRelay, error) {
r := &AutoRelay{
host: bhost,
host: host,
status: network.ReachabilityUnknown,
}
conf := defaultConfig
Expand All @@ -46,25 +43,12 @@ func NewAutoRelay(bhost *basic.BasicHost, opts ...Option) (*AutoRelay, error) {
}
}
r.ctx, r.ctxCancel = context.WithCancel(context.Background())
r.conf = &conf
r.relayFinder = newRelayFinder(bhost, conf.peerSource, &conf)
r.metricsTracer = &wrappedMetricsTracer{conf.metricsTracer}

// Update the host address factory to use autorelay addresses if we're private
//
// TODO: Don't update host address factory. Instead send our relay addresses on the eventbus.
// The host can decide how to handle those.
addrF := bhost.AddrsFactory
bhost.AddrsFactory = func(addrs []ma.Multiaddr) []ma.Multiaddr {
addrs = addrF(addrs)
r.mx.Lock()
defer r.mx.Unlock()

if r.status != network.ReachabilityPrivate {
return addrs
}
return r.relayFinder.relayAddrs(addrs)
rf, err := newRelayFinder(host, &conf)
if err != nil {
return nil, fmt.Errorf("failed to create autorelay: %w", err)
}
r.relayFinder = rf
r.metricsTracer = &wrappedMetricsTracer{conf.metricsTracer}

return r, nil
}
Expand Down Expand Up @@ -93,7 +77,6 @@ func (r *AutoRelay) background() {
if !ok {
return
}
// TODO: push changed addresses
evt := ev.(event.EvtLocalReachabilityChanged)
switch evt.Reachability {
case network.ReachabilityPrivate, network.ReachabilityUnknown:
Expand Down
86 changes: 85 additions & 1 deletion p2p/host/autorelay/autorelay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package autorelay_test
import (
"context"
"fmt"
"slices"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -17,6 +19,7 @@ import (
circuitv2_proto "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"

ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -96,7 +99,10 @@ func newRelay(t *testing.T) host.Host {
saddr := addr.String()
if strings.HasPrefix(saddr, "/ip4/127.0.0.1/") {
addrNoIP := strings.TrimPrefix(saddr, "/ip4/127.0.0.1")
addrs[i] = ma.StringCast("/dns4/localhost" + addrNoIP)
// .internal is classified as a public address as users
// are free to map this dns to a public ip address for
// use within a LAN
addrs[i] = ma.StringCast("/dns/libp2p.internal" + addrNoIP)
}
}
return addrs
Expand Down Expand Up @@ -517,3 +523,81 @@ func TestNoBusyLoop0MinInterval(t *testing.T) {
val := atomic.LoadUint64(&calledTimes)
require.Less(t, val, uint64(2))
}
func TestAutoRelayAddrsEvent(t *testing.T) {
cl := newMockClock()
relays := []host.Host{newRelay(t), newRelay(t), newRelay(t), newRelay(t), newRelay(t)}
t.Cleanup(func() {
for _, r := range relays {
r.Close()
}
})

relayIDFromP2PAddr := func(a ma.Multiaddr) peer.ID {
r, c := ma.SplitLast(a)
if c.Protocol().Code != ma.P_CIRCUIT {
return ""
}
if id, err := peer.IDFromP2PAddr(r); err == nil {
return id
}
return ""
}

checkAddrsContainsPeersAsRelay := func(addrs []ma.Multiaddr, peers ...peer.ID) bool {
for _, p := range peers {
if !slices.ContainsFunc(addrs, func(a ma.Multiaddr) bool { return relayIDFromP2PAddr(a) == p }) {
return false
}
}
return true
}
peerChan := make(chan peer.AddrInfo, 5)
h := newPrivateNode(t,
func(context.Context, int) <-chan peer.AddrInfo {
return peerChan
},
autorelay.WithClock(cl),
autorelay.WithMinCandidates(1),
autorelay.WithMaxCandidates(10),
autorelay.WithNumRelays(5),
autorelay.WithBootDelay(1*time.Second),
autorelay.WithMinInterval(time.Hour),
)
defer h.Close()

sub, err := h.EventBus().Subscribe(new(event.EvtAutoRelayAddrsUpdated))
require.NoError(t, err)

peerChan <- peer.AddrInfo{ID: relays[0].ID(), Addrs: relays[0].Addrs()}
cl.AdvanceBy(time.Second)

require.EventuallyWithT(t, func(collect *assert.CollectT) {
e := <-sub.Out()
evt := e.(event.EvtAutoRelayAddrsUpdated)
if !checkAddrsContainsPeersAsRelay(evt.RelayAddrs, relays[0].ID()) {
collect.Errorf("expected %s to be in %v", relays[0].ID(), evt.RelayAddrs)
}
if checkAddrsContainsPeersAsRelay(evt.RelayAddrs, relays[1].ID()) {
collect.Errorf("expected %s to not be in %v", relays[1].ID(), evt.RelayAddrs)
}
}, 5*time.Second, 50*time.Millisecond)
for _, r := range relays[1:] {
peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}
}
require.EventuallyWithT(t, func(c *assert.CollectT) {
e := <-sub.Out()
evt := e.(event.EvtAutoRelayAddrsUpdated)
relayIds := []peer.ID{}
for _, r := range relays[1:] {
relayIds = append(relayIds, r.ID())
}
if !checkAddrsContainsPeersAsRelay(evt.RelayAddrs, relayIds...) {
c.Errorf("expected %s to be in %v", relayIds, evt.RelayAddrs)
}
}, 5*time.Second, 50*time.Millisecond)
select {
case e := <-sub.Out():
t.Fatal("expected no more events after all reservations obtained; got: ", e.(event.EvtAutoRelayAddrsUpdated))
case <-time.After(1 * time.Second):
}
}
19 changes: 0 additions & 19 deletions p2p/host/autorelay/relay.go

This file was deleted.

Loading
Loading