diff --git a/go.mod b/go.mod index d1dca375f4..0dd8605831 100644 --- a/go.mod +++ b/go.mod @@ -37,6 +37,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.13.0 github.com/robfig/cron/v3 v3.0.1 + github.com/samber/lo v1.33.0 github.com/smallnest/weighted v0.0.0-20200122032019-adf21c9b8bd1 github.com/smartystreets/goconvey v1.7.2 github.com/spf13/cast v1.5.0 @@ -139,7 +140,7 @@ require ( go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect go.uber.org/atomic v1.9.0 // indirect golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect - golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 // indirect + golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 // indirect diff --git a/go.sum b/go.sum index fc2de3c0b6..dc83cb533d 100644 --- a/go.sum +++ b/go.sum @@ -549,6 +549,8 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= +github.com/samber/lo v1.33.0 h1:2aKucr+rQV6gHpY3bpeZu69uYoQOzVhGT3J22Op6Cjk= +github.com/samber/lo v1.33.0/go.mod h1:HLeWcJRRyLKp3+/XBJvOrerCQn9mhdKMHyd7IRlgeQ8= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= @@ -596,6 +598,7 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/thoas/go-funk v0.9.1 h1:O549iLZqPpTUQ10ykd26sZhzD+rmR5pWhuElrhbC20M= github.com/tidwall/gjson v1.13.0 h1:3TFY9yxOQShrvmjdM76K+jc66zJeT6D3/VFFYCGQf7M= github.com/tidwall/gjson v1.13.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= @@ -714,8 +717,9 @@ golang.org/x/exp v0.0.0-20191129062945-2f5052295587/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= -golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 h1:QE6XYQK6naiK1EPAe1g/ILLxN5RBoH5xkJk3CqlMI/Y= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= +golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM= +golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= diff --git a/pkg/registry/etcdv3/registry.go b/pkg/registry/etcdv3/registry.go index 81bd463325..9b2cd185f9 100644 --- a/pkg/registry/etcdv3/registry.go +++ b/pkg/registry/etcdv3/registry.go @@ -52,7 +52,7 @@ const ( // defaultRetryTimes default retry times defaultRetryTimes = 3 // defaultKeepAliveTimeout is the default timeout for keepalive requests. - defaultKeepaliveTimeout = 5 * time.Second + defaultRegisterTimeout = 5 * time.Second // servicePrefix is the prefix of service key servicePrefix = "%s:%s:%s:%s/" // registerService is servicePrefix+host:port @@ -110,12 +110,15 @@ func (reg *etcdv3Registry) ListServices(ctx context.Context, name string, scheme } for _, kv := range getResp.Kvs { - var service server.ServiceInfo + var service Update if err := json.Unmarshal(kv.Value, &service); err != nil { reg.logger.Warn("invalid service", xlog.FieldErr(err)) continue } - services = append(services, &service) + + services = append(services, &server.ServiceInfo{ + Address: service.Addr, + }) } return @@ -234,7 +237,7 @@ func (reg *etcdv3Registry) registerKV(ctx context.Context, key, val string) erro // opOptions = append(opOptions, clientv3.WithSerializable()) if ttl := reg.Config.ServiceTTL.Seconds(); ttl > 0 { // 这里基于应用名为key做缓存,每个服务实例应该只需要创建一个lease,降低etcd的压力 - lease, err := reg.getLeaseID(ctx, int64(reg.ServiceTTL.Seconds())) + lease, err := reg.getLeaseID(ctx) if err != nil { reg.logger.Error("getSession", xlog.FieldErrKind(ecode.ErrKindRegisterErr), xlog.FieldErr(err), xlog.FieldKeyAny(key), xlog.FieldValueAny(val)) @@ -259,7 +262,7 @@ func (reg *etcdv3Registry) registerKV(ctx context.Context, key, val string) erro return nil } -func (reg *etcdv3Registry) getLeaseID(ctx context.Context, ttl int64) (clientv3.LeaseID, error) { +func (reg *etcdv3Registry) getLeaseID(ctx context.Context) (clientv3.LeaseID, error) { reg.rmu.Lock() defer reg.rmu.Unlock() @@ -267,7 +270,7 @@ func (reg *etcdv3Registry) getLeaseID(ctx context.Context, ttl int64) (clientv3. return reg.leaseID, nil } - grant, err := reg.client.Grant(ctx, ttl) + grant, err := reg.client.Grant(ctx, int64(reg.ServiceTTL.Seconds())) if err != nil { reg.logger.Error("reg.client.Grant failed", xlog.FieldErrKind(ecode.ErrKindRegisterErr), xlog.FieldErr(err)) return 0, err @@ -301,26 +304,17 @@ func (reg *etcdv3Registry) doKeepalive(ctx context.Context) { // do register again, and retry 3 times err := reg.registerAllKvs(cancelCtx) if err != nil { + cancel() return } - // try do keepalive again - // when error or timeout happens, just exit the goroutine - kac, err = reg.client.KeepAlive(cancelCtx, reg.leaseID) - if err != nil { - reg.logger.Error("reg.client.KeepAlive failed", xlog.FieldErrKind(ecode.ErrKindRegisterErr), xlog.FieldErr(err)) - return - } - - reg.logger.Debug("reg.client.KeepAlive finished", xlog.String("leaseid", fmt.Sprintf("%x", reg.leaseID))) - done <- struct{}{} }() // wait keepalive success select { - case <-time.After(defaultKeepaliveTimeout): - // when timeout or error happens + case <-time.After(defaultRegisterTimeout): + // when timeout happens // we should cancel the context and retry again cancel() // mark leaseID as 0 to retry register @@ -331,6 +325,17 @@ func (reg *etcdv3Registry) doKeepalive(ctx context.Context) { // when done happens, we just receive the kac channel // or wait the registry context done } + + // try do keepalive again + // when error or timeout happens, just continue + kac, err = reg.client.KeepAlive(ctx, reg.leaseID) + if err != nil { + reg.logger.Error("reg.client.KeepAlive failed", xlog.FieldErrKind(ecode.ErrKindRegisterErr), xlog.FieldErr(err)) + + continue + } + + reg.logger.Debug("reg.client.KeepAlive finished", xlog.String("leaseid", fmt.Sprintf("%x", reg.leaseID))) } select { diff --git a/pkg/registry/etcdv3/registry_test.go b/pkg/registry/etcdv3/registry_test.go index a56c4e9dc4..0afbcea479 100644 --- a/pkg/registry/etcdv3/registry_test.go +++ b/pkg/registry/etcdv3/registry_test.go @@ -17,37 +17,21 @@ package etcdv3 import ( "context" "fmt" - "log" "testing" "time" + "github.com/samber/lo" "github.com/douyu/jupiter/pkg/client/etcdv3" "github.com/douyu/jupiter/pkg/core/constant" "github.com/douyu/jupiter/pkg/registry" "github.com/douyu/jupiter/pkg/server" "github.com/douyu/jupiter/pkg/xlog" "github.com/stretchr/testify/assert" - "go.etcd.io/etcd/client/v3/mock/mockserver" ) -func startMockServer() { - ms, err := mockserver.StartMockServers(1) - if err != nil { - log.Fatal(err) - } - - if err := ms.StartAt(0); err != nil { - log.Fatal(err) - } -} - -func TestMain(m *testing.M) { - go startMockServer() -} - func Test_etcdv3Registry(t *testing.T) { etcdConfig := etcdv3.DefaultConfig() - etcdConfig.Endpoints = []string{"localhost:0"} + etcdConfig.Endpoints = []string{"localhost:2379"} registry, err := newETCDRegistry(&Config{ Config: etcdConfig, ReadTimeout: time.Second * 10, @@ -112,7 +96,7 @@ func Test_etcdv3Registry(t *testing.T) { func Test_etcdv3registry_UpdateAddressList(t *testing.T) { etcdConfig := etcdv3.DefaultConfig() - etcdConfig.Endpoints = []string{"localhost:0"} + etcdConfig.Endpoints = []string{"localhost:2379"} reg, err := newETCDRegistry(&Config{ Config: etcdConfig, ReadTimeout: time.Second * 10, @@ -151,3 +135,64 @@ func Test_etcdv3registry_UpdateAddressList(t *testing.T) { _ = reg.Close() time.Sleep(time.Second * 1) } + +func TestKeepalive(t *testing.T) { + etcdConfig := etcdv3.DefaultConfig() + etcdConfig.Endpoints = []string{"localhost:2379"} + reg, err := newETCDRegistry(&Config{ + Config: etcdConfig, + ReadTimeout: time.Second * 10, + Prefix: "jupiter", + logger: xlog.Jupiter(), + ServiceTTL: time.Second, + }) + assert.Nil(t, err) + assert.Nil(t, reg.RegisterService(context.Background(), &server.ServiceInfo{ + Name: "service_2", + AppID: "", + Scheme: "grpc", + Address: "10.10.10.1:9091", + Weight: 0, + Enable: true, + Healthy: true, + Metadata: map[string]string{}, + Region: "default", + Zone: "default", + Kind: constant.ServiceProvider, + Deployment: "default", + Group: "", + })) + assert.Nil(t, reg.RegisterService(context.Background(), &server.ServiceInfo{ + Name: "service_2", + AppID: "", + Scheme: "grpc", + Address: "10.10.10.1:9092", + Weight: 0, + Enable: true, + Healthy: true, + Metadata: map[string]string{}, + Region: "default", + Zone: "default", + Kind: constant.ServiceProvider, + Deployment: "default", + Group: "", + })) + + lease := reg.leaseID + reg.client.Revoke(reg.ctx, lo.Must(reg.getLeaseID(reg.ctx))) + + time.Sleep(1 * time.Second) + assert.NotZero(t,lo.Must(reg.getLeaseID(reg.ctx))) + assert.True(t, lease != lo.Must(reg.getLeaseID(reg.ctx))) + + ttl, err := reg.client.TimeToLive(reg.ctx, lease) + assert.Nil(t, err) + assert.Equal(t, int64(-1), ttl.TTL) + + ttl, err = reg.client.TimeToLive(reg.ctx, lo.Must(reg.getLeaseID(reg.ctx))) + assert.Nil(t, err) + assert.Equal(t, int64(1), ttl.TTL) + + _ = reg.Close() + time.Sleep(time.Second * 1) +}