diff --git a/client/clients/tso/client.go b/client/clients/tso/client.go index 7bc768ee21b..1a101cf6b17 100644 --- a/client/clients/tso/client.go +++ b/client/clients/tso/client.go @@ -116,8 +116,7 @@ func NewClient( }, } - eventSrc := svcDiscovery.(sd.TSOEventSource) - eventSrc.SetTSOLeaderURLUpdatedCallback(c.updateTSOLeaderURL) + c.svcDiscovery.ExecuteAndAddServingURLSwitchedCallback(c.updateTSOLeaderURL) c.svcDiscovery.AddServiceURLsSwitchedCallback(c.scheduleUpdateTSOConnectionCtxs) return c diff --git a/client/inner_client.go b/client/inner_client.go index 8379b6a51a9..4fc63bdef3a 100644 --- a/client/inner_client.go +++ b/client/inner_client.go @@ -129,11 +129,12 @@ func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) { } } -func (c *innerClient) scheduleUpdateTokenConnection() { +func (c *innerClient) scheduleUpdateTokenConnection(string) error { select { case c.updateTokenConnectionCh <- struct{}{}: default: } + return nil } func (c *innerClient) getServiceMode() pdpb.ServiceMode { diff --git a/client/servicediscovery/callbacks.go b/client/servicediscovery/callbacks.go new file mode 100644 index 00000000000..f3076ab4b52 --- /dev/null +++ b/client/servicediscovery/callbacks.go @@ -0,0 +1,102 @@ +// Copyright 2025 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package servicediscovery + +import ( + "sync" + + "github.com/pingcap/kvproto/pkg/pdpb" +) + +type leaderSwitchedCallbackFunc func(string) error + +// serviceCallbacks contains all the callback functions for service discovery events +type serviceCallbacks struct { + sync.RWMutex + // serviceModeUpdateCb will be called when the service mode gets updated + serviceModeUpdateCb func(pdpb.ServiceMode) + // leaderSwitchedCbs will be called after the leader switched + leaderSwitchedCbs []leaderSwitchedCallbackFunc + // membersChangedCbs will be called after there is any membership change in the + // leader and followers + membersChangedCbs []func() +} + +func newServiceCallbacks() *serviceCallbacks { + return &serviceCallbacks{ + leaderSwitchedCbs: make([]leaderSwitchedCallbackFunc, 0), + membersChangedCbs: make([]func(), 0), + } +} + +func (c *serviceCallbacks) setServiceModeUpdateCallback(cb func(pdpb.ServiceMode)) { + c.Lock() + defer c.Unlock() + c.serviceModeUpdateCb = cb +} + +func (c *serviceCallbacks) addServingURLSwitchedCallback(cb leaderSwitchedCallbackFunc) { + c.Lock() + defer c.Unlock() + c.leaderSwitchedCbs = append(c.leaderSwitchedCbs, cb) +} + +func (c *serviceCallbacks) addServiceURLsSwitchedCallback(cb func()) { + c.Lock() + defer c.Unlock() + c.membersChangedCbs = append(c.membersChangedCbs, cb) +} + +func (c *serviceCallbacks) callServiceModeUpdateCallback(mode pdpb.ServiceMode) { + c.RLock() + cb := c.serviceModeUpdateCb + c.RUnlock() + + if cb == nil { + return + } + cb(mode) +} + +func (c *serviceCallbacks) callLeaderSwitchedCallback(leader string) error { + c.RLock() + cbs := make([]leaderSwitchedCallbackFunc, len(c.leaderSwitchedCbs)) + copy(cbs, c.leaderSwitchedCbs) + c.RUnlock() + + for _, cb := range cbs { + if cb == nil { + continue + } + if err := cb(leader); err != nil { + return err + } + } + return nil +} + +func (c *serviceCallbacks) callMembersChangedCallback() { + c.RLock() + cbs := make([]func(), len(c.membersChangedCbs)) + copy(cbs, c.membersChangedCbs) + c.RUnlock() + + for _, cb := range cbs { + if cb == nil { + continue + } + cb() + } +} diff --git a/client/servicediscovery/mock_service_discovery.go b/client/servicediscovery/mock_service_discovery.go index 6ca649f4575..5e4e2654412 100644 --- a/client/servicediscovery/mock_service_discovery.go +++ b/client/servicediscovery/mock_service_discovery.go @@ -100,8 +100,11 @@ func (*mockServiceDiscovery) ScheduleCheckMemberChanged() {} // CheckMemberChanged implements the ServiceDiscovery interface. func (*mockServiceDiscovery) CheckMemberChanged() error { return nil } +// ExecuteAndAddServingURLSwitchedCallback implements the ServiceDiscovery interface. +func (*mockServiceDiscovery) ExecuteAndAddServingURLSwitchedCallback(leaderSwitchedCallbackFunc) {} + // AddServingURLSwitchedCallback implements the ServiceDiscovery interface. -func (*mockServiceDiscovery) AddServingURLSwitchedCallback(...func()) {} +func (*mockServiceDiscovery) AddServingURLSwitchedCallback(leaderSwitchedCallbackFunc) {} // AddServiceURLsSwitchedCallback implements the ServiceDiscovery interface. -func (*mockServiceDiscovery) AddServiceURLsSwitchedCallback(...func()) {} +func (*mockServiceDiscovery) AddServiceURLsSwitchedCallback(func()) {} diff --git a/client/servicediscovery/service_discovery.go b/client/servicediscovery/service_discovery.go index f5ac665b7cd..8574ceadf42 100644 --- a/client/servicediscovery/service_discovery.go +++ b/client/servicediscovery/service_discovery.go @@ -126,14 +126,16 @@ type ServiceDiscovery interface { // CheckMemberChanged immediately check if there is any membership change among the leader/followers // in a quorum-based cluster or among the primary/secondaries in a primary/secondary configured cluster. CheckMemberChanged() error + // ExecuteAndAddServingURLSwitchedCallback executes the callback once and adds it to the callback list then. + ExecuteAndAddServingURLSwitchedCallback(cb leaderSwitchedCallbackFunc) // AddServingURLSwitchedCallback adds callbacks which will be called when the leader // in a quorum-based cluster or the primary in a primary/secondary configured cluster // is switched. - AddServingURLSwitchedCallback(callbacks ...func()) + AddServingURLSwitchedCallback(cb leaderSwitchedCallbackFunc) // AddServiceURLsSwitchedCallback adds callbacks which will be called when any leader/follower // in a quorum-based cluster or any primary/secondary in a primary/secondary configured cluster // is changed. - AddServiceURLsSwitchedCallback(callbacks ...func()) + AddServiceURLsSwitchedCallback(cb func()) } // ServiceClient is an interface that defines a set of operations for a raw PD gRPC client to specific PD server. @@ -394,18 +396,8 @@ func (c *serviceBalancer) get() (ret ServiceClient) { // UpdateKeyspaceIDFunc is the function type for updating the keyspace ID. type UpdateKeyspaceIDFunc func() error -type tsoLeaderURLUpdatedFunc func(string) error -// TSOEventSource subscribes to events related to changes in the TSO leader/primary from the service discovery. -type TSOEventSource interface { - // SetTSOLeaderURLUpdatedCallback adds a callback which will be called when the TSO leader/primary is updated. - SetTSOLeaderURLUpdatedCallback(callback tsoLeaderURLUpdatedFunc) -} - -var ( - _ ServiceDiscovery = (*serviceDiscovery)(nil) - _ TSOEventSource = (*serviceDiscovery)(nil) -) +var _ ServiceDiscovery = (*serviceDiscovery)(nil) // serviceDiscovery is the service discovery client of PD/PD service which is quorum based type serviceDiscovery struct { @@ -426,15 +418,7 @@ type serviceDiscovery struct { // url -> a gRPC connection clientConns sync.Map // Store as map[string]*grpc.ClientConn - // serviceModeUpdateCb will be called when the service mode gets updated - serviceModeUpdateCb func(pdpb.ServiceMode) - // leaderSwitchedCbs will be called after the leader switched - leaderSwitchedCbs []func() - // membersChangedCbs will be called after there is any membership change in the - // leader and followers - membersChangedCbs []func() - // tsoLeaderUpdatedCb will be called when the TSO leader is updated. - tsoLeaderUpdatedCb tsoLeaderURLUpdatedFunc + callbacks *serviceCallbacks checkMembershipCh chan struct{} @@ -474,12 +458,13 @@ func NewServiceDiscovery( cancel: cancel, wg: wg, apiCandidateNodes: [apiKindCount]*serviceBalancer{newServiceBalancer(emptyErrorFn), newServiceBalancer(regionAPIErrorFn)}, - serviceModeUpdateCb: serviceModeUpdateCb, + callbacks: newServiceCallbacks(), updateKeyspaceIDFunc: updateKeyspaceIDFunc, keyspaceID: keyspaceID, tlsCfg: tlsCfg, option: option, } + pdsd.callbacks.setServiceModeUpdateCallback(serviceModeUpdateCb) urls = tlsutil.AddrsToURLs(urls, tlsCfg) pdsd.urls.Store(urls) return pdsd @@ -570,7 +555,7 @@ func (c *serviceDiscovery) updateServiceModeLoop() { failpoint.Return() }) failpoint.Inject("usePDServiceMode", func() { - c.serviceModeUpdateCb(pdpb.ServiceMode_PD_SVC_MODE) + c.callbacks.callServiceModeUpdateCallback(pdpb.ServiceMode_PD_SVC_MODE) failpoint.Return() }) @@ -791,27 +776,29 @@ func (c *serviceDiscovery) CheckMemberChanged() error { return c.updateMember() } -// AddServingURLSwitchedCallback adds callbacks which will be called -// when the leader is switched. -func (c *serviceDiscovery) AddServingURLSwitchedCallback(callbacks ...func()) { - c.leaderSwitchedCbs = append(c.leaderSwitchedCbs, callbacks...) -} - -// AddServiceURLsSwitchedCallback adds callbacks which will be called when -// any leader/follower is changed. -func (c *serviceDiscovery) AddServiceURLsSwitchedCallback(callbacks ...func()) { - c.membersChangedCbs = append(c.membersChangedCbs, callbacks...) -} - -// SetTSOLeaderURLUpdatedCallback adds a callback which will be called when the TSO leader is updated. -func (c *serviceDiscovery) SetTSOLeaderURLUpdatedCallback(callback tsoLeaderURLUpdatedFunc) { +// ExecuteAndAddServingURLSwitchedCallback executes the callback once and adds it to the callback list then. +func (c *serviceDiscovery) ExecuteAndAddServingURLSwitchedCallback(callback leaderSwitchedCallbackFunc) { url := c.getLeaderURL() if len(url) > 0 { if err := callback(url); err != nil { - log.Error("[tso] failed to call back when tso leader url update", zap.String("url", url), errs.ZapError(err)) + log.Error("[pd] failed to run a callback with the current leader url", + zap.String("url", url), errs.ZapError(err)) } } - c.tsoLeaderUpdatedCb = callback + c.AddServingURLSwitchedCallback(callback) +} + +// AddServingURLSwitchedCallback adds callbacks which will be called when the leader +// in a quorum-based cluster or the primary in a primary/secondary configured cluster +// is switched. +func (c *serviceDiscovery) AddServingURLSwitchedCallback(callback leaderSwitchedCallbackFunc) { + c.callbacks.addServingURLSwitchedCallback(callback) +} + +// AddServiceURLsSwitchedCallback adds callbacks which will be called when any primary/secondary +// in a primary/secondary configured cluster is changed. +func (c *serviceDiscovery) AddServiceURLsSwitchedCallback(callback func()) { + c.callbacks.addServiceURLsSwitchedCallback(callback) } // getLeaderURL returns the leader URL. @@ -867,9 +854,7 @@ func (c *serviceDiscovery) checkServiceModeChanged() error { // If the method is not supported, we set it to pd mode. // TODO: it's a hack way to solve the compatibility issue. // we need to remove this after all maintained version supports the method. - if c.serviceModeUpdateCb != nil { - c.serviceModeUpdateCb(pdpb.ServiceMode_PD_SVC_MODE) - } + c.callbacks.callServiceModeUpdateCallback(pdpb.ServiceMode_PD_SVC_MODE) return nil } return err @@ -877,9 +862,7 @@ func (c *serviceDiscovery) checkServiceModeChanged() error { if clusterInfo == nil || len(clusterInfo.ServiceModes) == 0 { return errors.WithStack(errs.ErrNoServiceModeReturned) } - if c.serviceModeUpdateCb != nil { - c.serviceModeUpdateCb(clusterInfo.ServiceModes[0]) - } + c.callbacks.callServiceModeUpdateCallback(clusterInfo.ServiceModes[0]) return nil } @@ -968,9 +951,7 @@ func (c *serviceDiscovery) updateURLs(members []*pdpb.Member) { } c.urls.Store(urls) // Run callbacks to reflect the membership changes in the leader and followers. - for _, cb := range c.membersChangedCbs { - cb() - } + c.callbacks.callMembersChangedCallback() log.Info("[pd] update member urls", zap.Strings("old-urls", oldURLs), zap.Strings("new-urls", urls)) } @@ -980,23 +961,18 @@ func (c *serviceDiscovery) switchLeader(url string) (bool, error) { return false, nil } - newConn, err := c.GetOrCreateGRPCConn(url) + newConn, _ := c.GetOrCreateGRPCConn(url) // If gRPC connect is created successfully or leader is new, still saves. if url != oldLeader.GetURL() || newConn != nil { leaderClient := newPDServiceClient(url, url, newConn, true) c.leader.Store(leaderClient) } // Run callbacks - if c.tsoLeaderUpdatedCb != nil { - if err := c.tsoLeaderUpdatedCb(url); err != nil { - return true, err - } - } - for _, cb := range c.leaderSwitchedCbs { - cb() + if err := c.callbacks.callLeaderSwitchedCallback(url); err != nil { + return true, err } log.Info("[pd] switch leader", zap.String("new-leader", url), zap.String("old-leader", oldLeader.GetURL())) - return true, err + return true, nil } func (c *serviceDiscovery) updateFollowers(members []*pdpb.Member, leaderID uint64, leaderURL string) (changed bool) { diff --git a/client/servicediscovery/tso_service_discovery.go b/client/servicediscovery/tso_service_discovery.go index 7734fd23107..d3f9a6b307a 100644 --- a/client/servicediscovery/tso_service_discovery.go +++ b/client/servicediscovery/tso_service_discovery.go @@ -55,10 +55,7 @@ const ( tsoQueryRetryInterval = 500 * time.Millisecond ) -var ( - _ ServiceDiscovery = (*tsoServiceDiscovery)(nil) - _ TSOEventSource = (*tsoServiceDiscovery)(nil) -) +var _ ServiceDiscovery = (*tsoServiceDiscovery)(nil) // keyspaceGroupSvcDiscovery is used for discovering the serving endpoints of the keyspace // group to which the keyspace belongs @@ -144,7 +141,7 @@ type tsoServiceDiscovery struct { clientConns sync.Map // Store as map[string]*grpc.ClientConn // tsoLeaderUpdatedCb will be called when the TSO leader is updated. - tsoLeaderUpdatedCb tsoLeaderURLUpdatedFunc + tsoLeaderUpdatedCb leaderSwitchedCallbackFunc checkMembershipCh chan struct{} @@ -361,16 +358,8 @@ func (c *tsoServiceDiscovery) CheckMemberChanged() error { return nil } -// AddServingURLSwitchedCallback adds callbacks which will be called when the primary in -// a primary/secondary configured cluster is switched. -func (*tsoServiceDiscovery) AddServingURLSwitchedCallback(...func()) {} - -// AddServiceURLsSwitchedCallback adds callbacks which will be called when any primary/secondary -// in a primary/secondary configured cluster is changed. -func (*tsoServiceDiscovery) AddServiceURLsSwitchedCallback(...func()) {} - -// SetTSOLeaderURLUpdatedCallback adds a callback which will be called when the TSO leader is updated. -func (c *tsoServiceDiscovery) SetTSOLeaderURLUpdatedCallback(callback tsoLeaderURLUpdatedFunc) { +// ExecuteAndAddServingURLSwitchedCallback executes the callback once and adds it to the callback list then. +func (c *tsoServiceDiscovery) ExecuteAndAddServingURLSwitchedCallback(callback leaderSwitchedCallbackFunc) { url := c.getPrimaryURL() if len(url) > 0 { if err := callback(url); err != nil { @@ -380,6 +369,14 @@ func (c *tsoServiceDiscovery) SetTSOLeaderURLUpdatedCallback(callback tsoLeaderU c.tsoLeaderUpdatedCb = callback } +// AddServingURLSwitchedCallback adds callbacks which will be called when the primary in +// a primary/secondary configured cluster is switched. +func (*tsoServiceDiscovery) AddServingURLSwitchedCallback(leaderSwitchedCallbackFunc) {} + +// AddServiceURLsSwitchedCallback adds callbacks which will be called when any primary/secondary +// in a primary/secondary configured cluster is changed. +func (*tsoServiceDiscovery) AddServiceURLsSwitchedCallback(func()) {} + // GetServiceClient implements ServiceDiscovery func (c *tsoServiceDiscovery) GetServiceClient() ServiceClient { return c.serviceDiscovery.GetServiceClient() diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 0e0bf25d74d..7351f8ee30c 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -238,8 +238,9 @@ func TestGetTSAfterTransferLeader(t *testing.T) { defer cli.Close() var leaderSwitched atomic.Bool - cli.GetServiceDiscovery().AddServingURLSwitchedCallback(func() { + cli.GetServiceDiscovery().AddServingURLSwitchedCallback(func(string) error { leaderSwitched.Store(true) + return nil }) err = cluster.GetServer(leader).ResignLeader() re.NoError(err)