diff --git a/system/p2p/dht/bootstrap.go b/system/p2p/dht/bootstrap.go index 8eb4c81b3..d32069db5 100644 --- a/system/p2p/dht/bootstrap.go +++ b/system/p2p/dht/bootstrap.go @@ -2,6 +2,7 @@ package dht import ( "context" + "time" "github.com/33cn/chain33/system/p2p/dht/extension" p2pty "github.com/33cn/chain33/system/p2p/dht/types" @@ -11,7 +12,7 @@ import ( "github.com/multiformats/go-multiaddr" ) -func initInnerPeers(host host.Host, peersInfo []peer.AddrInfo, cfg *p2pty.P2PSubConfig) { +func initInnerPeers(ctx context.Context, host host.Host, peersInfo []peer.AddrInfo, cfg *p2pty.P2PSubConfig) { for _, node := range cfg.Seeds { info := genAddrInfo(node) @@ -57,7 +58,7 @@ func initInnerPeers(host host.Host, peersInfo []peer.AddrInfo, cfg *p2pty.P2PSub //加保护 host.ConnManager().Protect(info.ID, "relayNode") //向中继节点申请一个通信插槽,以便通过中继节点连接到自己 - extension.MakeNodeRelayClient(host, info) + go extension.ReserveRelaySlot(ctx, host, *info, time.Minute) } } diff --git a/system/p2p/dht/bootstrap_test.go b/system/p2p/dht/bootstrap_test.go index 1b51d7b01..52156a974 100644 --- a/system/p2p/dht/bootstrap_test.go +++ b/system/p2p/dht/bootstrap_test.go @@ -48,7 +48,7 @@ func Test_initInnerPeers(t *testing.T) { subcfg.RelayEnable = true subcfg.RelayNodeAddr = []string{h0str} peerinfo := []peer.AddrInfo{{ID: h7.ID, Addrs: h7.Addrs}} - initInnerPeers(hosts[5], peerinfo, subcfg) + initInnerPeers(ctx, hosts[5], peerinfo, subcfg) hosts[5].Close() } diff --git a/system/p2p/dht/discovery.go b/system/p2p/dht/discovery.go index 703eedac4..07250e6c6 100644 --- a/system/p2p/dht/discovery.go +++ b/system/p2p/dht/discovery.go @@ -55,7 +55,7 @@ func InitDhtDiscovery(ctx context.Context, host host.Host, peersInfo []peer.Addr //Start the dht func (d *Discovery) Start() { //连接内置种子,以及addrbook存储的节点 - go initInnerPeers(d.host, d.bootstraps, d.subCfg) + go initInnerPeers(d.ctx, d.host, d.bootstraps, d.subCfg) // Bootstrap the DHT. In the default configuration, this spawns a Background // thread that will refresh the peer table every five minutes. if err := d.kademliaDHT.Bootstrap(d.ctx); err != nil { diff --git a/system/p2p/dht/extension/mdns.go b/system/p2p/dht/extension/mdns.go index a4a50db4c..5eae737f7 100644 --- a/system/p2p/dht/extension/mdns.go +++ b/system/p2p/dht/extension/mdns.go @@ -28,7 +28,7 @@ func (n *discoveryNotifee) HandlePeerFound(pi peer.AddrInfo) { func NewMDNS(ctx context.Context, peerhost host.Host, serviceTag string) (*MDNS, error) { //register with service so that we get notified about peer discovery notifee := &discoveryNotifee{} - notifee.PeerChan = make(chan peer.AddrInfo) + notifee.PeerChan = make(chan peer.AddrInfo, 1) ser := discovery.NewMdnsService(peerhost, serviceTag, notifee) mnds := new(MDNS) mnds.Service = ser diff --git a/system/p2p/dht/extension/mdns_test.go b/system/p2p/dht/extension/mdns_test.go index f106444ea..abca30bdb 100644 --- a/system/p2p/dht/extension/mdns_test.go +++ b/system/p2p/dht/extension/mdns_test.go @@ -2,6 +2,7 @@ package extension import ( "context" + "github.com/libp2p/go-libp2p/core/peer" "testing" "time" @@ -24,8 +25,9 @@ func Test_mdns(t *testing.T) { select { case peerinfo := <-tmdns.PeerChan(): t.Log("findMdns", peerinfo.ID) - case <-time.After(time.Second * 10): + case <-time.After(time.Second * 5): return } + tmdns.notifee.HandlePeerFound(peer.AddrInfo{}) } diff --git a/system/p2p/dht/extension/pubsub.go b/system/p2p/dht/extension/pubsub.go index bed8d1b0b..4ace97545 100644 --- a/system/p2p/dht/extension/pubsub.go +++ b/system/p2p/dht/extension/pubsub.go @@ -15,7 +15,7 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" ) -var log = log15.New("module", "pubsub") +var log = log15.New("module", "extension") var setOnce sync.Once diff --git a/system/p2p/dht/extension/relay.go b/system/p2p/dht/extension/relay.go index 57d3803a9..35c019fa8 100644 --- a/system/p2p/dht/extension/relay.go +++ b/system/p2p/dht/extension/relay.go @@ -49,26 +49,28 @@ func MakeNodeRelayService(host host.Host, opts []circuit.Option) *Relay { return r } -func MakeNodeRelayClient(host host.Host, relayInfo *peer.AddrInfo) { +// ReserveRelaySlot reserve relay slot +func ReserveRelaySlot(ctx context.Context, host host.Host, relayInfo peer.AddrInfo, timeout time.Duration) { //向中继节点申请一个通信插槽 - rs, err := circuitClient.Reserve(context.Background(), host, *relayInfo) - if err != nil { - return - } - go func(reserve *circuitClient.Reservation) { - var ticker = time.NewTicker(time.Second * 5) - for { - select { - case <-ticker.C: - if reserve.Expiration.Before(time.Now()) { - reserve, err = circuitClient.Reserve(context.Background(), host, *relayInfo) - if err != nil { - return - } + + var ticker = time.NewTicker(timeout) + var err error + var reserve *circuitClient.Reservation + for { + select { + case <-ctx.Done(): + ticker.Stop() + return + case <-ticker.C: + if reserve == nil || reserve.Expiration.Before(time.Now()) { + reserve, err = circuitClient.Reserve(context.Background(), host, relayInfo) + if err != nil { + log.Error("ReserveRelaySlot", "err", err) + return } } } - }(rs) + } } diff --git a/system/p2p/dht/extension/relay_test.go b/system/p2p/dht/extension/relay_test.go index e7e01547e..adeeb70e9 100644 --- a/system/p2p/dht/extension/relay_test.go +++ b/system/p2p/dht/extension/relay_test.go @@ -7,8 +7,6 @@ import ( ma "github.com/multiformats/go-multiaddr" - "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client" - "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -103,12 +101,11 @@ func TestRelayV2(t *testing.T) { err = unreachable2.Connect(ctx, relayInfo) assert.Nil(t, err) + go ReserveRelaySlot(ctx, unreachable1, unreachable2info, time.Millisecond) + //unreachable2 向中继节点申请一个中继槽slot - reserve, err := client.Reserve(ctx, unreachable2, relayInfo) - assert.Nil(t, err) - if reserve.Expiration.Before(time.Now()) { - client.Reserve(ctx, unreachable2, relayInfo) - } + go ReserveRelaySlot(ctx, unreachable2, relayInfo, time.Millisecond*100) + time.Sleep(time.Second) relayaddr, err := MakeRelayAddrs(relayInfo.ID.String(), unreachable2.ID().String()) assert.Nil(t, err) t.Log("relayaddr:", relayaddr) diff --git a/system/p2p/dht/p2p.go b/system/p2p/dht/p2p.go index 3256a20b8..485a8a2f6 100644 --- a/system/p2p/dht/p2p.go +++ b/system/p2p/dht/p2p.go @@ -230,14 +230,12 @@ func (p *P2P) CloseP2P() { } func (p *P2P) reStart() { - atomic.StoreInt32(&p.restart, 1) - log.Info("reStart p2p") + if p.host == nil { - //说明p2p还没有开始启动,无需重启 - log.Info("p2p no need restart...") - atomic.StoreInt32(&p.restart, 0) return } + atomic.StoreInt32(&p.restart, 1) + log.Info("reStart p2p") p.CloseP2P() p.StartP2P() } diff --git a/system/p2p/dht/p2p_test.go b/system/p2p/dht/p2p_test.go index 39123ef71..6a48959b2 100644 --- a/system/p2p/dht/p2p_test.go +++ b/system/p2p/dht/p2p_test.go @@ -428,6 +428,7 @@ func Test_p2p(t *testing.T) { cfg := types.NewChain33Config(types.ReadFile("../../../cmd/chain33/chain33.test.toml")) q := queue.New("channel") datadir := util.ResetDatadir(cfg.GetModuleConfig(), "$TEMP/") + setLibp2pLog(cfg.GetModuleConfig().Log.LogFile, "") cfg.GetModuleConfig().Log.LogFile = "" cfg.GetModuleConfig().Address.DefaultDriver = "BTC" q.SetConfig(cfg) @@ -469,5 +470,6 @@ func Test_p2p(t *testing.T) { tcfg.DbCache = 4 tcfg.DbPath = filepath.Join(datadir, "addrbook") testAddrbook(t, &tcfg) + dhtp2p.reStart() p2p.CloseP2P() }