From b8cb654be67b06b59c8e9930bdabdaa3dc5f0733 Mon Sep 17 00:00:00 2001
From: Joshua Zhang <joshuazh@microsoft.com>
Date: Thu, 7 Mar 2024 08:58:01 +0000
Subject: [PATCH] Use strict synchronization for revision getter to minimize
 flaky result caused by time racing.

Signed-off-by: Joshua Zhang <joshuazh@microsoft.com>

Addressed review comments

Co-authored-by: Abhishek Kr Srivastav <Abhishek.kr.srivastav@ibm.com>
Signed-off-by: Abhishek Kr Srivastav <Abhishek.kr.srivastav@ibm.com>
---
 client/pkg/testutil/recorder.go               |  5 ++-
 .../api/v3compactor/periodic_test.go          | 38 +++++++++++--------
 2 files changed, 26 insertions(+), 17 deletions(-)

diff --git a/client/pkg/testutil/recorder.go b/client/pkg/testutil/recorder.go
index cc99914f609..0863593ef7a 100644
--- a/client/pkg/testutil/recorder.go
+++ b/client/pkg/testutil/recorder.go
@@ -115,7 +115,10 @@ func (r *recorderStream) Chan() <-chan Action {
 
 func (r *recorderStream) Wait(n int) ([]Action, error) {
 	acts := make([]Action, n)
-	timeoutC := time.After(r.waitTimeout)
+	var timeoutC <-chan time.Time
+	if r.waitTimeout != 0 {
+		timeoutC = time.After(r.waitTimeout)
+	}
 	for i := 0; i < n; i++ {
 		select {
 		case acts[i] = <-r.ch:
diff --git a/server/etcdserver/api/v3compactor/periodic_test.go b/server/etcdserver/api/v3compactor/periodic_test.go
index 5053482a807..c3a3cb60688 100644
--- a/server/etcdserver/api/v3compactor/periodic_test.go
+++ b/server/etcdserver/api/v3compactor/periodic_test.go
@@ -33,7 +33,7 @@ func TestPeriodicHourly(t *testing.T) {
 
 	fc := clockwork.NewFakeClock()
 	// TODO: Do not depand or real time (Recorder.Wait) in unit tests.
-	rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
+	rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0}
 	compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
 	tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)
 
@@ -43,8 +43,8 @@ func TestPeriodicHourly(t *testing.T) {
 	initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10
 
 	// compaction doesn't happen til 2 hours elapse
-	for i := 0; i < initialIntervals; i++ {
-		rg.Wait(1)
+	for i := 0; i < initialIntervals-1; i++ {
+		waitOneAction(t, rg)
 		fc.Advance(tb.getRetryInterval())
 	}
 
@@ -63,7 +63,7 @@ func TestPeriodicHourly(t *testing.T) {
 	for i := 0; i < 3; i++ {
 		// advance one hour, one revision for each interval
 		for j := 0; j < intervalsPerPeriod; j++ {
-			rg.Wait(1)
+			waitOneAction(t, rg)
 			fc.Advance(tb.getRetryInterval())
 		}
 
@@ -84,7 +84,7 @@ func TestPeriodicMinutes(t *testing.T) {
 	retentionDuration := time.Duration(retentionMinutes) * time.Minute
 
 	fc := clockwork.NewFakeClock()
-	rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
+	rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0}
 	compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
 	tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)
 
@@ -94,8 +94,8 @@ func TestPeriodicMinutes(t *testing.T) {
 	initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10
 
 	// compaction doesn't happen til 5 minutes elapse
-	for i := 0; i < initialIntervals; i++ {
-		rg.Wait(1)
+	for i := 0; i < initialIntervals-1; i++ {
+		waitOneAction(t, rg)
 		fc.Advance(tb.getRetryInterval())
 	}
 
@@ -113,7 +113,7 @@ func TestPeriodicMinutes(t *testing.T) {
 	for i := 0; i < 5; i++ {
 		// advance 5-minute, one revision for each interval
 		for j := 0; j < intervalsPerPeriod; j++ {
-			rg.Wait(1)
+			waitOneAction(t, rg)
 			fc.Advance(tb.getRetryInterval())
 		}
 
@@ -132,7 +132,7 @@ func TestPeriodicMinutes(t *testing.T) {
 func TestPeriodicPause(t *testing.T) {
 	fc := clockwork.NewFakeClock()
 	retentionDuration := time.Hour
-	rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
+	rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0}
 	compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
 	tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)
 
@@ -143,7 +143,7 @@ func TestPeriodicPause(t *testing.T) {
 
 	// tb will collect 3 hours of revisions but not compact since paused
 	for i := 0; i < n*3; i++ {
-		rg.Wait(1)
+		waitOneAction(t, rg)
 		fc.Advance(tb.getRetryInterval())
 	}
 	// t.revs = [21 22 23 24 25 26 27 28 29 30]
@@ -156,7 +156,7 @@ func TestPeriodicPause(t *testing.T) {
 
 	// tb resumes to being blocked on the clock
 	tb.Resume()
-	rg.Wait(1)
+	waitOneAction(t, rg)
 
 	// unblock clock, will kick off a compaction at T=3h6m by retry
 	fc.Advance(tb.getRetryInterval())
@@ -179,7 +179,7 @@ func TestPeriodicSkipRevNotChange(t *testing.T) {
 	retentionDuration := time.Duration(retentionMinutes) * time.Minute
 
 	fc := clockwork.NewFakeClock()
-	rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
+	rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0}
 	compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
 	tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)
 
@@ -189,10 +189,10 @@ func TestPeriodicSkipRevNotChange(t *testing.T) {
 	initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10
 
 	// first compaction happens til 5 minutes elapsed
-	for i := 0; i < initialIntervals; i++ {
+	for i := 0; i < initialIntervals-1; i++ {
 		// every time set the same revision with 100
 		rg.SetRev(int64(100))
-		rg.Wait(1)
+		waitOneAction(t, rg)
 		fc.Advance(tb.getRetryInterval())
 	}
 
@@ -212,7 +212,7 @@ func TestPeriodicSkipRevNotChange(t *testing.T) {
 	for i := 0; i < 5; i++ {
 		for j := 0; j < intervalsPerPeriod; j++ {
 			rg.SetRev(int64(100))
-			rg.Wait(1)
+			waitOneAction(t, rg)
 			fc.Advance(tb.getRetryInterval())
 		}
 
@@ -224,7 +224,7 @@ func TestPeriodicSkipRevNotChange(t *testing.T) {
 
 	// when revision changed, compaction is normally
 	for i := 0; i < initialIntervals; i++ {
-		rg.Wait(1)
+		waitOneAction(t, rg)
 		fc.Advance(tb.getRetryInterval())
 	}
 
@@ -238,3 +238,9 @@ func TestPeriodicSkipRevNotChange(t *testing.T) {
 		t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
 	}
 }
+
+func waitOneAction(t *testing.T, r testutil.Recorder) {
+	if actions, _ := r.Wait(1); len(actions) != 1 {
+		t.Errorf("expect 1 action, got %v instead", len(actions))
+	}
+}