From 1b0c077cf915fb735022f62011861290d73849a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=ABl=20G=C3=A4hwiler?= Date: Fri, 14 Sep 2018 11:24:11 +0200 Subject: [PATCH] finished resubscribe tests --- client/service_test.go | 156 +++-------------------------------------- 1 file changed, 10 insertions(+), 146 deletions(-) diff --git a/client/service_test.go b/client/service_test.go index 8e6ff6e..6873070 100644 --- a/client/service_test.go +++ b/client/service_test.go @@ -1,8 +1,6 @@ package client import ( - "fmt" - "sync/atomic" "testing" "time" @@ -368,7 +366,7 @@ func TestServiceResubscribe(t *testing.T) { s := NewService() s.MinReconnectDelay = 50 * time.Millisecond - i := uint32(0) + i := 0 s.OnlineCallback = func(_ bool) { i++ @@ -410,10 +408,10 @@ func TestServiceResubscribe(t *testing.T) { safeReceive(offline) safeReceive(done) - assert.Equal(t, uint32(2), i) + assert.Equal(t, 2, i) } -func TestServiceReconnectResubscribeTimeout(t *testing.T) { +func TestServiceResubscribeTimeout(t *testing.T) { subscribe1 := packet.NewSubscribe() subscribe1.Subscriptions = []packet.Subscription{{Topic: "test", QOS: 0}} subscribe1.ID = 1 @@ -451,25 +449,21 @@ func TestServiceReconnectResubscribeTimeout(t *testing.T) { s := NewService() s.MinReconnectDelay = 50 * time.Millisecond - s.ConnectTimeout = 50 * time.Millisecond - s.ResubscribeTimeout = 55 * time.Millisecond - s.Logger = func(msg string) { - fmt.Println(msg) - } + s.ResubscribeTimeout = 50 * time.Millisecond - i := uint32(0) + i := 0 s.OnlineCallback = func(_ bool) { - j := atomic.AddUint32(&i, 1) - if j == 1 { + i++ + if i == 1 { close(online1) - } else if j == 3 { + } else if i == 2 { close(online2) } } s.OfflineCallback = func() { - if atomic.LoadUint32(&i) == 3 { + if i == 2 { close(offline) } } @@ -487,137 +481,7 @@ func TestServiceReconnectResubscribeTimeout(t *testing.T) { safeReceive(offline) safeReceive(done) - assert.Equal(t, uint32(3), i) -} - -func TestServiceReconnectResubscribeClosed(t *testing.T) { - subscribe1 := packet.NewSubscribe() - subscribe1.Subscriptions = []packet.Subscription{{Topic: "test", QOS: 0}} - subscribe1.ID = 1 - - suback1 := packet.NewSuback() - suback1.ReturnCodes = []uint8{0} - suback1.ID = 1 - - publish := packet.NewPublish() - publish.Message.Topic = "test" - publish.Message.Payload = []byte("test") - - broker1 := flow.New(). - Receive(connectPacket()). - Send(connackPacket()). - Receive(subscribe1). - Send(suback1). - Close() - - broker2 := flow.New(). - Receive(connectPacket()). - Send(connackPacket()). - Send(publish). - Receive(disconnectPacket()). - End() - - done, port := fakeBroker(t, broker1, broker2) - - online1 := make(chan struct{}) - message := make(chan struct{}) - offline := make(chan struct{}) - - s := NewService() - s.MinReconnectDelay = 50 * time.Millisecond - s.ConnectTimeout = 50 * time.Millisecond - s.ResubscribeTimeout = 55 * time.Millisecond - s.ResubscribeAllSubscriptions = false - - s.MessageCallback = func(msg *packet.Message) error { - assert.Equal(t, "test", msg.Topic) - assert.Equal(t, []byte("test"), msg.Payload) - assert.Equal(t, uint8(0), msg.QOS) - assert.False(t, msg.Retain) - close(message) - return nil - } - - i := uint32(0) - - s.OnlineCallback = func(_ bool) { - if atomic.AddUint32(&i, 1) == 1 { - close(online1) - } - } - - s.OfflineCallback = func() { - if atomic.LoadUint32(&i) == 2 { - close(offline) - } - } - - s.Start(NewConfig("tcp://localhost:" + port)) - - safeReceive(online1) - - assert.NoError(t, s.SubscribeMultiple(subscribe1.Subscriptions).Wait(time.Second)) - - safeReceive(message) - - s.Stop(true) - - safeReceive(offline) - safeReceive(done) - - assert.Equal(t, uint32(2), i) -} - -func TestServiceReconnectResubscribeError(t *testing.T) { - subscribe := packet.NewSubscribe() - subscribe.Subscriptions = []packet.Subscription{{Topic: "test", QOS: 3}} - subscribe.ID = 1 - - suback := packet.NewSuback() - suback.ReturnCodes = []uint8{packet.QOSFailure} - suback.ID = 1 - - broker1 := flow.New(). - Receive(connectPacket()). - Send(connackPacket()). - Receive(subscribe). - Send(suback). - Receive(disconnectPacket()). - End() - - done, port := fakeBroker(t, broker1) - - online := make(chan struct{}) - - s := NewService() - s.MinReconnectDelay = 50 * time.Millisecond - s.ConnectTimeout = 50 * time.Millisecond - s.ResubscribeTimeout = 55 * time.Millisecond - - i := uint32(0) - - s.OnlineCallback = func(_ bool) { - if atomic.AddUint32(&i, 1) == 1 { - close(online) - } - } - - s.ErrorCallback = func(err error) { - assert.EqualError(t, err, "failed subscription") - } - - s.Start(NewConfig("tcp://localhost:" + port)) - - safeReceive(online) - - err := s.SubscribeMultiple(subscribe.Subscriptions).Wait(time.Second) - assert.EqualError(t, err, "future canceled") - - s.Stop(true) - - safeReceive(done) - - assert.Equal(t, uint32(1), i) + assert.Equal(t, 2, i) } func TestServiceFutureSurvival(t *testing.T) {