diff --git a/serve.go b/serve.go index 2c93a72..58c64d1 100644 --- a/serve.go +++ b/serve.go @@ -13,6 +13,7 @@ func (c *BaseClient) serve() error { close(c.connClosed) }() r := c.Transport + subBuffer := make(map[uint16]*Message) for { pktTypeBytes := make([]byte, 1) if _, err := io.ReadFull(r, pktTypeBytes); err != nil { @@ -45,11 +46,13 @@ func (c *BaseClient) serve() error { } case packetPublish: publish := (&pktPublish{}).parse(pktFlag, contents) - if c.Handler != nil { - c.Handler.Serve(&publish.Message) - } switch publish.Message.QoS { + case QoS0: + if c.Handler != nil { + c.Handler.Serve(&publish.Message) + } case QoS1: + // Ownership of the message is now transferred to the receiver. pktPubAck := pack( packetPubAck.b()|packetFromClient.b(), packUint16(publish.Message.ID), @@ -57,6 +60,9 @@ func (c *BaseClient) serve() error { if err := c.write(pktPubAck); err != nil { return err } + if c.Handler != nil { + c.Handler.Serve(&publish.Message) + } case QoS2: pktPubRec := pack( packetPubRec.b()|packetFromClient.b(), @@ -65,6 +71,7 @@ func (c *BaseClient) serve() error { if err := c.write(pktPubRec); err != nil { return err } + subBuffer[publish.Message.ID] = &publish.Message } case packetPubAck: pubAck := (&pktPubAck{}).parse(pktFlag, contents) @@ -84,6 +91,14 @@ func (c *BaseClient) serve() error { } case packetPubRel: pubRel := (&pktPubRel{}).parse(pktFlag, contents) + if msg, ok := subBuffer[pubRel.ID]; ok { + // Ownership of the message is now transferred to the receiver. + if c.Handler != nil { + c.Handler.Serve(msg) + } + delete(subBuffer, pubRel.ID) + } + pktPubComp := pack( packetPubComp.b()|packetFromClient.b(), packUint16(pubRel.ID),