diff --git a/conf/cluster.toml b/conf/cluster.toml index 079efc8d..80efeab9 100644 --- a/conf/cluster.toml +++ b/conf/cluster.toml @@ -32,7 +32,7 @@ db="file://.?cap=10000000&checkSeconds=10" deliverySeconds=5 maxDeliverWorkers=8000 - recoverSeconds=10 + recoverSeconds=1 recievePermitsPerSecond=20000 [clusters.mysql_dev] diff --git a/handler/deliver_pre.go b/handler/deliver_pre.go index ca26290a..6a68ff61 100644 --- a/handler/deliver_pre.go +++ b/handler/deliver_pre.go @@ -9,6 +9,7 @@ import ( "github.com/blackbeans/kiteq-common/store" packet "github.com/blackbeans/turbo/packet" p "github.com/blackbeans/turbo/pipe" + "sync/atomic" "time" ) @@ -17,7 +18,8 @@ type DeliverPreHandler struct { p.BaseForwardHandler kitestore store.IKiteStore exchanger *exchange.BindExchanger - maxDeliverNum chan byte + maxDeliverNum int32 + conditions int32 deliverTimeout time.Duration flowstat *stat.FlowStat deliveryRegistry *stat.DeliveryRegistry @@ -31,9 +33,11 @@ func NewDeliverPreHandler(name string, kitestore store.IKiteStore, phandler.BaseForwardHandler = p.NewBaseForwardHandler(name, phandler) phandler.kitestore = kitestore phandler.exchanger = exchanger - phandler.maxDeliverNum = make(chan byte, maxDeliverWorker) + phandler.maxDeliverNum = (int32)(maxDeliverWorker) + phandler.conditions = 0 phandler.flowstat = flowstat phandler.deliveryRegistry = deliveryRegistry + return phandler } @@ -62,17 +66,30 @@ func (self *DeliverPreHandler) Process(ctx *p.DefaultPipelineContext, event p.IE return nil } - self.maxDeliverNum <- 1 - self.flowstat.DeliverGo.Incr(1) - go func() { - defer func() { - <-self.maxDeliverNum - self.flowstat.DeliverGo.Incr(-1) - }() - //启动投递 - self.send0(ctx, pevent) - self.flowstat.DeliverFlow.Incr(1) - }() + /** + * 尝试三次进行spinlock + **/ + for i := 0; i < 3; i++ { + old := atomic.LoadInt32(&self.conditions) + if (old + 1) > self.maxDeliverNum { + continue + } + if atomic.CompareAndSwapInt32(&self.conditions, old, old+1) { + self.flowstat.DeliverGo.Incr(1) + go func() { + defer func() { + atomic.AddInt32(&self.conditions, -1) + self.flowstat.DeliverGo.Incr(-1) + }() + //启动投递 + self.send0(ctx, pevent) + self.flowstat.DeliverFlow.Incr(1) + }() + break + } else { + + } + } return nil } diff --git a/server/kiteq_server.go b/server/kiteq_server.go index 43a15863..45a1dc16 100644 --- a/server/kiteq_server.go +++ b/server/kiteq_server.go @@ -61,13 +61,13 @@ func NewKiteQServer(kc KiteQConfig) *KiteQServer { //重投策略 rw := make([]handler.RedeliveryWindow, 0, 10) - rw = append(rw, handler.NewRedeliveryWindow(0, 3, 10)) - rw = append(rw, handler.NewRedeliveryWindow(4, 10, 30)) - rw = append(rw, handler.NewRedeliveryWindow(10, 20, 2*30)) - rw = append(rw, handler.NewRedeliveryWindow(20, 30, 4*60)) - rw = append(rw, handler.NewRedeliveryWindow(30, 40, 8*60)) - rw = append(rw, handler.NewRedeliveryWindow(40, 50, 16*60)) - rw = append(rw, handler.NewRedeliveryWindow(50, -1, 32*60)) + rw = append(rw, handler.NewRedeliveryWindow(0, 3, 0)) + rw = append(rw, handler.NewRedeliveryWindow(4, 10, 5)) + rw = append(rw, handler.NewRedeliveryWindow(10, 20, 10)) + rw = append(rw, handler.NewRedeliveryWindow(20, 30, 2*10)) + rw = append(rw, handler.NewRedeliveryWindow(30, 40, 4*10)) + rw = append(rw, handler.NewRedeliveryWindow(40, 50, 8*10)) + rw = append(rw, handler.NewRedeliveryWindow(50, -1, 16*10)) //创建KiteqServer的流控 limiter, _ := turbo.NewBurstyLimiter(kc.so.recievePermitsPerSecond/2, kc.so.recievePermitsPerSecond) diff --git a/server/recover_manager.go b/server/recover_manager.go index 3a3782e9..687ab911 100644 --- a/server/recover_manager.go +++ b/server/recover_manager.go @@ -68,12 +68,11 @@ func (self *RecoverManager) Stop() { } func (self *RecoverManager) redeliverMsg(hashKey string, now time.Time) int { - var hasMore bool = true startIdx := 0 - preTimestamp := time.Now().Unix() + preTimestamp := now.Unix() //开始分页查询未过期的消息实体 - for !self.isClose && hasMore { - more, entities := self.kitestore.PageQueryEntity(hashKey, self.serverName, + for !self.isClose { + _, entities := self.kitestore.PageQueryEntity(hashKey, self.serverName, preTimestamp, startIdx, 200) if len(entities) <= 0 { @@ -95,7 +94,7 @@ func (self *RecoverManager) redeliverMsg(hashKey string, now time.Time) int { } } startIdx += len(entities) - hasMore = more + // hasMore = more preTimestamp = entities[len(entities)-1].NextDeliverTime } return startIdx