diff --git a/blockchain/accept.go b/blockchain/accept.go index 4adc2f6127..372f4542cc 100644 --- a/blockchain/accept.go +++ b/blockchain/accept.go @@ -6,6 +6,7 @@ package blockchain import ( "fmt" + "sync" "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/database" @@ -44,20 +45,36 @@ func (b *BlockChain) maybeAcceptBlock(block *btcutil.Block, flags BehaviorFlags) return false, err } - // Insert the block into the database if it's not already there. Even - // though it is possible the block will ultimately fail to connect, it - // has already passed all proof-of-work and validity tests which means - // it would be prohibitively expensive for an attacker to fill up the - // disk with a bunch of blocks that fail to connect. This is necessary - // since it allows block download to be decoupled from the much more - // expensive connection logic. It also has some other nice properties - // such as making blocks that never become part of the main chain or - // blocks that fail to connect available for further analysis. - err = b.db.Update(func(dbTx database.Tx) error { - return dbStoreBlock(dbTx, block) - }) - if err != nil { - return false, err + // Store the block in parallel if we're in headers first mode. The + // headers were already checked and this block is under the checkpoint + // so it's safe to just add it to the database while the block + // validation is happening. + var wg sync.WaitGroup + var dbStoreError error + if flags&BFFastAdd == BFFastAdd { + go func() { + wg.Add(1) + defer wg.Done() + // Insert the block into the database if it's not already there. Even + // though it is possible the block will ultimately fail to connect, it + // has already passed all proof-of-work and validity tests which means + // it would be prohibitively expensive for an attacker to fill up the + // disk with a bunch of blocks that fail to connect. This is necessary + // since it allows block download to be decoupled from the much more + // expensive connection logic. It also has some other nice properties + // such as making blocks that never become part of the main chain or + // blocks that fail to connect available for further analysis. + dbStoreError = b.db.Update(func(dbTx database.Tx) error { + return dbTx.StoreBlock(block) + }) + }() + } else { + err = b.db.Update(func(dbTx database.Tx) error { + return dbStoreBlock(dbTx, block) + }) + if err != nil { + return false, err + } } // Create a new block node for the block and add it to the node index. Even @@ -90,5 +107,17 @@ func (b *BlockChain) maybeAcceptBlock(block *btcutil.Block, flags BehaviorFlags) b.sendNotification(NTBlockAccepted, block) }() + // Wait until the block is saved. If there was a db error, then unset + // the data stored flag and flush the block index. + wg.Wait() + if dbStoreError != nil { + b.index.UnsetStatusFlags(newNode, statusDataStored) + err = b.index.flushToDB() + if err != nil { + return false, fmt.Errorf("%v. %v", err, dbStoreError) + } + return false, dbStoreError + } + return isMainChain, nil } diff --git a/go.mod b/go.mod index 1f445d9065..86e8fbc8eb 100644 --- a/go.mod +++ b/go.mod @@ -9,12 +9,13 @@ require ( github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 github.com/btcsuite/winsvc v1.0.0 github.com/davecgh/go-spew v1.1.1 - github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 - github.com/decred/dcrd/lru v1.0.0 + github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 + github.com/decred/dcrd/lru v1.1.2 github.com/gorilla/websocket v1.5.0 github.com/jessevdk/go-flags v1.4.0 github.com/jrick/logrotate v1.0.0 - github.com/stretchr/testify v1.8.4 + github.com/lightninglabs/neutrino v0.16.0 + github.com/stretchr/testify v1.9.0 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 golang.org/x/crypto v0.22.0 golang.org/x/sys v0.19.0 @@ -22,11 +23,12 @@ require ( require ( github.com/aead/siphash v1.0.1 // indirect - github.com/decred/dcrd/crypto/blake256 v1.0.0 // indirect + github.com/decred/dcrd/crypto/blake256 v1.0.1 // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23 // indirect + github.com/kkdai/bstream v1.0.0 // indirect + github.com/kr/text v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/stretchr/objx v0.5.0 // indirect + github.com/stretchr/objx v0.5.2 // indirect golang.org/x/net v0.24.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) @@ -64,3 +66,5 @@ retract ( ) go 1.17 + +replace github.com/lightninglabs/neutrino => /home/calvin/bitcoin-projects/btcd/neutrino diff --git a/go.sum b/go.sum index bb666c89de..218aeef788 100644 --- a/go.sum +++ b/go.sum @@ -1,14 +1,20 @@ github.com/aead/siphash v1.0.1 h1:FwHfE/T45KPKYuuSAKyyvE+oPWcaQ+CUmFW0bPlM+kg= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= +github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI= github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ= github.com/btcsuite/btcd v0.22.0-beta.0.20220111032746-97732e52810c/go.mod h1:tjmYdS6MLJ5/s0Fj4DbLgSbDHbEqLJrtnHecBFkdz5M= +github.com/btcsuite/btcd v0.22.0-beta.0.20220204213055-eaf0459ff879/go.mod h1:osu7EoKiL36UThEgzYPqdRaxeo0NU8VoXqgcnwpey0g= +github.com/btcsuite/btcd v0.22.0-beta.0.20220316175102-8d5c75c28923/go.mod h1:taIcYprAW2g6Z9S0gGUxyR+zDwimyDMK5ePOX+iJ2ds= +github.com/btcsuite/btcd v0.23.5-0.20230711222809-7faa9b266231/go.mod h1:0QJIIN1wwIXF/3G/m87gIwGniDMDQqjVn4SZgnFpsYY= github.com/btcsuite/btcd v0.23.5-0.20231215221805-96c9fd8078fd/go.mod h1:nm3Bko6zh6bWP60UxwoT5LzdGJsQJaPo6HjduXq9p6A= github.com/btcsuite/btcd/btcec/v2 v2.1.0/go.mod h1:2VzYrv4Gm4apmbVVsSq5bqf1Ec8v56E48Vt0Y/umPgA= +github.com/btcsuite/btcd/btcec/v2 v2.1.1/go.mod h1:ctjw4H1kknNJmRN4iP1R7bTQ+v3GJkZBd6mui8ZsAZE= github.com/btcsuite/btcd/btcec/v2 v2.1.3/go.mod h1:ctjw4H1kknNJmRN4iP1R7bTQ+v3GJkZBd6mui8ZsAZE= github.com/btcsuite/btcd/btcec/v2 v2.3.4 h1:3EJjcN70HCu/mwqlUsGK8GcNVyLVxFDlWurTXGPFfiQ= github.com/btcsuite/btcd/btcec/v2 v2.3.4/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04= github.com/btcsuite/btcd/btcutil v1.0.0/go.mod h1:Uoxwv0pqYWhD//tfTiipkxNfdhG9UrLwaeswfjfdF0A= github.com/btcsuite/btcd/btcutil v1.1.0/go.mod h1:5OapHB7A2hBBWLm48mmw4MOHNJCcUBTwmWH/0Jn8VHE= +github.com/btcsuite/btcd/btcutil v1.1.1/go.mod h1:nbKlBMNm9FGsdvKvu0essceubPiAcI57pYBNnsLAa34= github.com/btcsuite/btcd/btcutil v1.1.5 h1:+wER79R5670vs/ZusMTF1yTcRYE5GUsFbdjdisflzM8= github.com/btcsuite/btcd/btcutil v1.1.5/go.mod h1:PSZZ4UitpLBWzxGd5VGOrLnmOjtPP/a6HaFo12zMs00= github.com/btcsuite/btcd/chaincfg/chainhash v1.0.0/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc= @@ -18,6 +24,11 @@ github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0/go.mod h1:7SFka0XMvUgj3hfZtyd github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f h1:bAs4lUbRJpnnkd9VhRV3jjAVU7DJVjMaK+IsvSeZvFo= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= +github.com/btcsuite/btcwallet/wallet/txauthor v1.2.3/go.mod h1:T2xSiKGpUkSLCh68aF+FMXmKK9mFqNdHl9VaqOr+JjU= +github.com/btcsuite/btcwallet/wallet/txrules v1.2.0/go.mod h1:AtkqiL7ccKWxuLYtZm8Bu8G6q82w4yIZdgq6riy60z0= +github.com/btcsuite/btcwallet/wallet/txsizes v1.1.0/go.mod h1:pauEU8UuMFiThe5PB3EO+gO5kx87Me5NvdQDsTuq6cs= +github.com/btcsuite/btcwallet/walletdb v1.3.5/go.mod h1:oJDxAEUHVtnmIIBaa22wSBPTVcs6hUp5NKWmI8xDwwU= +github.com/btcsuite/btcwallet/wtxmgr v1.5.0/go.mod h1:TQVDhFxseiGtZwEPvLgtfyxuNUDsIdaJdshvWzR0HJ4= github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd h1:R/opQEbFEy9JGkIguV40SvRY1uliPX8ifOvi6ICsFCw= github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg= github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVaaLLH7j4eDXPRvw78tMflu7Ie2bzYOH4Y8rRKBY= @@ -28,16 +39,20 @@ github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 h1:R8vQdOQdZ9Y3 github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY= github.com/btcsuite/winsvc v1.0.0 h1:J9B4L7e3oqhXOcm+2IuNApwzQec85lE+QaikUcCs+dk= github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0= github.com/decred/dcrd/crypto/blake256 v1.0.0/go.mod h1:sQl2p6Y26YV+ZOcSTP6thNdn47hh8kt6rqSlvmrXFAc= -github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 h1:YLtO71vCjJRCBcrPMtQ9nqBsqpA1m5sE92cU+pd5Mcc= +github.com/decred/dcrd/crypto/blake256 v1.0.1 h1:7PltbUIQB7u/FfZ39+DGa/ShuMyJ5ilcvdfma9wOH6Y= +github.com/decred/dcrd/crypto/blake256 v1.0.1/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPcCXqlm2ArzUIkw9czNJo= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeCxkaw7y45JueMRL4DIyJDKs= -github.com/decred/dcrd/lru v1.0.0 h1:Kbsb1SFDsIlaupWPwsPp+dkxiBY1frcS07PCPgotKz8= +github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 h1:rpfIENRNNilwHwZeG5+P150SMrnNEcHYvcCuK6dPZSg= +github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= github.com/decred/dcrd/lru v1.0.0/go.mod h1:mxKOwFd7lFjN2GZYsiz/ecgqR6kkYAl+0pz0tEMk218= +github.com/decred/dcrd/lru v1.1.2 h1:KdCzlkxppuoIDGEvCGah1fZRicrDH36IipvlB1ROkFY= +github.com/decred/dcrd/lru v1.1.2/go.mod h1:gEdCVgXs1/YoBvFWt7Scgknbhwik3FgVSzlnCcXL2N8= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= @@ -61,8 +76,23 @@ github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGAR github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jrick/logrotate v1.0.0 h1:lQ1bL/n9mBNeIXoTUoYRlK4dHuNJVofX9oWqBtPnSzI= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= -github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23 h1:FOOIBWrEkLgmlgGfMuZT83xIwfPDxEI2OHu6xUmJMFE= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= +github.com/kkdai/bstream v1.0.0 h1:Se5gHwgp2VT2uHfDrkbbgbgEvV9cimLELwrPJctSjg8= +github.com/kkdai/bstream v1.0.0/go.mod h1:FDnDOHt5Yx4p3FaHcioFT0QjDOtgUpvjeZqAs+NVZZA= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lightninglabs/neutrino v0.16.0 h1:YNTQG32fPR/Zg0vvJVI65OBH8l3U18LSXXtX91hx0q0= +github.com/lightninglabs/neutrino v0.16.0/go.mod h1:x3OmY2wsA18+Kc3TSV2QpSUewOCiscw2mKpXgZv2kZk= +github.com/lightninglabs/neutrino/cache v1.1.2/go.mod h1:XJNcgdOw1LQnanGjw8Vj44CvguYA25IMKjWFZczwZuo= +github.com/lightningnetwork/lnd/clock v1.0.1/go.mod h1:KnQudQ6w0IAMZi1SgvecLZQZ43ra2vpDNj7H/aasemg= +github.com/lightningnetwork/lnd/queue v1.0.1/go.mod h1:vaQwexir73flPW43Mrm7JOgJHmcEFBWWSl9HlyASoms= +github.com/lightningnetwork/lnd/ticker v1.0.0/go.mod h1:iaLXJiVgI1sPANIF2qYYUJXjoksPNvGNYowB8aRbpX0= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -77,19 +107,30 @@ github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.etcd.io/bbolt v1.3.5-0.20200615073812-232d8fc87f50/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= +go.etcd.io/bbolt v1.3.7/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw= +go.etcd.io/gofail v0.1.0/go.mod h1:VZBCXYGZhHAinaBiiqYvuDynvahNsAyLFwB3kEHKz1M= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= @@ -98,8 +139,10 @@ golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180719180050-a680a1efc54d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190206173232-65e2d4e15006/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= @@ -112,6 +155,7 @@ golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w= golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -121,6 +165,7 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -128,6 +173,7 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= @@ -140,6 +186,7 @@ golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= golang.org/x/term v0.19.0/go.mod h1:2CuTdWZ7KHSQwUzKva0cbMg6q2DMI3Mmxp+gKJbskEk= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= @@ -161,12 +208,16 @@ google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/log.go b/log.go index 5707d7c23a..48cc536618 100644 --- a/log.go +++ b/log.go @@ -21,6 +21,7 @@ import ( "github.com/btcsuite/btcd/netsync" "github.com/btcsuite/btcd/peer" "github.com/btcsuite/btcd/txscript" + "github.com/lightninglabs/neutrino/query" "github.com/btcsuite/btclog" "github.com/jrick/logrotate/rotator" @@ -54,21 +55,22 @@ var ( // application shutdown. logRotator *rotator.Rotator - adxrLog = backendLog.Logger("ADXR") - amgrLog = backendLog.Logger("AMGR") - cmgrLog = backendLog.Logger("CMGR") - bcdbLog = backendLog.Logger("BCDB") - btcdLog = backendLog.Logger("BTCD") - chanLog = backendLog.Logger("CHAN") - discLog = backendLog.Logger("DISC") - indxLog = backendLog.Logger("INDX") - minrLog = backendLog.Logger("MINR") - peerLog = backendLog.Logger("PEER") - rpcsLog = backendLog.Logger("RPCS") - scrpLog = backendLog.Logger("SCRP") - srvrLog = backendLog.Logger("SRVR") - syncLog = backendLog.Logger("SYNC") - txmpLog = backendLog.Logger("TXMP") + adxrLog = backendLog.Logger("ADXR") + amgrLog = backendLog.Logger("AMGR") + cmgrLog = backendLog.Logger("CMGR") + bcdbLog = backendLog.Logger("BCDB") + btcdLog = backendLog.Logger("BTCD") + chanLog = backendLog.Logger("CHAN") + discLog = backendLog.Logger("DISC") + indxLog = backendLog.Logger("INDX") + minrLog = backendLog.Logger("MINR") + peerLog = backendLog.Logger("PEER") + rpcsLog = backendLog.Logger("RPCS") + scrpLog = backendLog.Logger("SCRP") + srvrLog = backendLog.Logger("SRVR") + syncLog = backendLog.Logger("SYNC") + txmpLog = backendLog.Logger("TXMP") + queryLog = backendLog.Logger("QURY") ) // Initialize package-global logger variables. @@ -84,6 +86,7 @@ func init() { txscript.UseLogger(scrpLog) netsync.UseLogger(syncLog) mempool.UseLogger(txmpLog) + query.UseLogger(queryLog) } // subsystemLoggers maps each subsystem identifier to its associated logger. @@ -103,6 +106,7 @@ var subsystemLoggers = map[string]btclog.Logger{ "SRVR": srvrLog, "SYNC": syncLog, "TXMP": txmpLog, + "QURY": queryLog, } // initLogRotator initializes the logging rotater to write logs to logFile and diff --git a/netsync/blocklogger.go b/netsync/blocklogger.go index 31a6a4c509..1625c30048 100644 --- a/netsync/blocklogger.go +++ b/netsync/blocklogger.go @@ -6,6 +6,7 @@ package netsync import ( "fmt" + "sort" "sync" "time" @@ -82,3 +83,77 @@ func (b *blockProgressLogger) LogBlockHeight(block *btcutil.Block, chain *blockc func (b *blockProgressLogger) SetLastLogTime(time time.Time) { b.lastBlockLogTime = time } + +// peerLogger logs the progress of blocks downloaded from different peers during +// headers-first download. +type peerLogger struct { + lastPeerLogTime time.Time + peers map[string]int + + subsystemLogger btclog.Logger + sync.Mutex +} + +// newPeerLogger returns a new peerLogger with fields initialized. +func newPeerLogger(logger btclog.Logger) *peerLogger { + return &peerLogger{ + lastPeerLogTime: time.Now(), + subsystemLogger: logger, + peers: make(map[string]int), + } +} + +// LogPeers logs how many blocks have been received from which peers in the last +// 10 seconds. +func (p *peerLogger) LogPeers(peer string) { + p.Lock() + defer p.Unlock() + + count, found := p.peers[peer] + if found { + count++ + p.peers[peer] = count + } else { + p.peers[peer] = 1 + } + + now := time.Now() + duration := now.Sub(p.lastPeerLogTime) + if duration < time.Second*10 { + return + } + // Truncate the duration to 10s of milliseconds. + durationMillis := int64(duration / time.Millisecond) + tDuration := 10 * time.Millisecond * time.Duration(durationMillis/10) + + type peerInfo struct { + name string + count int + } + + // Sort by blocks downloaded before printing. + var sortedPeers []peerInfo + for k, v := range p.peers { + sortedPeers = append(sortedPeers, peerInfo{k, v}) + } + sort.Slice(sortedPeers, func(i, j int) bool { + return sortedPeers[i].count > sortedPeers[j].count + }) + + totalBlocks := 0 + peerDownloadStr := "" + for _, sortedPeer := range sortedPeers { + peerDownloadStr += fmt.Sprintf("%d blocks from %v, ", + sortedPeer.count, sortedPeer.name) + totalBlocks += sortedPeer.count + } + + p.subsystemLogger.Infof("Peer download stats in the last %s. total: %v, %s", + tDuration, totalBlocks, peerDownloadStr) + + // Reset fields. + p.lastPeerLogTime = now + for k := range p.peers { + delete(p.peers, k) + } +} diff --git a/netsync/interface.go b/netsync/interface.go index 6a873bd888..cd848adb1a 100644 --- a/netsync/interface.go +++ b/netsync/interface.go @@ -38,4 +38,7 @@ type Config struct { MaxPeers int FeeEstimator *mempool.FeeEstimator + + // ConnectedPeers returns all the currently connected peers. + ConnectedPeers func() []*peer.Peer } diff --git a/netsync/manager.go b/netsync/manager.go index 3215a86ace..83b37af481 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -20,6 +20,7 @@ import ( "github.com/btcsuite/btcd/mempool" peerpkg "github.com/btcsuite/btcd/peer" "github.com/btcsuite/btcd/wire" + "github.com/lightninglabs/neutrino/query" ) const ( @@ -28,6 +29,16 @@ const ( // more. minInFlightBlocks = 10 + // maxInFlightBlocks is the maximum number of blocks per peer that + // should be in the request queue for headers-first mode before + // requesting more. + maxInFlightBlocksPerPeer = 32 + + // blockDownloadWindow is the maximum number of blocks that are allowed + // to be fetch from the current best tip. This number limits the memory + // usage a node will have. + blockDownloadWindow = 1024 + // maxRejectedTxns is the maximum number of rejected transactions // hashes to store in memory. maxRejectedTxns = 1000 @@ -138,6 +149,14 @@ type pauseMsg struct { unpause <-chan struct{} } +// peerDisconnectMsg is a message type to be sent across the message channel for +// the disconnection of the given peer. +type peerDisconnectMsg struct { + peer string + blocks map[chainhash.Hash]struct{} + reply chan struct{} +} + // headerNode is used as a node in a list of headers that are linked together // between checkpoints. type headerNode struct { @@ -173,6 +192,86 @@ func limitAdd(m map[chainhash.Hash]struct{}, hash chainhash.Hash, limit int) { m[hash] = struct{}{} } +// checkpointedBlocksQuery is a helper to construct query.Requests for GetData +// messages. +type checkpointedBlocksQuery struct { + msg *wire.MsgGetData + sm *SyncManager + blocks map[chainhash.Hash]struct{} +} + +// newCheckpointedBlocksQuery returns an initialized newCheckpointedBlocksQuery. +func newCheckpointedBlocksQuery(msg *wire.MsgGetData, + sm *SyncManager) checkpointedBlocksQuery { + + m := make(map[chainhash.Hash]struct{}, len(msg.InvList)) + for _, inv := range msg.InvList { + m[inv.Hash] = struct{}{} + } + return checkpointedBlocksQuery{msg, sm, m} +} + +// handleResponse returns that the progress is progressed and finished if the +// received wire.Message is a MsgBlock. +func (c *checkpointedBlocksQuery) handleResponse(req, resp wire.Message, + peerAddr string) query.Progress { + + block, ok := resp.(*wire.MsgBlock) + if !ok { + // We are only looking for block messages. + return query.Progress{ + Finished: false, + Progressed: false, + } + } + + // If we didn't find this block in the map of blocks we're expecting, + // we're neither finished nor progressed. + hash := block.BlockHash() + _, found := c.blocks[hash] + if !found { + log.Warnf("Got unrequested block %v from %s -- disconnecting", + hash, peerAddr) + + done := make(chan struct{}) + c.sm.queuePeerToBeDisconnected(peerAddr, c.blocks, done) + + // Block until we disconnect the peer. + <-done + + return query.Progress{ + Finished: false, + Progressed: false, + } + } + delete(c.blocks, hash) + + // If we have blocks we're expecting, we've progressed but not finished. + if len(c.blocks) > 0 { + return query.Progress{ + Finished: false, + Progressed: true, + } + } + + // We have no more blocks we're expecting from heres so we're finished. + return query.Progress{ + Finished: true, + Progressed: true, + } +} + +// requests returns a slice of query.Request that can be queued to +// query.WorkManager. +func (c *checkpointedBlocksQuery) requests() []*query.Request { + req := &query.Request{ + Req: c.msg, + HandleResp: c.handleResponse, + } + + return []*query.Request{req} +} + // SyncManager is used to communicate block related messages with peers. The // SyncManager is started as by executing Start() in a goroutine. Once started, // it selects peers to sync from and starts the initial block download. Once the @@ -186,6 +285,7 @@ type SyncManager struct { txMemPool *mempool.TxPool chainParams *chaincfg.Params progressLogger *blockProgressLogger + peerLogger *peerLogger msgChan chan interface{} wg sync.WaitGroup quit chan struct{} @@ -199,10 +299,15 @@ type SyncManager struct { lastProgressTime time.Time // The following fields are used for headers-first mode. - headersFirstMode bool - headerList *list.List - startHeader *list.Element - nextCheckpoint *chaincfg.Checkpoint + headersFirstMode bool + headerList *list.List + startHeader *list.Element + nextCheckpoint *chaincfg.Checkpoint + queuedBlocks map[chainhash.Hash]*blockMsg + queuedBlocksPrevHash map[chainhash.Hash]chainhash.Hash + peerSubscribers []*peerSubscription + connectedPeers func() []*peerpkg.Peer + fetchManager query.WorkManager // An optional fee estimator. feeEstimator *mempool.FeeEstimator @@ -325,11 +430,6 @@ func (sm *SyncManager) startSync() { // Start syncing from the best peer if one was selected. if bestPeer != nil { - // Clear the requestedBlocks if the sync peer changes, otherwise - // we may ignore blocks we need that the last sync peer failed - // to send. - sm.requestedBlocks = make(map[chainhash.Hash]struct{}) - locator, err := sm.chain.LatestBlockLocator() if err != nil { log.Errorf("Failed to get block locator for the "+ @@ -367,6 +467,10 @@ func (sm *SyncManager) startSync() { "%d from peer %s", best.Height+1, sm.nextCheckpoint.Height, bestPeer.Addr()) } else { + // Clear the requestedBlocks if the sync peer changes, otherwise + // we may ignore blocks we need that the last sync peer failed + // to send. + sm.requestedBlocks = make(map[chainhash.Hash]struct{}) bestPeer.PushGetBlocksMsg(locator, &zeroHash) } sm.syncPeer = bestPeer @@ -452,6 +556,31 @@ func (sm *SyncManager) isSyncCandidate(peer *peerpkg.Peer) bool { return true } +// notifyPeerSubscribers notifies all the current peer subscribers of the peer +// that was passed in. +func (sm *SyncManager) notifyPeerSubscribers(peer *peerpkg.Peer) { + // Loop for alerting subscribers to the new peer that was connected to. + n := 0 + for i, sub := range sm.peerSubscribers { + select { + // Quickly check whether this subscription has been canceled. + case <-sub.cancel: + // Avoid GC leak. + sm.peerSubscribers[i] = nil + continue + default: + } + + // Keep non-canceled subscribers around. + sm.peerSubscribers[n] = sub + n++ + + sub.peers <- peer + } + // Re-align the slice to only active subscribers. + sm.peerSubscribers = sm.peerSubscribers[:n] +} + // handleNewPeerMsg deals with new peers that have signalled they may // be considered as a sync peer (they have already successfully negotiated). It // also starts syncing if needed. It is invoked from the syncHandler goroutine. @@ -461,16 +590,30 @@ func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) { return } - log.Infof("New valid peer %s (%s)", peer, peer.UserAgent()) - // Initialize the peer state. isSyncCandidate := sm.isSyncCandidate(peer) + if !sm.current() && !isSyncCandidate { + log.Infof("New peer %s (%s) is not a valid sync candidate -- disconnecting", + peer, peer.UserAgent()) + peer.Disconnect() + return + } + + log.Infof("New valid peer %s (%s)", peer, peer.UserAgent()) + sm.peerStates[peer] = &peerSyncState{ syncCandidate: isSyncCandidate, requestedTxns: make(map[chainhash.Hash]struct{}), requestedBlocks: make(map[chainhash.Hash]struct{}), } + // Only pass the peer off to the subscribers if we're able to sync off of + // the peer. + bestHeight := sm.chain.BestSnapshot().Height + if isSyncCandidate && peer.LastBlock() > bestHeight { + sm.notifyPeerSubscribers(peer) + } + // Start syncing by choosing the best candidate if needed. if isSyncCandidate && sm.syncPeer == nil { sm.startSync() @@ -565,12 +708,14 @@ func (sm *SyncManager) clearRequestedState(state *peerSyncState) { delete(sm.requestedTxns, txHash) } - // Remove requested blocks from the global map so that they will be - // fetched from elsewhere next time we get an inv. - // TODO: we could possibly here check which peers have these blocks - // and request them now to speed things up a little. - for blockHash := range state.requestedBlocks { - delete(sm.requestedBlocks, blockHash) + if !sm.headersFirstMode { + // Remove requested blocks from the global map so that they will be + // fetched from elsewhere next time we get an inv. + // TODO: we could possibly here check which peers have these blocks + // and request them now to speed things up a little. + for blockHash := range state.requestedBlocks { + delete(sm.requestedBlocks, blockHash) + } } } @@ -685,30 +830,10 @@ func (sm *SyncManager) current() bool { return true } -// handleBlockMsg handles block messages from all peers. -func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { +// processBlock checks if the block connects to the best chain. +func (sm *SyncManager) processBlock(bmsg *blockMsg) (bool, error) { peer := bmsg.peer - state, exists := sm.peerStates[peer] - if !exists { - log.Warnf("Received block message from unknown peer %s", peer) - return - } - - // If we didn't ask for this block then the peer is misbehaving. blockHash := bmsg.block.Hash() - if _, exists = state.requestedBlocks[*blockHash]; !exists { - // The regression test intentionally sends some blocks twice - // to test duplicate block insertion fails. Don't disconnect - // the peer or ignore the block when we're in regression test - // mode in this case so the chain code is actually fed the - // duplicate blocks. - if sm.chainParams != &chaincfg.RegressionNetParams { - log.Warnf("Got unrequested block %v from %s -- "+ - "disconnecting", blockHash, peer.Addr()) - peer.Disconnect() - return - } - } // When in headers-first mode, if the block matches the hash of the // first header in the list of headers that are being fetched, it's @@ -734,12 +859,6 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { } } - // Remove block from request maps. Either chain will know about it and - // so we shouldn't have any more instances of trying to fetch it, or we - // will fail the insert and thus we'll retry next time we get an inv. - delete(state.requestedBlocks, *blockHash) - delete(sm.requestedBlocks, *blockHash) - // Process the block to include validation, best chain selection, orphan // handling, etc. _, isOrphan, err := sm.chain.ProcessBlock(bmsg.block, behaviorFlags) @@ -764,7 +883,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { // send it. code, reason := mempool.ErrToRejectErr(err) peer.PushRejectMsg(wire.CmdBlock, code, reason, blockHash, false) - return + return false, err } // Meta-data about the new block this peer is reporting. We use this @@ -840,22 +959,135 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { } } + return isCheckpointBlock, nil +} + +// handleBlockMsg handles block messages from all peers. +func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { + peer := bmsg.peer + state, exists := sm.peerStates[peer] + if !exists { + log.Warnf("Received block message from unknown peer %s", peer) + return + } + + // If we didn't ask for this block then the peer is misbehaving. + blockHash := bmsg.block.Hash() + if _, exists := state.requestedBlocks[*blockHash]; !exists { + // The regression test intentionally sends some blocks twice + // to test duplicate block insertion fails. Don't disconnect + // the peer or ignore the block when we're in regression test + // mode in this case so the chain code is actually fed the + // duplicate blocks. + if sm.chainParams != &chaincfg.RegressionNetParams { + log.Warnf("Got unrequested block %v from %s."+ + "this peer may be a stalling peer -- disconnecting", + blockHash, peer.Addr()) + peer.Disconnect() + return + } + } + + // Remove block from request maps. Either chain will know about it and + // so we shouldn't have any more instances of trying to fetch it or we + // will fail the insert and thus we'll retry next time we get an inv. + delete(sm.requestedBlocks, *blockHash) + delete(state.requestedBlocks, *blockHash) + + _, err := sm.processBlock(bmsg) + if err != nil { + return + } + // If we are not in headers first mode, it's a good time to periodically // flush the blockchain cache because we don't expect new blocks immediately. // After that, there is nothing more to do. - if !sm.headersFirstMode { - if err := sm.chain.FlushUtxoCache(blockchain.FlushPeriodic); err != nil { - log.Errorf("Error while flushing the blockchain cache: %v", err) - } + if err := sm.chain.FlushUtxoCache(blockchain.FlushPeriodic); err != nil { + log.Errorf("Error while flushing the blockchain cache: %v", err) + } +} + +// handleBlockMsgInHeadersFirst handles block messages from all peers when the +// sync manager is in headers first mode. For blocks received out of order, it +// first keeps them in memory and sends them to be processed when the next block +// from the tip is available. +func (sm *SyncManager) handleBlockMsgInHeadersFirst(bmsg *blockMsg) { + blockHash := bmsg.block.Hash() + peer := bmsg.peer + _, exists := sm.peerStates[peer] + if !exists { + log.Warnf("Received block message from unknown peer %s", peer) return } + // If we didn't ask for this block then the peer may be stalling. + if _, exists := sm.requestedBlocks[*blockHash]; !exists { + // The regression test intentionally sends some blocks twice + // to test duplicate block insertion fails. Don't disconnect + // the peer or ignore the block when we're in regression test + // mode in this case so the chain code is actually fed the + // duplicate blocks. + if sm.chainParams != &chaincfg.RegressionNetParams { + log.Debugf("Got unrequested block %v from %s."+ + "this peer may be a stalling peer", + blockHash, peer.Addr()) + return + } + } + + // Add the block to the queue. + sm.queuedBlocks[*blockHash] = bmsg + sm.queuedBlocksPrevHash[bmsg.block.MsgBlock().Header.PrevBlock] = *blockHash + + // Remove block from the request map. Either chain will know about it + // and so we shouldn't have any more instances of trying to fetch it, we + // keep it in the queued blocks map, or we will fail the insert and thus + // we'll retry next time we get an inv. + delete(sm.requestedBlocks, *blockHash) + + // Since we may receive blocks out of order, attempt to find the next block + // and any other descendent blocks that connect to it. + processBlocks := make([]*blockMsg, 0, 1024) + + bestHash := sm.chain.BestSnapshot().Hash + for len(sm.queuedBlocks) > 0 { + hash, found := sm.queuedBlocksPrevHash[bestHash] + if !found { + break + } + + b, found := sm.queuedBlocks[hash] + if !found { + // Break when we're missing the next block in + // sequence. + break + } + + // Append the block to be processed and delete from the + // queue. + delete(sm.queuedBlocks, hash) + delete(sm.queuedBlocksPrevHash, bestHash) + processBlocks = append(processBlocks, b) + bestHash = hash + } + + var isCheckpointBlock bool + if len(processBlocks) > 0 { + for _, blockMsg := range processBlocks { + var err error + isCheckpointBlock, err = sm.processBlock(blockMsg) + if err != nil { + return + } + } + } + // This is headers-first mode, so if the block is not a checkpoint // request more blocks using the header list when the request queue is // getting short. if !isCheckpointBlock { if sm.startHeader != nil && - len(state.requestedBlocks) < minInFlightBlocks { + len(sm.requestedBlocks) < minInFlightBlocks { sm.fetchHeaderBlocks() } return @@ -870,7 +1102,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { sm.nextCheckpoint = sm.findNextHeaderCheckpoint(prevHeight) if sm.nextCheckpoint != nil { locator := blockchain.BlockLocator([]*chainhash.Hash{prevHash}) - err := peer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash) + err := sm.syncPeer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash) if err != nil { log.Warnf("Failed to send getheaders message to "+ "peer %s: %v", peer.Addr(), err) @@ -889,7 +1121,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { sm.headerList.Init() log.Infof("Reached the final checkpoint -- switching to normal mode") locator := blockchain.BlockLocator([]*chainhash.Hash{blockHash}) - err = peer.PushGetBlocksMsg(locator, &zeroHash) + err := peer.PushGetBlocksMsg(locator, &zeroHash) if err != nil { log.Warnf("Failed to send getblocks message to peer %s: %v", peer.Addr(), err) @@ -906,11 +1138,35 @@ func (sm *SyncManager) fetchHeaderBlocks() { return } + queueFetch := func(msg *wire.MsgGetData, quit chan struct{}) { + if msg == nil || len(msg.InvList) == 0 { + return + } + + // Keep fetching until we don't have an error. + blockMap, err := sm.queueFetchManager(msg) + if err != nil { + Loop: + for err != nil { + select { + case <-quit: + break Loop + default: + } + gdmsg := wire.NewMsgGetDataSizeHint(maxInFlightBlocksPerPeer) + for k := range blockMap { + iv := wire.NewInvVect(wire.InvTypeWitnessBlock, &k) + gdmsg.AddInvVect(iv) + } + blockMap, err = sm.queueFetchManager(gdmsg) + } + } + } + // Build up a getdata request for the list of blocks the headers // describe. The size hint will be limited to wire.MaxInvPerMsg by // the function, so no need to double check it here. - gdmsg := wire.NewMsgGetDataSizeHint(uint(sm.headerList.Len())) - numRequested := 0 + msgs := make([]*wire.MsgGetData, 0, blockDownloadWindow/maxInFlightBlocksPerPeer) for e := sm.startHeader; e != nil; e = e.Next() { node, ok := e.Value.(*headerNode) if !ok { @@ -918,18 +1174,40 @@ func (sm *SyncManager) fetchHeaderBlocks() { continue } + // Prevent the blocks from being too out of order. + if node.height-blockDownloadWindow > sm.chain.BestSnapshot().Height { + break + } + + // Check if the block is already requested. If it is just move + // to the next block. + _, requested := sm.requestedBlocks[*node.hash] + if requested { + sm.startHeader = e.Next() + continue + } + + // Check if the block is already queued. If it is just move to + // the next block. + _, queued := sm.queuedBlocks[*node.hash] + if queued { + sm.startHeader = e.Next() + continue + } + + // Check if we already have the block. iv := wire.NewInvVect(wire.InvTypeBlock, node.hash) haveInv, err := sm.haveInventory(iv) if err != nil { + // If we error out, fetch the block anyways. log.Warnf("Unexpected failure when checking for "+ "existing inventory during header block "+ "fetch: %v", err) } - if !haveInv { - syncPeerState := sm.peerStates[sm.syncPeer] + // We don't have this block so include it in the invs. + if !haveInv { sm.requestedBlocks[*node.hash] = struct{}{} - syncPeerState.requestedBlocks[*node.hash] = struct{}{} // If we're fetching from a witness enabled peer // post-fork, then ensure that we receive all the @@ -938,16 +1216,26 @@ func (sm *SyncManager) fetchHeaderBlocks() { iv.Type = wire.InvTypeWitnessBlock } - gdmsg.AddInvVect(iv) - numRequested++ + if len(msgs) == 0 { + gdmsg := wire.NewMsgGetDataSizeHint(maxInFlightBlocksPerPeer) + msgs = append(msgs, gdmsg) + } + + gdmsg := msgs[len(msgs)-1] + if gdmsg != nil && len(gdmsg.InvList) < maxInFlightBlocksPerPeer { + gdmsg.AddInvVect(iv) + } else { + gdmsg := wire.NewMsgGetDataSizeHint(maxInFlightBlocksPerPeer) + gdmsg.AddInvVect(iv) + msgs = append(msgs, gdmsg) + } } + sm.startHeader = e.Next() - if numRequested >= wire.MaxInvPerMsg { - break - } } - if len(gdmsg.InvList) > 0 { - sm.syncPeer.QueueMessage(gdmsg, nil) + + for _, m := range msgs { + go queueFetch(m, sm.quit) } } @@ -1362,8 +1650,15 @@ out: msg.reply <- struct{}{} case *blockMsg: - sm.handleBlockMsg(msg) - msg.reply <- struct{}{} + if sm.headersFirstMode { + // We're purposefully not sending back + // reply as that'll happen within this + // function. + sm.handleBlockMsgInHeadersFirst(msg) + } else { + sm.handleBlockMsg(msg) + msg.reply <- struct{}{} + } case *invMsg: sm.handleInvMsg(msg) @@ -1402,6 +1697,29 @@ out: case isCurrentMsg: msg.reply <- sm.current() + case peerDisconnectMsg: + found := false + for peer := range sm.peerStates { + if peer.Addr() == msg.peer { + peer.Disconnect() + found = true + // Remove requested blocks from the global map so that they will be + // fetched from elsewhere next time we get an inv. + // TODO: we could possibly here check which peers have these blocks + // and request them now to speed things up a little. + for blockHash := range msg.blocks { + delete(sm.requestedBlocks, blockHash) + } + break + } + } + if !found { + log.Debugf("Disconnect peer %v "+ + "failed. Peer not found "+ + "in peerStates", msg.peer) + } + msg.reply <- struct{}{} + case pauseMsg: // Wait until the sender unpauses the manager. <-msg.unpause @@ -1534,6 +1852,13 @@ func (sm *SyncManager) NewPeer(peer *peerpkg.Peer) { sm.msgChan <- &newPeerMsg{peer: peer} } +// queuePeerToBeDisconnected takes in a string of a peer and passes it to the +// message channel in order for the peer to be disconnected. It's done in this +// fashion to avoid hv +func (sm *SyncManager) queuePeerToBeDisconnected(disconnectPeer string, blocks map[chainhash.Hash]struct{}, done chan struct{}) { + sm.msgChan <- peerDisconnectMsg{disconnectPeer, blocks, done} +} + // QueueTx adds the passed transaction message and peer to the block handling // queue. Responds to the done channel argument after the tx message is // processed. @@ -1557,7 +1882,24 @@ func (sm *SyncManager) QueueBlock(block *btcutil.Block, peer *peerpkg.Peer, done return } - sm.msgChan <- &blockMsg{block: block, peer: peer, reply: done} + if sm.headersFirstMode { + // Since we're in headers first mode, we're downloading blocks + // from multiple peers and we're relying on a timeout to determine if + // the peer is lagging or not. The peer thread is blocked by this + // function here and wait for the block to be processed, then the + // block that the peer sent over is not being read and we may confuse + // the fetchManager that the peer is lagging so we send that we've + // processed the block to the peer so that it'll go download another + // block. + // + // During a headers-first download, it's much harder for a peer + // to be queuing up a bunch of bad blocks as we specifically ask + // for the blocks that had their proof of work verified. + done <- struct{}{} + sm.msgChan <- &blockMsg{block: block, peer: peer, reply: make(chan struct{})} + } else { + sm.msgChan <- &blockMsg{block: block, peer: peer, reply: done} + } } // QueueInv adds the passed inv message and peer to the block handling queue. @@ -1612,6 +1954,10 @@ func (sm *SyncManager) Start() { return } + if err := sm.fetchManager.Start(); err != nil { + log.Info(err) + } + log.Trace("Starting sync manager") sm.wg.Add(1) go sm.blockHandler() @@ -1627,6 +1973,8 @@ func (sm *SyncManager) Stop() error { } log.Infof("Sync manager shutting down") + + sm.fetchManager.Stop() close(sm.quit) sm.wg.Wait() return nil @@ -1666,24 +2014,93 @@ func (sm *SyncManager) Pause() chan<- struct{} { return c } +// peerSubscription holds a peer subscription which we'll notify about any +// connected peers. +type peerSubscription struct { + peers chan<- query.Peer + cancel <-chan struct{} +} + +// ConnectedPeers returns all the currently connected peers to the channel +// and then any additional new peers on connect. +func (sm *SyncManager) ConnectedPeers() (<-chan query.Peer, func(), error) { + peers := sm.connectedPeers() + peerChan := make(chan query.Peer, len(peers)) + + for _, peer := range peers { + if sm.isSyncCandidate(peer) { + peerChan <- peer + } + } + + cancelChan := make(chan struct{}) + sm.peerSubscribers = append(sm.peerSubscribers, &peerSubscription{ + peers: peerChan, + cancel: cancelChan, + }) + + return peerChan, func() { + close(cancelChan) + }, nil +} + +// queueFetchManager queues the given getdata to the fetch manager and waits for +// the resulting error from the channel and returns the value. +func (sm *SyncManager) queueFetchManager(msg *wire.MsgGetData) ( + map[chainhash.Hash]struct{}, error) { + + r := newCheckpointedBlocksQuery(msg, sm) + errChan := sm.fetchManager.Query( + r.requests(), + query.Cancel(sm.quit), + query.NumRetries(0), + ) + + var err error + select { + case err = <-errChan: + return r.blocks, err + case <-sm.quit: + } + + return nil, nil +} + // New constructs a new SyncManager. Use Start to begin processing asynchronous // block, tx, and inv updates. func New(config *Config) (*SyncManager, error) { sm := SyncManager{ - peerNotifier: config.PeerNotifier, - chain: config.Chain, - txMemPool: config.TxMemPool, - chainParams: config.ChainParams, - rejectedTxns: make(map[chainhash.Hash]struct{}), - requestedTxns: make(map[chainhash.Hash]struct{}), - requestedBlocks: make(map[chainhash.Hash]struct{}), - peerStates: make(map[*peerpkg.Peer]*peerSyncState), - progressLogger: newBlockProgressLogger("Processed", log), - msgChan: make(chan interface{}, config.MaxPeers*3), - headerList: list.New(), - quit: make(chan struct{}), - feeEstimator: config.FeeEstimator, - } + peerNotifier: config.PeerNotifier, + chain: config.Chain, + txMemPool: config.TxMemPool, + chainParams: config.ChainParams, + rejectedTxns: make(map[chainhash.Hash]struct{}), + requestedTxns: make(map[chainhash.Hash]struct{}), + requestedBlocks: make(map[chainhash.Hash]struct{}), + peerStates: make(map[*peerpkg.Peer]*peerSyncState), + progressLogger: newBlockProgressLogger("Processed", log), + msgChan: make(chan interface{}, config.MaxPeers*maxInFlightBlocksPerPeer), + peerLogger: newPeerLogger(log), + headerList: list.New(), + quit: make(chan struct{}), + queuedBlocks: make(map[chainhash.Hash]*blockMsg), + queuedBlocksPrevHash: make(map[chainhash.Hash]chainhash.Hash), + feeEstimator: config.FeeEstimator, + connectedPeers: config.ConnectedPeers, + } + sm.fetchManager = query.NewWorkManager( + &query.Config{ + ConnectedPeers: sm.ConnectedPeers, + NewWorker: query.NewWorker, + OnMaxTries: func(peerString string) { + log.Infof("queuing %v to be disconnected", peerString) + + done := make(chan struct{}, 1) + sm.queuePeerToBeDisconnected(peerString, nil, done) + }, + Ranking: query.NewPeerRanking(), + }, + ) best := sm.chain.BestSnapshot() if !config.DisableCheckpoints { diff --git a/peer/peer.go b/peer/peer.go index 5767cbbf66..bffc9f5b13 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -18,19 +18,18 @@ import ( "sync/atomic" "time" - "github.com/btcsuite/go-socks/socks" - "github.com/davecgh/go-spew/spew" - "github.com/decred/dcrd/lru" - "github.com/btcsuite/btcd/blockchain" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/go-socks/socks" + "github.com/davecgh/go-spew/spew" + "github.com/decred/dcrd/lru" ) const ( // MaxProtocolVersion is the max protocol version the peer supports. - MaxProtocolVersion = wire.SendAddrV2Version + MaxProtocolVersion = wire.AddrV2Version // DefaultTrickleInterval is the min time between attempts to send an // inv message to a peer. @@ -495,6 +494,10 @@ type Peer struct { queueQuit chan struct{} outQuit chan struct{} quit chan struct{} + + // subscribers is a channel for relaying all messages that were received + // to this peer. + subscribers []chan wire.Message } // String returns the peer's address and directionality as a human-readable @@ -878,8 +881,8 @@ func (p *Peer) PushAddrMsg(addresses []*wire.NetAddress) ([]*wire.NetAddress, er // // This function is safe for concurrent access. func (p *Peer) PushAddrV2Msg(addrs []*wire.NetAddressV2) ( - []*wire.NetAddressV2, error, -) { + []*wire.NetAddressV2, error) { + count := len(addrs) // Nothing to send. @@ -1099,6 +1102,24 @@ func (p *Peer) readMessage(encoding wire.MessageEncoding) (wire.Message, []byte, return msg, buf, nil } +// SubscribeRecvMsg adds a OnRead subscription to the peer. All bitcoin +// messages received from this peer will be sent on the returned +// channel. A closure is also returned, that should be called to cancel +// the subscription. +func (p *Peer) SubscribeRecvMsg() (<-chan wire.Message, func()) { + msgChan := make(chan wire.Message, 1) + p.subscribers = append(p.subscribers, msgChan) + + // Cancellation is just removing the channel from the subscribers list. + idx := len(p.subscribers) - 1 + cancel := func() { + p.subscribers = append(p.subscribers[:idx], + p.subscribers[idx+1:]...) + } + + return msgChan, cancel +} + // writeMessage sends a bitcoin message to the peer with logging. func (p *Peer) writeMessage(msg wire.Message, enc wire.MessageEncoding) error { // Don't do anything if we're disconnecting. @@ -1403,6 +1424,10 @@ out: // needed. rmsg, buf, err := p.readMessage(p.wireEncoding) idleTimer.Stop() + // Send the received message to all the subscribers. + for _, sub := range p.subscribers { + sub <- rmsg + } if err != nil { // In order to allow regression tests with malformed messages, don't // disconnect the peer when we're in regression test mode and the @@ -1447,6 +1472,7 @@ out: } break out } + atomic.StoreInt64(&p.lastRecv, time.Now().Unix()) p.stallControl <- stallControlMsg{sccReceiveMessage, rmsg} @@ -1901,8 +1927,8 @@ func (p *Peer) QueueMessage(msg wire.Message, doneChan chan<- struct{}) { // // This function is safe for concurrent access. func (p *Peer) QueueMessageWithEncoding(msg wire.Message, doneChan chan<- struct{}, - encoding wire.MessageEncoding, -) { + encoding wire.MessageEncoding) { + // Avoid risk of deadlock if goroutine already exited. The goroutine // we will be sending to hangs around until it knows for a fact that // it is marked as disconnected and *then* it drains the channels. @@ -1962,6 +1988,12 @@ func (p *Peer) Disconnect() { close(p.quit) } +// OnDisconnect returns a channel that will be closed when this peer is +// disconnected. +func (p *Peer) OnDisconnect() <-chan struct{} { + return p.quit +} + // readRemoteVersionMsg waits for the next message to arrive from the remote // peer. If the next message is not a version message or the version is not // acceptable then return an error. @@ -2153,7 +2185,7 @@ func (p *Peer) writeLocalVersionMsg() error { // writeSendAddrV2Msg writes our sendaddrv2 message to the remote peer if the // peer supports protocol version 70016 and above. func (p *Peer) writeSendAddrV2Msg(pver uint32) error { - if pver < wire.SendAddrV2Version { + if pver < wire.AddrV2Version { return nil } @@ -2181,7 +2213,7 @@ func (p *Peer) waitToFinishNegotiation(pver uint32) error { switch m := remoteMsg.(type) { case *wire.MsgSendAddrV2: - if pver >= wire.SendAddrV2Version { + if pver >= wire.AddrV2Version { p.flagsMtx.Lock() p.sendAddrV2 = true p.flagsMtx.Unlock() diff --git a/server.go b/server.go index 66794e4bb7..66bebc63c9 100644 --- a/server.go +++ b/server.go @@ -1982,6 +1982,34 @@ type removeNodeMsg struct { reply chan error } +// ConnectedPeers returns an array consisting of all connected peers. +func (s *server) ConnectedPeers() []*peer.Peer { + replyChan := make(chan []*serverPeer, 1) + + // Send a query for a subscription for the connected peers. + select { + case s.query <- getPeersMsg{ + reply: replyChan, + }: + + case <-s.quit: + return nil + } + + // Wait for the result here. + select { + case reply := <-replyChan: + peers := make([]*peer.Peer, 0, len(reply)) + for _, sp := range reply { + peers = append(peers, sp.Peer) + } + + return peers + case <-s.quit: + return nil + } +} + // handleQuery is the central handler for all queries and commands from other // goroutines related to peer state. func (s *server) handleQuery(state *peerState, querymsg interface{}) { @@ -2912,6 +2940,7 @@ func newServer(listenAddrs, agentBlacklist, agentWhitelist []string, DisableCheckpoints: cfg.DisableCheckpoints, MaxPeers: cfg.MaxPeers, FeeEstimator: s.feeEstimator, + ConnectedPeers: s.ConnectedPeers, }) if err != nil { return nil, err diff --git a/wire/message.go b/wire/message.go index a90fb62733..95f7013ebb 100644 --- a/wire/message.go +++ b/wire/message.go @@ -112,9 +112,6 @@ func makeEmptyMessage(command string) (Message, error) { case CmdSendAddrV2: msg = &MsgSendAddrV2{} - case CmdWTxIdRelay: - msg = &MsgWTxIdRelay{} - case CmdGetAddr: msg = &MsgGetAddr{} @@ -280,8 +277,8 @@ func WriteMessage(w io.Writer, msg Message, pver uint32, btcnet BitcoinNet) erro // to specify the message encoding format to be used when serializing wire // messages. func WriteMessageWithEncodingN(w io.Writer, msg Message, pver uint32, - btcnet BitcoinNet, encoding MessageEncoding, -) (int, error) { + btcnet BitcoinNet, encoding MessageEncoding) (int, error) { + totalBytes := 0 // Enforce max command size. @@ -357,8 +354,8 @@ func WriteMessageWithEncodingN(w io.Writer, msg Message, pver uint32, // allows the caller to specify which message encoding is to to consult when // decoding wire messages. func ReadMessageWithEncodingN(r io.Reader, pver uint32, btcnet BitcoinNet, - enc MessageEncoding, -) (int, Message, []byte, error) { + enc MessageEncoding) (int, Message, []byte, error) { + totalBytes := 0 n, hdr, err := readMessageHeader(r) totalBytes += n diff --git a/wire/msgsendaddrv2.go b/wire/msgsendaddrv2.go index 8348b815ba..7be30d118d 100644 --- a/wire/msgsendaddrv2.go +++ b/wire/msgsendaddrv2.go @@ -15,7 +15,7 @@ type MsgSendAddrV2 struct{} // BtcDecode decodes r using the bitcoin protocol encoding into the receiver. // This is part of the Message interface implementation. func (msg *MsgSendAddrV2) BtcDecode(r io.Reader, pver uint32, enc MessageEncoding) error { - if pver < SendAddrV2Version { + if pver < AddrV2Version { str := fmt.Sprintf("sendaddrv2 message invalid for protocol "+ "version %d", pver) return messageError("MsgSendAddrV2.BtcDecode", str) @@ -27,7 +27,7 @@ func (msg *MsgSendAddrV2) BtcDecode(r io.Reader, pver uint32, enc MessageEncodin // BtcEncode encodes the receiver to w using the bitcoin protocol encoding. // This is part of the Message interface implementation. func (msg *MsgSendAddrV2) BtcEncode(w io.Writer, pver uint32, enc MessageEncoding) error { - if pver < SendAddrV2Version { + if pver < AddrV2Version { str := fmt.Sprintf("sendaddrv2 message invalid for protocol "+ "version %d", pver) return messageError("MsgSendAddrV2.BtcEncode", str) diff --git a/wire/msgsendaddrv2_test.go b/wire/msgsendaddrv2_test.go index db6be1d21f..9161c08acc 100644 --- a/wire/msgsendaddrv2_test.go +++ b/wire/msgsendaddrv2_test.go @@ -45,7 +45,7 @@ func TestSendAddrV2(t *testing.T) { // Older protocol versions should fail encode since message didn't // exist yet. - oldPver := SendAddrV2Version - 1 + oldPver := AddrV2Version - 1 err = msg.BtcEncode(&buf, oldPver, enc) if err == nil { s := "encode of MsgSendAddrV2 passed for old protocol " + @@ -72,10 +72,10 @@ func TestSendAddrV2(t *testing.T) { } // TestSendAddrV2BIP0130 tests the MsgSendAddrV2 API against the protocol -// prior to version SendAddrV2Version. +// prior to version AddrV2Version. func TestSendAddrV2BIP0130(t *testing.T) { - // Use the protocol version just prior to SendAddrV2Version changes. - pver := SendAddrV2Version - 1 + // Use the protocol version just prior to AddrV2Version changes. + pver := AddrV2Version - 1 enc := BaseEncoding msg := NewMsgSendAddrV2() @@ -98,7 +98,7 @@ func TestSendAddrV2BIP0130(t *testing.T) { } // TestSendAddrV2CrossProtocol tests the MsgSendAddrV2 API when encoding with -// the latest protocol version and decoding with SendAddrV2Version. +// the latest protocol version and decoding with AddrV2Version. func TestSendAddrV2CrossProtocol(t *testing.T) { enc := BaseEncoding msg := NewMsgSendAddrV2() @@ -113,7 +113,7 @@ func TestSendAddrV2CrossProtocol(t *testing.T) { // Decode with old protocol version. readmsg := NewMsgSendAddrV2() - err = readmsg.BtcDecode(&buf, SendAddrV2Version, enc) + err = readmsg.BtcDecode(&buf, AddrV2Version, enc) if err != nil { t.Errorf("decode of MsgSendAddrV2 failed [%v] err <%v>", buf, err) @@ -142,21 +142,21 @@ func TestSendAddrV2Wire(t *testing.T) { BaseEncoding, }, - // Protocol version SendAddrV2Version+1 + // Protocol version AddrV2Version+1 { msgSendAddrV2, msgSendAddrV2, msgSendAddrV2Encoded, - SendAddrV2Version + 1, + AddrV2Version + 1, BaseEncoding, }, - // Protocol version SendAddrV2Version + // Protocol version AddrV2Version { msgSendAddrV2, msgSendAddrV2, msgSendAddrV2Encoded, - SendAddrV2Version, + AddrV2Version, BaseEncoding, }, } diff --git a/wire/msgwtxidrelay.go b/wire/msgwtxidrelay.go index 480c711dee..ab131bc5bd 100644 --- a/wire/msgwtxidrelay.go +++ b/wire/msgwtxidrelay.go @@ -19,7 +19,7 @@ type MsgWTxIdRelay struct{} // BtcDecode decodes r using the bitcoin protocol encoding into the receiver. // This is part of the Message interface implementation. func (msg *MsgWTxIdRelay) BtcDecode(r io.Reader, pver uint32, enc MessageEncoding) error { - if pver < WTxIdRelayVersion { + if pver < AddrV2Version { str := fmt.Sprintf("wtxidrelay message invalid for protocol "+ "version %d", pver) return messageError("MsgWTxIdRelay.BtcDecode", str) @@ -31,7 +31,7 @@ func (msg *MsgWTxIdRelay) BtcDecode(r io.Reader, pver uint32, enc MessageEncodin // BtcEncode encodes the receiver to w using the bitcoin protocol encoding. // This is part of the Message interface implementation. func (msg *MsgWTxIdRelay) BtcEncode(w io.Writer, pver uint32, enc MessageEncoding) error { - if pver < WTxIdRelayVersion { + if pver < AddrV2Version { str := fmt.Sprintf("wtxidrelay message invalid for protocol "+ "version %d", pver) return messageError("MsgWTxIdRelay.BtcEncode", str) diff --git a/wire/msgwtxidrelay_test.go b/wire/msgwtxidrelay_test.go index e7cde73a7b..7f519b4e93 100644 --- a/wire/msgwtxidrelay_test.go +++ b/wire/msgwtxidrelay_test.go @@ -45,7 +45,7 @@ func TestWTxIdRelay(t *testing.T) { // Older protocol versions should fail encode since message didn't // exist yet. - oldPver := WTxIdRelayVersion - 1 + oldPver := AddrV2Version - 1 err = msg.BtcEncode(&buf, oldPver, enc) if err == nil { s := "encode of MsgWTxIdRelay passed for old protocol " + @@ -72,10 +72,10 @@ func TestWTxIdRelay(t *testing.T) { } // TestWTxIdRelayBIP0130 tests the MsgWTxIdRelay API against the protocol -// prior to version WTxIdRelayVersion. +// prior to version AddrV2Version. func TestWTxIdRelayBIP0130(t *testing.T) { - // Use the protocol version just prior to WTxIdRelayVersion changes. - pver := WTxIdRelayVersion - 1 + // Use the protocol version just prior to AddrV2Version changes. + pver := AddrV2Version - 1 enc := BaseEncoding msg := NewMsgWTxIdRelay() @@ -98,7 +98,7 @@ func TestWTxIdRelayBIP0130(t *testing.T) { } // TestWTxIdRelayCrossProtocol tests the MsgWTxIdRelay API when encoding with -// the latest protocol version and decoding with WTxIdRelayVersion. +// the latest protocol version and decoding with AddrV2Version. func TestWTxIdRelayCrossProtocol(t *testing.T) { enc := BaseEncoding msg := NewMsgWTxIdRelay() @@ -113,7 +113,7 @@ func TestWTxIdRelayCrossProtocol(t *testing.T) { // Decode with old protocol version. readmsg := NewMsgWTxIdRelay() - err = readmsg.BtcDecode(&buf, WTxIdRelayVersion, enc) + err = readmsg.BtcDecode(&buf, AddrV2Version, enc) if err != nil { t.Errorf("decode of MsgWTxIdRelay failed [%v] err <%v>", buf, err) @@ -142,21 +142,21 @@ func TestWTxIdRelayWire(t *testing.T) { BaseEncoding, }, - // Protocol version WTxIdRelayVersion+1 + // Protocol version AddrV2Version+1 { msgWTxIdRelay, msgWTxIdRelay, msgWTxIdRelayEncoded, - WTxIdRelayVersion + 1, + AddrV2Version + 1, BaseEncoding, }, - // Protocol version WTxIdRelayVersion + // Protocol version AddrV2Version { msgWTxIdRelay, msgWTxIdRelay, msgWTxIdRelayEncoded, - WTxIdRelayVersion, + AddrV2Version, BaseEncoding, }, } diff --git a/wire/protocol.go b/wire/protocol.go index e1344f3aa7..4ed0922b3d 100644 --- a/wire/protocol.go +++ b/wire/protocol.go @@ -52,16 +52,12 @@ const ( // feefilter message. FeeFilterVersion uint32 = 70013 - // SendAddrV2Version is the protocol version which added two new - // messages. sendaddrv2 is sent during the version-verack handshake - // and signals support for sending and receiving the addrv2 message. In - // the future, new messages that occur during the version-verack - // handshake will not come with a protocol version bump. - // In addition, wtxidrelay was also added as an optional message in the - // same protocol version. - SendAddrV2Version uint32 = 70016 - WTxIdRelayVersion uint32 = SendAddrV2Version - AddrV2Version uint32 = SendAddrV2Version // Keep for upstream compatibility + // AddrV2Version is the protocol version which added two new messages. + // sendaddrv2 is sent during the version-verack handshake and signals + // support for sending and receiving the addrv2 message. In the future, + // new messages that occur during the version-verack handshake will not + // come with a protocol version bump. + AddrV2Version uint32 = 70016 ) const (