Skip to content

Commit

Permalink
Avoid double-closing message channel (#27)
Browse files Browse the repository at this point in the history
* Avoid double-closing message channel

Only call done() if the message hasn't been settled.  This showed up
after recent changes to mode first to correctly settle messages.

* Don't add messages with empty delivery tags to the unsettled map

Messages that are automatically settled by the sender won't contain a
delivery tag, so skip adding them to the map (they were already settled
anyways).

* track message completion for unsettled messages only

* improve test

* move creating doneSignal to trackCompletion
  • Loading branch information
jhendrixMSFT authored Mar 4, 2021
1 parent 0bbd840 commit 58ec794
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 40 deletions.
3 changes: 3 additions & 0 deletions link.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ func attachLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) {
}

func (l *link) addUnsettled(msg *Message) {
if len(msg.DeliveryTag) == 0 {
return
}
l.unsettledMessagesLock.Lock()
l.unsettledMessages[string(msg.DeliveryTag)] = struct{}{}
l.unsettledMessagesLock.Unlock()
Expand Down
15 changes: 6 additions & 9 deletions receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ func (r *Receiver) HandleMessage(ctx context.Context, handle func(*Message) erro
debug(3, "Entering link %s Receive()", r.link.key.name)

trackCompletion := func(msg *Message) {
if msg.doneSignal == nil {
msg.doneSignal = make(chan struct{})
}
<-msg.doneSignal
r.link.deleteUnsettled(msg)
debug(3, "Receive() deleted unsettled %d", msg.deliveryID)
Expand All @@ -45,16 +48,10 @@ func (r *Receiver) HandleMessage(ctx context.Context, handle func(*Message) erro
callHandler := func(msg *Message) error {
debug(3, "Receive() blocking %d", msg.deliveryID)
msg.receiver = r
if msg.doneSignal == nil {
msg.doneSignal = make(chan struct{})
}
go trackCompletion(msg)
// accept spontaneously when receiver is in ModeFirst
// we only need to track message disposition for mode second
// spec : http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#type-receiver-settle-mode
if r.link.receiverSettleMode.value() == ModeFirst {
if err := msg.Accept(ctx); err != nil {
return err
}
if r.link.receiverSettleMode.value() == ModeSecond {
go trackCompletion(msg)
}
// tracks messages until exiting handler
if err := handle(msg); err != nil {
Expand Down
62 changes: 36 additions & 26 deletions receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,17 @@ func accept(msg *Message) error {
return msg.Accept(context.TODO())
}

func makeMessage() Message {
func makeMessage(mode ReceiverSettleMode) Message {
var tag []byte
var done chan struct{}
if mode == ModeSecond {
tag = []byte("one")
done = make(chan struct{})
}
return Message{
deliveryID: uint32(1),
DeliveryTag: []byte("one"),
doneSignal: make(chan struct{}),
DeliveryTag: tag,
doneSignal: done,
}
}

Expand All @@ -39,33 +45,19 @@ func TestReceiver_HandleMessageModeFirst_AutoAccept(t *testing.T) {
batching: true, // allows to avoid making the outgoing call on dispostion
dispositions: make(chan messageDisposition, 2),
}
msg := makeMessage()
msg := makeMessage(ModeFirst)
r.link.messages <- msg
r.link.addUnsettled(&msg)
if r.link.countUnsettled() != 0 {
// mode first messages have no delivery tag, thus there should be no unsettled message
t.Fatal("expected zero unsettled count")
}
if err := r.HandleMessage(context.TODO(), doNothing); err != nil {
t.Errorf("HandleMessage() error = %v", err)
}

if len(r.dispositions) == 0 {
t.Errorf("the message should have triggered a disposition")
}

// handle the race because the mao is purged in a background goroutine
check := true
success := true
for check {
select {
case <-time.After(10 * time.Millisecond):
success = false
break
default:
if r.link.countUnsettled() == 0 {
check = false
}
}
}
if !success {
t.Errorf("the message was not removed from the unsettled map")
if len(r.dispositions) != 0 {
t.Errorf("the message should not have triggered a disposition")
}
}

Expand All @@ -75,7 +67,7 @@ func TestReceiver_HandleMessageModeSecond_DontDispose(t *testing.T) {
batching: true, // allows to avoid making the outgoing call on dispostion
dispositions: make(chan messageDisposition, 2),
}
msg := makeMessage()
msg := makeMessage(ModeSecond)
r.link.messages <- msg
r.link.addUnsettled(&msg)
if err := r.HandleMessage(context.TODO(), doNothing); err != nil {
Expand All @@ -87,6 +79,15 @@ func TestReceiver_HandleMessageModeSecond_DontDispose(t *testing.T) {
if r.link.countUnsettled() == 0 {
t.Errorf("the message should still be tracked until settled")
}
// ensure channel wasn't closed
select {
case _, ok := <-msg.doneSignal:
if !ok {
t.Fatal("unexpected closing of doneSignal")
}
default:
// channel wasn't closed
}
}

func TestReceiver_HandleMessageModeSecond_removeFromUnsettledMapOnDisposition(t *testing.T) {
Expand All @@ -95,7 +96,7 @@ func TestReceiver_HandleMessageModeSecond_removeFromUnsettledMapOnDisposition(t
batching: true, // allows to avoid making the outgoing call on dispostion
dispositions: make(chan messageDisposition, 1),
}
msg := makeMessage()
msg := makeMessage(ModeSecond)
r.link.messages <- msg
r.link.addUnsettled(&msg)
// unblock the accept waiting on inflight disposition for modeSecond
Expand Down Expand Up @@ -127,4 +128,13 @@ func TestReceiver_HandleMessageModeSecond_removeFromUnsettledMapOnDisposition(t
t.Errorf("the message should be removed from unsettled map")
}
loop = false
// ensure channel was closed
select {
case _, ok := <-msg.doneSignal:
if !ok {
// channel was closed
}
default:
t.Fatal("expected closed of doneSignal")
}
}
13 changes: 8 additions & 5 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1674,6 +1674,7 @@ type Message struct {
Format uint32

// The DeliveryTag can be up to 32 octets of binary data.
// Note that when mode one is enabled there will be no delivery tag.
DeliveryTag []byte

// The header section carries standard delivery details about the transfer
Expand Down Expand Up @@ -1798,31 +1799,31 @@ func (m *Message) GetData() []byte {
// Accept notifies the server that the message has been
// accepted and does not require redelivery.
func (m *Message) Accept(ctx context.Context) error {
defer m.done()
if !m.shouldSendDisposition() {
return nil
}
defer m.done()
return m.receiver.messageDisposition(ctx, m.deliveryID, &stateAccepted{})
}

// Reject notifies the server that the message is invalid.
//
// Rejection error is optional.
func (m *Message) Reject(ctx context.Context, e *Error) error {
defer m.done()
if !m.shouldSendDisposition() {
return nil
}
defer m.done()
return m.receiver.messageDisposition(ctx, m.deliveryID, &stateRejected{Error: e})
}

// Release releases the message back to the server. The message
// may be redelivered to this or another consumer.
func (m *Message) Release(ctx context.Context) error {
defer m.done()
if !m.shouldSendDisposition() {
return nil
}
defer m.done()
return m.receiver.messageDisposition(ctx, m.deliveryID, &stateReleased{})
}

Expand All @@ -1839,10 +1840,10 @@ func (m *Message) Release(ctx context.Context) error {
// with the existing message annotations, overwriting existing keys
// if necessary.
func (m *Message) Modify(ctx context.Context, deliveryFailed, undeliverableHere bool, messageAnnotations Annotations) error {
defer m.done()
if !m.shouldSendDisposition() {
return nil
}
defer m.done()
return m.receiver.messageDisposition(ctx,
m.deliveryID, &stateModified{
DeliveryFailed: deliveryFailed,
Expand All @@ -1855,7 +1856,9 @@ func (m *Message) Modify(ctx context.Context, deliveryFailed, undeliverableHere
// without any disposition. It frees the amqp receiver to get the next message
// this is implicitly done after calling message dispositions (Accept/Release/Reject/Modify)
func (m *Message) Ignore() {
m.done()
if m.shouldSendDisposition() {
m.done()
}
}

// MarshalBinary encodes the message into binary form.
Expand Down

0 comments on commit 58ec794

Please sign in to comment.