Skip to content

Commit

Permalink
conditionally announce message on Publish (enabling server to retry) …
Browse files Browse the repository at this point in the history
…or hold until Pubrel (having certainty the broker won't redeliver).

Add tests for both behaviours.
  • Loading branch information
SimonT90poe authored and 256dpi committed Sep 27, 2021
1 parent ad88e5d commit 4bc5fa2
Show file tree
Hide file tree
Showing 3 changed files with 344 additions and 3 deletions.
8 changes: 5 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ type Client struct {
config *Config
conn transport.Conn
clean bool
earlyCallback bool
keepAlive time.Duration
tracker *Tracker
futureStore *future.Store
Expand Down Expand Up @@ -206,6 +207,8 @@ func (c *Client) Connect(config *Config) (ConnectFuture, error) {
}
}

c.earlyCallback = config.AlwaysAnnounceOnPublish

// allocate packet
connect := packet.NewConnect()
connect.ClientID = config.ClientID
Expand Down Expand Up @@ -611,15 +614,14 @@ func (c *Client) processUnsuback(unsuback *packet.Unsuback) error {
// handle an incoming Publish packet
func (c *Client) processPublish(publish *packet.Publish) error {
// call callback for unacknowledged and directly acknowledged messages
if publish.Message.QOS <= 1 {
if publish.Message.QOS <= 1 || c.earlyCallback {
if c.Callback != nil {
err := c.Callback(&publish.Message, nil)
if err != nil {
return c.die(err, true)
}
}
}

// handle qos 1 flow
if publish.Message.QOS == 1 {
// prepare puback packet
Expand Down Expand Up @@ -714,7 +716,7 @@ func (c *Client) processPubrel(id packet.ID) error {
}

// call callback
if c.Callback != nil {
if c.Callback != nil && !c.earlyCallback {
err = c.Callback(&publish.Message, nil)
if err != nil {
return c.die(err, true)
Expand Down
8 changes: 8 additions & 0 deletions client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ type Config struct {
// MaxWriteDelay defines the maximum allowed delay when flushing the
// underlying buffered writer.
MaxWriteDelay time.Duration

// AlwaysAnnounceOnPublish defines when the message callback handler is called.
// QOS 0,1: callback always occurs on reception of Publish
// QOS 2 and AlwaysAnnounceOnPublish == false: callback occurs on reception of PubRel and returning and error in
// the callback will close the connection before PubComp is sent.
// QOS 2 and AlwaysAnnounceOnPublish == true: callback occurs on reception of Publish and returning an error in
// the callback will close the connection, preventing PubRec being sent, thus ensuring redelivery of publish.
AlwaysAnnounceOnPublish bool
}

// NewConfig creates a new Config using the specified URL.
Expand Down
Loading

0 comments on commit 4bc5fa2

Please sign in to comment.