Skip to content

Commit

Permalink
dht:add more test
Browse files Browse the repository at this point in the history
  • Loading branch information
bysomeone committed Sep 7, 2023
1 parent c222ea3 commit ed21640
Show file tree
Hide file tree
Showing 10 changed files with 37 additions and 35 deletions.
5 changes: 3 additions & 2 deletions system/p2p/dht/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dht

import (
"context"
"time"

"github.com/33cn/chain33/system/p2p/dht/extension"
p2pty "github.com/33cn/chain33/system/p2p/dht/types"
Expand All @@ -11,7 +12,7 @@ import (
"github.com/multiformats/go-multiaddr"
)

func initInnerPeers(host host.Host, peersInfo []peer.AddrInfo, cfg *p2pty.P2PSubConfig) {
func initInnerPeers(ctx context.Context, host host.Host, peersInfo []peer.AddrInfo, cfg *p2pty.P2PSubConfig) {

for _, node := range cfg.Seeds {
info := genAddrInfo(node)
Expand Down Expand Up @@ -57,7 +58,7 @@ func initInnerPeers(host host.Host, peersInfo []peer.AddrInfo, cfg *p2pty.P2PSub
//加保护
host.ConnManager().Protect(info.ID, "relayNode")
//向中继节点申请一个通信插槽,以便通过中继节点连接到自己
extension.MakeNodeRelayClient(host, info)
go extension.ReserveRelaySlot(ctx, host, *info, time.Minute)

}
}
Expand Down
2 changes: 1 addition & 1 deletion system/p2p/dht/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func Test_initInnerPeers(t *testing.T) {
subcfg.RelayEnable = true
subcfg.RelayNodeAddr = []string{h0str}
peerinfo := []peer.AddrInfo{{ID: h7.ID, Addrs: h7.Addrs}}
initInnerPeers(hosts[5], peerinfo, subcfg)
initInnerPeers(ctx, hosts[5], peerinfo, subcfg)
hosts[5].Close()

}
2 changes: 1 addition & 1 deletion system/p2p/dht/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func InitDhtDiscovery(ctx context.Context, host host.Host, peersInfo []peer.Addr
//Start the dht
func (d *Discovery) Start() {
//连接内置种子,以及addrbook存储的节点
go initInnerPeers(d.host, d.bootstraps, d.subCfg)
go initInnerPeers(d.ctx, d.host, d.bootstraps, d.subCfg)
// Bootstrap the DHT. In the default configuration, this spawns a Background
// thread that will refresh the peer table every five minutes.
if err := d.kademliaDHT.Bootstrap(d.ctx); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion system/p2p/dht/extension/mdns.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (n *discoveryNotifee) HandlePeerFound(pi peer.AddrInfo) {
func NewMDNS(ctx context.Context, peerhost host.Host, serviceTag string) (*MDNS, error) {
//register with service so that we get notified about peer discovery
notifee := &discoveryNotifee{}
notifee.PeerChan = make(chan peer.AddrInfo)
notifee.PeerChan = make(chan peer.AddrInfo, 1)
ser := discovery.NewMdnsService(peerhost, serviceTag, notifee)
mnds := new(MDNS)
mnds.Service = ser
Expand Down
4 changes: 3 additions & 1 deletion system/p2p/dht/extension/mdns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package extension

import (
"context"
"github.com/libp2p/go-libp2p/core/peer"
"testing"
"time"

Expand All @@ -24,8 +25,9 @@ func Test_mdns(t *testing.T) {
select {
case peerinfo := <-tmdns.PeerChan():
t.Log("findMdns", peerinfo.ID)
case <-time.After(time.Second * 10):
case <-time.After(time.Second * 5):
return
}
tmdns.notifee.HandlePeerFound(peer.AddrInfo{})

}
2 changes: 1 addition & 1 deletion system/p2p/dht/extension/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
)

var log = log15.New("module", "pubsub")
var log = log15.New("module", "extension")

var setOnce sync.Once

Expand Down
34 changes: 18 additions & 16 deletions system/p2p/dht/extension/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,28 @@ func MakeNodeRelayService(host host.Host, opts []circuit.Option) *Relay {
return r
}

func MakeNodeRelayClient(host host.Host, relayInfo *peer.AddrInfo) {
// ReserveRelaySlot reserve relay slot
func ReserveRelaySlot(ctx context.Context, host host.Host, relayInfo peer.AddrInfo, timeout time.Duration) {
//向中继节点申请一个通信插槽
rs, err := circuitClient.Reserve(context.Background(), host, *relayInfo)
if err != nil {
return
}
go func(reserve *circuitClient.Reservation) {
var ticker = time.NewTicker(time.Second * 5)
for {
select {
case <-ticker.C:
if reserve.Expiration.Before(time.Now()) {
reserve, err = circuitClient.Reserve(context.Background(), host, *relayInfo)
if err != nil {
return
}

var ticker = time.NewTicker(timeout)
var err error
var reserve *circuitClient.Reservation
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
if reserve == nil || reserve.Expiration.Before(time.Now()) {
reserve, err = circuitClient.Reserve(context.Background(), host, relayInfo)
if err != nil {
log.Error("ReserveRelaySlot", "err", err)
return
}
}
}
}(rs)
}

}

Expand Down
11 changes: 4 additions & 7 deletions system/p2p/dht/extension/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (

ma "github.com/multiformats/go-multiaddr"

"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"

"github.com/libp2p/go-libp2p/core/network"

"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -103,12 +101,11 @@ func TestRelayV2(t *testing.T) {
err = unreachable2.Connect(ctx, relayInfo)
assert.Nil(t, err)

go ReserveRelaySlot(ctx, unreachable1, unreachable2info, time.Millisecond)

//unreachable2 向中继节点申请一个中继槽slot
reserve, err := client.Reserve(ctx, unreachable2, relayInfo)
assert.Nil(t, err)
if reserve.Expiration.Before(time.Now()) {
client.Reserve(ctx, unreachable2, relayInfo)
}
go ReserveRelaySlot(ctx, unreachable2, relayInfo, time.Millisecond*100)
time.Sleep(time.Second)
relayaddr, err := MakeRelayAddrs(relayInfo.ID.String(), unreachable2.ID().String())
assert.Nil(t, err)
t.Log("relayaddr:", relayaddr)
Expand Down
8 changes: 3 additions & 5 deletions system/p2p/dht/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,12 @@ func (p *P2P) CloseP2P() {
}

func (p *P2P) reStart() {
atomic.StoreInt32(&p.restart, 1)
log.Info("reStart p2p")

if p.host == nil {
//说明p2p还没有开始启动,无需重启
log.Info("p2p no need restart...")
atomic.StoreInt32(&p.restart, 0)
return
}
atomic.StoreInt32(&p.restart, 1)
log.Info("reStart p2p")
p.CloseP2P()
p.StartP2P()
}
Expand Down
2 changes: 2 additions & 0 deletions system/p2p/dht/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ func Test_p2p(t *testing.T) {
cfg := types.NewChain33Config(types.ReadFile("../../../cmd/chain33/chain33.test.toml"))
q := queue.New("channel")
datadir := util.ResetDatadir(cfg.GetModuleConfig(), "$TEMP/")
setLibp2pLog(cfg.GetModuleConfig().Log.LogFile, "")
cfg.GetModuleConfig().Log.LogFile = ""
cfg.GetModuleConfig().Address.DefaultDriver = "BTC"
q.SetConfig(cfg)
Expand Down Expand Up @@ -469,5 +470,6 @@ func Test_p2p(t *testing.T) {
tcfg.DbCache = 4
tcfg.DbPath = filepath.Join(datadir, "addrbook")
testAddrbook(t, &tcfg)
dhtp2p.reStart()
p2p.CloseP2P()
}

0 comments on commit ed21640

Please sign in to comment.