Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

catchup: fetchAndWrite/fetchRound quit early on errNoBlockForRound #5809

Merged
Merged
40 changes: 25 additions & 15 deletions catchup/peerSelector.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ const (
peerRank4LowBlockTime = 801
peerRank4HighBlockTime = 999

// peerRankNoBlockForRound is used for responses failed because of no block for round
// This indicates a peer is either behind or a block has not happened yet, or does not have a block that is old enough.
peerRankNoBlockForRound = 2000

// peerRankDownloadFailed is used for responses which could be temporary, such as missing files, or such that we don't
// have clear resolution
peerRankDownloadFailed = 10000
Expand Down Expand Up @@ -143,7 +147,7 @@ func makeHistoricStatus(windowSize int, class peerClass) *historicStats {
// that will determine the rank of the peer.
hs := historicStats{
windowSize: windowSize,
rankSamples: make([]int, windowSize, windowSize),
rankSamples: make([]int, windowSize),
requestGaps: make([]uint64, 0, windowSize),
rankSum: uint64(class.initialRank) * uint64(windowSize),
gapSum: 0.0}
Expand Down Expand Up @@ -229,18 +233,24 @@ func (hs *historicStats) push(value int, counter uint64, class peerClass) (avera

// Download may fail for various reasons. Give it additional tries
// and see if it recovers/improves.
if value == peerRankDownloadFailed {
factor := float64(1.0)
switch value {
// - Set the rank to the class upper bound multiplied
// by the number of downloadFailures.
// - Each downloadFailure increments the counter, and
// each non-failure decrements it, until it gets to 0.
// - When the peer is consistently failing to
// download, the value added to rankSum will
// increase at an increasing rate to evict the peer
// from the class sooner.
case peerRankNoBlockForRound:
// for the no block errors apply very smooth rank increase
factor = 0.1
fallthrough
case peerRankDownloadFailed:
hs.downloadFailures++
// - Set the rank to the class upper bound multiplied
// by the number of downloadFailures.
// - Each downloadFailure increments the counter, and
// each non-failure decrements it, until it gets to 0.
// - When the peer is consistently failing to
// download, the value added to rankSum will
// increase at an increasing rate to evict the peer
// from the class sooner.
value = upperBound(class) * int(math.Exp2(float64(hs.downloadFailures)))
} else {
value = upperBound(class) * int(math.Exp2(float64(hs.downloadFailures)*factor))
default:
if hs.downloadFailures > 0 {
hs.downloadFailures--
}
Expand All @@ -252,12 +262,12 @@ func (hs *historicStats) push(value int, counter uint64, class peerClass) (avera
// The average performance of the peer
average := float64(hs.rankSum) / float64(len(hs.rankSamples))

if int(average) > upperBound(class) && initialRank == peerRankDownloadFailed {
if int(average) > upperBound(class) && (initialRank == peerRankDownloadFailed || initialRank == peerRankNoBlockForRound) {
// peerRankDownloadFailed will be delayed, to give the peer
// additional time to improve. If does not improve over time,
// the average will exceed the class limit. At this point,
// it will be pushed down to download failed class.
return peerRankDownloadFailed
return initialRank
}

// A penalty is added relative to how freequently the peer is used
Expand Down Expand Up @@ -470,7 +480,7 @@ func (ps *peerSelector) refreshAvailablePeers() {
for peerIdx := len(pool.peers) - 1; peerIdx >= 0; peerIdx-- {
peer := pool.peers[peerIdx].peer
if peerAddress := peerAddress(peer); peerAddress != "" {
if toRemove, _ := existingPeers[pool.peers[peerIdx].class.peerClass][peerAddress]; toRemove {
if toRemove := existingPeers[pool.peers[peerIdx].class.peerClass][peerAddress]; toRemove {
// need to be removed.
pool.peers = append(pool.peers[:peerIdx], pool.peers[peerIdx+1:]...)
}
Expand Down
46 changes: 44 additions & 2 deletions catchup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@
return
}

const errNoBlockForRoundThreshold = 5

// fetchAndWrite fetches a block, checks the cert, and writes it to the ledger. Cert checking and ledger writing both wait for the ledger to advance if necessary.
// Returns false if we should stop trying to catch up. This may occur for several reasons:
// - If the context is canceled (e.g. if the node is shutting down)
Expand All @@ -254,6 +256,11 @@
if dontSyncRound := s.GetDisableSyncRound(); dontSyncRound != 0 && r >= basics.Round(dontSyncRound) {
return false
}

// peerErrors tracks occurrences of errNoBlockForRound in order to quit earlier without making
// repeated requests for a block that most likely does not exist yet
peerErrors := map[network.Peer]int{}

i := 0
for {
i++
Expand Down Expand Up @@ -302,8 +309,19 @@
s.log.Infof("fetchAndWrite(%d): the block is already in the ledger. The catchup is complete", r)
return false
}
failureRank := peerRankDownloadFailed
if err == errNoBlockForRound {
failureRank = peerRankNoBlockForRound
// remote peer doesn't have the block, try another peer
// quit if the the same peer peer encountered errNoBlockForRound more than errNoBlockForRoundThreshold times
if count := peerErrors[peer]; count > errNoBlockForRoundThreshold {
s.log.Infof("fetchAndWrite(%d): remote peers do not have the block. Quitting", r)
return false
}
peerErrors[peer]++
}
s.log.Debugf("fetchAndWrite(%v): Could not fetch: %v (attempt %d)", r, err, i)
peerSelector.rankPeer(psp, peerRankDownloadFailed)
peerSelector.rankPeer(psp, failureRank)

// we've just failed to retrieve a block; wait until the previous block is fetched before trying again
// to avoid the usecase where the first block doesn't exist, and we're making many requests down the chain
Expand Down Expand Up @@ -689,6 +707,8 @@
return
}

peerErrors := map[network.Peer]int{}

blockHash := bookkeeping.BlockHash(cert.Proposal.BlockDigest) // semantic digest (i.e., hash of the block header), not byte-for-byte digest
peerSelector := createPeerSelector(s.net, s.cfg, false)
for s.ledger.LastRound() < cert.Round {
Expand All @@ -710,8 +730,30 @@
return
default:
}
failureRank := peerRankDownloadFailed
if err == errNoBlockForRound {
gmalouf marked this conversation as resolved.
Show resolved Hide resolved
failureRank = peerRankNoBlockForRound

Check warning on line 735 in catchup/service.go

View check run for this annotation

Codecov / codecov/patch

catchup/service.go#L733-L735

Added lines #L733 - L735 were not covered by tests
// If a peer does not have the block after few attempts it probably has not persisted the block yet.
// Give it some time to persist the block and try again.
// None, there is no exit condition on too many retries as per the function contract.
if count, ok := peerErrors[peer]; ok {
if count > errNoBlockForRoundThreshold {
time.Sleep(50 * time.Millisecond)

Check warning on line 741 in catchup/service.go

View check run for this annotation

Codecov / codecov/patch

catchup/service.go#L739-L741

Added lines #L739 - L741 were not covered by tests
}
if count > errNoBlockForRoundThreshold*10 {

Check warning on line 743 in catchup/service.go

View check run for this annotation

Codecov / codecov/patch

catchup/service.go#L743

Added line #L743 was not covered by tests
// for the low number of connected peers (like 2) the following scenatio is possible:
// - both peers do not have the block
// - peer selector punishes one of the peers more than the other
// - the punoshed peer gets the block, and the less punished peer stucks.
// It this case reset the peer selector to let it re-learn priorities.
peerSelector = createPeerSelector(s.net, s.cfg, false)

Check warning on line 749 in catchup/service.go

View check run for this annotation

Codecov / codecov/patch

catchup/service.go#L749

Added line #L749 was not covered by tests
}
}
peerErrors[peer]++

Check warning on line 752 in catchup/service.go

View check run for this annotation

Codecov / codecov/patch

catchup/service.go#L752

Added line #L752 was not covered by tests
}
// remote peer doesn't have the block, try another peer
logging.Base().Warnf("fetchRound could not acquire block, fetcher errored out: %v", err)
peerSelector.rankPeer(psp, peerRankDownloadFailed)
peerSelector.rankPeer(psp, failureRank)

Check warning on line 756 in catchup/service.go

View check run for this annotation

Codecov / codecov/patch

catchup/service.go#L756

Added line #L756 was not covered by tests
continue
}

Expand Down
81 changes: 76 additions & 5 deletions catchup/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"context"
"errors"
"math/rand"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -180,6 +182,27 @@ func (cl *periodicSyncLogger) Warnf(s string, args ...interface{}) {
cl.Logger.Warnf(s, args...)
}

type periodicSyncDebugLogger struct {
periodicSyncLogger
debugMsgFilter []string
debugMsgs atomic.Uint32
}

func (cl *periodicSyncDebugLogger) Debugf(s string, args ...interface{}) {
// save debug messages for later inspection.
if len(cl.debugMsgFilter) > 0 {
for _, filter := range cl.debugMsgFilter {
if strings.Contains(s, filter) {
cl.debugMsgs.Add(1)
break
}
}
} else {
cl.debugMsgs.Add(1)
}
cl.Logger.Debugf(s, args...)
}

func TestSyncRound(t *testing.T) {
partitiontest.PartitionTest(t)

Expand Down Expand Up @@ -208,7 +231,7 @@ func TestSyncRound(t *testing.T) {

auth := &mockedAuthenticator{fail: true}
initialLocalRound := local.LastRound()
require.True(t, 0 == initialLocalRound)
require.Zero(t, initialLocalRound)

// Make Service
localCfg := config.GetDefaultLocal()
Expand Down Expand Up @@ -253,7 +276,7 @@ func TestSyncRound(t *testing.T) {
s.UnsetDisableSyncRound()
// wait until the catchup is done
waitStart = time.Now()
for time.Now().Sub(waitStart) < 8*s.deadlineTimeout {
for time.Since(waitStart) < 8*s.deadlineTimeout {
if remote.LastRound() == local.LastRound() {
break
}
Expand Down Expand Up @@ -298,7 +321,7 @@ func TestPeriodicSync(t *testing.T) {

auth := &mockedAuthenticator{fail: true}
initialLocalRound := local.LastRound()
require.True(t, 0 == initialLocalRound)
require.Zero(t, initialLocalRound)

// Make Service
s := MakeService(logging.Base(), defaultConfig, net, local, auth, nil, nil)
Expand All @@ -315,7 +338,7 @@ func TestPeriodicSync(t *testing.T) {
// wait until the catchup is done. Since we've might have missed the sleep window, we need to wait
// until the synchronization is complete.
waitStart := time.Now()
for time.Now().Sub(waitStart) < 10*s.deadlineTimeout {
for time.Since(waitStart) < 10*s.deadlineTimeout {
if remote.LastRound() == local.LastRound() {
break
}
Expand Down Expand Up @@ -506,7 +529,6 @@ func TestServiceFetchBlocksMultiBlocks(t *testing.T) {
localBlock, err := local.Block(i)
require.NoError(t, err)
require.Equal(t, *blk, localBlock)
return
}
}

Expand Down Expand Up @@ -1184,3 +1206,52 @@ func TestServiceLedgerUnavailable(t *testing.T) {
require.Greater(t, local.LastRound(), basics.Round(0))
require.Less(t, local.LastRound(), remote.LastRound())
}

// TestServiceNoBlockForRound checks if fetchAndWrite does not repeats 500 times if a block not avaialble
func TestServiceNoBlockForRound(t *testing.T) {
partitiontest.PartitionTest(t)

// Make Ledger
local := new(mockedLedger)
local.blocks = append(local.blocks, bookkeeping.Block{})

remote, _, blk, err := buildTestLedger(t, bookkeeping.Block{})
if err != nil {
t.Fatal(err)
return
}
numBlocks := 10
addBlocks(t, remote, blk, numBlocks)

// Create a network and block service
blockServiceConfig := config.GetDefaultLocal()
net := &httpTestPeerSource{}
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
net.addPeer(rootURL)

require.Equal(t, basics.Round(0), local.LastRound())
require.Equal(t, basics.Round(numBlocks+1), remote.LastRound())

// Make Service
auth := &mockedAuthenticator{fail: false}
cfg := config.GetDefaultLocal()
cfg.CatchupParallelBlocks = 8
s := MakeService(logging.Base(), cfg, net, local, auth, nil, nil)
pl := &periodicSyncDebugLogger{periodicSyncLogger: periodicSyncLogger{Logger: logging.Base()}}
s.log = pl
s.deadlineTimeout = 1 * time.Second

s.testStart()
defer s.Stop()
s.sync()

// without the fix there are about 2k messages (4x catchupRetryLimit)
// with the fix expect less than catchupRetryLimit
require.Less(t, int(pl.debugMsgs.Load()), catchupRetryLimit)
}
18 changes: 0 additions & 18 deletions test/e2e-go/features/catchup/basicCatchup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,24 +73,6 @@ func TestBasicCatchup(t *testing.T) {
// Now, catch up
err = fixture.LibGoalFixture.ClientWaitForRoundWithTimeout(cloneClient, waitForRound)
a.NoError(err)

cloneNC := fixture.GetNodeControllerForDataDir(cloneDataDir)
cloneRestClient := fixture.GetAlgodClientForController(cloneNC)

// an immediate call for ready will error, for sync time != 0
a.Error(cloneRestClient.ReadyCheck())

for {
status, err := cloneRestClient.Status()
a.NoError(err)

if status.LastRound < 10 {
time.Sleep(250 * time.Millisecond)
continue
}
a.NoError(cloneRestClient.ReadyCheck())
break
}
}

// TestCatchupOverGossip tests catchup across network versions
Expand Down