Skip to content

Commit

Permalink
finished resubscribe tests
Browse files Browse the repository at this point in the history
  • Loading branch information
256dpi committed Sep 14, 2018
1 parent 011de7d commit 1b0c077
Showing 1 changed file with 10 additions and 146 deletions.
156 changes: 10 additions & 146 deletions client/service_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package client

import (
"fmt"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -368,7 +366,7 @@ func TestServiceResubscribe(t *testing.T) {
s := NewService()
s.MinReconnectDelay = 50 * time.Millisecond

i := uint32(0)
i := 0

s.OnlineCallback = func(_ bool) {
i++
Expand Down Expand Up @@ -410,10 +408,10 @@ func TestServiceResubscribe(t *testing.T) {
safeReceive(offline)
safeReceive(done)

assert.Equal(t, uint32(2), i)
assert.Equal(t, 2, i)
}

func TestServiceReconnectResubscribeTimeout(t *testing.T) {
func TestServiceResubscribeTimeout(t *testing.T) {
subscribe1 := packet.NewSubscribe()
subscribe1.Subscriptions = []packet.Subscription{{Topic: "test", QOS: 0}}
subscribe1.ID = 1
Expand Down Expand Up @@ -451,25 +449,21 @@ func TestServiceReconnectResubscribeTimeout(t *testing.T) {

s := NewService()
s.MinReconnectDelay = 50 * time.Millisecond
s.ConnectTimeout = 50 * time.Millisecond
s.ResubscribeTimeout = 55 * time.Millisecond
s.Logger = func(msg string) {
fmt.Println(msg)
}
s.ResubscribeTimeout = 50 * time.Millisecond

i := uint32(0)
i := 0

s.OnlineCallback = func(_ bool) {
j := atomic.AddUint32(&i, 1)
if j == 1 {
i++
if i == 1 {
close(online1)
} else if j == 3 {
} else if i == 2 {
close(online2)
}
}

s.OfflineCallback = func() {
if atomic.LoadUint32(&i) == 3 {
if i == 2 {
close(offline)
}
}
Expand All @@ -487,137 +481,7 @@ func TestServiceReconnectResubscribeTimeout(t *testing.T) {
safeReceive(offline)
safeReceive(done)

assert.Equal(t, uint32(3), i)
}

func TestServiceReconnectResubscribeClosed(t *testing.T) {
subscribe1 := packet.NewSubscribe()
subscribe1.Subscriptions = []packet.Subscription{{Topic: "test", QOS: 0}}
subscribe1.ID = 1

suback1 := packet.NewSuback()
suback1.ReturnCodes = []uint8{0}
suback1.ID = 1

publish := packet.NewPublish()
publish.Message.Topic = "test"
publish.Message.Payload = []byte("test")

broker1 := flow.New().
Receive(connectPacket()).
Send(connackPacket()).
Receive(subscribe1).
Send(suback1).
Close()

broker2 := flow.New().
Receive(connectPacket()).
Send(connackPacket()).
Send(publish).
Receive(disconnectPacket()).
End()

done, port := fakeBroker(t, broker1, broker2)

online1 := make(chan struct{})
message := make(chan struct{})
offline := make(chan struct{})

s := NewService()
s.MinReconnectDelay = 50 * time.Millisecond
s.ConnectTimeout = 50 * time.Millisecond
s.ResubscribeTimeout = 55 * time.Millisecond
s.ResubscribeAllSubscriptions = false

s.MessageCallback = func(msg *packet.Message) error {
assert.Equal(t, "test", msg.Topic)
assert.Equal(t, []byte("test"), msg.Payload)
assert.Equal(t, uint8(0), msg.QOS)
assert.False(t, msg.Retain)
close(message)
return nil
}

i := uint32(0)

s.OnlineCallback = func(_ bool) {
if atomic.AddUint32(&i, 1) == 1 {
close(online1)
}
}

s.OfflineCallback = func() {
if atomic.LoadUint32(&i) == 2 {
close(offline)
}
}

s.Start(NewConfig("tcp://localhost:" + port))

safeReceive(online1)

assert.NoError(t, s.SubscribeMultiple(subscribe1.Subscriptions).Wait(time.Second))

safeReceive(message)

s.Stop(true)

safeReceive(offline)
safeReceive(done)

assert.Equal(t, uint32(2), i)
}

func TestServiceReconnectResubscribeError(t *testing.T) {
subscribe := packet.NewSubscribe()
subscribe.Subscriptions = []packet.Subscription{{Topic: "test", QOS: 3}}
subscribe.ID = 1

suback := packet.NewSuback()
suback.ReturnCodes = []uint8{packet.QOSFailure}
suback.ID = 1

broker1 := flow.New().
Receive(connectPacket()).
Send(connackPacket()).
Receive(subscribe).
Send(suback).
Receive(disconnectPacket()).
End()

done, port := fakeBroker(t, broker1)

online := make(chan struct{})

s := NewService()
s.MinReconnectDelay = 50 * time.Millisecond
s.ConnectTimeout = 50 * time.Millisecond
s.ResubscribeTimeout = 55 * time.Millisecond

i := uint32(0)

s.OnlineCallback = func(_ bool) {
if atomic.AddUint32(&i, 1) == 1 {
close(online)
}
}

s.ErrorCallback = func(err error) {
assert.EqualError(t, err, "failed subscription")
}

s.Start(NewConfig("tcp://localhost:" + port))

safeReceive(online)

err := s.SubscribeMultiple(subscribe.Subscriptions).Wait(time.Second)
assert.EqualError(t, err, "future canceled")

s.Stop(true)

safeReceive(done)

assert.Equal(t, uint32(1), i)
assert.Equal(t, 2, i)
}

func TestServiceFutureSurvival(t *testing.T) {
Expand Down

0 comments on commit 1b0c077

Please sign in to comment.