Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] *: GC API Refactorying #8989

Draft
wants to merge 13 commits into
base: master
Choose a base branch
from
2 changes: 2 additions & 0 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,5 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/pingcap/kvproto => github.com/MyonKeminta/kvproto v0.0.0-20250108081236-f8be07eac6e5
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/MyonKeminta/kvproto v0.0.0-20250108081236-f8be07eac6e5 h1:aBpjrLJlsSIkxh0pg6QuIAzMbajgPLvoELAnNNyFn2E=
github.com/MyonKeminta/kvproto v0.0.0-20250108081236-f8be07eac6e5/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
Expand Down Expand Up @@ -49,8 +51,6 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4=
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 h1:xYNSJjYNur4Dr5bV+9BXK9n5E0T1zlcAN25XX68+mOg=
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
10 changes: 10 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,11 @@ error = '''
internal etcd transaction error occurred
'''

["PD:etcd:ErrEtcdTxnResponse"]
error = '''
etcd transaction returned invalid response: %v
'''

["PD:etcd:ErrEtcdURLMap"]
error = '''
etcd url map error
Expand All @@ -446,6 +451,11 @@ error = '''
failed to convert a path to absolute path
'''

["PD:gc:ErrGCOnInvalidKeyspace"]
error = '''
trying to manage GC in keyspace %v where keyspace level GC is not enabled
'''

["PD:gin:ErrBindJSON"]
error = '''
bind JSON error
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,5 @@ require (
moul.io/zapgorm2 v1.1.0 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
)

replace github.com/pingcap/kvproto => github.com/MyonKeminta/kvproto v0.0.0-20250108081236-f8be07eac6e5
1,716 changes: 1,707 additions & 9 deletions go.sum

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ var (
ErrEtcdGrantLease = errors.Normalize("etcd lease failed", errors.RFCCodeText("PD:etcd:ErrEtcdGrantLease"))
ErrEtcdTxnInternal = errors.Normalize("internal etcd transaction error occurred", errors.RFCCodeText("PD:etcd:ErrEtcdTxnInternal"))
ErrEtcdTxnConflict = errors.Normalize("etcd transaction failed, conflicted and rolled back", errors.RFCCodeText("PD:etcd:ErrEtcdTxnConflict"))
ErrEtcdTxnResponse = errors.Normalize("etcd transaction returned invalid response: %v", errors.RFCCodeText("PD:etcd:ErrEtcdTxnResponse"))
ErrEtcdKVPut = errors.Normalize("etcd KV put failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVPut"))
ErrEtcdKVDelete = errors.Normalize("etcd KV delete failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVDelete"))
ErrEtcdKVGet = errors.Normalize("etcd KV get failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVGet"))
Expand Down Expand Up @@ -547,3 +548,10 @@ var (
ErrNotFoundSchedulingPrimary = errors.Normalize("cannot find scheduling primary", errors.RFCCodeText("PD:mcs:ErrNotFoundSchedulingPrimary"))
ErrSchedulingServer = errors.Normalize("scheduling server meets %v", errors.RFCCodeText("PD:mcs:ErrSchedulingServer"))
)

// GC errors
var (
ErrGCOnInvalidKeyspace = errors.Normalize("trying to manage GC in keyspace %v where keyspace level GC is not enabled", errors.RFCCodeText("PD:gc:ErrGCOnInvalidKeyspace"))
ErrDecreasingGCSafePoint = errors.Normalize("trying to update GC safe point to a smaller value, current value: %v, given: %v", errors.RFCCodeText("PD:gc:ErrDecreasingGCSafePoint"))
ErrGCSafePointExceedsTxnSafePoint = errors.Normalize("trying to update GC safe point to a too large value that exceeds the txn safe point, current value: %v, given: %v, current txn safe point: %v", errors.RFCCodeText("PD:gc:ErrGCSafePointExceedsTxnSafePoint"))
)
181 changes: 147 additions & 34 deletions pkg/gc/safepoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"go.uber.org/zap"

"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/keyspace"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/server/config"
Expand All @@ -28,57 +33,158 @@ import (
var blockGCSafePointErrmsg = "don't allow update gc safe point v1."
var blockServiceSafepointErrmsg = "don't allow update service safe point v1."

// SafePointManager is the manager for safePoint of GC and services.
type SafePointManager struct {
gcLock syncutil.Mutex
serviceGCLock syncutil.Mutex
store endpoint.GCSafePointStorage
cfg config.PDServerConfig
// GCStateManager is the manager for safePoint of GC and services.
type GCStateManager struct {
lock syncutil.RWMutex
gcMetaStorage endpoint.GCStateStorage
cfg config.PDServerConfig
keyspaceManager keyspace.Manager
}

// NewSafePointManager creates a SafePointManager of GC and services.
func NewSafePointManager(store endpoint.GCSafePointStorage, cfg config.PDServerConfig) *SafePointManager {
return &SafePointManager{store: store, cfg: cfg}
// NewGCStateManager creates a GCStateManager of GC and services.
func NewGCStateManager(store endpoint.GCStateStorage, cfg config.PDServerConfig) *GCStateManager {
return &GCStateManager{gcMetaStorage: store, cfg: cfg}
}

// LoadGCSafePoint loads current GC safe point from storage.
func (manager *SafePointManager) LoadGCSafePoint() (uint64, error) {
return manager.store.LoadGCSafePoint()
func (m *GCStateManager) redirectKeyspace(keyspaceID uint32, isUserAPI bool) (uint32, error) {
// Regard it as NullKeyspaceID if the given one is invalid (exceeds the valid range of keyspace id), no matter
// whether it exactly matches the NullKeyspaceID.
if keyspaceID & ^constant.ValidKeyspaceIDMask != 0 {
return constant.NullKeyspaceID, nil
}

keyspaceMeta, err := m.keyspaceManager.LoadKeyspaceByID(keyspaceID)
if err != nil {
return 0, err
}
if keyspaceMeta.Config[keyspace.GCManagementType] != keyspace.KeyspaceLevelGC {
if isUserAPI {
// The user API is expected to always work. Operate on the state of global GC instead.
return constant.NullKeyspaceID, nil
}
// Internal API should never be called on keyspaces without keyspace level GC. They won't perform any active
// GC operation and will be managed by the global GC.
return 0, errs.ErrGCOnInvalidKeyspace.GenWithStackByArgs(keyspaceID)
}

return keyspaceID, nil
}

// UpdateGCSafePoint updates the safepoint if it is greater than the previous one
// it returns the old safepoint in the storage.
func (manager *SafePointManager) UpdateGCSafePoint(newSafePoint uint64) (oldSafePoint uint64, err error) {
manager.gcLock.Lock()
defer manager.gcLock.Unlock()
// TODO: cache the safepoint in the storage.
oldSafePoint, err = manager.store.LoadGCSafePoint()
// CompatibleLoadGCSafePoint loads current GC safe point from storage.
func (m *GCStateManager) CompatibleLoadGCSafePoint(keyspaceID uint32) (uint64, error) {
keyspaceID, err := m.redirectKeyspace(keyspaceID, false)
if err != nil {
return
return 0, err
}
if manager.cfg.BlockSafePointV1 {
err = errors.New(blockGCSafePointErrmsg)

// No need to acquire the lock as a single-key read operation is atomic.
return m.gcMetaStorage.LoadGCSafePoint(keyspaceID)
}

// AdvanceGCSafePoint tries to advance the GC safe point to the given target. If the target is less than the current
// value or greater than the txn safe point, it returns an error.
func (m *GCStateManager) AdvanceGCSafePoint(keyspaceID uint32, target uint64) (oldGCSafePoint uint64, newGCSafePoint uint64, err error) {
return m.advanceGCSafePointImpl(keyspaceID, target, false)
}

// CompatibleUpdateGCSafePoint tries to advance the GC safe point to the given target. If the target is less than the
// current value, it returns the current value without updating it.
// This is provided for compatibility purpose, making the existing uses of the deprecated API `UpdateGCSafePoint`
// still work.
func (m *GCStateManager) CompatibleUpdateGCSafePoint(target uint64) (oldGCSafePoint uint64, newGCSafePoint uint64, err error) {
return m.advanceGCSafePointImpl(constant.NullKeyspaceID, target, true)
}

func (m *GCStateManager) advanceGCSafePointImpl(keyspaceID uint32, target uint64, compatible bool) (oldGCSafePoint uint64, newGCSafePoint uint64, err error) {
keyspaceID, err = m.redirectKeyspace(keyspaceID, false)
if err != nil {
return
}

if oldSafePoint >= newSafePoint {
return
m.lock.Lock()
defer m.lock.Unlock()

newGCSafePoint = target

err = m.gcMetaStorage.RunInGCMetaTransaction(func(wb *endpoint.GCStateWriteBatch) error {
var err1 error
oldGCSafePoint, err1 = m.gcMetaStorage.LoadGCSafePoint(keyspaceID)
if err1 != nil {
return err1
}
if target < oldGCSafePoint {
if compatible {
// When in compatible mode, trying to update the safe point to a smaller value fails silently, returning
// the actual value. There exist some use cases that fetches the current value by passing zero.
log.Warn("deprecated API `UpdateGCSafePoint` is called with invalid argument",
zap.Uint64("currentGCSafePoint", oldGCSafePoint), zap.Uint64("attemptedGCSafePoint", target))
newGCSafePoint = oldGCSafePoint
return nil
}
// Otherwise, return error to reject the operation explicitly.
return errs.ErrDecreasingGCSafePoint.GenWithStackByArgs(oldGCSafePoint, target)
}
txnSafePoint, err1 := m.gcMetaStorage.LoadTxnSafePoint(keyspaceID)
if err1 != nil {
return err1
}
if target > txnSafePoint {
return errs.ErrGCSafePointExceedsTxnSafePoint.GenWithStackByArgs(oldGCSafePoint, target, txnSafePoint)
}

err1 = wb.SetGCSafePoint(keyspaceID, target)
if err1 != nil {
return err1
}

return nil
})
if err != nil {
return 0, 0, err
}

if keyspaceID == constant.NullKeyspaceID {
gcSafePointGauge.WithLabelValues("gc_safepoint").Set(float64(target))
}

return
}

func (m *GCStateManager) AdvanceTxnSafePoint(keyspaceID uint32, target uint64) (AdvanceTxnSafePointResult, error) {
keyspaceID, err := m.redirectKeyspace(keyspaceID, false)
if err != nil {
return AdvanceTxnSafePointResult{}, err
}
err = manager.store.SaveGCSafePoint(newSafePoint)
if err == nil {
gcSafePointGauge.WithLabelValues("gc_safepoint").Set(float64(newSafePoint))

m.lock.Lock()
m.lock.Unlock()

newTxnSafePoint = target
err = m.gcMetaStorage.RunInGCMetaTransaction(func(wb *endpoint.GCStateWriteBatch) error {
var err1 error
oldTxnSafePoint, err1 = m.gcMetaStorage.LoadTxnSafePoint(keyspaceID)
if err1 != nil {
return err1
}

return nil
})
if err != nil {
return AdvanceTxnSafePointResult{}, err
}

return
}

// UpdateServiceGCSafePoint update the safepoint for a specific service.
func (manager *SafePointManager) UpdateServiceGCSafePoint(serviceID string, newSafePoint uint64, ttl int64, now time.Time) (minServiceSafePoint *endpoint.ServiceSafePoint, updated bool, err error) {
if manager.cfg.BlockSafePointV1 {
func (m *GCStateManager) UpdateServiceGCSafePoint(serviceID string, newSafePoint uint64, ttl int64, now time.Time) (minServiceSafePoint *endpoint.ServiceSafePoint, updated bool, err error) {
if m.cfg.BlockSafePointV1 {
return nil, false, errors.New(blockServiceSafepointErrmsg)
}
manager.serviceGCLock.Lock()
defer manager.serviceGCLock.Unlock()
minServiceSafePoint, err = manager.store.LoadMinServiceGCSafePoint(now)
// This function won't support keyspace as it's being deprecated.
m.lock.Lock(constant.NullKeyspaceID)
defer m.lock.Unlock(constant.NullKeyspaceID)
minServiceSafePoint, err = m.gcMetaStorage.LoadMinServiceGCSafePoint(now)
if err != nil || ttl <= 0 || newSafePoint < minServiceSafePoint.SafePoint {
return minServiceSafePoint, false, err
}
Expand All @@ -91,13 +197,20 @@ func (manager *SafePointManager) UpdateServiceGCSafePoint(serviceID string, newS
if math.MaxInt64-now.Unix() <= ttl {
ssp.ExpiredAt = math.MaxInt64
}
if err := manager.store.SaveServiceGCSafePoint(ssp); err != nil {
if err := m.gcMetaStorage.SaveServiceGCSafePoint(ssp); err != nil {
return nil, false, err
}

// If the min safePoint is updated, load the next one.
if serviceID == minServiceSafePoint.ServiceID {
minServiceSafePoint, err = manager.store.LoadMinServiceGCSafePoint(now)
minServiceSafePoint, err = m.gcMetaStorage.LoadMinServiceGCSafePoint(now)
}
return minServiceSafePoint, true, err
}

type AdvanceTxnSafePointResult struct {
OldTxnSafePoint uint64
Target uint64
NewTxnSafePoint uint64
BlockerMessage bool
}
26 changes: 13 additions & 13 deletions pkg/gc/safepoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,41 +27,41 @@ import (
"github.com/tikv/pd/server/config"
)

func newGCStorage() endpoint.GCSafePointStorage {
func newGCStorage() endpoint.GCStateStorage {
return endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
}

func TestGCSafePointUpdateSequentially(t *testing.T) {
gcSafePointManager := NewSafePointManager(newGCStorage(), config.PDServerConfig{})
gcSafePointManager := NewGCStateManager(newGCStorage(), config.PDServerConfig{})
re := require.New(t)
curSafePoint := uint64(0)
// update gc safePoint with asc value.
for id := 10; id < 20; id++ {
safePoint, err := gcSafePointManager.LoadGCSafePoint()
safePoint, err := gcSafePointManager.CompatibleLoadGCSafePoint()
re.NoError(err)
re.Equal(curSafePoint, safePoint)
previousSafePoint := curSafePoint
curSafePoint = uint64(id)
oldSafePoint, err := gcSafePointManager.UpdateGCSafePoint(curSafePoint)
oldSafePoint, err := gcSafePointManager.AdvanceGCSafePoint(curSafePoint)
re.NoError(err)
re.Equal(previousSafePoint, oldSafePoint)
}

safePoint, err := gcSafePointManager.LoadGCSafePoint()
safePoint, err := gcSafePointManager.CompatibleLoadGCSafePoint()
re.NoError(err)
re.Equal(curSafePoint, safePoint)
// update with smaller value should be failed.
oldSafePoint, err := gcSafePointManager.UpdateGCSafePoint(safePoint - 5)
oldSafePoint, err := gcSafePointManager.AdvanceGCSafePoint(safePoint - 5)
re.NoError(err)
re.Equal(safePoint, oldSafePoint)
curSafePoint, err = gcSafePointManager.LoadGCSafePoint()
curSafePoint, err = gcSafePointManager.CompatibleLoadGCSafePoint()
re.NoError(err)
// current safePoint should not change since the update value was smaller
re.Equal(safePoint, curSafePoint)
}

func TestGCSafePointUpdateCurrently(t *testing.T) {
gcSafePointManager := NewSafePointManager(newGCStorage(), config.PDServerConfig{})
gcSafePointManager := NewGCStateManager(newGCStorage(), config.PDServerConfig{})
maxSafePoint := uint64(1000)
wg := sync.WaitGroup{}
re := require.New(t)
Expand All @@ -71,21 +71,21 @@ func TestGCSafePointUpdateCurrently(t *testing.T) {
wg.Add(1)
go func(step uint64) {
for safePoint := step; safePoint <= maxSafePoint; safePoint += step {
_, err := gcSafePointManager.UpdateGCSafePoint(safePoint)
_, err := gcSafePointManager.AdvanceGCSafePoint(safePoint)
re.NoError(err)
}
wg.Done()
}(uint64(id + 1))
}
wg.Wait()
safePoint, err := gcSafePointManager.LoadGCSafePoint()
safePoint, err := gcSafePointManager.CompatibleLoadGCSafePoint()
re.NoError(err)
re.Equal(maxSafePoint, safePoint)
}

func TestServiceGCSafePointUpdate(t *testing.T) {
re := require.New(t)
manager := NewSafePointManager(newGCStorage(), config.PDServerConfig{})
manager := NewGCStateManager(newGCStorage(), config.PDServerConfig{})
gcWorkerServiceID := "gc_worker"
cdcServiceID := "cdc"
brServiceID := "br"
Expand Down Expand Up @@ -167,7 +167,7 @@ func TestServiceGCSafePointUpdate(t *testing.T) {

func TestBlockUpdateSafePointV1(t *testing.T) {
re := require.New(t)
manager := NewSafePointManager(newGCStorage(), config.PDServerConfig{BlockSafePointV1: true})
manager := NewGCStateManager(newGCStorage(), config.PDServerConfig{BlockSafePointV1: true})
gcworkerServiceID := "gc_worker"
gcWorkerSafePoint := uint64(8)

Expand All @@ -177,7 +177,7 @@ func TestBlockUpdateSafePointV1(t *testing.T) {
re.False(updated)
re.Nil(min)

oldSafePoint, err := manager.UpdateGCSafePoint(gcWorkerSafePoint)
oldSafePoint, err := manager.AdvanceGCSafePoint(gcWorkerSafePoint)
re.Error(err)
re.Equal(err.Error(), blockGCSafePointErrmsg)

Expand Down
Loading