From bfaae1ba462c91aaf149a285b8d2369807044f71 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Thu, 6 Dec 2018 11:09:46 +0100 Subject: [PATCH] raft: enter ProgressStateReplica immediately after snapshot When a follower requires a snapshot and the snapshot is sent at the committed (and last) index in an otherwise idle Raft group, the follower would previously remain in ProgressStateProbe even though it had been caught up completely. In a busy Raft group this wouldn't be an issue since the next round of MsgApp would update the state, but in an idle group there's nothing that rectifies the status (since there's nothing to append or update). The reason this matters is that the state is exposed through `RaftStatus()`. Concretely, in CockroachDB, we use the Raft status to make sharding decisions (since it's dangerous to make rapid changes to a fragile Raft group), and had to work around this problem[1]. [1]: https://github.com/cockroachdb/cockroach/blob/91b11dae416f3d9b55fadd2a4b096e94d874176c/pkg/storage/split_delay_helper.go#L138-L141 --- raft/raft.go | 6 +++ raft/raft_snap_test.go | 119 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 123 insertions(+), 2 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index 56dd20810ef..211a3b0e2e4 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -1078,7 +1078,13 @@ func stepLeader(r *raft, m pb.Message) error { pr.becomeReplicate() case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort(): r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr) + // Transition back to replicating state via probing state + // (which takes the snapshot into account). If we didn't + // move to replicating state, that would only happen with + // the next round of appends (but there may not be a next + // round for a while, exposing an inconsistent RaftStatus). pr.becomeProbe() + pr.becomeReplicate() case pr.State == ProgressStateReplicate: pr.ins.freeTo(m.Index) } diff --git a/raft/raft_snap_test.go b/raft/raft_snap_test.go index a80ed4da8b5..14cedc23cdb 100644 --- a/raft/raft_snap_test.go +++ b/raft/raft_snap_test.go @@ -111,6 +111,114 @@ func TestSnapshotSucceed(t *testing.T) { } } +// TestSnapshotSucceedViaAppResp regression tests the situation in which a snap- +// shot is sent to a follower at the most recent index (i.e. the snapshot index +// is the leader's last index is the committed index). In that situation, a bug +// in the past left the follower in probing status until the next log entry was +// committed. +func TestSnapshotSucceedViaAppResp(t *testing.T) { + snap := pb.Snapshot{ + Metadata: pb.SnapshotMetadata{ + Index: 11, // magic number + Term: 11, // magic number + ConfState: pb.ConfState{Nodes: []uint64{1, 2, 3}}, + }, + } + + s1 := NewMemoryStorage() + n1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, s1) + + // Become follower because otherwise the way this test sets things up the + // leadership term will be 1 (which is stale). We want it to match the snap- + // shot term in this test. + n1.becomeFollower(snap.Metadata.Term-1, 2) + n1.becomeCandidate() + n1.becomeLeader() + + // Apply a snapshot on the leader. This is a workaround against the fact that + // the leader will always append an empty entry, but that empty entry works + // against what we're trying to assert in this test, namely that a snapshot + // at the latest committed index leaves the follower in probing state. + // With the snapshot, the empty entry is fully committed. + n1.restore(snap) + + noMessage := pb.MessageType(-1) + mustSend := func(from, to *raft, typ pb.MessageType) pb.Message { + t.Helper() + for i, msg := range from.msgs { + if msg.From != from.id || msg.To != to.id || msg.Type != typ { + continue + } + t.Log(DescribeMessage(msg, func([]byte) string { return "" })) + if err := to.Step(msg); err != nil { + t.Fatalf("%v: %s", msg, err) + } + from.msgs = append(from.msgs[:i], from.msgs[i+1:]...) + return msg + } + if typ == noMessage { + if len(from.msgs) == 0 { + return pb.Message{} + } + t.Fatalf("expected no more messages, but got %d->%d %v", from.id, to.id, from.msgs) + } + t.Fatalf("message %d->%d %s not found in %v", from.id, to.id, typ, from.msgs) + return pb.Message{} // unreachable + } + + // Create the follower that will receive the snapshot. + s2 := NewMemoryStorage() + n2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, s2) + + // Let the leader probe the follower. + if !n1.maybeSendAppend(2, true /* sendIfEmpty */) { + t.Fatalf("expected message to be sent") + } + if msg := mustSend(n1, n2, pb.MsgApp); len(msg.Entries) > 0 { + // For this test to work, the leader must not have anything to append + // to the follower right now. + t.Fatalf("unexpectedly appending entries %v", msg.Entries) + } + + // Follower rejects the append (because it doesn't have any log entries) + if msg := mustSend(n2, n1, pb.MsgAppResp); !msg.Reject { + t.Fatalf("expected a rejection with zero hint, got reject=%t hint=%d", msg.Reject, msg.RejectHint) + } + + expIdx := snap.Metadata.Index + // Leader sends snapshot due to RejectHint of zero (the storage we use here + // has index zero compacted). + if msg := mustSend(n1, n2, pb.MsgSnap); msg.Snapshot.Metadata.Index != expIdx { + t.Fatalf("expected snapshot at index %d, got %d", expIdx, msg.Snapshot.Metadata.Index) + } + + // n2 reacts to snapshot with MsgAppResp. + if msg := mustSend(n2, n1, pb.MsgAppResp); msg.Index != expIdx { + t.Fatalf("expected AppResp at index %d, got %d", expIdx, msg.Index) + } + + // Leader sends MsgApp to communicate commit index. + if msg := mustSend(n1, n2, pb.MsgApp); msg.Commit != expIdx { + t.Fatalf("expected commit index %d, got %d", expIdx, msg.Commit) + } + + // Follower responds. + mustSend(n2, n1, pb.MsgAppResp) + + // Leader has correct state for follower. + pr := n1.prs[2] + if pr.State != ProgressStateReplicate { + t.Fatalf("unexpected state %v", pr) + } + if pr.Match != expIdx || pr.Next != expIdx+1 { + t.Fatalf("expected match = %d, next = %d; got match = %d and next = %d", expIdx, expIdx+1, pr.Match, pr.Next) + } + + // Leader and follower are done. + mustSend(n1, n2, noMessage) + mustSend(n2, n1, noMessage) +} + func TestSnapshotAbort(t *testing.T) { storage := NewMemoryStorage() sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage) @@ -128,7 +236,14 @@ func TestSnapshotAbort(t *testing.T) { if sm.prs[2].PendingSnapshot != 0 { t.Fatalf("PendingSnapshot = %d, want 0", sm.prs[2].PendingSnapshot) } - if sm.prs[2].Next != 12 { - t.Fatalf("Next = %d, want 12", sm.prs[2].Next) + // The follower entered ProgressStateReplicate and the leader send an append + // and optimistically updated the progress (so we see 13 instead of 12). + // There is something to append because the leader appended an empty entry + // to the log at index 12 when it assumed leadership. + if sm.prs[2].Next != 13 { + t.Fatalf("Next = %d, want 13", sm.prs[2].Next) + } + if n := sm.prs[2].ins.count; n != 1 { + t.Fatalf("expected an inflight message, got %d", n) } }