-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreader.go
70 lines (62 loc) · 1.64 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package mqtt
import (
"errors"
"fmt"
"git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/packets"
"io"
"net"
"reflect"
"time"
)
func (this *client) reader() (err error) {
defer func() {
if r := recover(); r != nil && err == nil {
err = fmt.Errorf("reader panic %v", r)
}
log.Debugf("reader(%v) stopped, %v, %v", this.id, err, this.Err())
go this.close()
}()
var cp packets.ControlPacket
for {
timeout := this.keepAlive + (this.keepAlive)/2
if cp, err = this.readPacket(timeout); err != nil {
switch err.(type) {
case net.Error:
if err.(net.Error).Timeout() {
log.Debugf("reader(%v) client keepalive timeout, ", this.id)
err = errors.New("keepalive timeout")
}
default:
if err != io.EOF {
log.Warnf("reader(%v) error(%v) reading from connection", this.id, err)
}
}
break
}
log.Debugf("reader(%v) new packet received, %v, queue len:%v", this.id, reflect.TypeOf(cp), len(this.in))
select {
case <-this.Dying():
return
case this.in <- cp:
}
}
return
}
// read one message from stream
func (this *client) readPacket(timeout time.Duration) (cp packets.ControlPacket, err error) {
// log.Debug("read packet with timeout ", timeout)
this.conn.SetReadDeadline(time.Now().Add(timeout))
cp, err = packets.ReadPacket(this.conn)
this.conn.SetReadDeadline(time.Time{})
return
}
func (this *client) ReadConnectPacket() (p *packets.ConnectPacket, err error) {
var cp packets.ControlPacket
var ok bool
if cp, err = this.readPacket(this.opts.ConnectTimeout); err == nil {
if p, ok = cp.(*packets.ConnectPacket); !ok {
err = errors.New("connect message expected")
}
}
return
}