Skip to content

Commit

Permalink
fix 自动订阅chan带来的超时问题
Browse files Browse the repository at this point in the history
  • Loading branch information
ClarkQAQ committed May 7, 2021
1 parent d57183f commit 4ade9d6
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 6 deletions.
8 changes: 4 additions & 4 deletions npubo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ var pub *npubo.Publisher = npubo.NewPublisher(500, true)

func TestSub(t *testing.T) {

sub, _ := pub.Subscribe("sub_one/one", "QwQ", func(sub *npubo.Subscriber, val interface{}) error {
sub, _ := pub.Subscribe("sub_one/one", "QwQ", true, func(sub *npubo.Subscriber, val interface{}) error {
fmt.Println("sub", sub, " message", val)
return nil
})
Expand All @@ -25,12 +25,12 @@ func TestSub(t *testing.T) {

//sub.Evict()

pub.Subscribe("sub_one/timeout", "QwQ", func(sub *npubo.Subscriber, val interface{}) error {
pub.Subscribe("sub_one/timeout", "QwQ", false, func(sub *npubo.Subscriber, val interface{}) error {
time.Sleep(time.Second)
return nil
})

pub.Subscribe("sub_one/error", "QwQ", func(sub *npubo.Subscriber, val interface{}) error {
pub.Subscribe("sub_one/error", "QwQ", false, func(sub *npubo.Subscriber, val interface{}) error {
return errors.New("a error")
})
//pub.Close()
Expand All @@ -56,7 +56,7 @@ func TestPub(t *testing.T) {

func BenchmarkSub(b *testing.B) {
for i := 0; i < b.N; i++ {
pub.Subscribe(fmt.Sprintf("sub_more/%v", i), "QwQ", func(sub *npubo.Subscriber, val interface{}) error {
pub.Subscribe(fmt.Sprintf("sub_more/%v", i), "QwQ", false, func(sub *npubo.Subscriber, val interface{}) error {
return nil
})
}
Expand Down
4 changes: 2 additions & 2 deletions sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package npubo
import "strings"

// 订阅
func (that *Publisher) Subscribe(topic string, c_id string, call Call) (*Subscriber, error) {
func (that *Publisher) Subscribe(topic string, c_id string, openChan bool, call Call) (*Subscriber, error) {
that.rwLock.Lock()
defer that.rwLock.Unlock()

Expand All @@ -28,7 +28,7 @@ func (that *Publisher) Subscribe(topic string, c_id string, call Call) (*Subscri
if i == len(cals)-1 {
nowNode[v].Calls[c_id] = &call

if that.openChan {
if that.openChan && openChan {
chanCallBack := func(subData *Subscriber, val interface{}) error {
sub.C <- ChanCall{
Subscriber: subData,
Expand Down

0 comments on commit 4ade9d6

Please sign in to comment.