Skip to content

Commit

Permalink
Adding in AMQP sequence support. #121
Browse files Browse the repository at this point in the history
Adding in AMQP sequence support.

This makes it so we can now encode and decode all the body types from the AMQP spec - data, value and now sequence. There can be multiple sequences encoded into a single body.

Fixes #94
  • Loading branch information
richardpark-msft authored Feb 4, 2022
1 parent df6b6c4 commit 9181141
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 5 deletions.
35 changes: 30 additions & 5 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,16 @@ type Message struct {
// simple types only, that is, excluding map, list, and array types.

// Data payloads.
Data [][]byte
// A data section contains opaque binary data.
// TODO: this could be data(s), amqp-sequence(s), amqp-value rather than single data:
// "The body consists of one of the following three choices: one or more data
// sections, one or more amqp-sequence sections, or a single amqp-value section."
Data [][]byte

// Value payload.
Value interface{}
// An amqp-value section contains a single AMQP value.
Value interface{}

// Sequence will contain AMQP sequence sections from the body of the message.
// An amqp-sequence section contains an AMQP sequence.
Sequence [][]interface{}

// The footer section is used for details about the message or delivery which
// can only be calculated or evaluated once the whole bare message has been
Expand Down Expand Up @@ -202,6 +203,18 @@ func (m *Message) Marshal(wr *buffer.Buffer) error {
}
}

if m.Sequence != nil {
// the body can basically be one of three different types (value, data or sequence).
// When it's sequence it's actually _several_ sequence sections, one for each sub-array.
for _, v := range m.Sequence {
encoding.WriteDescriptor(wr, encoding.TypeCodeAMQPSequence)
err := encoding.Marshal(wr, v)
if err != nil {
return err
}
}
}

if m.Footer != nil {
encoding.WriteDescriptor(wr, encoding.TypeCodeFooter)
err := encoding.Marshal(wr, m.Footer)
Expand Down Expand Up @@ -265,6 +278,18 @@ func (m *Message) Unmarshal(r *buffer.Buffer) error {
m.Data = append(m.Data, data)
continue

case encoding.TypeCodeAMQPSequence:
r.Skip(int(headerLength))

var data []interface{}
err = encoding.Unmarshal(r, &data)
if err != nil {
return err
}

m.Sequence = append(m.Sequence, data)
continue

case encoding.TypeCodeFooter:
section = &m.Footer

Expand Down
22 changes: 22 additions & 0 deletions message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/require"
)

var helperTo = "ActiveMQ.DLQ"
Expand Down Expand Up @@ -59,3 +60,24 @@ func TestMessageUnmarshaling(t *testing.T) {
})
}
}

func TestMessageWithSequence(t *testing.T) {
m := &Message{
Sequence: [][]interface{}{
{"hello1", "world1", 11, 12, 13},
{"hello2", "world2", 21, 22, 23},
},
}

bytes, err := m.MarshalBinary()
require.NoError(t, err)

newM := &Message{}
err = newM.UnmarshalBinary(bytes)
require.NoError(t, err)

require.EqualValues(t, [][]interface{}{
{"hello1", "world1", int64(11), int64(12), int64(13)},
{"hello2", "world2", int64(21), int64(22), int64(23)},
}, newM.Sequence)
}

0 comments on commit 9181141

Please sign in to comment.