Skip to content

Commit

Permalink
Fix QoS2 message receive procedure (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
at-wat authored Dec 23, 2019
1 parent 2d271ca commit 58be7b1
Showing 1 changed file with 18 additions and 3 deletions.
21 changes: 18 additions & 3 deletions serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ func (c *BaseClient) serve() error {
close(c.connClosed)
}()
r := c.Transport
subBuffer := make(map[uint16]*Message)
for {
pktTypeBytes := make([]byte, 1)
if _, err := io.ReadFull(r, pktTypeBytes); err != nil {
Expand Down Expand Up @@ -45,18 +46,23 @@ func (c *BaseClient) serve() error {
}
case packetPublish:
publish := (&pktPublish{}).parse(pktFlag, contents)
if c.Handler != nil {
c.Handler.Serve(&publish.Message)
}
switch publish.Message.QoS {
case QoS0:
if c.Handler != nil {
c.Handler.Serve(&publish.Message)
}
case QoS1:
// Ownership of the message is now transferred to the receiver.
pktPubAck := pack(
packetPubAck.b()|packetFromClient.b(),
packUint16(publish.Message.ID),
)
if err := c.write(pktPubAck); err != nil {
return err
}
if c.Handler != nil {
c.Handler.Serve(&publish.Message)
}
case QoS2:
pktPubRec := pack(
packetPubRec.b()|packetFromClient.b(),
Expand All @@ -65,6 +71,7 @@ func (c *BaseClient) serve() error {
if err := c.write(pktPubRec); err != nil {
return err
}
subBuffer[publish.Message.ID] = &publish.Message
}
case packetPubAck:
pubAck := (&pktPubAck{}).parse(pktFlag, contents)
Expand All @@ -84,6 +91,14 @@ func (c *BaseClient) serve() error {
}
case packetPubRel:
pubRel := (&pktPubRel{}).parse(pktFlag, contents)
if msg, ok := subBuffer[pubRel.ID]; ok {
// Ownership of the message is now transferred to the receiver.
if c.Handler != nil {
c.Handler.Serve(msg)
}
delete(subBuffer, pubRel.ID)
}

pktPubComp := pack(
packetPubComp.b()|packetFromClient.b(),
packUint16(pubRel.ID),
Expand Down

0 comments on commit 58be7b1

Please sign in to comment.