Skip to content

Commit

Permalink
✨ feat(重构): 完整重构速度提升
Browse files Browse the repository at this point in the history
  • Loading branch information
ClarkQAQ committed May 18, 2022
1 parent 4ade9d6 commit fbf8e3b
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 357 deletions.
30 changes: 11 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,36 @@

> 基于前缀树的发布/订阅
Topic抄的MQTT的方法用路径来匹配
性能....就那样....

GoTest:

```raw
[clark@ArchLinux npubo]$ go test -benchmem -bench .
sub &{0xc00004c4d0 0xc000026040 sub_one/one QwQ true} message QwQ
sub &{0xc00004c4e0 0xc000026040 sub_one/timeout QwQ true} error subscriber timeout
sub &{0xc00004c4f0 0xc000026040 sub_one/error QwQ true} error a error
[clark@ArchOwO npubo]$ go test -benchmem -bench .
goos: linux
goarch: amd64
pkg: npubo
cpu: Intel(R) Core(TM) i5-7300HQ CPU @ 2.50GHz
BenchmarkSub-4 754231 1427 ns/op 527 B/op 8 allocs/op
BenchmarkPub-4 1 1780671142 ns/op 332174032 B/op 6788814 allocs/op
cpu: AMD Ryzen 7 5800H with Radeon Graphics
BenchmarkSub-16 8911771 138.6 ns/op 100 B/op 2 allocs/op
BenchmarkPub-16 4401676 242.6 ns/op 88 B/op 4 allocs/op
PASS
ok npubo 3.190s
ok npubo 2.726s
```

示例:

```go

// 初始化
pub := npubo.NewPublisher(500)
n := npubo.New()

// 订阅
pub.Subscribe("sub/1", "QwQ", func(sub *Subscriber, val interface{}) error {
fmt.Println(sub, val)
n.Subscribe("/user/*", func(c *npubo.Context) error {
fmt.Printf("topic: %s, data: %s\n",
c.Topic(), c.String())
return nil
})

// 发布 topic 支持星号通配 ("sub/*", "*")
pub.Publish("sub/1", "消息", func(sub *Subscriber, e error) {
fmt.Println(sub, e)
})

pub.Close()
n.Publish("/user/1231/dwdw", "qaq")
n.Publish("/user/123", n)

```
46 changes: 46 additions & 0 deletions context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package npubo

import (
"fmt"
"reflect"
)

type Context struct {
topic string
data interface{}
}

func newContext(topic string, data interface{}) *Context {
return &Context{
topic: topic,
data: data,
}
}

func (c *Context) Topic() string {
return c.topic
}

func (c *Context) Data() interface{} {
return c.data
}

func (c *Context) Int64() int64 {
if v, ok := c.data.(int64); ok {
return v
}

return 0
}

func (c *Context) String() string {
if v, ok := c.data.(string); ok {
return v
}

return fmt.Sprint(c.data)
}

func (c *Context) Type() string {
return reflect.TypeOf(c.data).Name()
}
19 changes: 19 additions & 0 deletions examples/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package main

import (
"fmt"
"npubo"
)

func main() {
n := npubo.New()

n.Subscribe("/user/*", func(c *npubo.Context) error {
fmt.Printf("topic: %s, data: %s\n",
c.Topic(), c.String())
return nil
})

n.Publish("/user/1231/dwdw", "qaq")
n.Publish("/user/123", n)
}
Empty file added go.sum
Empty file.
172 changes: 59 additions & 113 deletions npubo.go
Original file line number Diff line number Diff line change
@@ -1,143 +1,89 @@
package npubo

import (
"errors"
"fmt"
"runtime"
"sync"
)

var (
ErrTopicNotFound = errors.New("topic not found")
ErrSubscriberTimeout = errors.New("subscriber timeout")
ErrInvaildTopic = errors.New("invaild topic")
ErrNilNode = errors.New("nil node ")
)

type (
// 订阅消息回调
Call func(sub *Subscriber, val interface{}) error
type Npubo struct {
node *Node
rwLock *sync.RWMutex
}

// 推送错误返回回调
ErrCall func(sub *Subscriber, e error)
type Subscriber struct {
npubo *Npubo
n *Node
id int
}

ChanCall struct {
Subscriber *Subscriber
Content interface{}
func New() *Npubo {
return &Npubo{
node: newNode(nil),
rwLock: &sync.RWMutex{},
}
}

// 前缀树节点
Node struct {
Calls map[string]*Call
NextNode map[string]*Node
}
func (n *Npubo) Subscribe(topic string, h HandlerFunc) *Subscriber {
n.rwLock.Lock()
defer n.rwLock.Unlock()

// 推送结构
Publisher struct {
Node *Node
Root *Node
timeout int
openChan bool
rwLock *sync.RWMutex
workerLock *sync.Mutex
}
node, id := n.node.addNode(topic, h)
return &Subscriber{n, node, id}
}

// 订阅结构
Subscriber struct {
Node *Node // 所在节点
Publisher *Publisher // 所在推送
Topic string // 订阅路径
CallTopic string // 推送订阅路径
CId string // 客户端Id
C chan (ChanCall) // 通道订阅
isWorker bool
}
)
func (n *Subscriber) Unsubscribe() {
n.npubo.rwLock.Lock()
defer n.npubo.rwLock.Unlock()

// 初始化节点
func newNode() *Node {
return &Node{
NextNode: make(map[string]*Node),
Calls: make(map[string]*Call),
}
n.n.removeNode(n.id)
}

func NewPublisher(timeout int, openChan bool) *Publisher {
return &Publisher{
Node: newNode(),
Root: &Node{
Calls: make(map[string]*Call),
NextNode: nil,
},
openChan: openChan,
timeout: timeout,
rwLock: &sync.RWMutex{},
workerLock: &sync.Mutex{},
}
}
func (n *Npubo) Publish(topic string, data interface{}) error {
n.rwLock.RLock()
defer n.rwLock.RUnlock()

func (that *Subscriber) RootEvict(c_id string, call Call) error {
if that.isWorker {
that.Publisher.workerLock.Lock()
defer that.Publisher.workerLock.Unlock()
} else {
that.Publisher.rwLock.Lock()
defer that.Publisher.rwLock.Unlock()
}
if that.Node == nil {
return nil
h := n.node.getRoute(topic)
if h == nil {
return fmt.Errorf("no handler for topic: %s", topic)
}

delete(that.Node.Calls, that.CId)
that.Node = nil
return nil
}
c := newContext(topic, data)

// 取消订阅
func (that *Subscriber) Evict() error {
if that.isWorker {
that.Publisher.workerLock.Lock()
defer that.Publisher.workerLock.Unlock()
} else {
that.Publisher.rwLock.Lock()
defer that.Publisher.rwLock.Unlock()
}
if that.Node == nil {
return nil
for i := 0; i < len(h); i++ {
if e := func() (e error) {
defer func() {
if err := recover(); err != nil {
e = fmt.Errorf("%v", err)
}
}()

return h[i](c)
}(); e != nil {
return e
}
}

delete(that.Node.Calls, that.CId)
defer func() { recover() }()
close(that.C)
that.Node = nil
return nil
}

// 重写订阅函数
func (that *Subscriber) RewriteCall(call Call) error {
if that.isWorker {
that.Publisher.workerLock.Lock()
defer that.Publisher.workerLock.Unlock()
} else {
that.Publisher.rwLock.Lock()
defer that.Publisher.rwLock.Unlock()
}
if that.Node == nil {
return ErrNilNode
}
func (n *Npubo) NepoPublish(topic string, data interface{}) {
n.rwLock.RLock()
defer n.rwLock.RUnlock()

that.Node.Calls[that.CId] = &call
return nil
}
h := n.node.getRoute(topic)
if h == nil {
return
}

// 关闭实例
func (that *Publisher) Close() {
that.rwLock.RLock()
defer that.rwLock.RUnlock()
that.Node, that.Root = nil, nil
c := newContext(topic, data)

runtime.GC()
for i := 0; i < len(h); i++ {
func() {
defer func() {
recover()
}()

that.Node, that.Root = newNode(), newNode()
fmt.Println(that)
h[i](c)
}()
}
}
Loading

0 comments on commit fbf8e3b

Please sign in to comment.