Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support to integrate with the PD HTTP client #1049

Merged
merged 5 commits into from
Nov 9, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -2,6 +2,8 @@ module github.com/tikv/client-go/v2

go 1.21

replace github.com/tikv/pd/client v0.0.0-20230724080549-de985b8e0afc => github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864

require (
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864 h1:q7k2boDgGSdnX/gXaIhk59V33J6GcxhpI53eoyVvMM0=
github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
@@ -111,8 +114,6 @@ github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/pd/client v0.0.0-20230724080549-de985b8e0afc h1:IUg0j2nWoGYj3FQ3vA3vg97fPSpJEZQrDpgF8RkMLEU=
github.com/tikv/pd/client v0.0.0-20230724080549-de985b8e0afc/go.mod h1:wfHRc4iYaqJiOQZCHcrF+r4hYnkGDaYWDfcicee//pc=
github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA=
github.com/twmb/murmur3 v1.1.3/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
2 changes: 2 additions & 0 deletions integration_tests/go.mod
Original file line number Diff line number Diff line change
@@ -2,6 +2,8 @@ module integration_tests

go 1.21

replace github.com/tikv/pd/client v0.0.0-20230912103610-2f57a9f050eb => github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864

require (
github.com/ninedraft/israce v0.0.3
github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32
4 changes: 2 additions & 2 deletions integration_tests/go.sum
Original file line number Diff line number Diff line change
@@ -31,6 +31,8 @@ github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ=
github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM=
github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo=
github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864 h1:q7k2boDgGSdnX/gXaIhk59V33J6GcxhpI53eoyVvMM0=
github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ=
github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY=
github.com/Joker/jade v1.0.1-0.20190614124447-d475f43051e7/go.mod h1:6E6s8o2AE4KhCrqr6GRJjdC/gNfTdxkIXvuGZZda2VM=
github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww=
@@ -507,8 +509,6 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tikv/pd/client v0.0.0-20230912103610-2f57a9f050eb h1:hAcH9tFjQzQ3+ofrAHm4ajOTLliYCOfXpj3+boKOtac=
github.com/tikv/pd/client v0.0.0-20230912103610-2f57a9f050eb/go.mod h1:E+6qtPu8fJm5kNjvKWPVFqSgNAFPk07y2EjD03GWzuI=
github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs=
github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM=
github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI=
94 changes: 29 additions & 65 deletions integration_tests/pd_api_test.go
Original file line number Diff line number Diff line change
@@ -48,14 +48,15 @@ type apiTestSuite struct {
}

func (s *apiTestSuite) SetupTest() {
require := s.Require()
addrs := strings.Split(*pdAddrs, ",")
pdClient, err := pd.NewClient(addrs, pd.SecurityOption{})
s.Require().NoError(err)
require.NoError(err)
rpcClient := tikv.NewRPCClient()
s.Require().NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`))
require.NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`))
// Set PD HTTP client.
store, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient(nil, addrs))
s.store = store
s.store, err = tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient(addrs, nil))
require.NoError(err)
storeID := uint64(1)
s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, nil)
}
@@ -122,18 +123,17 @@ func (s *apiTestSuite) TestGetStoresMinResolvedTS() {
storeID := uint64(1)
s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, labels)
// Try to get the minimum resolved timestamp of the stores from PD.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
var retryCount int
for s.store.GetMinSafeTS(dcLabel) != 100 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`))
s.waitForMinSafeTS(dcLabel, 100)
require.Equal(int32(0), atomic.LoadInt32(&mockClient.requestCount))
require.Equal(uint64(100), s.store.GetMinSafeTS(dcLabel))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))
}

func (s *apiTestSuite) waitForMinSafeTS(txnScope string, ts uint64) {
s.Eventually(func() bool {
return s.store.GetMinSafeTS(txnScope) == ts
}, time.Second, 200*time.Millisecond)
}

func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
@@ -142,22 +142,15 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient())
s.store.SetTiKVClient(&mockClient)
// Try to get the minimum resolved timestamp of the cluster from PD.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
var retryCount int
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`))
s.waitForMinSafeTS(oracle.GlobalTxnScope, 100)
require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0))
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))

// Set DC label for store 1.
// Mock PD server not support get min resolved ts by stores.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`))
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`))
dcLabel := "testDC"
restore := config.UpdateGlobal(func(conf *config.Config) {
conf.TxnScope = dcLabel
@@ -173,18 +166,10 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
storeID := uint64(1)
s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, labels)
// Try to get the minimum resolved timestamp of the store from TiKV.
retryCount = 0
for s.store.GetMinSafeTS(dcLabel) != 150 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}

s.waitForMinSafeTS(dcLabel, 150)
require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1))
require.Equal(uint64(150), s.store.GetMinSafeTS(dcLabel))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))
}

func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() {
@@ -196,47 +181,26 @@ func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() {
// Make sure the store's min resolved ts is not initialized.
mockClient.SetKVSafeTS(0)
// Try to get the minimum resolved timestamp of the cluster from TiKV.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`))
var retryCount int
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != math.MaxUint64 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`))
// Make sure the store's min resolved ts is not initialized.
s.waitForMinSafeTS(oracle.GlobalTxnScope, math.MaxUint64)
require.Equal(uint64(math.MaxUint64), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))

// Try to get the minimum resolved timestamp of the cluster from PD.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
retryCount = 0
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) == math.MaxUint64 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`))
// Make sure the store's min resolved ts is not regarded as MaxUint64.
s.waitForMinSafeTS(oracle.GlobalTxnScope, 100)
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))

// Fallback to KV Request when PD server not support get min resolved ts.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`))
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`))
mockClient.SetKVSafeTS(150)
retryCount = 0
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 150 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}
// Make sure the minSafeTS can advance.
s.waitForMinSafeTS(oracle.GlobalTxnScope, 150)
require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))
}

func (s *apiTestSuite) TearDownTest() {
4 changes: 4 additions & 0 deletions tikv/interface.go
Original file line number Diff line number Diff line change
@@ -41,6 +41,7 @@ import (
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/txnlock"
pdhttp "github.com/tikv/pd/client/http"
)

// Storage represent the kv.Storage runs on TiKV.
@@ -69,6 +70,9 @@ type Storage interface {
// GetTiKVClient gets the TiKV client.
GetTiKVClient() Client

// GetPDHTTPClient gets the PD HTTP client.
GetPDHTTPClient() pdhttp.Client

// Closed returns the closed channel.
Closed() <-chan struct{}

61 changes: 50 additions & 11 deletions tikv/kv.go
Original file line number Diff line number Diff line change
@@ -66,6 +66,7 @@ import (
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
"github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
pdhttp "github.com/tikv/pd/client/http"
resourceControlClient "github.com/tikv/pd/client/resource_group/controller"
clientv3 "go.etcd.io/etcd/client/v3"
atomicutil "go.uber.org/atomic"
@@ -115,7 +116,7 @@ type KVStore struct {
client Client
}
pdClient pd.Client
pdHttpClient *util.PDHTTPClient
pdHttpClient pdhttp.Client
regionCache *locate.RegionCache
lockResolver *txnlock.LockResolver
txnLatches *latch.LatchesScheduler
@@ -147,6 +148,8 @@ type KVStore struct {
gP Pool
}

var _ Storage = (*KVStore)(nil)

// Go run the function in a separate goroutine.
func (s *KVStore) Go(f func()) error {
return s.gP.Run(f)
@@ -195,9 +198,9 @@ func WithPool(gp Pool) Option {
}

// WithPDHTTPClient set the PD HTTP client with the given address and TLS config.
func WithPDHTTPClient(tlsConf *tls.Config, pdaddrs []string) Option {
func WithPDHTTPClient(pdAddrs []string, tlsConf *tls.Config) Option {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make a compatibility error to TiDB?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When upgrading client-go, we will naturally perceive this modification before and after compilation. I don't think it will cause compatibility problems.

return func(o *KVStore) {
o.pdHttpClient = util.NewPDHTTPClient(tlsConf, pdaddrs)
o.pdHttpClient = pdhttp.NewClient(pdAddrs, pdhttp.WithTLSConfig(tlsConf))
}
}

@@ -444,6 +447,11 @@ func (s *KVStore) GetPDClient() pd.Client {
return s.pdClient
}

// GetPDHTTPClient returns the PD HTTP client.
func (s *KVStore) GetPDHTTPClient() pdhttp.Client {
return s.pdHttpClient
}

// SupportDeleteRange gets the storage support delete range or not.
func (s *KVStore) SupportDeleteRange() (supported bool) {
return !s.mock
@@ -598,28 +606,31 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
err error
storeMinResolvedTSs map[uint64]uint64
)
storeIDs := make([]string, len(stores))
storeIDs := make([]uint64, len(stores))
if s.pdHttpClient != nil {
for i, store := range stores {
storeIDs[i] = strconv.FormatUint(store.StoreID(), 10)
storeIDs[i] = store.StoreID()
}
_, storeMinResolvedTSs, err = s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, storeIDs)
_, storeMinResolvedTSs, err = s.getMinResolvedTSByStoresIDs(ctx, storeIDs)
if err != nil {
// If getting the minimum resolved timestamp from PD failed, log the error and need to get it from TiKV.
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err), zap.Any("stores", storeIDs))
}
}

for i, store := range stores {
for _, store := range stores {
storeID := store.StoreID()
storeAddr := store.GetAddr()
if store.IsTiFlash() {
storeAddr = store.GetPeerAddr()
}
go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string, storeIDStr string) {
go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string) {
defer wg.Done()

var safeTS uint64
var (
safeTS uint64
storeIDStr = strconv.FormatUint(storeID, 10)
)
// If getting the minimum resolved timestamp from PD failed or returned 0, try to get it from TiKV.
if storeMinResolvedTSs == nil || storeMinResolvedTSs[storeID] == 0 || err != nil {
resp, err := tikvClient.SendRequest(
@@ -655,7 +666,7 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
metrics.TiKVSafeTSUpdateCounter.WithLabelValues("success", storeIDStr).Inc()
safeTSTime := oracle.GetTimeFromTS(safeTS)
metrics.TiKVMinSafeTSGapSeconds.WithLabelValues(storeIDStr).Set(time.Since(safeTSTime).Seconds())
}(ctx, wg, storeID, storeAddr, storeIDs[i])
}(ctx, wg, storeID, storeAddr)
}

txnScopeMap := make(map[string][]uint64)
@@ -672,6 +683,34 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
wg.Wait()
}

func (s *KVStore) getMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uint64) (uint64, map[uint64]uint64, error) {
var (
minResolvedTS uint64
storeMinResolvedTSs map[uint64]uint64
err error
)
minResolvedTS, storeMinResolvedTSs, err = s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, storeIDs)
if err != nil {
return 0, nil, err
}
if val, e := util.EvalFailpoint("InjectPDMinResolvedTS"); e == nil {
injectedTS, ok := val.(int)
if !ok {
return minResolvedTS, storeMinResolvedTSs, err
}
minResolvedTS = uint64(injectedTS)
logutil.BgLogger().Info("inject min resolved ts", zap.Uint64("ts", uint64(injectedTS)))
// Currently we only have a store 1 in the test, so it's OK to inject the same min resolved TS for all stores here.
for storeID, v := range storeMinResolvedTSs {
if v != 0 && v != math.MaxUint64 {
storeMinResolvedTSs[storeID] = uint64(injectedTS)
logutil.BgLogger().Info("inject store min resolved ts", zap.Uint64("storeID", storeID), zap.Uint64("ts", uint64(injectedTS)))
}
}
}
return minResolvedTS, storeMinResolvedTSs, err
}

var (
skipSafeTSUpdateCounter = metrics.TiKVSafeTSUpdateCounter.WithLabelValues("skip", "cluster")
successSafeTSUpdateCounter = metrics.TiKVSafeTSUpdateCounter.WithLabelValues("success", "cluster")
@@ -684,7 +723,7 @@ func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool {
isGlobal := config.GetTxnScopeFromConfig() == oracle.GlobalTxnScope
// Try to get the minimum resolved timestamp of the cluster from PD.
if s.pdHttpClient != nil && isGlobal {
clusterMinSafeTS, _, err := s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, nil)
clusterMinSafeTS, _, err := s.getMinResolvedTSByStoresIDs(ctx, nil)
if err != nil {
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err))
} else if clusterMinSafeTS != 0 {
Loading