Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource group: support more mode for burstable #9044

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ const (

reservedDefaultGroupName = "default"
middlePriority = 8
unlimitedRate = math.MaxInt32
unlimitedBurstLimit = -1
)

// Manager is the manager of resource group.
Expand Down Expand Up @@ -168,8 +170,8 @@ func (m *Manager) Init(ctx context.Context) error {
RUSettings: &RequestUnitSettings{
RU: &GroupTokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: math.MaxInt32,
BurstLimit: -1,
FillRate: unlimitedRate,
BurstLimit: unlimitedBurstLimit,
},
},
},
Expand Down
61 changes: 48 additions & 13 deletions pkg/mcs/resourcemanager/server/token_buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ const (
)

const (
defaultReserveRatio = 0.5
defaultLoanCoefficient = 2
maxAssignTokens = math.MaxFloat64 / 1024 // assume max client connect is 1024
slotExpireTimeout = 10 * time.Minute
defaultBurstLimitFactor = 2.0
defaultReserveRatio = 0.5
defaultLoanCoefficient = 2
maxAssignTokens = math.MaxFloat64 / 1024 // assume max client connect is 1024
slotExpireTimeout = 10 * time.Minute
)

// GroupTokenBucket is a token bucket for a resource group.
Expand All @@ -43,7 +44,8 @@ type GroupTokenBucket struct {
// Settings is the setting of TokenBucket.
// BurstLimit is used as below:
// - If b == 0, that means the limiter is unlimited capacity. default use in resource controller (burst with a rate within an unlimited capacity).
// - If b < 0, that means the limiter is unlimited capacity and fillrate(r) is ignored, can be seen as r == Inf (burst within an unlimited capacity).
// - If b == -1, that means the limiter is unlimited capacity and fillrate(r) is ignored, can be seen as r == Inf (burst within an unlimited capacity).
// - If b == -2, that means the limiter is limited capacity and fillrate(r) is ignored, can be seen as r == defaultBurstLimitFactor * fillrate (burst within a limited capacity).
// - If b > 0, that means the limiter is limited capacity.
// MaxTokens limits the number of tokens that can be accumulated
Settings *rmpb.TokenLimitSettings `json:"settings,omitempty"`
Expand All @@ -70,6 +72,7 @@ func (gtb *GroupTokenBucket) setState(state *GroupTokenBucketState) {
gtb.Tokens = state.Tokens
gtb.LastUpdate = state.LastUpdate
gtb.Initialized = state.Initialized
gtb.BurstLimitFactor = state.BurstLimitFactor
}

// TokenSlot is used to split a token bucket into multiple slots to
Expand All @@ -83,6 +86,7 @@ type TokenSlot struct {
tokenCapacity float64
lastTokenCapacity float64
lastReqTime time.Time
burstLimitFactor uint64
}

// GroupTokenBucketState is the running state of TokenBucket.
Expand All @@ -93,8 +97,9 @@ type GroupTokenBucketState struct {
clientConsumptionTokensSum float64
lastBurstTokens float64

LastUpdate *time.Time `json:"last_update,omitempty"`
Initialized bool `json:"initialized"`
LastUpdate *time.Time `json:"last_update,omitempty"`
Initialized bool `json:"initialized"`
BurstLimitFactor uint64 `json:"burst_limit_factor,omitempty"`
// settingChanged is used to avoid that the number of tokens returned is jitter because of changing fill rate.
settingChanged bool
lastCheckExpireSlot time.Time
Expand Down Expand Up @@ -122,6 +127,7 @@ func (gts *GroupTokenBucketState) Clone() *GroupTokenBucketState {
tokenSlots: tokenSlots,
clientConsumptionTokensSum: gts.clientConsumptionTokensSum,
lastCheckExpireSlot: gts.lastCheckExpireSlot,
BurstLimitFactor: gts.BurstLimitFactor,
}
}

Expand Down Expand Up @@ -152,7 +158,10 @@ func (gts *GroupTokenBucketState) balanceSlotTokens(
// Only slots that require a positive number will be considered alive,
// but still need to allocate the elapsed tokens as well.
if requiredToken != 0 {
slot = &TokenSlot{lastReqTime: now}
slot = &TokenSlot{
lastReqTime: now,
burstLimitFactor: gts.BurstLimitFactor,
}
gts.tokenSlots[clientUniqueID] = slot
gts.clientConsumptionTokensSum = 0
}
Expand Down Expand Up @@ -182,7 +191,7 @@ func (gts *GroupTokenBucketState) balanceSlotTokens(
return
}
evenRatio := 1 / float64(len(gts.tokenSlots))
if settings.GetBurstLimit() <= 0 {
if settings.GetBurstLimit() == 0 {
for _, slot := range gts.tokenSlots {
slot.settings = &rmpb.TokenLimitSettings{
FillRate: uint64(float64(settings.GetFillRate()) * evenRatio),
Expand All @@ -191,8 +200,32 @@ func (gts *GroupTokenBucketState) balanceSlotTokens(
}
return
}
if settings.GetBurstLimit() == unlimitedBurstLimit || settings.FillRate == unlimitedRate {
for _, slot := range gts.tokenSlots {
slot.settings = &rmpb.TokenLimitSettings{
FillRate: unlimitedRate,
BurstLimit: unlimitedBurstLimit,
}
}
return
}

for _, slot := range gts.tokenSlots {
if settings.GetBurstLimit() == -2 {
// Need to make each slot even.
slot.tokenCapacity = evenRatio * gts.Tokens
slot.lastTokenCapacity = evenRatio * gts.Tokens
slot.requireTokensSum = 0
gts.clientConsumptionTokensSum = 0
fillRate := float64(settings.GetFillRate()) * evenRatio * float64(gts.BurstLimitFactor)

slot.settings = &rmpb.TokenLimitSettings{
FillRate: uint64(fillRate),
BurstLimit: int64(fillRate),
}
continue
}

if gts.clientConsumptionTokensSum == 0 || len(gts.tokenSlots) == 1 {
// Need to make each slot even.
slot.tokenCapacity = evenRatio * gts.Tokens
Expand Down Expand Up @@ -257,8 +290,9 @@ func NewGroupTokenBucket(tokenBucket *rmpb.TokenBucket) *GroupTokenBucket {
return &GroupTokenBucket{
Settings: tokenBucket.GetSettings(),
GroupTokenBucketState: GroupTokenBucketState{
Tokens: tokenBucket.GetTokens(),
tokenSlots: make(map[uint64]*TokenSlot),
Tokens: tokenBucket.GetTokens(),
tokenSlots: make(map[uint64]*TokenSlot),
BurstLimitFactor: defaultBurstLimitFactor,
},
}
}
Expand Down Expand Up @@ -301,6 +335,7 @@ func (gtb *GroupTokenBucket) init(now time.Time, clientID uint64) {
settings: gtb.Settings,
tokenCapacity: gtb.Tokens,
lastTokenCapacity: gtb.Tokens,
burstLimitFactor: gtb.BurstLimitFactor,
}
gtb.LastUpdate = &now
gtb.lastCheckExpireSlot = now
Expand Down Expand Up @@ -356,8 +391,8 @@ func (ts *TokenSlot) assignSlotTokens(requiredToken float64, targetPeriodMs uint
var res rmpb.TokenBucket
burstLimit := ts.settings.GetBurstLimit()
res.Settings = &rmpb.TokenLimitSettings{BurstLimit: burstLimit}
// If BurstLimit < 0, just return.
if burstLimit < 0 {
// If BurstLimit == -1, just return.
if burstLimit == unlimitedBurstLimit {
res.Tokens = requiredToken
return &res, 0
}
Expand Down
84 changes: 84 additions & 0 deletions pkg/mcs/resourcemanager/server/token_buckets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,90 @@ func TestGroupTokenBucketRequest(t *testing.T) {
re.Equal(int64(time.Second)*10/int64(time.Millisecond), trickle)
}

func TestGroupTokenBucketRequestBurstLimit(t *testing.T) {
re := require.New(t)
testGroupSetting := func(tbSetting *rmpb.TokenBucket, expectedFillRate, expectedBurstLimit int64) {
gtb := NewGroupTokenBucket(tbSetting)
time1 := time.Now()
clientUniqueID := uint64(0)
gtb.request(time1, 190000, uint64(time.Second)*10/uint64(time.Millisecond), clientUniqueID)
re.Contains(gtb.tokenSlots, clientUniqueID)
// it should not be able to change group settings
groupSetting := gtb.tokenSlots[clientUniqueID]
re.Equal(expectedBurstLimit, groupSetting.settings.BurstLimit)
re.Equal(uint64(expectedFillRate), groupSetting.settings.FillRate)
// it should not be able to change gtb settings
re.Equal(tbSetting.GetSettings().BurstLimit, gtb.Settings.BurstLimit)
re.Equal(tbSetting.GetSettings().FillRate, gtb.Settings.FillRate)
}

// case 1: fillrate = 2000, burstLimit = 2000,0,-1,-2
testGroupSetting(&rmpb.TokenBucket{
Tokens: 200000,
Settings: &rmpb.TokenLimitSettings{
FillRate: 2000,
BurstLimit: 2000,
},
}, 2000, 2000)

testGroupSetting(&rmpb.TokenBucket{
Tokens: 200000,
Settings: &rmpb.TokenLimitSettings{
FillRate: 2000,
BurstLimit: 0,
},
}, 2000, 0)

testGroupSetting(&rmpb.TokenBucket{
Tokens: 200000,
Settings: &rmpb.TokenLimitSettings{
FillRate: 2000,
BurstLimit: unlimitedBurstLimit,
},
}, unlimitedRate, unlimitedBurstLimit)

testGroupSetting(&rmpb.TokenBucket{
Tokens: 200000,
Settings: &rmpb.TokenLimitSettings{
FillRate: 2000,
BurstLimit: -2,
},
}, 2000*defaultBurstLimitFactor, 2000*defaultBurstLimitFactor)

// case 2: fillrate = unlimited, burstLimit = 2000,0,-1,-2
testGroupSetting(&rmpb.TokenBucket{
Tokens: 200000,
Settings: &rmpb.TokenLimitSettings{
FillRate: unlimitedRate,
BurstLimit: 2000,
},
}, unlimitedRate, unlimitedBurstLimit)

testGroupSetting(&rmpb.TokenBucket{
Tokens: 200000,
Settings: &rmpb.TokenLimitSettings{
FillRate: unlimitedRate,
BurstLimit: 0,
},
}, unlimitedRate, 0) // burstLimit = 0 is a special case

testGroupSetting(&rmpb.TokenBucket{
Tokens: 200000,
Settings: &rmpb.TokenLimitSettings{
FillRate: unlimitedRate,
BurstLimit: unlimitedBurstLimit,
},
}, unlimitedRate, unlimitedBurstLimit)

testGroupSetting(&rmpb.TokenBucket{
Tokens: 200000,
Settings: &rmpb.TokenLimitSettings{
FillRate: unlimitedRate,
BurstLimit: -2,
},
}, unlimitedRate, unlimitedBurstLimit)
}

func TestGroupTokenBucketRequestLoop(t *testing.T) {
re := require.New(t)
tbSetting := &rmpb.TokenBucket{
Expand Down
Loading