Skip to content

Commit

Permalink
gomqtt is a very good libs and used in our projects, thank all gomqtt…
Browse files Browse the repository at this point in the history
… contributors. But because some logics of our projects are different from gomqtt's, we cannot use some gomqtt's public structs directly. For example, we need a mqtt client with auto-reconnect feature and without any message cache in memory, so we must implement it by myself and in order to avoid to copy code we hope gomqtt/client/tracker can be marked as a public struct.
  • Loading branch information
ludanfeng authored and 256dpi committed Sep 5, 2018
1 parent f8a9b07 commit 09c2e15
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 32 deletions.
14 changes: 7 additions & 7 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ type Client struct {
clean bool

keepAlive time.Duration
tracker *tracker
tracker *Tracker
futureStore *future.Store
connectFuture *future.Future

Expand Down Expand Up @@ -177,7 +177,7 @@ func (c *Client) Connect(config *Config) (ConnectFuture, error) {

// allocate and initialize tracker
c.keepAlive = keepAlive
c.tracker = newTracker(keepAlive)
c.tracker = NewTracker(keepAlive)

// dial broker (with custom dialer if present)
if config.Dialer != nil {
Expand Down Expand Up @@ -482,7 +482,7 @@ func (c *Client) processor() error {
case *packet.Unsuback:
err = c.processUnsuback(typedPkt)
case *packet.Pingresp:
c.tracker.pong()
c.tracker.Pong()
case *packet.Publish:
err = c.processPublish(typedPkt)
case *packet.Puback:
Expand Down Expand Up @@ -749,12 +749,12 @@ func (c *Client) processPubrel(id packet.ID) error {
func (c *Client) pinger() error {
for {
// get current window
window := c.tracker.window()
window := c.tracker.Window()

// check if ping is due
if window < 0 {
// check if a pong has already been sent
if c.tracker.pending() {
if c.tracker.Pending() {
return c.die(ErrClientMissingPong, true, false)
}

Expand All @@ -765,7 +765,7 @@ func (c *Client) pinger() error {
}

// save ping attempt
c.tracker.ping()
c.tracker.Ping()
} else {
// log keep alive delay
if c.Logger != nil {
Expand All @@ -787,7 +787,7 @@ func (c *Client) pinger() error {
// sends packet and updates lastSend
func (c *Client) send(pkt packet.Generic, async bool) error {
// reset keep alive tracker
c.tracker.reset()
c.tracker.Reset()

// send packet
err := c.conn.Send(pkt, async)
Expand Down
30 changes: 15 additions & 15 deletions client/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,57 +5,57 @@ import (
"time"
)

// a tracker keeps track of keep alive intervals
type tracker struct {
// Tracker a tracker keeps track of keep alive intervals
type Tracker struct {
sync.RWMutex

last time.Time
pings uint8
timeout time.Duration
}

// returns a new tracker
func newTracker(timeout time.Duration) *tracker {
return &tracker{
// NewTracker returns a new tracker
func NewTracker(timeout time.Duration) *Tracker {
return &Tracker{
last: time.Now(),
timeout: timeout,
}
}

// updates the tracker
func (t *tracker) reset() {
// Reset updates the tracker
func (t *Tracker) Reset() {
t.Lock()
defer t.Unlock()

t.last = time.Now()
}

// returns the current time window
func (t *tracker) window() time.Duration {
// Window returns the current time window
func (t *Tracker) Window() time.Duration {
t.RLock()
defer t.RUnlock()

return t.timeout - time.Since(t.last)
}

// mark ping
func (t *tracker) ping() {
// Ping mark ping
func (t *Tracker) Ping() {
t.Lock()
defer t.Unlock()

t.pings++
}

// mark pong
func (t *tracker) pong() {
// Pong mark pong
func (t *Tracker) Pong() {
t.Lock()
defer t.Unlock()

t.pings--
}

// returns if pings are pending
func (t *tracker) pending() bool {
// Pending returns if pings are pending
func (t *Tracker) Pending() bool {
t.RLock()
defer t.RUnlock()

Expand Down
20 changes: 10 additions & 10 deletions client/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@ import (
)

func TestTracker(t *testing.T) {
tracker := newTracker(10 * time.Millisecond)
assert.False(t, tracker.pending())
assert.True(t, tracker.window() > 0)
tracker := NewTracker(10 * time.Millisecond)
assert.False(t, tracker.Pending())
assert.True(t, tracker.Window() > 0)

time.Sleep(10 * time.Millisecond)
assert.True(t, tracker.window() <= 0)
assert.True(t, tracker.Window() <= 0)

tracker.reset()
assert.True(t, tracker.window() > 0)
tracker.Reset()
assert.True(t, tracker.Window() > 0)

tracker.ping()
assert.True(t, tracker.pending())
tracker.Ping()
assert.True(t, tracker.Pending())

tracker.pong()
assert.False(t, tracker.pending())
tracker.Pong()
assert.False(t, tracker.Pending())
}

0 comments on commit 09c2e15

Please sign in to comment.