diff --git a/README.md b/README.md index e204312..5a11a1c 100644 --- a/README.md +++ b/README.md @@ -2,24 +2,20 @@ > 基于前缀树的发布/订阅 -Topic抄的MQTT的方法用路径来匹配 性能....就那样.... GoTest: ```raw -[clark@ArchLinux npubo]$ go test -benchmem -bench . -sub &{0xc00004c4d0 0xc000026040 sub_one/one QwQ true} message QwQ -sub &{0xc00004c4e0 0xc000026040 sub_one/timeout QwQ true} error subscriber timeout -sub &{0xc00004c4f0 0xc000026040 sub_one/error QwQ true} error a error +[clark@ArchOwO npubo]$ go test -benchmem -bench . goos: linux goarch: amd64 pkg: npubo -cpu: Intel(R) Core(TM) i5-7300HQ CPU @ 2.50GHz -BenchmarkSub-4 754231 1427 ns/op 527 B/op 8 allocs/op -BenchmarkPub-4 1 1780671142 ns/op 332174032 B/op 6788814 allocs/op +cpu: AMD Ryzen 7 5800H with Radeon Graphics +BenchmarkSub-16 8911771 138.6 ns/op 100 B/op 2 allocs/op +BenchmarkPub-16 4401676 242.6 ns/op 88 B/op 4 allocs/op PASS -ok npubo 3.190s +ok npubo 2.726s ``` 示例: @@ -27,19 +23,15 @@ ok npubo 3.190s ```go // 初始化 - pub := npubo.NewPublisher(500) + n := npubo.New() - // 订阅 - pub.Subscribe("sub/1", "QwQ", func(sub *Subscriber, val interface{}) error { - fmt.Println(sub, val) + n.Subscribe("/user/*", func(c *npubo.Context) error { + fmt.Printf("topic: %s, data: %s\n", + c.Topic(), c.String()) return nil }) - - // 发布 topic 支持星号通配 ("sub/*", "*") - pub.Publish("sub/1", "消息", func(sub *Subscriber, e error) { - fmt.Println(sub, e) - }) - pub.Close() + n.Publish("/user/1231/dwdw", "qaq") + n.Publish("/user/123", n) ``` \ No newline at end of file diff --git a/context.go b/context.go new file mode 100644 index 0000000..4c518f5 --- /dev/null +++ b/context.go @@ -0,0 +1,46 @@ +package npubo + +import ( + "fmt" + "reflect" +) + +type Context struct { + topic string + data interface{} +} + +func newContext(topic string, data interface{}) *Context { + return &Context{ + topic: topic, + data: data, + } +} + +func (c *Context) Topic() string { + return c.topic +} + +func (c *Context) Data() interface{} { + return c.data +} + +func (c *Context) Int64() int64 { + if v, ok := c.data.(int64); ok { + return v + } + + return 0 +} + +func (c *Context) String() string { + if v, ok := c.data.(string); ok { + return v + } + + return fmt.Sprint(c.data) +} + +func (c *Context) Type() string { + return reflect.TypeOf(c.data).Name() +} diff --git a/examples/main.go b/examples/main.go new file mode 100644 index 0000000..c1ccd81 --- /dev/null +++ b/examples/main.go @@ -0,0 +1,19 @@ +package main + +import ( + "fmt" + "npubo" +) + +func main() { + n := npubo.New() + + n.Subscribe("/user/*", func(c *npubo.Context) error { + fmt.Printf("topic: %s, data: %s\n", + c.Topic(), c.String()) + return nil + }) + + n.Publish("/user/1231/dwdw", "qaq") + n.Publish("/user/123", n) +} diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..e69de29 diff --git a/npubo.go b/npubo.go index 00da64e..8debbe3 100644 --- a/npubo.go +++ b/npubo.go @@ -1,143 +1,89 @@ package npubo import ( - "errors" "fmt" - "runtime" "sync" ) -var ( - ErrTopicNotFound = errors.New("topic not found") - ErrSubscriberTimeout = errors.New("subscriber timeout") - ErrInvaildTopic = errors.New("invaild topic") - ErrNilNode = errors.New("nil node ") -) - -type ( - // 订阅消息回调 - Call func(sub *Subscriber, val interface{}) error +type Npubo struct { + node *Node + rwLock *sync.RWMutex +} - // 推送错误返回回调 - ErrCall func(sub *Subscriber, e error) +type Subscriber struct { + npubo *Npubo + n *Node + id int +} - ChanCall struct { - Subscriber *Subscriber - Content interface{} +func New() *Npubo { + return &Npubo{ + node: newNode(nil), + rwLock: &sync.RWMutex{}, } +} - // 前缀树节点 - Node struct { - Calls map[string]*Call - NextNode map[string]*Node - } +func (n *Npubo) Subscribe(topic string, h HandlerFunc) *Subscriber { + n.rwLock.Lock() + defer n.rwLock.Unlock() - // 推送结构 - Publisher struct { - Node *Node - Root *Node - timeout int - openChan bool - rwLock *sync.RWMutex - workerLock *sync.Mutex - } + node, id := n.node.addNode(topic, h) + return &Subscriber{n, node, id} +} - // 订阅结构 - Subscriber struct { - Node *Node // 所在节点 - Publisher *Publisher // 所在推送 - Topic string // 订阅路径 - CallTopic string // 推送订阅路径 - CId string // 客户端Id - C chan (ChanCall) // 通道订阅 - isWorker bool - } -) +func (n *Subscriber) Unsubscribe() { + n.npubo.rwLock.Lock() + defer n.npubo.rwLock.Unlock() -// 初始化节点 -func newNode() *Node { - return &Node{ - NextNode: make(map[string]*Node), - Calls: make(map[string]*Call), - } + n.n.removeNode(n.id) } -func NewPublisher(timeout int, openChan bool) *Publisher { - return &Publisher{ - Node: newNode(), - Root: &Node{ - Calls: make(map[string]*Call), - NextNode: nil, - }, - openChan: openChan, - timeout: timeout, - rwLock: &sync.RWMutex{}, - workerLock: &sync.Mutex{}, - } -} +func (n *Npubo) Publish(topic string, data interface{}) error { + n.rwLock.RLock() + defer n.rwLock.RUnlock() -func (that *Subscriber) RootEvict(c_id string, call Call) error { - if that.isWorker { - that.Publisher.workerLock.Lock() - defer that.Publisher.workerLock.Unlock() - } else { - that.Publisher.rwLock.Lock() - defer that.Publisher.rwLock.Unlock() - } - if that.Node == nil { - return nil + h := n.node.getRoute(topic) + if h == nil { + return fmt.Errorf("no handler for topic: %s", topic) } - delete(that.Node.Calls, that.CId) - that.Node = nil - return nil -} + c := newContext(topic, data) -// 取消订阅 -func (that *Subscriber) Evict() error { - if that.isWorker { - that.Publisher.workerLock.Lock() - defer that.Publisher.workerLock.Unlock() - } else { - that.Publisher.rwLock.Lock() - defer that.Publisher.rwLock.Unlock() - } - if that.Node == nil { - return nil + for i := 0; i < len(h); i++ { + if e := func() (e error) { + defer func() { + if err := recover(); err != nil { + e = fmt.Errorf("%v", err) + } + }() + + return h[i](c) + }(); e != nil { + return e + } } - delete(that.Node.Calls, that.CId) - defer func() { recover() }() - close(that.C) - that.Node = nil return nil } -// 重写订阅函数 -func (that *Subscriber) RewriteCall(call Call) error { - if that.isWorker { - that.Publisher.workerLock.Lock() - defer that.Publisher.workerLock.Unlock() - } else { - that.Publisher.rwLock.Lock() - defer that.Publisher.rwLock.Unlock() - } - if that.Node == nil { - return ErrNilNode - } +func (n *Npubo) NepoPublish(topic string, data interface{}) { + n.rwLock.RLock() + defer n.rwLock.RUnlock() - that.Node.Calls[that.CId] = &call - return nil -} + h := n.node.getRoute(topic) + if h == nil { + return + } -// 关闭实例 -func (that *Publisher) Close() { - that.rwLock.RLock() - defer that.rwLock.RUnlock() - that.Node, that.Root = nil, nil + c := newContext(topic, data) - runtime.GC() + for i := 0; i < len(h); i++ { + func() { + defer func() { + recover() + }() - that.Node, that.Root = newNode(), newNode() - fmt.Println(that) + h[i](c) + }() + } } diff --git a/npubo_test.go b/npubo_test.go index 1456288..481a7a7 100644 --- a/npubo_test.go +++ b/npubo_test.go @@ -1,69 +1,30 @@ package npubo_test import ( - "errors" "fmt" "npubo" "testing" - "time" ) -var pub *npubo.Publisher = npubo.NewPublisher(500, true) - -func TestSub(t *testing.T) { - - sub, _ := pub.Subscribe("sub_one/one", "QwQ", true, func(sub *npubo.Subscriber, val interface{}) error { - fmt.Println("sub", sub, " message", val) - return nil - }) - - go func(sub *npubo.Subscriber) { - for val := range sub.C { - fmt.Println("chan:", val.Subscriber.CallTopic) - } - }(sub) - - //sub.Evict() - - pub.Subscribe("sub_one/timeout", "QwQ", false, func(sub *npubo.Subscriber, val interface{}) error { - time.Sleep(time.Second) - return nil - }) - - pub.Subscribe("sub_one/error", "QwQ", false, func(sub *npubo.Subscriber, val interface{}) error { - return errors.New("a error") - }) - //pub.Close() -} - -func TestPub(t *testing.T) { - pub.Publish("sub_one/one", "QwQ", func(sub *npubo.Subscriber, e error) { - fmt.Println("sub", sub, " error", e) - }) - - pub.Publish("sub_one/timeout", "Message", func(sub *npubo.Subscriber, e error) { - fmt.Println("sub", sub, " error", e) - }) - - pub.Publish("sub_one/error", "Message", func(sub *npubo.Subscriber, e error) { - fmt.Println("sub", sub, " error", e) - }) - - pub.Publish("*", "Call All Subscriber", func(sub *npubo.Subscriber, e error) { - fmt.Println("sub", sub, " error", e) - }) -} - func BenchmarkSub(b *testing.B) { + n := npubo.New() + for i := 0; i < b.N; i++ { - pub.Subscribe(fmt.Sprintf("sub_more/%v", i), "QwQ", false, func(sub *npubo.Subscriber, val interface{}) error { + n.Subscribe("sub_more/*id", func(c *npubo.Context) error { return nil }) } + } func BenchmarkPub(b *testing.B) { - pub.Publish("sub_more/*", "Message", func(sub *npubo.Subscriber, e error) { - return + n := npubo.New() + + n.Subscribe("sub_more/*id", func(c *npubo.Context) error { + return nil }) + + for i := 0; i < b.N; i++ { + n.NepoPublish(fmt.Sprintf("sub_more/%d", i), "1") + } } diff --git a/pub.go b/pub.go deleted file mode 100644 index b048456..0000000 --- a/pub.go +++ /dev/null @@ -1,115 +0,0 @@ -package npubo - -import ( - "errors" - "fmt" - "strings" - "sync" - "time" -) - -// 发布消息 -func (that *Publisher) Publish(topic string, val interface{}, errBack ErrCall) { - that.rwLock.RLock() - defer that.rwLock.RUnlock() - - // 觉得慢可以加协程 - that.callNode(that.Root, "*", topic, errBack, val) - - nowNode := that.Node.NextNode - cals := strings.Split(topic, "/") - - topicRecode := []string{} - for i, v := range cals { - if _, ok := nowNode[v]; ok { // 正常匹配路径 - topicRecode = append(topicRecode, v) - if i == len(cals)-1 { - that.callNode(nowNode[v], strings.Join(topicRecode, "/"), topic, errBack, val) - } - nowNode = nowNode[v].NextNode - } else if v == "*" { // 匹配通配符 - that.callAllNode(nowNode, errBack, topicRecode, topic, val) - break - } else { - sub := &Subscriber{ - Topic: topic, - CallTopic: topic, - Publisher: that, - isWorker: true, - } - - that.callErrBack(errBack, sub, ErrTopicNotFound) - break - } - } -} - -func (that *Publisher) callNode(node *Node, topic, callTopic string, errBack ErrCall, val interface{}) { - for c_id, call := range node.Calls { - sub := &Subscriber{ - Topic: topic, - CallTopic: callTopic, - CId: c_id, - Node: node, - Publisher: that, - isWorker: true, - } - e := that.callSubscriber(call, sub, val) - if e != nil && errBack != nil { - that.callErrBack(errBack, sub, e) - } - } -} - -func (that *Publisher) callAllNode(nowNode map[string]*Node, errBack ErrCall, topicInit []string, callTopic string, val interface{}) { - var wg sync.WaitGroup - for t, n := range nowNode { - topic := topicInit - topic = append(topic, t) - - that.callNode(n, strings.Join(topic, "/"), callTopic, errBack, val) - - wg.Add(1) - go func(nextNode map[string]*Node, topic []string, val interface{}) { - defer wg.Done() - that.callAllNode(nextNode, errBack, topic, callTopic, val) - }(n.NextNode, topic, val) - } - wg.Wait() -} - -func (that *Publisher) callSubscriber(call *Call, sub *Subscriber, val interface{}) (e error) { - done := make(chan byte, 0) - go func() { - defer func() { - if r := recover(); r != nil { - e = errors.New(fmt.Sprint(r)) - } - }() - e = (*call)(sub, val) - done <- 0 - }() - select { - case <-done: - return e - case <-time.After(time.Microsecond * time.Duration(that.timeout)): - close(done) - return ErrSubscriberTimeout - } -} - -func (that *Publisher) callErrBack(errBack ErrCall, sub *Subscriber, e error) { - done := make(chan byte, 0) - go func() { - defer func() { recover() }() - errBack(sub, e) - done <- 0 - }() - select { - case <-done: - return - case <-time.After(time.Microsecond * time.Duration(that.timeout)): - close(done) - return - } -} diff --git a/router.go b/router.go new file mode 100644 index 0000000..8e91764 --- /dev/null +++ b/router.go @@ -0,0 +1,77 @@ +package npubo + +import ( + "strings" +) + +type Node struct { + field *Node // 上级节点 + part string // 当前节点路径 + handler []HandlerFunc + node map[string]*Node +} + +type HandlerFunc func(*Context) error + +func newNode(field *Node) *Node { + return &Node{ + field: field, + part: "", + handler: []HandlerFunc{}, + node: make(map[string]*Node), + } +} + +func (p *Node) addNode(topic string, h HandlerFunc) (*Node, int) { + parts := strings.Split(topic, "/") + + for i := 0; i < len(parts); i++ { + t := parts[i] + + // 路由结构是否存在 + if _, ok := p.node[t]; !ok { + p.node[t] = newNode(p) + } + + // 指向 + p = p.node[t] + p.part = parts[i] + + if t == "*" { + break + } + } + + // 添加请求 + p.handler = append(p.handler, h) + return p, len(p.handler) - 1 +} + +func (p *Node) getRoute(topic string) []HandlerFunc { + parts := strings.Split(topic, "/") + handler := []HandlerFunc{} + + for i := 0; i < len(parts); i++ { + t := parts[i] + + // 查找以及通配 + if v, ok := p.node["*"]; ok { + handler = append(handler, v.handler...) + } + + if _, ok := p.node[t]; !ok { + break + } + + p = p.node[t] + handler = append(handler, p.handler...) + } + + return handler +} + +func (p *Node) removeNode(id int) { + if len(p.handler) > id { + p.handler = append(p.handler[:id], p.handler[id+1:]...) + } +} diff --git a/sub.go b/sub.go deleted file mode 100644 index c3703b4..0000000 --- a/sub.go +++ /dev/null @@ -1,59 +0,0 @@ -package npubo - -import "strings" - -// 订阅 -func (that *Publisher) Subscribe(topic string, c_id string, openChan bool, call Call) (*Subscriber, error) { - that.rwLock.Lock() - defer that.rwLock.Unlock() - - nowNode := that.Node.NextNode - cals := strings.Split(topic, "/") - sub := &Subscriber{ - Topic: topic, - Publisher: that, - CId: c_id, - Node: nil, - C: make(chan ChanCall), - } - - if strings.Contains(topic, "*") || cals[0] == "" { - return sub, ErrInvaildTopic - } - - for i, v := range cals { - if _, ok := nowNode[v]; !ok { - nowNode[v] = newNode() - } - if i == len(cals)-1 { - nowNode[v].Calls[c_id] = &call - - if that.openChan && openChan { - chanCallBack := func(subData *Subscriber, val interface{}) error { - sub.C <- ChanCall{ - Subscriber: subData, - Content: val, - } - return nil - } - nowNode[v].Calls["__chan__"+c_id] = (*Call)(&chanCallBack) - } - - sub.Node = nowNode[v] - } - nowNode = nowNode[v].NextNode - } - return sub, nil -} - -func (that *Publisher) RootSubscribe(c_id string, call Call) *Subscriber { - that.rwLock.Lock() - defer that.rwLock.Unlock() - - that.Root.Calls[c_id] = &call - return &Subscriber{ - Topic: "*", - CId: c_id, - Node: that.Root, - } -}