From 3d3b14301764c7addb546ebd6fd914d708a53ff0 Mon Sep 17 00:00:00 2001
From: Kristoffer Johansson <kristoffer.johansson@gcore.com>
Date: Thu, 19 Dec 2024 11:24:45 +0100
Subject: [PATCH] grpcproxy: add test to reproduce issue with auth token

---
 tests/e2e/etcd_grpcproxy_test.go | 96 ++++++++++++++++++++++++++++++++
 tests/framework/e2e/cluster.go   |  4 ++
 2 files changed, 100 insertions(+)

diff --git a/tests/e2e/etcd_grpcproxy_test.go b/tests/e2e/etcd_grpcproxy_test.go
index 02174e89f626..0e109779dd29 100644
--- a/tests/e2e/etcd_grpcproxy_test.go
+++ b/tests/e2e/etcd_grpcproxy_test.go
@@ -17,6 +17,8 @@ package e2e
 import (
 	"context"
 	"strings"
+	"sync"
+	"sync/atomic"
 	"testing"
 	"time"
 
@@ -142,3 +144,97 @@ func waitForEndpointInLog(ctx context.Context, proxyProc *expect.ExpectProcess,
 
 	return err
 }
+
+func TestGRPCProxyWatchersAfterTokenExpiry(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+	cluster, err := e2e.NewEtcdProcessCluster(ctx, t,
+		e2e.WithClusterSize(1),
+		e2e.WithAuthTokenOpts("simple"),
+		e2e.WithAuthTokenTTL(1),
+	)
+	require.NoError(t, err)
+	t.Cleanup(func() { require.NoError(t, cluster.Stop()) })
+
+	cli := cluster.Etcdctl()
+
+	createUsers(ctx, t, cli)
+
+	require.NoError(t, cli.AuthEnable(ctx))
+
+	var (
+		node1ClientURL = cluster.Procs[0].Config().ClientURL
+		proxyClientURL = "127.0.0.1:42379"
+	)
+
+	proxyProc, err := e2e.SpawnCmd([]string{
+		e2e.BinPath.Etcd, "grpc-proxy", "start",
+		"--advertise-client-url", proxyClientURL,
+		"--listen-addr", proxyClientURL,
+		"--endpoints", node1ClientURL,
+	}, nil)
+	require.NoError(t, err)
+	t.Cleanup(func() { require.NoError(t, proxyProc.Stop()) })
+
+	var totalEventsCount int64
+
+	handler := func(events clientv3.WatchChan) {
+		for {
+			select {
+			case ev, open := <-events:
+				if !open {
+					return
+				}
+				if ev.Err() != nil {
+					t.Logf("watch response error: %s", ev.Err())
+					continue
+				}
+				atomic.AddInt64(&totalEventsCount, 1)
+			case <-ctx.Done():
+				return
+			}
+		}
+	}
+
+	withAuth := e2e.WithAuth("root", "rootPassword")
+	withEndpoint := e2e.WithEndpoints([]string{proxyClientURL})
+
+	events := cluster.Etcdctl(withAuth, withEndpoint).Watch(ctx, "/test", config.WatchOptions{Prefix: true, Revision: 1})
+
+	wg := sync.WaitGroup{}
+
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		handler(events)
+	}()
+
+	clusterCli := cluster.Etcdctl(withAuth)
+	require.NoError(t, clusterCli.Put(ctx, "/test/1", "test", config.PutOptions{}))
+	require.NoError(t, err)
+
+	time.Sleep(time.Second * 2)
+
+	events2 := cluster.Etcdctl(withAuth, withEndpoint).Watch(ctx, "/test", config.WatchOptions{Prefix: true, Revision: 1})
+
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		handler(events2)
+	}()
+
+	events3 := cluster.Etcdctl(withAuth, withEndpoint).Watch(ctx, "/test", config.WatchOptions{Prefix: true, Revision: 1})
+
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		handler(events3)
+	}()
+
+	time.Sleep(time.Second)
+
+	cancel()
+	wg.Wait()
+
+	assert.Equal(t, int64(3), atomic.LoadInt64(&totalEventsCount))
+}
diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go
index 4aff11b9d6f9..fe1d57b1751d 100644
--- a/tests/framework/e2e/cluster.go
+++ b/tests/framework/e2e/cluster.go
@@ -291,6 +291,10 @@ func WithAuthTokenOpts(token string) EPClusterOption {
 	return func(c *EtcdProcessClusterConfig) { c.ServerConfig.AuthToken = token }
 }
 
+func WithAuthTokenTTL(ttl uint) EPClusterOption {
+	return func(c *EtcdProcessClusterConfig) { c.ServerConfig.AuthTokenTTL = ttl }
+}
+
 func WithRollingStart(rolling bool) EPClusterOption {
 	return func(c *EtcdProcessClusterConfig) { c.RollingStart = rolling }
 }