Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client/http: implement more HTTP APIs #7371

Merged
merged 4 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 101 additions & 11 deletions client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,58 @@ package http
import (
"fmt"
"net/url"
"time"
)

// The following constants are the paths of PD HTTP APIs.
const (
HotRead = "/pd/api/v1/hotspot/regions/read"
HotWrite = "/pd/api/v1/hotspot/regions/write"
Regions = "/pd/api/v1/regions"
regionByID = "/pd/api/v1/region/id"
regionByKey = "/pd/api/v1/region/key"
regionsByKey = "/pd/api/v1/regions/key"
regionsByStoreID = "/pd/api/v1/regions/store"
Stores = "/pd/api/v1/stores"
// Metadata
HotRead = "/pd/api/v1/hotspot/regions/read"
HotWrite = "/pd/api/v1/hotspot/regions/write"
HotHistory = "/pd/api/v1/hotspot/regions/history"
RegionByIDPrefix = "/pd/api/v1/region/id"
regionByKey = "/pd/api/v1/region/key"
Regions = "/pd/api/v1/regions"
regionsByKey = "/pd/api/v1/regions/key"
RegionsByStoreIDPrefix = "/pd/api/v1/regions/store"
EmptyRegions = "/pd/api/v1/regions/check/empty-region"
accelerateSchedule = "/pd/api/v1/regions/accelerate-schedule"
store = "/pd/api/v1/store"
Stores = "/pd/api/v1/stores"
StatsRegion = "/pd/api/v1/stats/region"
// Config
Config = "/pd/api/v1/config"
ClusterVersion = "/pd/api/v1/config/cluster-version"
ScheduleConfig = "/pd/api/v1/config/schedule"
ReplicateConfig = "/pd/api/v1/config/replicate"
// Rule
PlacementRule = "/pd/api/v1/config/rule"
PlacementRules = "/pd/api/v1/config/rules"
placementRulesByGroup = "/pd/api/v1/config/rules/group"
RegionLabelRule = "/pd/api/v1/config/region-label/rule"
// Scheduler
Schedulers = "/pd/api/v1/schedulers"
scatterRangeScheduler = "/pd/api/v1/schedulers/scatter-range-"
// Admin
ResetTS = "/pd/api/v1/admin/reset-ts"
BaseAllocID = "/pd/api/v1/admin/base-alloc-id"
SnapshotRecoveringMark = "/pd/api/v1/admin/cluster/markers/snapshot-recovering"
// Debug
PProfProfile = "/pd/api/v1/debug/pprof/profile"
PProfHeap = "/pd/api/v1/debug/pprof/heap"
PProfMutex = "/pd/api/v1/debug/pprof/mutex"
PProfAllocs = "/pd/api/v1/debug/pprof/allocs"
PProfBlock = "/pd/api/v1/debug/pprof/block"
PProfGoroutine = "/pd/api/v1/debug/pprof/goroutine"
// Others
MinResolvedTSPrefix = "/pd/api/v1/min-resolved-ts"
Status = "/pd/api/v1/status"
Version = "/pd/api/v1/version"
)

// RegionByID returns the path of PD HTTP API to get region by ID.
func RegionByID(regionID uint64) string {
return fmt.Sprintf("%s/%d", regionByID, regionID)
return fmt.Sprintf("%s/%d", RegionByIDPrefix, regionID)
}

// RegionByKey returns the path of PD HTTP API to get region by key.
Expand All @@ -45,10 +79,66 @@ func RegionByKey(key []byte) string {
// RegionsByKey returns the path of PD HTTP API to scan regions with given start key, end key and limit parameters.
func RegionsByKey(startKey, endKey []byte, limit int) string {
return fmt.Sprintf("%s?start_key=%s&end_key=%s&limit=%d",
regionsByKey, url.QueryEscape(string(startKey)), url.QueryEscape(string(endKey)), limit)
regionsByKey,
url.QueryEscape(string(startKey)),
url.QueryEscape(string(endKey)),
limit)
}

// RegionsByStoreID returns the path of PD HTTP API to get regions by store ID.
func RegionsByStoreID(storeID uint64) string {
return fmt.Sprintf("%s/%d", regionsByStoreID, storeID)
return fmt.Sprintf("%s/%d", RegionsByStoreIDPrefix, storeID)
}

// RegionStatsByKeyRange returns the path of PD HTTP API to get region stats by start key and end key.
func RegionStatsByKeyRange(startKey, endKey []byte) string {
return fmt.Sprintf("%s?start_key=%s&end_key=%s",
StatsRegion,
url.QueryEscape(string(startKey)),
url.QueryEscape(string(endKey)))
}

// StoreByID returns the store API with store ID parameter.
func StoreByID(id uint64) string {
return fmt.Sprintf("%s/%d", store, id)
}

// StoreLabelByID returns the store label API with store ID parameter.
func StoreLabelByID(id uint64) string {
return fmt.Sprintf("%s/%d/label", store, id)
}

// ConfigWithTTLSeconds returns the config API with the TTL seconds parameter.
func ConfigWithTTLSeconds(ttlSeconds float64) string {
return fmt.Sprintf("%s?ttlSecond=%.0f", Config, ttlSeconds)
}

// PlacementRulesByGroup returns the path of PD HTTP API to get placement rules by group.
func PlacementRulesByGroup(group string) string {
return fmt.Sprintf("%s/%s", placementRulesByGroup, group)
}

// PlacementRuleByGroupAndID returns the path of PD HTTP API to get placement rule by group and ID.
func PlacementRuleByGroupAndID(group, id string) string {
return fmt.Sprintf("%s/%s/%s", PlacementRule, group, id)
}

// SchedulerByName returns the scheduler API with the given scheduler name.
func SchedulerByName(name string) string {
return fmt.Sprintf("%s/%s", Schedulers, name)
}

// ScatterRangeSchedulerWithName returns the scatter range scheduler API with name parameter.
func ScatterRangeSchedulerWithName(name string) string {
return fmt.Sprintf("%s%s", scatterRangeScheduler, name)
}

// PProfProfileAPIWithInterval returns the pprof profile API with interval parameter.
func PProfProfileAPIWithInterval(interval time.Duration) string {
return fmt.Sprintf("%s?seconds=%d", PProfProfile, interval/time.Second)
}

// PProfGoroutineWithDebugLevel returns the pprof goroutine API with debug level parameter.
func PProfGoroutineWithDebugLevel(level int) string {
return fmt.Sprintf("%s?debug=%d", PProfGoroutine, level)
}
126 changes: 106 additions & 20 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
package http

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"

Expand All @@ -43,12 +45,17 @@ type Client interface {
GetRegionByID(context.Context, uint64) (*RegionInfo, error)
GetRegionByKey(context.Context, []byte) (*RegionInfo, error)
GetRegions(context.Context) (*RegionsInfo, error)
GetRegionsByKey(context.Context, []byte, []byte, int) (*RegionsInfo, error)
GetRegionsByKeyRange(context.Context, []byte, []byte, int) (*RegionsInfo, error)
GetRegionsByStoreID(context.Context, uint64) (*RegionsInfo, error)
GetHotReadRegions(context.Context) (*StoreHotPeersInfos, error)
GetHotWriteRegions(context.Context) (*StoreHotPeersInfos, error)
GetRegionStatusByKeyRange(context.Context, []byte, []byte) (*RegionStats, error)
GetStores(context.Context) (*StoresInfo, error)
GetPlacementRulesByGroup(context.Context, string) ([]*Rule, error)
SetPlacementRule(context.Context, *Rule) error
DeletePlacementRule(context.Context, string, string) error
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error)
AccelerateSchedule(context.Context, []byte, []byte) error
Close()
}

Expand Down Expand Up @@ -154,16 +161,16 @@ func (c *client) execDuration(name string, duration time.Duration) {
// it consistent with the current implementation of some clients (e.g. TiDB).
func (c *client) requestWithRetry(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to customize the retry?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will provide the function of custom backoffer in the future, but at present, the interface we are about to replace in TiDB uses this simple retry method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still prefer to leave it to the caller even if the retry method is simple.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense to me though I'd like to implement it in an independent PR later.

ctx context.Context,
name, uri string,
res interface{},
name, uri, method string,
body io.Reader, res interface{},
) error {
var (
err error
addr string
)
for idx := 0; idx < len(c.pdAddrs); idx++ {
addr = c.pdAddrs[idx]
err = c.request(ctx, name, addr, uri, res)
err = c.request(ctx, name, fmt.Sprintf("%s%s", addr, uri), method, body, res)
if err == nil {
break
}
Expand All @@ -175,16 +182,15 @@ func (c *client) requestWithRetry(

func (c *client) request(
ctx context.Context,
name, addr, uri string,
res interface{},
name, url, method string,
body io.Reader, res interface{},
) error {
reqURL := fmt.Sprintf("%s%s", addr, uri)
logFields := []zap.Field{
zap.String("name", name),
zap.String("url", reqURL),
zap.String("url", url),
}
log.Debug("[pd] request the http url", logFields...)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil)
req, err := http.NewRequestWithContext(ctx, method, url, body)
if err != nil {
log.Error("[pd] create http request failed", append(logFields, zap.Error(err))...)
return errors.Trace(err)
Expand Down Expand Up @@ -219,6 +225,10 @@ func (c *client) request(
return errors.Errorf("request pd http api failed with status: '%s'", resp.Status)
}

if res == nil {
return nil
}

err = json.NewDecoder(resp.Body).Decode(res)
if err != nil {
return errors.Trace(err)
Expand All @@ -229,7 +239,9 @@ func (c *client) request(
// GetRegionByID gets the region info by ID.
func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*RegionInfo, error) {
var region RegionInfo
err := c.requestWithRetry(ctx, "GetRegionByID", RegionByID(regionID), &region)
err := c.requestWithRetry(ctx,
"GetRegionByID", RegionByID(regionID),
http.MethodGet, nil, &region)
if err != nil {
return nil, err
}
Expand All @@ -239,7 +251,9 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*RegionInf
// GetRegionByKey gets the region info by key.
func (c *client) GetRegionByKey(ctx context.Context, key []byte) (*RegionInfo, error) {
var region RegionInfo
err := c.requestWithRetry(ctx, "GetRegionByKey", RegionByKey(key), &region)
err := c.requestWithRetry(ctx,
"GetRegionByKey", RegionByKey(key),
http.MethodGet, nil, &region)
if err != nil {
return nil, err
}
Expand All @@ -249,17 +263,21 @@ func (c *client) GetRegionByKey(ctx context.Context, key []byte) (*RegionInfo, e
// GetRegions gets the regions info.
func (c *client) GetRegions(ctx context.Context) (*RegionsInfo, error) {
var regions RegionsInfo
err := c.requestWithRetry(ctx, "GetRegions", Regions, &regions)
err := c.requestWithRetry(ctx,
"GetRegions", Regions,
http.MethodGet, nil, &regions)
if err != nil {
return nil, err
}
return &regions, nil
}

// GetRegionsByKey gets the regions info by key range. If the limit is -1, it will return all regions within the range.
func (c *client) GetRegionsByKey(ctx context.Context, startKey, endKey []byte, limit int) (*RegionsInfo, error) {
// GetRegionsByKeyRange gets the regions info by key range. If the limit is -1, it will return all regions within the range.
func (c *client) GetRegionsByKeyRange(ctx context.Context, startKey, endKey []byte, limit int) (*RegionsInfo, error) {
var regions RegionsInfo
err := c.requestWithRetry(ctx, "GetRegionsByKey", RegionsByKey(startKey, endKey, limit), &regions)
err := c.requestWithRetry(ctx,
"GetRegionsByKeyRange", RegionsByKey(startKey, endKey, limit),
http.MethodGet, nil, &regions)
if err != nil {
return nil, err
}
Expand All @@ -269,7 +287,9 @@ func (c *client) GetRegionsByKey(ctx context.Context, startKey, endKey []byte, l
// GetRegionsByStoreID gets the regions info by store ID.
func (c *client) GetRegionsByStoreID(ctx context.Context, storeID uint64) (*RegionsInfo, error) {
var regions RegionsInfo
err := c.requestWithRetry(ctx, "GetRegionsByStoreID", RegionsByStoreID(storeID), &regions)
err := c.requestWithRetry(ctx,
"GetRegionsByStoreID", RegionsByStoreID(storeID),
http.MethodGet, nil, &regions)
if err != nil {
return nil, err
}
Expand All @@ -279,7 +299,9 @@ func (c *client) GetRegionsByStoreID(ctx context.Context, storeID uint64) (*Regi
// GetHotReadRegions gets the hot read region statistics info.
func (c *client) GetHotReadRegions(ctx context.Context) (*StoreHotPeersInfos, error) {
var hotReadRegions StoreHotPeersInfos
err := c.requestWithRetry(ctx, "GetHotReadRegions", HotRead, &hotReadRegions)
err := c.requestWithRetry(ctx,
"GetHotReadRegions", HotRead,
http.MethodGet, nil, &hotReadRegions)
if err != nil {
return nil, err
}
Expand All @@ -289,23 +311,70 @@ func (c *client) GetHotReadRegions(ctx context.Context) (*StoreHotPeersInfos, er
// GetHotWriteRegions gets the hot write region statistics info.
func (c *client) GetHotWriteRegions(ctx context.Context) (*StoreHotPeersInfos, error) {
var hotWriteRegions StoreHotPeersInfos
err := c.requestWithRetry(ctx, "GetHotWriteRegions", HotWrite, &hotWriteRegions)
err := c.requestWithRetry(ctx,
"GetHotWriteRegions", HotWrite,
http.MethodGet, nil, &hotWriteRegions)
if err != nil {
return nil, err
}
return &hotWriteRegions, nil
}

// GetRegionStatusByKeyRange gets the region status by key range.
func (c *client) GetRegionStatusByKeyRange(ctx context.Context, startKey, endKey []byte) (*RegionStats, error) {
var regionStats RegionStats
err := c.requestWithRetry(ctx,
"GetRegionStatusByKeyRange", RegionStatsByKeyRange(startKey, endKey),
http.MethodGet, nil, &regionStats,
)
if err != nil {
return nil, err
}
return &regionStats, nil
}

// GetStores gets the stores info.
func (c *client) GetStores(ctx context.Context) (*StoresInfo, error) {
var stores StoresInfo
err := c.requestWithRetry(ctx, "GetStores", Stores, &stores)
err := c.requestWithRetry(ctx,
"GetStores", Stores,
http.MethodGet, nil, &stores)
if err != nil {
return nil, err
}
return &stores, nil
}

// GetPlacementRulesByGroup gets the placement rules by group.
func (c *client) GetPlacementRulesByGroup(ctx context.Context, group string) ([]*Rule, error) {
var rules []*Rule
err := c.requestWithRetry(ctx,
"GetPlacementRulesByGroup", PlacementRulesByGroup(group),
http.MethodGet, nil, &rules)
if err != nil {
return nil, err
}
return rules, nil
}

// SetPlacementRule sets the placement rule.
func (c *client) SetPlacementRule(ctx context.Context, rule *Rule) error {
ruleJSON, err := json.Marshal(rule)
if err != nil {
return errors.Trace(err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why needs errors.Trace?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One is to be consistent with the original HTTP call code of TiDB, and the other is that because it is a general library, adding trace information can help us better locate where the call is wrong I think.

}
return c.requestWithRetry(ctx,
"SetPlacementRule", PlacementRule,
http.MethodPost, bytes.NewBuffer(ruleJSON), nil)
}

// DeletePlacementRule deletes the placement rule.
func (c *client) DeletePlacementRule(ctx context.Context, group, id string) error {
return c.requestWithRetry(ctx,
"DeletePlacementRule", PlacementRuleByGroupAndID(group, id),
http.MethodDelete, nil, nil)
}

// GetMinResolvedTSByStoresIDs get min-resolved-ts by stores IDs.
func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uint64) (uint64, map[uint64]uint64, error) {
uri := MinResolvedTSPrefix
Expand All @@ -326,7 +395,9 @@ func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uin
IsRealTime bool `json:"is_real_time,omitempty"`
StoresMinResolvedTS map[uint64]uint64 `json:"stores_min_resolved_ts"`
}{}
err := c.requestWithRetry(ctx, "GetMinResolvedTSByStoresIDs", uri, &resp)
err := c.requestWithRetry(ctx,
"GetMinResolvedTSByStoresIDs", uri,
http.MethodGet, nil, &resp)
if err != nil {
return 0, nil, err
}
Expand All @@ -335,3 +406,18 @@ func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uin
}
return resp.MinResolvedTS, resp.StoresMinResolvedTS, nil
}

// AccelerateSchedule accelerates the scheduling of the regions within the given key range.
func (c *client) AccelerateSchedule(ctx context.Context, startKey, endKey []byte) error {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
input := map[string]string{
"start_key": url.QueryEscape(string(startKey)),
"end_key": url.QueryEscape(string(endKey)),
}
inputJSON, err := json.Marshal(input)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"AccelerateSchedule", accelerateSchedule,
http.MethodPost, bytes.NewBuffer(inputJSON), nil)
}
Loading