Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client/http: implement more rule and batch related interfaces #7430

Merged
merged 5 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 22 additions & 13 deletions client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,20 @@ import (
// The following constants are the paths of PD HTTP APIs.
const (
// Metadata
HotRead = "/pd/api/v1/hotspot/regions/read"
HotWrite = "/pd/api/v1/hotspot/regions/write"
HotHistory = "/pd/api/v1/hotspot/regions/history"
RegionByIDPrefix = "/pd/api/v1/region/id"
regionByKey = "/pd/api/v1/region/key"
Regions = "/pd/api/v1/regions"
regionsByKey = "/pd/api/v1/regions/key"
RegionsByStoreIDPrefix = "/pd/api/v1/regions/store"
EmptyRegions = "/pd/api/v1/regions/check/empty-region"
AccelerateSchedule = "/pd/api/v1/regions/accelerate-schedule"
store = "/pd/api/v1/store"
Stores = "/pd/api/v1/stores"
StatsRegion = "/pd/api/v1/stats/region"
HotRead = "/pd/api/v1/hotspot/regions/read"
HotWrite = "/pd/api/v1/hotspot/regions/write"
HotHistory = "/pd/api/v1/hotspot/regions/history"
RegionByIDPrefix = "/pd/api/v1/region/id"
regionByKey = "/pd/api/v1/region/key"
Regions = "/pd/api/v1/regions"
regionsByKey = "/pd/api/v1/regions/key"
RegionsByStoreIDPrefix = "/pd/api/v1/regions/store"
EmptyRegions = "/pd/api/v1/regions/check/empty-region"
AccelerateSchedule = "/pd/api/v1/regions/accelerate-schedule"
AccelerateScheduleInBatch = "/pd/api/v1/regions/accelerate-schedule/batch"
store = "/pd/api/v1/store"
Stores = "/pd/api/v1/stores"
StatsRegion = "/pd/api/v1/stats/region"
// Config
Config = "/pd/api/v1/config"
ClusterVersion = "/pd/api/v1/config/cluster-version"
Expand All @@ -44,8 +45,11 @@ const (
// Rule
PlacementRule = "/pd/api/v1/config/rule"
PlacementRules = "/pd/api/v1/config/rules"
PlacementRulesInBatch = "/pd/api/v1/config/rules/batch"
placementRulesByGroup = "/pd/api/v1/config/rules/group"
PlacementRuleBundle = "/pd/api/v1/config/placement-rule"
placementRuleGroup = "/pd/api/v1/config/rule_group"
placementRuleGroups = "/pd/api/v1/config/rule_groups"
RegionLabelRule = "/pd/api/v1/config/region-label/rule"
RegionLabelRules = "/pd/api/v1/config/region-label/rules"
RegionLabelRulesByIDs = "/pd/api/v1/config/region-label/rules/ids"
Expand Down Expand Up @@ -136,6 +140,11 @@ func PlacementRuleBundleWithPartialParameter(partial bool) string {
return fmt.Sprintf("%s?partial=%t", PlacementRuleBundle, partial)
}

// PlacementRuleGroupByID returns the path of PD HTTP API to get placement rule group by ID.
func PlacementRuleGroupByID(id string) string {
return fmt.Sprintf("%s/%s", placementRuleGroup, id)
}

// SchedulerByName returns the scheduler API with the given scheduler name.
func SchedulerByName(name string) string {
return fmt.Sprintf("%s/%s", Schedulers, name)
Expand Down
101 changes: 89 additions & 12 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"crypto/tls"
"encoding/hex"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -46,25 +47,31 @@ type Client interface {
GetRegionByID(context.Context, uint64) (*RegionInfo, error)
GetRegionByKey(context.Context, []byte) (*RegionInfo, error)
GetRegions(context.Context) (*RegionsInfo, error)
GetRegionsByKeyRange(context.Context, []byte, []byte, int) (*RegionsInfo, error)
GetRegionsByKeyRange(context.Context, *KeyRange, int) (*RegionsInfo, error)
GetRegionsByStoreID(context.Context, uint64) (*RegionsInfo, error)
GetHotReadRegions(context.Context) (*StoreHotPeersInfos, error)
GetHotWriteRegions(context.Context) (*StoreHotPeersInfos, error)
GetRegionStatusByKeyRange(context.Context, []byte, []byte) (*RegionStats, error)
GetRegionStatusByKeyRange(context.Context, *KeyRange) (*RegionStats, error)
GetStores(context.Context) (*StoresInfo, error)
/* Rule-related interfaces */
GetAllPlacementRuleBundles(context.Context) ([]*GroupBundle, error)
GetPlacementRuleBundleByGroup(context.Context, string) (*GroupBundle, error)
GetPlacementRulesByGroup(context.Context, string) ([]*Rule, error)
SetPlacementRule(context.Context, *Rule) error
SetPlacementRuleInBatch(context.Context, []*RuleOp) error
SetPlacementRuleBundles(context.Context, []*GroupBundle, bool) error
DeletePlacementRule(context.Context, string, string) error
GetAllPlacementRuleGroups(context.Context) ([]*RuleGroup, error)
GetPlacementRuleGroupByID(context.Context, string) (*RuleGroup, error)
SetPlacementRuleGroup(context.Context, *RuleGroup) error
DeletePlacementRuleGroupByID(context.Context, string) error
GetAllRegionLabelRules(context.Context) ([]*LabelRule, error)
GetRegionLabelRulesByIDs(context.Context, []string) ([]*LabelRule, error)
SetRegionLabelRule(context.Context, *LabelRule) error
PatchRegionLabelRules(context.Context, *LabelRulePatch) error
/* Scheduling-related interfaces */
AccelerateSchedule(context.Context, []byte, []byte) error
AccelerateSchedule(context.Context, *KeyRange) error
AccelerateScheduleInBatch(context.Context, []*KeyRange) error
/* Other interfaces */
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error)

Expand Down Expand Up @@ -308,10 +315,10 @@ func (c *client) GetRegions(ctx context.Context) (*RegionsInfo, error) {
}

// GetRegionsByKeyRange gets the regions info by key range. If the limit is -1, it will return all regions within the range.
func (c *client) GetRegionsByKeyRange(ctx context.Context, startKey, endKey []byte, limit int) (*RegionsInfo, error) {
func (c *client) GetRegionsByKeyRange(ctx context.Context, keyRange *KeyRange, limit int) (*RegionsInfo, error) {
var regions RegionsInfo
err := c.requestWithRetry(ctx,
"GetRegionsByKeyRange", RegionsByKey(startKey, endKey, limit),
"GetRegionsByKeyRange", RegionsByKey(keyRange.StartKey, keyRange.EndKey, limit),
http.MethodGet, http.NoBody, &regions)
if err != nil {
return nil, err
Expand Down Expand Up @@ -356,10 +363,10 @@ func (c *client) GetHotWriteRegions(ctx context.Context) (*StoreHotPeersInfos, e
}

// GetRegionStatusByKeyRange gets the region status by key range.
func (c *client) GetRegionStatusByKeyRange(ctx context.Context, startKey, endKey []byte) (*RegionStats, error) {
func (c *client) GetRegionStatusByKeyRange(ctx context.Context, keyRange *KeyRange) (*RegionStats, error) {
var regionStats RegionStats
err := c.requestWithRetry(ctx,
"GetRegionStatusByKeyRange", RegionStatsByKeyRange(startKey, endKey),
"GetRegionStatusByKeyRange", RegionStatsByKeyRange(keyRange.StartKey, keyRange.StartKey),
http.MethodGet, http.NoBody, &regionStats,
)
if err != nil {
Expand Down Expand Up @@ -427,6 +434,17 @@ func (c *client) SetPlacementRule(ctx context.Context, rule *Rule) error {
http.MethodPost, bytes.NewBuffer(ruleJSON), nil)
}

// SetPlacementRuleInBatch sets the placement rules in batch.
func (c *client) SetPlacementRuleInBatch(ctx context.Context, ruleOps []*RuleOp) error {
ruleOpsJSON, err := json.Marshal(ruleOps)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"SetPlacementRuleInBatch", PlacementRulesInBatch,
http.MethodPost, bytes.NewBuffer(ruleOpsJSON), nil)
}

// SetPlacementRuleBundles sets the placement rule bundles.
// If `partial` is false, all old configurations will be over-written and dropped.
func (c *client) SetPlacementRuleBundles(ctx context.Context, bundles []*GroupBundle, partial bool) error {
Expand All @@ -446,6 +464,48 @@ func (c *client) DeletePlacementRule(ctx context.Context, group, id string) erro
http.MethodDelete, http.NoBody, nil)
}

// GetAllPlacementRuleGroups gets all placement rule groups.
func (c *client) GetAllPlacementRuleGroups(ctx context.Context) ([]*RuleGroup, error) {
var ruleGroups []*RuleGroup
err := c.requestWithRetry(ctx,
"GetAllPlacementRuleGroups", placementRuleGroups,
http.MethodGet, http.NoBody, &ruleGroups)
if err != nil {
return nil, err
}
return ruleGroups, nil
}

// GetPlacementRuleGroupByID gets the placement rule group by ID.
func (c *client) GetPlacementRuleGroupByID(ctx context.Context, id string) (*RuleGroup, error) {
var ruleGroup RuleGroup
err := c.requestWithRetry(ctx,
"GetPlacementRuleGroupByID", PlacementRuleGroupByID(id),
http.MethodGet, http.NoBody, &ruleGroup)
if err != nil {
return nil, err
}
return &ruleGroup, nil
}

// SetPlacementRuleGroup sets the placement rule group.
func (c *client) SetPlacementRuleGroup(ctx context.Context, ruleGroup *RuleGroup) error {
ruleGroupJSON, err := json.Marshal(ruleGroup)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"SetPlacementRuleGroup", placementRuleGroup,
http.MethodPost, bytes.NewBuffer(ruleGroupJSON), nil)
}

// DeletePlacementRuleGroupByID deletes the placement rule group by ID.
func (c *client) DeletePlacementRuleGroupByID(ctx context.Context, id string) error {
return c.requestWithRetry(ctx,
"DeletePlacementRuleGroupByID", PlacementRuleGroupByID(id),
http.MethodDelete, http.NoBody, nil)
}

// GetAllRegionLabelRules gets all region label rules.
func (c *client) GetAllRegionLabelRules(ctx context.Context) ([]*LabelRule, error) {
var labelRules []*LabelRule
Expand Down Expand Up @@ -497,17 +557,34 @@ func (c *client) PatchRegionLabelRules(ctx context.Context, labelRulePatch *Labe
}

// AccelerateSchedule accelerates the scheduling of the regions within the given key range.
func (c *client) AccelerateSchedule(ctx context.Context, startKey, endKey []byte) error {
input := map[string]string{
"start_key": url.QueryEscape(string(startKey)),
"end_key": url.QueryEscape(string(endKey)),
func (c *client) AccelerateSchedule(ctx context.Context, keyRange *KeyRange) error {
inputJSON, err := json.Marshal(map[string]string{
"start_key": url.QueryEscape(hex.EncodeToString(keyRange.StartKey)),
"end_key": url.QueryEscape(hex.EncodeToString(keyRange.EndKey)),
})
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"AccelerateSchedule", AccelerateSchedule,
http.MethodPost, bytes.NewBuffer(inputJSON), nil)
}

// AccelerateScheduleInBatch accelerates the scheduling of the regions within the given key ranges in batch.
func (c *client) AccelerateScheduleInBatch(ctx context.Context, keyRanges []*KeyRange) error {
input := make([]map[string]string, 0, len(keyRanges))
for _, keyRange := range keyRanges {
input = append(input, map[string]string{
"start_key": url.QueryEscape(hex.EncodeToString(keyRange.StartKey)),
"end_key": url.QueryEscape(hex.EncodeToString(keyRange.EndKey)),
})
}
inputJSON, err := json.Marshal(input)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"AccelerateSchedule", AccelerateSchedule,
"AccelerateScheduleInBatch", AccelerateScheduleInBatch,
http.MethodPost, bytes.NewBuffer(inputJSON), nil)
}

Expand Down
61 changes: 60 additions & 1 deletion client/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,16 @@

package http

import "time"
import (
"encoding/json"
"time"
)

// KeyRange defines a range of keys.
type KeyRange struct {
StartKey []byte `json:"start_key"`
EndKey []byte `json:"end_key"`
}

// NOTICE: the structures below are copied from the PD API definitions.
// Please make sure the consistency if any change happens to the PD API.
Expand Down Expand Up @@ -247,6 +256,56 @@ type Rule struct {
CreateTimestamp uint64 `json:"create_timestamp,omitempty"` // only set at runtime, recorded rule create timestamp
}

// String returns the string representation of this rule.
func (r *Rule) String() string {
b, _ := json.Marshal(r)
return string(b)
}

// Clone returns a copy of Rule.
func (r *Rule) Clone() *Rule {
var clone Rule
json.Unmarshal([]byte(r.String()), &clone)
clone.StartKey = append(r.StartKey[:0:0], r.StartKey...)
clone.EndKey = append(r.EndKey[:0:0], r.EndKey...)
return &clone
}

// RuleOpType indicates the operation type
type RuleOpType string

const (
// RuleOpAdd a placement rule, only need to specify the field *Rule
RuleOpAdd RuleOpType = "add"
// RuleOpDel a placement rule, only need to specify the field `GroupID`, `ID`, `MatchID`
RuleOpDel RuleOpType = "del"
)

// RuleOp is for batching placement rule actions.
// The action type is distinguished by the field `Action`.
type RuleOp struct {
*Rule // information of the placement rule to add/delete the operation type
Action RuleOpType `json:"action"`
DeleteByIDPrefix bool `json:"delete_by_id_prefix"` // if action == delete, delete by the prefix of id
}

func (r RuleOp) String() string {
b, _ := json.Marshal(r)
return string(b)
}

// RuleGroup defines properties of a rule group.
type RuleGroup struct {
ID string `json:"id,omitempty"`
Index int `json:"index,omitempty"`
Override bool `json:"override,omitempty"`
}

func (g *RuleGroup) String() string {
b, _ := json.Marshal(g)
return string(b)
}

// GroupBundle represents a rule group and all rules belong to the group.
type GroupBundle struct {
ID string `json:"group_id"`
Expand Down
5 changes: 5 additions & 0 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,11 @@ func (c *Controller) ClearSuspectKeyRanges() {
c.suspectKeyRanges.Clear()
}

// ClearSuspectRegions clears the suspect regions, only for unit test
func (c *Controller) ClearSuspectRegions() {
c.suspectRegions.Clear()
}

// IsPendingRegion returns true if the given region is in the pending list.
func (c *Controller) IsPendingRegion(regionID uint64) bool {
_, exist := c.ruleChecker.pendingList.Get(regionID)
Expand Down
5 changes: 5 additions & 0 deletions pkg/utils/tsoutil/tsoutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ const (
logicalBits = (1 << physicalShiftBits) - 1
)

// TimeToTS converts a `time.Time` to an `uint64` TS.
func TimeToTS(t time.Time) uint64 {
return ComposeTS(t.UnixNano()/int64(time.Millisecond), 0)
}

// ParseTS parses the ts to (physical,logical).
func ParseTS(ts uint64) (time.Time, uint64) {
physical, logical := ParseTSUint64(ts)
Expand Down
6 changes: 4 additions & 2 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2241,7 +2241,9 @@ func (c *RaftCluster) SetMinResolvedTS(storeID, minResolvedTS uint64) error {
return nil
}

func (c *RaftCluster) checkAndUpdateMinResolvedTS() (uint64, bool) {
// CheckAndUpdateMinResolvedTS checks and updates the min resolved ts of the cluster.
// This is exported for testing purpose.
func (c *RaftCluster) CheckAndUpdateMinResolvedTS() (uint64, bool) {
c.Lock()
defer c.Unlock()

Expand Down Expand Up @@ -2284,7 +2286,7 @@ func (c *RaftCluster) runMinResolvedTSJob() {
case <-ticker.C:
interval = c.opt.GetMinResolvedTSPersistenceInterval()
if interval != 0 {
if current, needPersist := c.checkAndUpdateMinResolvedTS(); needPersist {
if current, needPersist := c.CheckAndUpdateMinResolvedTS(); needPersist {
c.storage.SaveMinResolvedTS(current)
}
} else {
Expand Down
7 changes: 7 additions & 0 deletions server/cluster/scheduling_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,13 @@ func (sc *schedulingController) ClearSuspectKeyRanges() {
sc.coordinator.GetCheckerController().ClearSuspectKeyRanges()
}

// ClearSuspectRegions clears the suspect regions, only for unit test
func (sc *schedulingController) ClearSuspectRegions() {
sc.mu.RLock()
defer sc.mu.RUnlock()
sc.coordinator.GetCheckerController().ClearSuspectRegions()
}

// AddSuspectKeyRange adds the key range with the its ruleID as the key
// The instance of each keyRange is like following format:
// [2][]byte: start key/end key
Expand Down
Loading