Skip to content

Commit

Permalink
resolve the conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Apr 22, 2024
1 parent a0beefa commit 5eea1b7
Show file tree
Hide file tree
Showing 13 changed files with 53 additions and 210 deletions.
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func deleteAllRegionCache(c *gin.Context) {
c.String(http.StatusInternalServerError, errs.ErrNotBootstrapped.GenWithStackByArgs().Error())
return
}
cluster.DropCacheAllRegion()
cluster.ResetRegionCache()
c.String(http.StatusOK, "All regions are removed from server cache.")
}

Expand All @@ -297,7 +297,7 @@ func deleteRegionCacheByID(c *gin.Context) {
c.String(http.StatusBadRequest, err.Error())
return
}
cluster.DropCacheRegion(regionID)
cluster.RemoveRegionIfExist(regionID)
c.String(http.StatusOK, "The region is removed from server cache.")
}

Expand Down
10 changes: 0 additions & 10 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,13 +672,3 @@ func (c *Cluster) IsPrepared() bool {
func (c *Cluster) SetPrepared() {
c.coordinator.GetPrepareChecker().SetPrepared()
}

// DropCacheAllRegion removes all cached regions.
func (c *Cluster) DropCacheAllRegion() {
c.ResetRegionCache()
}

// DropCacheRegion removes a region from the cache.
func (c *Cluster) DropCacheRegion(id uint64) {
c.RemoveRegionIfExist(id)
}
2 changes: 1 addition & 1 deletion pkg/schedule/checker/rule_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1980,7 +1980,7 @@ func makeStores() placement.StoreSet {
if zone == 1 && host == 1 {
labels["type"] = "read"
}
stores.SetStore(core.NewStoreInfoWithLabel(id, labels).Clone(core.SetLastHeartbeatTS(now), core.SetStoreState(metapb.StoreState_Up)))
stores.PutStore(core.NewStoreInfoWithLabel(id, labels).Clone(core.SetLastHeartbeatTS(now), core.SetStoreState(metapb.StoreState_Up)))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/placement/fit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func makeStores() StoreSet {
if id == 1111 || id == 2111 || id == 3111 {
labels["disk"] = "ssd"
}
stores.SetStore(core.NewStoreInfoWithLabel(id, labels).Clone(core.SetLastHeartbeatTS(now)))
stores.PutStore(core.NewStoreInfoWithLabel(id, labels).Clone(core.SetLastHeartbeatTS(now)))
}
}
}
Expand Down
Empty file modified pkg/storage/leveldb_backend.go
100644 → 100755
Empty file.
6 changes: 3 additions & 3 deletions server/api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (h *adminHandler) DeleteRegionCache(w http.ResponseWriter, r *http.Request)
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
rc.DropCacheRegion(regionID)
rc.RemoveRegionIfExist(regionID)
if h.svr.IsServiceIndependent(utils.SchedulingServiceName) {
err = h.DeleteRegionCacheInSchedulingServer(regionID)
}
Expand Down Expand Up @@ -100,7 +100,7 @@ func (h *adminHandler) DeleteRegionStorage(w http.ResponseWriter, r *http.Reques
return
}
// Remove region from cache.
rc.DropCacheRegion(regionID)
rc.RemoveRegionIfExist(regionID)

Check warning on line 103 in server/api/admin.go

View check run for this annotation

Codecov / codecov/patch

server/api/admin.go#L103

Added line #L103 was not covered by tests
if h.svr.IsServiceIndependent(utils.SchedulingServiceName) {
err = h.DeleteRegionCacheInSchedulingServer(regionID)
}
Expand All @@ -116,7 +116,7 @@ func (h *adminHandler) DeleteRegionStorage(w http.ResponseWriter, r *http.Reques
func (h *adminHandler) DeleteAllRegionCache(w http.ResponseWriter, r *http.Request) {
var err error
rc := getCluster(r)
rc.DropCacheAllRegion()
rc.ResetRegionCache()
if h.svr.IsServiceIndependent(utils.SchedulingServiceName) {
err = h.DeleteRegionCacheInSchedulingServer()
}
Expand Down
167 changes: 10 additions & 157 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ type RaftCluster struct {
// This below fields are all read-only, we cannot update itself after the raft cluster starts.
clusterID uint64
id id.Allocator
core *core.BasicCluster // cached cluster info
opt *config.PersistOptions
limiter *StoreLimiter
*schedulingController
Expand Down Expand Up @@ -197,7 +196,7 @@ func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.Ba
regionSyncer: regionSyncer,
httpClient: httpClient,
etcdClient: etcdClient,
core: basicCluster,
BasicCluster: basicCluster,
storage: storage,
taskRunner: ratelimit.NewConcurrentRunner(hbConcurrentRunner, time.Minute),
hbConcurrencyLimiter: ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU() * 2)),
Expand Down Expand Up @@ -290,7 +289,7 @@ func (c *RaftCluster) InitCluster(
return err
}
}
c.schedulingController = newSchedulingController(c.ctx, c.core, c.opt, c.ruleManager)
c.schedulingController = newSchedulingController(c.ctx, c.BasicCluster, c.opt, c.ruleManager)
return nil
}

Expand Down Expand Up @@ -650,7 +649,7 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) {
start = time.Now()

// used to load region from kv storage to cache storage.
if err = storage.TryLoadRegionsOnce(c.ctx, c.storage, c.core.CheckAndPutRegion); err != nil {
if err = storage.TryLoadRegionsOnce(c.ctx, c.storage, c.CheckAndPutRegion); err != nil {
return nil, err
}
log.Info("load regions",
Expand Down Expand Up @@ -722,7 +721,7 @@ func (c *RaftCluster) runUpdateStoreStats() {
case <-ticker.C:
// Update related stores.
start := time.Now()
c.core.UpdateAllStoreStatus()
c.UpdateAllStoreStatus()
updateStoreStatsGauge.Set(time.Since(start).Seconds())
}
}
Expand Down Expand Up @@ -901,7 +900,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest
if store := c.GetStore(storeID); store != nil {
statistics.UpdateStoreHeartbeatMetrics(store)
}
c.core.PutStore(newStore)
c.PutStore(newStore)
var (
regions map[uint64]*core.RegionInfo
interval uint64
Expand Down Expand Up @@ -1004,7 +1003,7 @@ var syncRunner = ratelimit.NewSyncRunner()
// processRegionHeartbeat updates the region information.
func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *core.RegionInfo) error {
tracer := ctx.Tracer
origin, _, err := c.core.PreCheckPutRegion(region)
origin, _, err := c.PreCheckPutRegion(region)
tracer.OnPreCheckFinished()
if err != nil {
return err
Expand Down Expand Up @@ -1065,7 +1064,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
// check its validation again here.
//
// However, it can't solve the race condition of concurrent heartbeats from the same region.
if overlaps, err = c.core.AtomicCheckAndPutRegion(ctx, region); err != nil {
if overlaps, err = c.AtomicCheckAndPutRegion(ctx, region); err != nil {
tracer.OnSaveCacheFinished()
return err
}
Expand Down Expand Up @@ -1155,153 +1154,7 @@ func (c *RaftCluster) putMetaLocked(meta *metapb.Cluster) error {

// GetBasicCluster returns the basic cluster.
func (c *RaftCluster) GetBasicCluster() *core.BasicCluster {
return c.core
}

// GetRegionByKey gets regionInfo by region key from cluster.
func (c *RaftCluster) GetRegionByKey(regionKey []byte) *core.RegionInfo {
return c.core.GetRegionByKey(regionKey)
}

// GetPrevRegionByKey gets previous region and leader peer by the region key from cluster.
func (c *RaftCluster) GetPrevRegionByKey(regionKey []byte) *core.RegionInfo {
return c.core.GetPrevRegionByKey(regionKey)
}

// ScanRegions scans region with start key, until the region contains endKey, or
// total number greater than limit.
func (c *RaftCluster) ScanRegions(startKey, endKey []byte, limit int) []*core.RegionInfo {
return c.core.ScanRegions(startKey, endKey, limit)
}

// GetRegion searches for a region by ID.
func (c *RaftCluster) GetRegion(regionID uint64) *core.RegionInfo {
return c.core.GetRegion(regionID)
}

// GetMetaRegions gets regions from cluster.
func (c *RaftCluster) GetMetaRegions() []*metapb.Region {
return c.core.GetMetaRegions()
}

// GetRegions returns all regions' information in detail.
func (c *RaftCluster) GetRegions() []*core.RegionInfo {
return c.core.GetRegions()
}

// ValidRegion is used to decide if the region is valid.
func (c *RaftCluster) ValidRegion(region *metapb.Region) error {
return c.core.ValidRegion(region)
}

// GetTotalRegionCount returns total count of regions
func (c *RaftCluster) GetTotalRegionCount() int {
return c.core.GetTotalRegionCount()
}

// GetStoreRegions returns all regions' information with a given storeID.
func (c *RaftCluster) GetStoreRegions(storeID uint64) []*core.RegionInfo {
return c.core.GetStoreRegions(storeID)
}

// RandLeaderRegions returns some random regions that has leader on the store.
func (c *RaftCluster) RandLeaderRegions(storeID uint64, ranges []core.KeyRange) []*core.RegionInfo {
return c.core.RandLeaderRegions(storeID, ranges)
}

// RandFollowerRegions returns some random regions that has a follower on the store.
func (c *RaftCluster) RandFollowerRegions(storeID uint64, ranges []core.KeyRange) []*core.RegionInfo {
return c.core.RandFollowerRegions(storeID, ranges)
}

// RandPendingRegions returns some random regions that has a pending peer on the store.
func (c *RaftCluster) RandPendingRegions(storeID uint64, ranges []core.KeyRange) []*core.RegionInfo {
return c.core.RandPendingRegions(storeID, ranges)
}

// RandLearnerRegions returns some random regions that has a learner peer on the store.
func (c *RaftCluster) RandLearnerRegions(storeID uint64, ranges []core.KeyRange) []*core.RegionInfo {
return c.core.RandLearnerRegions(storeID, ranges)
}

// RandWitnessRegions returns some random regions that has a witness peer on the store.
func (c *RaftCluster) RandWitnessRegions(storeID uint64, ranges []core.KeyRange) []*core.RegionInfo {
return c.core.RandWitnessRegions(storeID, ranges)
}

// GetLeaderStore returns all stores that contains the region's leader peer.
func (c *RaftCluster) GetLeaderStore(region *core.RegionInfo) *core.StoreInfo {
return c.core.GetLeaderStore(region)
}

// GetNonWitnessVoterStores returns all stores that contains the region's non-witness voter peer.
func (c *RaftCluster) GetNonWitnessVoterStores(region *core.RegionInfo) []*core.StoreInfo {
return c.core.GetNonWitnessVoterStores(region)
}

// GetFollowerStores returns all stores that contains the region's follower peer.
func (c *RaftCluster) GetFollowerStores(region *core.RegionInfo) []*core.StoreInfo {
return c.core.GetFollowerStores(region)
}

// GetRegionStores returns all stores that contains the region's peer.
func (c *RaftCluster) GetRegionStores(region *core.RegionInfo) []*core.StoreInfo {
return c.core.GetRegionStores(region)
}

// GetStoreCount returns the count of stores.
func (c *RaftCluster) GetStoreCount() int {
return c.core.GetStoreCount()
}

// GetStoreRegionCount returns the number of regions for a given store.
func (c *RaftCluster) GetStoreRegionCount(storeID uint64) int {
return c.core.GetStoreRegionCount(storeID)
}

// GetAverageRegionSize returns the average region approximate size.
func (c *RaftCluster) GetAverageRegionSize() int64 {
return c.core.GetAverageRegionSize()
}

// DropCacheRegion removes a region from the cache.
func (c *RaftCluster) DropCacheRegion(id uint64) {
c.core.RemoveRegionIfExist(id)
}

// DropCacheAllRegion removes all regions from the cache.
func (c *RaftCluster) DropCacheAllRegion() {
c.core.ResetRegionCache()
}

// GetMetaStores gets stores from cluster.
func (c *RaftCluster) GetMetaStores() []*metapb.Store {
return c.core.GetMetaStores()
}

// GetStores returns all stores in the cluster.
func (c *RaftCluster) GetStores() []*core.StoreInfo {
return c.core.GetStores()
}

// GetLeaderStoreByRegionID returns the leader store of the given region.
func (c *RaftCluster) GetLeaderStoreByRegionID(regionID uint64) *core.StoreInfo {
return c.core.GetLeaderStoreByRegionID(regionID)
}

// GetStore gets store from cluster.
func (c *RaftCluster) GetStore(storeID uint64) *core.StoreInfo {
return c.core.GetStore(storeID)
}

// GetAdjacentRegions returns regions' information that are adjacent with the specific region ID.
func (c *RaftCluster) GetAdjacentRegions(region *core.RegionInfo) (*core.RegionInfo, *core.RegionInfo) {
return c.core.GetAdjacentRegions(region)
}

// GetRangeHoles returns all range holes, i.e the key ranges without any region info.
func (c *RaftCluster) GetRangeHoles() [][]string {
return c.core.GetRangeHoles()
return c.BasicCluster
}

// UpdateStoreLabels updates a store's location labels
Expand Down Expand Up @@ -1695,7 +1548,7 @@ func (c *RaftCluster) setStore(store *core.StoreInfo) error {
return err
}
}
c.core.PutStore(store)
c.PutStore(store)
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
c.updateStoreStatistics(store.GetID(), store.IsSlow())
}
Expand Down Expand Up @@ -2138,7 +1991,7 @@ func (c *RaftCluster) PutMetaCluster(meta *metapb.Cluster) error {

// GetRegionStatsByRange returns region statistics from cluster.
func (c *RaftCluster) GetRegionStatsByRange(startKey, endKey []byte) *statistics.RegionStats {
return statistics.GetRegionStats(c.core.ScanRegions(startKey, endKey, -1))
return statistics.GetRegionStats(c.ScanRegions(startKey, endKey, -1))
}

// GetRegionStatsCount returns the number of regions in the range.
Expand Down
Loading

0 comments on commit 5eea1b7

Please sign in to comment.