Skip to content

Commit

Permalink
This is an automated cherry-pick of #9009
Browse files Browse the repository at this point in the history
close #9004

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
rleungx authored and ti-chi-bot committed Jan 26, 2025
1 parent 3e55a4f commit d4f1c33
Show file tree
Hide file tree
Showing 2 changed files with 307 additions and 0 deletions.
56 changes: 56 additions & 0 deletions pkg/ratelimit/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,15 @@ type Option func(string, *Limiter) UpdateStatus
// AddLabelAllowList adds a label into allow list.
// It means the given label will not be limited
func AddLabelAllowList() Option {
<<<<<<< HEAD

Check failure on line 48 in pkg/ratelimit/option.go

View workflow job for this annotation

GitHub Actions / tso-function-test

syntax error: unexpected <<, expecting }
return func(label string, l *Limiter) UpdateStatus {

Check failure on line 49 in pkg/ratelimit/option.go

View workflow job for this annotation

GitHub Actions / tso-function-test

method has multiple receivers

Check failure on line 49 in pkg/ratelimit/option.go

View workflow job for this annotation

GitHub Actions / tso-function-test

syntax error: unexpected {, expecting (
l.labelAllowList[label] = struct{}{}

Check failure on line 50 in pkg/ratelimit/option.go

View workflow job for this annotation

GitHub Actions / tso-function-test

syntax error: unexpected = in parameter list; possibly missing comma or )
return 0
=======
return func(label string, c *Controller) UpdateStatus {

Check failure on line 53 in pkg/ratelimit/option.go

View workflow job for this annotation

GitHub Actions / tso-function-test

method has multiple receivers

Check failure on line 53 in pkg/ratelimit/option.go

View workflow job for this annotation

GitHub Actions / tso-function-test

syntax error: unexpected {, expecting (
c.labelAllowList[label] = struct{}{}

Check failure on line 54 in pkg/ratelimit/option.go

View workflow job for this annotation

GitHub Actions / tso-function-test

syntax error: unexpected = in parameter list; possibly missing comma or )
return InAllowList
>>>>>>> 604b0d6d4 (pkg: avoid make channel every time (#9009))

Check failure on line 56 in pkg/ratelimit/option.go

View workflow job for this annotation

GitHub Actions / tso-function-test

invalid character U+0023 '#'
}
}

Expand Down Expand Up @@ -85,32 +91,82 @@ func updateQPSConfig(l *Limiter, label string, limit float64, burst int) UpdateS

// UpdateConcurrencyLimiter creates a concurrency limiter for a given label if it doesn't exist.
func UpdateConcurrencyLimiter(limit uint64) Option {
<<<<<<< HEAD

Check failure on line 94 in pkg/ratelimit/option.go

View workflow job for this annotation

GitHub Actions / tso-function-test

syntax error: unexpected <<, expecting }
return func(label string, l *Limiter) UpdateStatus {

Check failure on line 95 in pkg/ratelimit/option.go

View workflow job for this annotation

GitHub Actions / tso-function-test

method has multiple receivers
if _, allow := l.labelAllowList[label]; allow {
return InAllowList
}
return updateConcurrencyConfig(l, label, limit)
=======
return func(label string, c *Controller) UpdateStatus {
if _, allow := c.labelAllowList[label]; allow {
return InAllowList
}
lim, loaded := c.limiters.Load(label)
if !loaded {
lim, _ = c.limiters.LoadOrStore(label, newLimiter())
}
return lim.(*limiter).updateConcurrencyConfig(limit)
>>>>>>> 604b0d6d4 (pkg: avoid make channel every time (#9009))
}
}

// UpdateQPSLimiter creates a QPS limiter for a given label if it doesn't exist.
func UpdateQPSLimiter(limit float64, burst int) Option {
<<<<<<< HEAD
return func(label string, l *Limiter) UpdateStatus {
if _, allow := l.labelAllowList[label]; allow {
return InAllowList
}
return updateQPSConfig(l, label, limit, burst)
=======
return func(label string, c *Controller) UpdateStatus {
if _, allow := c.labelAllowList[label]; allow {
return InAllowList
}
lim, loaded := c.limiters.Load(label)
if !loaded {
lim, _ = c.limiters.LoadOrStore(label, newLimiter())
}
return lim.(*limiter).updateQPSConfig(limit, burst)
>>>>>>> 604b0d6d4 (pkg: avoid make channel every time (#9009))
}
}

// UpdateDimensionConfig creates QPS limiter and concurrency limiter for a given label by config if it doesn't exist.
func UpdateDimensionConfig(cfg *DimensionConfig) Option {
<<<<<<< HEAD
return func(label string, l *Limiter) UpdateStatus {
if _, allow := l.labelAllowList[label]; allow {
return InAllowList
}
status := updateQPSConfig(l, label, cfg.QPS, cfg.QPSBurst)
status |= updateConcurrencyConfig(l, label, cfg.ConcurrencyLimit)
return status
=======
return func(label string, c *Controller) UpdateStatus {
if _, allow := c.labelAllowList[label]; allow {
return InAllowList
}
lim, loaded := c.limiters.Load(label)
if !loaded {
lim, _ = c.limiters.LoadOrStore(label, newLimiter())
}
return lim.(*limiter).updateDimensionConfig(cfg)
}
}

// InitLimiter creates empty concurrency limiter for a given label by config if it doesn't exist.
func InitLimiter() Option {
return func(label string, c *Controller) UpdateStatus {
if _, allow := c.labelAllowList[label]; allow {
return InAllowList
}
_, loaded := c.limiters.Load(label)
if !loaded {
c.limiters.LoadOrStore(label, newLimiter())
}
return LimiterNotChanged
>>>>>>> 604b0d6d4 (pkg: avoid make channel every time (#9009))
}
}
251 changes: 251 additions & 0 deletions pkg/utils/tsoutil/tso_dispatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tsoutil

import (
"context"
"strings"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"

"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/timerutil"
)

const (
maxMergeRequests = 10000
// DefaultTSOProxyTimeout defines the default timeout value of TSP Proxying
DefaultTSOProxyTimeout = 3 * time.Second
)

type tsoResp interface {
GetTimestamp() *pdpb.Timestamp
}

// TSODispatcher dispatches the TSO requests to the corresponding forwarding TSO channels.
type TSODispatcher struct {
tsoProxyHandleDuration prometheus.Histogram
tsoProxyBatchSize prometheus.Histogram

// dispatchChs is used to dispatch different TSO requests to the corresponding forwarding TSO channels.
dispatchChs sync.Map // Store as map[string]chan Request
}

// NewTSODispatcher creates and returns a TSODispatcher
func NewTSODispatcher(tsoProxyHandleDuration, tsoProxyBatchSize prometheus.Histogram) *TSODispatcher {
tsoDispatcher := &TSODispatcher{
tsoProxyHandleDuration: tsoProxyHandleDuration,
tsoProxyBatchSize: tsoProxyBatchSize,
}
return tsoDispatcher
}

// DispatchRequest is the entry point for dispatching/forwarding a tso request to the destination host
func (s *TSODispatcher) DispatchRequest(
ctx context.Context,
req Request,
tsoProtoFactory ProtoFactory,
doneCh <-chan struct{},
errCh chan<- error,
tsoPrimaryWatchers ...*etcdutil.LoopWatcher) {
key := req.getForwardedHost()
val, loaded := s.dispatchChs.Load(key)
if !loaded {
val = make(chan Request, maxMergeRequests)
val, loaded = s.dispatchChs.LoadOrStore(key, val)
}
reqCh := val.(chan Request)
if !loaded {
tsDeadlineCh := make(chan *TSDeadline, 1)
go s.dispatch(ctx, tsoProtoFactory, req.getForwardedHost(), req.getClientConn(), reqCh, tsDeadlineCh, doneCh, errCh, tsoPrimaryWatchers...)
go WatchTSDeadline(ctx, tsDeadlineCh)
}
reqCh <- req
}

func (s *TSODispatcher) dispatch(
ctx context.Context,
tsoProtoFactory ProtoFactory,
forwardedHost string,
clientConn *grpc.ClientConn,
tsoRequestCh <-chan Request,
tsDeadlineCh chan<- *TSDeadline,
doneCh <-chan struct{},
errCh chan<- error,
tsoPrimaryWatchers ...*etcdutil.LoopWatcher) {
defer logutil.LogPanic()
dispatcherCtx, ctxCancel := context.WithCancel(ctx)
defer ctxCancel()
defer s.dispatchChs.Delete(forwardedHost)

forwardStream, cancel, err := tsoProtoFactory.createForwardStream(ctx, clientConn)
if err != nil || forwardStream == nil {
log.Error("create tso forwarding stream error",
zap.String("forwarded-host", forwardedHost),
errs.ZapError(errs.ErrGRPCCreateStream, err))
select {
case <-dispatcherCtx.Done():
return
case _, ok := <-doneCh:
if !ok {
return
}
case errCh <- err:
close(errCh)
return
}
}
defer cancel()

requests := make([]Request, maxMergeRequests+1)
needUpdateServicePrimaryAddr := len(tsoPrimaryWatchers) > 0 && tsoPrimaryWatchers[0] != nil
for {
select {
case first := <-tsoRequestCh:
pendingTSOReqCount := len(tsoRequestCh) + 1
requests[0] = first
for i := 1; i < pendingTSOReqCount; i++ {
requests[i] = <-tsoRequestCh
}
done := make(chan struct{})
dl := NewTSDeadline(DefaultTSOProxyTimeout, done, cancel)
select {
case tsDeadlineCh <- dl:
case <-dispatcherCtx.Done():
return
}
err = s.processRequests(forwardStream, requests[:pendingTSOReqCount])
close(done)
if err != nil {
log.Error("proxy forward tso error",
zap.String("forwarded-host", forwardedHost),
errs.ZapError(errs.ErrGRPCSend, err))
if needUpdateServicePrimaryAddr && strings.Contains(err.Error(), errs.NotLeaderErr) {
tsoPrimaryWatchers[0].ForceLoad()
}
select {
case <-dispatcherCtx.Done():
return
case _, ok := <-doneCh:
if !ok {
return
}
case errCh <- err:
close(errCh)
return
}
}
case <-dispatcherCtx.Done():
return
}
}
}

func (s *TSODispatcher) processRequests(forwardStream stream, requests []Request) error {
// Merge the requests
count := uint32(0)
for _, request := range requests {
count += request.getCount()
}

start := time.Now()
resp, err := requests[0].process(forwardStream, count)
if err != nil {
return err
}
s.tsoProxyHandleDuration.Observe(time.Since(start).Seconds())
s.tsoProxyBatchSize.Observe(float64(count))
// Split the response
ts := resp.GetTimestamp()
physical, logical := ts.GetPhysical(), ts.GetLogical()
// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
// This is different from the logic of client batch, for example, if we have a largest ts whose logical part is 10,
// count is 5, then the splitting results should be 5 and 10.
firstLogical := addLogical(logical, -int64(count))
return s.finishRequest(requests, physical, firstLogical)
}

// Because of the suffix, we need to shift the count before we add it to the logical part.
func addLogical(logical, count int64) int64 {
return logical + count
}

func (*TSODispatcher) finishRequest(requests []Request, physical, firstLogical int64) error {
countSum := int64(0)
for i := range requests {
newCountSum, err := requests[i].postProcess(countSum, physical, firstLogical)
if err != nil {
return err
}
countSum = newCountSum
}
return nil
}

// TSDeadline is used to watch the deadline of each tso request.
type TSDeadline struct {
timer *time.Timer
done chan struct{}
cancel context.CancelFunc
}

// NewTSDeadline creates a new TSDeadline.
func NewTSDeadline(
timeout time.Duration,
done chan struct{},
cancel context.CancelFunc,
) *TSDeadline {
timer := timerutil.GlobalTimerPool.Get(timeout)
return &TSDeadline{
timer: timer,
done: done,
cancel: cancel,
}
}

// WatchTSDeadline watches the deadline of each tso request.
func WatchTSDeadline(ctx context.Context, tsDeadlineCh <-chan *TSDeadline) {
defer logutil.LogPanic()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for {
select {
case d := <-tsDeadlineCh:
select {
case <-d.timer.C:
log.Error("tso proxy request processing is canceled due to timeout",
errs.ZapError(errs.ErrProxyTSOTimeout))
d.cancel()
timerutil.GlobalTimerPool.Put(d.timer)
case <-d.done:
timerutil.GlobalTimerPool.Put(d.timer)
case <-ctx.Done():
timerutil.GlobalTimerPool.Put(d.timer)
return
}
case <-ctx.Done():
return
}
}
}

0 comments on commit d4f1c33

Please sign in to comment.