diff --git a/npubo_test.go b/npubo_test.go index 84a3b15..1456288 100644 --- a/npubo_test.go +++ b/npubo_test.go @@ -12,7 +12,7 @@ var pub *npubo.Publisher = npubo.NewPublisher(500, true) func TestSub(t *testing.T) { - sub, _ := pub.Subscribe("sub_one/one", "QwQ", func(sub *npubo.Subscriber, val interface{}) error { + sub, _ := pub.Subscribe("sub_one/one", "QwQ", true, func(sub *npubo.Subscriber, val interface{}) error { fmt.Println("sub", sub, " message", val) return nil }) @@ -25,12 +25,12 @@ func TestSub(t *testing.T) { //sub.Evict() - pub.Subscribe("sub_one/timeout", "QwQ", func(sub *npubo.Subscriber, val interface{}) error { + pub.Subscribe("sub_one/timeout", "QwQ", false, func(sub *npubo.Subscriber, val interface{}) error { time.Sleep(time.Second) return nil }) - pub.Subscribe("sub_one/error", "QwQ", func(sub *npubo.Subscriber, val interface{}) error { + pub.Subscribe("sub_one/error", "QwQ", false, func(sub *npubo.Subscriber, val interface{}) error { return errors.New("a error") }) //pub.Close() @@ -56,7 +56,7 @@ func TestPub(t *testing.T) { func BenchmarkSub(b *testing.B) { for i := 0; i < b.N; i++ { - pub.Subscribe(fmt.Sprintf("sub_more/%v", i), "QwQ", func(sub *npubo.Subscriber, val interface{}) error { + pub.Subscribe(fmt.Sprintf("sub_more/%v", i), "QwQ", false, func(sub *npubo.Subscriber, val interface{}) error { return nil }) } diff --git a/sub.go b/sub.go index 7660c49..c3703b4 100644 --- a/sub.go +++ b/sub.go @@ -3,7 +3,7 @@ package npubo import "strings" // 订阅 -func (that *Publisher) Subscribe(topic string, c_id string, call Call) (*Subscriber, error) { +func (that *Publisher) Subscribe(topic string, c_id string, openChan bool, call Call) (*Subscriber, error) { that.rwLock.Lock() defer that.rwLock.Unlock() @@ -28,7 +28,7 @@ func (that *Publisher) Subscribe(topic string, c_id string, call Call) (*Subscri if i == len(cals)-1 { nowNode[v].Calls[c_id] = &call - if that.openChan { + if that.openChan && openChan { chanCallBack := func(subData *Subscriber, val interface{}) error { sub.C <- ChanCall{ Subscriber: subData,