Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

netsync: add peerSubscription #2294

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/decred/dcrd/crypto/blake256 v1.0.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23 // indirect
github.com/lightninglabs/neutrino v0.16.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
golang.org/x/net v0.24.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ github.com/jrick/logrotate v1.0.0 h1:lQ1bL/n9mBNeIXoTUoYRlK4dHuNJVofX9oWqBtPnSzI
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23 h1:FOOIBWrEkLgmlgGfMuZT83xIwfPDxEI2OHu6xUmJMFE=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/lightninglabs/neutrino v0.16.0 h1:YNTQG32fPR/Zg0vvJVI65OBH8l3U18LSXXtX91hx0q0=
github.com/lightninglabs/neutrino v0.16.0/go.mod h1:x3OmY2wsA18+Kc3TSV2QpSUewOCiscw2mKpXgZv2kZk=
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand Down
41 changes: 41 additions & 0 deletions netsync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/btcsuite/btcd/mempool"
peerpkg "github.com/btcsuite/btcd/peer"
"github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/neutrino/query"
)

const (
Expand Down Expand Up @@ -203,6 +204,7 @@ type SyncManager struct {
headerList *list.List
startHeader *list.Element
nextCheckpoint *chaincfg.Checkpoint
peerSubscribers []*peerSubscription

// An optional fee estimator.
feeEstimator *mempool.FeeEstimator
Expand Down Expand Up @@ -452,6 +454,31 @@ func (sm *SyncManager) isSyncCandidate(peer *peerpkg.Peer) bool {
return true
}

// notifyPeerSubscribers notifies all the current peer subscribers of the peer
// that was passed in.
func (sm *SyncManager) notifyPeerSubscribers(peer *peerpkg.Peer) {
// Loop for alerting subscribers to the new peer that was connected to.
n := 0
for i, sub := range sm.peerSubscribers {
select {
// Quickly check whether this subscription has been canceled.
case <-sub.cancel:
// Avoid GC leak.
sm.peerSubscribers[i] = nil
continue
default:
}

// Keep non-canceled subscribers around.
sm.peerSubscribers[n] = sub
n++

sub.peers <- peer
}
// Re-align the slice to only active subscribers.
sm.peerSubscribers = sm.peerSubscribers[:n]
}

// handleNewPeerMsg deals with new peers that have signalled they may
// be considered as a sync peer (they have already successfully negotiated). It
// also starts syncing if needed. It is invoked from the syncHandler goroutine.
Expand All @@ -471,6 +498,13 @@ func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) {
requestedBlocks: make(map[chainhash.Hash]struct{}),
}

// Only pass the peer off to the subscribers if we're able to sync off of
// the peer.
bestHeight := sm.chain.BestSnapshot().Height
if isSyncCandidate && peer.LastBlock() > bestHeight {
sm.notifyPeerSubscribers(peer)
}

// Start syncing by choosing the best candidate if needed.
if isSyncCandidate && sm.syncPeer == nil {
sm.startSync()
Expand Down Expand Up @@ -1666,6 +1700,13 @@ func (sm *SyncManager) Pause() chan<- struct{} {
return c
}

// peerSubscription holds a peer subscription which we'll notify about any
// connected peers.
type peerSubscription struct {
peers chan<- query.Peer
cancel <-chan struct{}
}

// New constructs a new SyncManager. Use Start to begin processing asynchronous
// block, tx, and inv updates.
func New(config *Config) (*SyncManager, error) {
Expand Down
62 changes: 62 additions & 0 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,11 @@ type Peer struct {
queueQuit chan struct{}
outQuit chan struct{}
quit chan struct{}

// subscribers is a channel for relaying all messages that were received
// to this peer.
subscribers map[recvMsgsubscription]struct{}
subscriberLock sync.Mutex
}

// String returns the peer's address and directionality as a human-readable
Expand Down Expand Up @@ -1098,6 +1103,35 @@ func (p *Peer) readMessage(encoding wire.MessageEncoding) (wire.Message, []byte,
return msg, buf, nil
}

// recvMsgsubscription is two channels for a subscriber of recevied messages.
// msgChan for sending the messages and quit for cancelling the subscription.
type recvMsgsubscription struct {
msgChan chan wire.Message
quit chan struct{}
}

// SubscribeRecvMsg adds a OnRead subscription to the peer. All bitcoin
// messages received from this peer will be sent on the returned
// channel. A closure is also returned, that should be called to cancel
// the subscription.
func (p *Peer) SubscribeRecvMsg() (<-chan wire.Message, func()) {
p.subscriberLock.Lock()
defer p.subscriberLock.Unlock()

// No need to buffer this channel as we'll spin up a new goroutine for
// every send.
msgChan := make(chan wire.Message)
quit := make(chan struct{})
sub := recvMsgsubscription{
msgChan,
quit,
}

p.subscribers[sub] = struct{}{}

return msgChan, func() { close(quit) }
}

// writeMessage sends a bitcoin message to the peer with logging.
func (p *Peer) writeMessage(msg wire.Message, enc wire.MessageEncoding) error {
// Don't do anything if we're disconnecting.
Expand Down Expand Up @@ -1446,6 +1480,27 @@ out:
}
break out
}

// Send the received message to all the subscribers.
for sub := range p.subscribers {
select {
case <-sub.quit:
delete(p.subscribers, sub)
continue
default:
}

// Spin up a goroutine so that we don't block here.
go func(subscription chan wire.Message,
quit chan struct{}) {

select {
case subscription <- rmsg:
case <-p.quit:
}

}(sub.msgChan, p.quit)
}
atomic.StoreInt64(&p.lastRecv, time.Now().Unix())
p.stallControl <- stallControlMsg{sccReceiveMessage, rmsg}

Expand Down Expand Up @@ -1961,6 +2016,12 @@ func (p *Peer) Disconnect() {
close(p.quit)
}

// OnDisconnect returns a channel that will be closed when this peer is
// disconnected.
func (p *Peer) OnDisconnect() <-chan struct{} {
return p.quit
}

// readRemoteVersionMsg waits for the next message to arrive from the remote
// peer. If the next message is not a version message or the version is not
// acceptable then return an error.
Expand Down Expand Up @@ -2397,6 +2458,7 @@ func newPeerBase(origCfg *Config, inbound bool) *Peer {
queueQuit: make(chan struct{}),
outQuit: make(chan struct{}),
quit: make(chan struct{}),
subscribers: make(map[recvMsgsubscription]struct{}),
cfg: cfg, // Copy so caller can't mutate.
services: cfg.Services,
protocolVersion: cfg.ProtocolVersion,
Expand Down
Loading