From 918114143b48896622c78346e902e897e1005a90 Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Thu, 3 Feb 2022 17:58:00 -0800 Subject: [PATCH] Adding in AMQP sequence support. #121 Adding in AMQP sequence support. This makes it so we can now encode and decode all the body types from the AMQP spec - data, value and now sequence. There can be multiple sequences encoded into a single body. Fixes #94 --- message.go | 35 ++++++++++++++++++++++++++++++----- message_test.go | 22 ++++++++++++++++++++++ 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/message.go b/message.go index b56fdd73..f7f6297c 100644 --- a/message.go +++ b/message.go @@ -82,15 +82,16 @@ type Message struct { // simple types only, that is, excluding map, list, and array types. // Data payloads. - Data [][]byte // A data section contains opaque binary data. - // TODO: this could be data(s), amqp-sequence(s), amqp-value rather than single data: - // "The body consists of one of the following three choices: one or more data - // sections, one or more amqp-sequence sections, or a single amqp-value section." + Data [][]byte // Value payload. - Value interface{} // An amqp-value section contains a single AMQP value. + Value interface{} + + // Sequence will contain AMQP sequence sections from the body of the message. + // An amqp-sequence section contains an AMQP sequence. + Sequence [][]interface{} // The footer section is used for details about the message or delivery which // can only be calculated or evaluated once the whole bare message has been @@ -202,6 +203,18 @@ func (m *Message) Marshal(wr *buffer.Buffer) error { } } + if m.Sequence != nil { + // the body can basically be one of three different types (value, data or sequence). + // When it's sequence it's actually _several_ sequence sections, one for each sub-array. + for _, v := range m.Sequence { + encoding.WriteDescriptor(wr, encoding.TypeCodeAMQPSequence) + err := encoding.Marshal(wr, v) + if err != nil { + return err + } + } + } + if m.Footer != nil { encoding.WriteDescriptor(wr, encoding.TypeCodeFooter) err := encoding.Marshal(wr, m.Footer) @@ -265,6 +278,18 @@ func (m *Message) Unmarshal(r *buffer.Buffer) error { m.Data = append(m.Data, data) continue + case encoding.TypeCodeAMQPSequence: + r.Skip(int(headerLength)) + + var data []interface{} + err = encoding.Unmarshal(r, &data) + if err != nil { + return err + } + + m.Sequence = append(m.Sequence, data) + continue + case encoding.TypeCodeFooter: section = &m.Footer diff --git a/message_test.go b/message_test.go index daeb9af0..83dcb0e4 100644 --- a/message_test.go +++ b/message_test.go @@ -5,6 +5,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/require" ) var helperTo = "ActiveMQ.DLQ" @@ -59,3 +60,24 @@ func TestMessageUnmarshaling(t *testing.T) { }) } } + +func TestMessageWithSequence(t *testing.T) { + m := &Message{ + Sequence: [][]interface{}{ + {"hello1", "world1", 11, 12, 13}, + {"hello2", "world2", 21, 22, 23}, + }, + } + + bytes, err := m.MarshalBinary() + require.NoError(t, err) + + newM := &Message{} + err = newM.UnmarshalBinary(bytes) + require.NoError(t, err) + + require.EqualValues(t, [][]interface{}{ + {"hello1", "world1", int64(11), int64(12), int64(13)}, + {"hello2", "world2", int64(21), int64(22), int64(23)}, + }, newM.Sequence) +}