Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: optimize memory usage #8164

Merged
merged 8 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,25 @@ type Cluster interface {

// HandleStatsAsync handles the flow asynchronously.
func HandleStatsAsync(c Cluster, region *core.RegionInfo) {
c.GetHotStat().CheckWriteAsync(statistics.NewCheckExpiredItemTask(region))
c.GetHotStat().CheckReadAsync(statistics.NewCheckExpiredItemTask(region))
c.GetHotStat().CheckWriteAsync(statistics.NewCheckWritePeerTask(region))
checkWritePeerTask := func(cache *statistics.HotPeerCache) {
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
stats := cache.CheckPeerFlow(region, region.GetPeers(), region.GetWriteLoads(), interval)
for _, stat := range stats {
cache.UpdateStat(stat)
}
}

checkExpiredTask := func(cache *statistics.HotPeerCache) {
expiredStats := cache.CollectExpiredItems(region)
for _, stat := range expiredStats {
cache.UpdateStat(stat)
}
}

c.GetHotStat().CheckWriteAsync(checkExpiredTask)
c.GetHotStat().CheckReadAsync(checkExpiredTask)
c.GetHotStat().CheckWriteAsync(checkWritePeerTask)
c.GetCoordinator().GetSchedulersController().CheckTransferWitnessLeader(region)
}

Expand Down
91 changes: 37 additions & 54 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@
// the properties are Read-Only once created except buckets.
// the `buckets` could be modified by the request `report buckets` with greater version.
type RegionInfo struct {
term uint64
meta *metapb.Region
learners []*metapb.Peer
witnesses []*metapb.Peer
voters []*metapb.Peer
leader *metapb.Peer
downPeers []*pdpb.PeerStats
pendingPeers []*metapb.Peer
term uint64
cpuUsage uint64
writtenBytes uint64
writtenKeys uint64
Expand Down Expand Up @@ -137,26 +137,22 @@

// classifyVoterAndLearner sorts out voter and learner from peers into different slice.
func classifyVoterAndLearner(region *RegionInfo) {
learners := make([]*metapb.Peer, 0, 1)
voters := make([]*metapb.Peer, 0, len(region.meta.Peers))
witnesses := make([]*metapb.Peer, 0, 1)
region.learners = make([]*metapb.Peer, 0, 1)
region.voters = make([]*metapb.Peer, 0, len(region.meta.Peers))
region.witnesses = make([]*metapb.Peer, 0, 1)
for _, p := range region.meta.Peers {
if IsLearner(p) {
learners = append(learners, p)
region.learners = append(region.learners, p)
} else {
voters = append(voters, p)
region.voters = append(region.voters, p)
}
// Whichever peer role can be a witness
if IsWitness(p) {
witnesses = append(witnesses, p)
region.witnesses = append(region.witnesses, p)
}
}
sort.Sort(peerSlice(learners))
sort.Sort(peerSlice(voters))
sort.Sort(peerSlice(witnesses))
region.learners = learners
region.voters = voters
region.witnesses = witnesses
sort.Sort(peerSlice(region.learners))
sort.Sort(peerSlice(region.voters))
sort.Sort(peerSlice(region.witnesses))
}

// peersEqualTo returns true when the peers are not changed, which may caused by: the region leader not changed,
Expand Down Expand Up @@ -214,7 +210,7 @@
}

// RegionFromHeartbeat constructs a Region from region heartbeat.
func RegionFromHeartbeat(heartbeat RegionHeartbeatRequest, opts ...RegionCreateOption) *RegionInfo {
func RegionFromHeartbeat(heartbeat RegionHeartbeatRequest, flowRoundDivisor int) *RegionInfo {
// Convert unit to MB.
// If region isn't empty and less than 1MB, use 1MB instead.
// The size of empty region will be correct by the previous RegionInfo.
Expand All @@ -224,20 +220,21 @@
}

region := &RegionInfo{
term: heartbeat.GetTerm(),
meta: heartbeat.GetRegion(),
leader: heartbeat.GetLeader(),
downPeers: heartbeat.GetDownPeers(),
pendingPeers: heartbeat.GetPendingPeers(),
writtenBytes: heartbeat.GetBytesWritten(),
writtenKeys: heartbeat.GetKeysWritten(),
readBytes: heartbeat.GetBytesRead(),
readKeys: heartbeat.GetKeysRead(),
approximateSize: int64(regionSize),
approximateKeys: int64(heartbeat.GetApproximateKeys()),
interval: heartbeat.GetInterval(),
queryStats: heartbeat.GetQueryStats(),
source: Heartbeat,
term: heartbeat.GetTerm(),
meta: heartbeat.GetRegion(),
leader: heartbeat.GetLeader(),
downPeers: heartbeat.GetDownPeers(),
pendingPeers: heartbeat.GetPendingPeers(),
writtenBytes: heartbeat.GetBytesWritten(),
writtenKeys: heartbeat.GetKeysWritten(),
readBytes: heartbeat.GetBytesRead(),
readKeys: heartbeat.GetKeysRead(),
approximateSize: int64(regionSize),
approximateKeys: int64(heartbeat.GetApproximateKeys()),
interval: heartbeat.GetInterval(),
queryStats: heartbeat.GetQueryStats(),
source: Heartbeat,
flowRoundDivisor: uint64(flowRoundDivisor),
}

// scheduling service doesn't need the following fields.
Expand All @@ -247,10 +244,6 @@
region.cpuUsage = h.GetCpuUsage()
}

for _, opt := range opts {
opt(region)
}

if region.writtenKeys >= ImpossibleFlowSize || region.writtenBytes >= ImpossibleFlowSize {
region.writtenKeys = 0
region.writtenBytes = 0
Expand Down Expand Up @@ -957,11 +950,11 @@
func (r *RegionsInfo) CheckAndPutRegion(region *RegionInfo) []*RegionInfo {
r.t.Lock()
origin := r.getRegionLocked(region.GetID())
var ols []*regionItem
var ols []*RegionInfo
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
ols = r.tree.overlaps(&regionItem{RegionInfo: region})
}
err := check(region, origin, convertItemsToRegions(ols))
err := check(region, origin, ols)
if err != nil {
log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err))
// return the state region to delete.
Expand All @@ -988,25 +981,17 @@
return origin, overlaps, err
}

func convertItemsToRegions(items []*regionItem) []*RegionInfo {
regions := make([]*RegionInfo, 0, len(items))
for _, item := range items {
regions = append(regions, item.RegionInfo)
}
return regions
}

// AtomicCheckAndPutRegion checks if the region is valid to put, if valid then put.
func (r *RegionsInfo) AtomicCheckAndPutRegion(ctx *MetaProcessContext, region *RegionInfo) ([]*RegionInfo, error) {
tracer := ctx.Tracer
r.t.Lock()
var ols []*regionItem
var ols []*RegionInfo
origin := r.getRegionLocked(region.GetID())
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
ols = r.tree.overlaps(&regionItem{RegionInfo: region})
}
tracer.OnCheckOverlapsFinished()
err := check(region, origin, convertItemsToRegions(ols))
err := check(region, origin, ols)
if err != nil {
r.t.Unlock()
tracer.OnValidateRegionFinished()
Expand All @@ -1026,13 +1011,13 @@
func (r *RegionsInfo) CheckAndPutRootTree(ctx *MetaProcessContext, region *RegionInfo) ([]*RegionInfo, error) {
tracer := ctx.Tracer
r.t.Lock()
var ols []*regionItem
var ols []*RegionInfo
origin := r.getRegionLocked(region.GetID())
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
ols = r.tree.overlaps(&regionItem{RegionInfo: region})
}
tracer.OnCheckOverlapsFinished()
err := check(region, origin, convertItemsToRegions(ols))
err := check(region, origin, ols)
if err != nil {
r.t.Unlock()
tracer.OnValidateRegionFinished()
Expand Down Expand Up @@ -1123,7 +1108,7 @@
if len(overlaps) == 0 {
// If the range has changed but the overlapped regions are not provided, collect them by `[]*regionItem`.
for _, item := range r.getOverlapRegionFromOverlapTreeLocked(region) {
r.removeRegionFromSubTreeLocked(item.RegionInfo)
r.removeRegionFromSubTreeLocked(item)
}
} else {
// Remove all provided overlapped regions from the subtrees.
Expand Down Expand Up @@ -1164,7 +1149,7 @@
setPeers(r.pendingPeers, region.GetPendingPeers())
}

func (r *RegionsInfo) getOverlapRegionFromOverlapTreeLocked(region *RegionInfo) []*regionItem {
func (r *RegionsInfo) getOverlapRegionFromOverlapTreeLocked(region *RegionInfo) []*RegionInfo {
return r.overlapTree.overlaps(&regionItem{RegionInfo: region})
}

Expand All @@ -1174,9 +1159,7 @@
defer r.t.RUnlock()
origin = r.getRegionLocked(region.GetID())
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
for _, item := range r.tree.overlaps(&regionItem{RegionInfo: region}) {
overlaps = append(overlaps, item.RegionInfo)
}
return origin, r.tree.overlaps(&regionItem{RegionInfo: region})
}
return
}
Expand Down Expand Up @@ -1211,7 +1194,7 @@
return r.setRegionLocked(region, false)
}

func (r *RegionsInfo) setRegionLocked(region *RegionInfo, withOverlaps bool, ol ...*regionItem) (*RegionInfo, []*RegionInfo, bool) {
func (r *RegionsInfo) setRegionLocked(region *RegionInfo, withOverlaps bool, ol ...*RegionInfo) (*RegionInfo, []*RegionInfo, bool) {
var (
item *regionItem // Pointer to the *RegionInfo of this ID.
origin *RegionInfo
Expand Down Expand Up @@ -1311,7 +1294,7 @@
}

// GetOverlaps returns the regions which are overlapped with the specified region range.
func (r *RegionsInfo) GetOverlaps(region *RegionInfo) []*regionItem {
func (r *RegionsInfo) GetOverlaps(region *RegionInfo) []*RegionInfo {

Check warning on line 1297 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L1297

Added line #L1297 was not covered by tests
r.t.RLock()
defer r.t.RUnlock()
return r.tree.overlaps(&regionItem{RegionInfo: region})
Expand Down
8 changes: 5 additions & 3 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,19 @@ func TestSortedEqual(t *testing.T) {
re.Equal(testCase.isEqual, SortedPeersEqual(regionA.GetVoters(), regionB.GetVoters()))
}

flowRoundDivisor := 3
// test RegionFromHeartbeat
for _, testCase := range testCases {
regionA := RegionFromHeartbeat(&pdpb.RegionHeartbeatRequest{
Region: &metapb.Region{Id: 100, Peers: pickPeers(testCase.idsA)},
DownPeers: pickPeerStats(testCase.idsA),
PendingPeers: pickPeers(testCase.idsA),
})
}, flowRoundDivisor)
regionB := RegionFromHeartbeat(&pdpb.RegionHeartbeatRequest{
Region: &metapb.Region{Id: 100, Peers: pickPeers(testCase.idsB)},
DownPeers: pickPeerStats(testCase.idsB),
PendingPeers: pickPeers(testCase.idsB),
})
}, flowRoundDivisor)
re.Equal(testCase.isEqual, SortedPeersEqual(regionA.GetVoters(), regionB.GetVoters()))
re.Equal(testCase.isEqual, SortedPeersEqual(regionA.GetVoters(), regionB.GetVoters()))
re.Equal(testCase.isEqual, SortedPeersEqual(regionA.GetPendingPeers(), regionB.GetPendingPeers()))
Expand Down Expand Up @@ -950,9 +951,10 @@ func BenchmarkRegionFromHeartbeat(b *testing.B) {
PendingPeers: []*metapb.Peer{peers[1]},
DownPeers: []*pdpb.PeerStats{{Peer: peers[2], DownSeconds: 100}},
}
flowRoundDivisor := 3
b.ResetTimer()
for i := 0; i < b.N; i++ {
RegionFromHeartbeat(regionReq)
RegionFromHeartbeat(regionReq, flowRoundDivisor)
}
}

Expand Down
14 changes: 7 additions & 7 deletions pkg/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (t *regionTree) notFromStorageRegionsCount() int {
}

// GetOverlaps returns the range items that has some intersections with the given items.
func (t *regionTree) overlaps(item *regionItem) []*regionItem {
func (t *regionTree) overlaps(item *regionItem) []*RegionInfo {
// note that Find() gets the last item that is less or equal than the item.
// in the case: |_______a_______|_____b_____|___c___|
// new item is |______d______|
Expand All @@ -116,12 +116,12 @@ func (t *regionTree) overlaps(item *regionItem) []*regionItem {
result = item
}
endKey := item.GetEndKey()
var overlaps []*regionItem
var overlaps []*RegionInfo
t.tree.AscendGreaterOrEqual(result, func(i *regionItem) bool {
if len(endKey) > 0 && bytes.Compare(endKey, i.GetStartKey()) <= 0 {
return false
}
overlaps = append(overlaps, i)
overlaps = append(overlaps, i.RegionInfo)
return true
})
return overlaps
Expand All @@ -130,7 +130,7 @@ func (t *regionTree) overlaps(item *regionItem) []*regionItem {
// update updates the tree with the region.
// It finds and deletes all the overlapped regions first, and then
// insert the region.
func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*regionItem) []*RegionInfo {
func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*RegionInfo) []*RegionInfo {
region := item.RegionInfo
t.totalSize += region.approximateSize
regionWriteBytesRate, regionWriteKeysRate := region.GetWriteRate()
Expand All @@ -145,15 +145,15 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re
}

for _, old := range overlaps {
t.tree.Delete(old)
t.tree.Delete(&regionItem{RegionInfo: old})
}
t.tree.ReplaceOrInsert(item)
if t.countRef {
item.RegionInfo.IncRef()
}
result := make([]*RegionInfo, len(overlaps))
for i, overlap := range overlaps {
old := overlap.RegionInfo
old := overlap
result[i] = old
log.Debug("overlapping region",
zap.Uint64("region-id", old.GetID()),
Expand All @@ -174,7 +174,7 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re
return result
}

// updateStat is used to update statistics when regionItem.RegionInfo is directly replaced.
// updateStat is used to update statistics when RegionInfo is directly replaced.
func (t *regionTree) updateStat(origin *RegionInfo, region *RegionInfo) {
t.totalSize += region.approximateSize
regionWriteBytesRate, regionWriteKeysRate := region.GetWriteRate()
Expand Down
4 changes: 2 additions & 2 deletions pkg/core/region_tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ func TestRegionTree(t *testing.T) {
updateNewItem(tree, regionA)
updateNewItem(tree, regionC)
re.Nil(tree.overlaps(newRegionItem([]byte("b"), []byte("c"))))
re.Equal(regionC, tree.overlaps(newRegionItem([]byte("c"), []byte("d")))[0].RegionInfo)
re.Equal(regionC, tree.overlaps(newRegionItem([]byte("a"), []byte("cc")))[1].RegionInfo)
re.Equal(regionC, tree.overlaps(newRegionItem([]byte("c"), []byte("d")))[0])
re.Equal(regionC, tree.overlaps(newRegionItem([]byte("a"), []byte("cc")))[1])
re.Nil(tree.search([]byte{}))
re.Equal(regionA, tree.search([]byte("a")))
re.Nil(tree.search([]byte("b")))
Expand Down
16 changes: 14 additions & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,11 +443,23 @@
utils.RegionWriteKeys: 0,
utils.RegionWriteQueryNum: 0,
}
c.hotStat.CheckReadAsync(statistics.NewCheckReadPeerTask(region, []*metapb.Peer{peer}, loads, interval))
checkReadPeerTask := func(cache *statistics.HotPeerCache) {
stats := cache.CheckPeerFlow(region, []*metapb.Peer{peer}, loads, interval)
for _, stat := range stats {
cache.UpdateStat(stat)

Check warning on line 449 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L446-L449

Added lines #L446 - L449 were not covered by tests
}
}
c.hotStat.CheckReadAsync(checkReadPeerTask)

Check warning on line 452 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L452

Added line #L452 was not covered by tests
}

// Here we will compare the reported regions with the previous hot peers to decide if it is still hot.
c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval))
collectUnReportedPeerTask := func(cache *statistics.HotPeerCache) {
stats := cache.CheckColdPeer(storeID, regions, interval)
for _, stat := range stats {
cache.UpdateStat(stat)

Check warning on line 459 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L459

Added line #L459 was not covered by tests
}
}
c.hotStat.CheckReadAsync(collectUnReportedPeerTask)
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeat
s.hbStreams.BindStream(storeID, server)
lastBind = time.Now()
}
region := core.RegionFromHeartbeat(request)
// scheduling service doesn't sync the pd server config, so we use 0 here
region := core.RegionFromHeartbeat(request, 0)
err = c.HandleRegionHeartbeat(region)
if err != nil {
// TODO: if we need to send the error back to API server.
Expand Down
Loading