diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java index 3746ea087..2a3c8291f 100644 --- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java +++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java @@ -1287,4 +1287,95 @@ public void testSendMessageWithMultipleDataSections() throws Exception { peer.waitForScriptToComplete(5, TimeUnit.SECONDS); } } + + @Test + public void testSendMessageWithNullStringValuePassedToCreate() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender().respond(); + peer.remoteFlow().withLinkCredit(10).queue(); + peer.expectAttach().respond(); // Open a receiver to ensure sender link has processed + peer.expectFlow(); // the inbound flow frame we sent previously before send. + peer.start(); + + URI remoteURI = peer.getServerURI(); + + LOG.info("Sender test started, peer listening on: {}", remoteURI); + + Client container = Client.create(); + Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get(); + + Session session = connection.openSession().openFuture().get(); + SenderOptions options = new SenderOptions().deliveryMode(DeliveryMode.AT_MOST_ONCE); + Sender sender = session.openSender("test-qos", options); + + // Gates send on remote flow having been sent and received + session.openReceiver("dummy").openFuture().get(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectTransfer().withMessage().withMessageFormat(0).withValue(null); + peer.expectDetach().respond(); + peer.expectClose().respond(); + + final Message message = Message.create((String) null); + final Tracker tracker = sender.send(message); + + assertNotNull(tracker); + assertNotNull(tracker.settlementFuture().isDone()); + assertNotNull(tracker.settlementFuture().get().settled()); + + sender.closeAsync().get(10, TimeUnit.SECONDS); + + connection.closeAsync().get(10, TimeUnit.SECONDS); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + } + } + + @Test + public void testSendMessageWithNullByteArrayPassedToCreate() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender().respond(); + peer.remoteFlow().withLinkCredit(10).queue(); + peer.expectAttach().respond(); // Open a receiver to ensure sender link has processed + peer.expectFlow(); // the inbound flow frame we sent previously before send. + peer.start(); + + URI remoteURI = peer.getServerURI(); + + LOG.info("Sender test started, peer listening on: {}", remoteURI); + + Client container = Client.create(); + Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get(); + + Session session = connection.openSession().openFuture().get(); + SenderOptions options = new SenderOptions().deliveryMode(DeliveryMode.AT_MOST_ONCE); + Sender sender = session.openSender("test-qos", options); + + // Gates send on remote flow having been sent and received + session.openReceiver("dummy").openFuture().get(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectTransfer().withMessage().withMessageFormat(0).withData(null); + peer.expectDetach().respond(); + peer.expectClose().respond(); + + final Message message = Message.create((byte[]) null); + final Tracker tracker = sender.send(message); + + assertNotNull(tracker); + assertNotNull(tracker.settlementFuture().isDone()); + assertNotNull(tracker.settlementFuture().get().settled()); + + sender.closeAsync().get(10, TimeUnit.SECONDS); + connection.closeAsync().get(10, TimeUnit.SECONDS); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + } + } } diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java index d4085ffa7..3bcc3651e 100644 --- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java +++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java @@ -3152,4 +3152,102 @@ public void doTestReceiveAcceptsNegtiveValuesAsInfiniteTimeouts(long timeout, Ti peer.waitForScriptToComplete(5, TimeUnit.SECONDS); } } + + @Test + public void testReceiveMessageWithNullWrappedInAmqpValue() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofReceiver().respond(); + peer.expectFlow(); + peer.remoteTransfer().withHandle(0) + .withDeliveryId(0) + .withDeliveryTag(new byte[] { 1 }) + .withMore(false) + .withMessageFormat(0) + .withMessage().withBody().withValue((String) null) + .also() + .splitWrite(true) + .afterDelay(25) + .queue(); + peer.expectDisposition().withFirst(0) + .withSettled(true) + .withState().accepted(); + peer.start(); + + URI remoteURI = peer.getServerURI(); + + LOG.info("Test started, peer listening on: {}", remoteURI); + + final Client container = Client.create(); + final ConnectionOptions options = new ConnectionOptions(); + final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options); + final Receiver receiver = connection.openReceiver("test-queue"); + final Delivery delivery = receiver.receive(); + final Message message = delivery.message(); + + assertNull(message.body()); + + peer.waitForScriptToComplete(); + peer.expectDetach().respond(); + peer.expectClose().respond(); + + assertNotNull(delivery); + + receiver.close(); + connection.close(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + } + } + + @Test + public void testReceiveMessageWithNullWrappedInDataSection() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofReceiver().respond(); + peer.expectFlow(); + peer.remoteTransfer().withHandle(0) + .withDeliveryId(0) + .withDeliveryTag(new byte[] { 1 }) + .withMore(false) + .withMessageFormat(0) + .withMessage().withBody().withData((byte[]) null) + .also() + .splitWrite(true) + .afterDelay(25) + .queue(); + peer.expectDisposition().withFirst(0) + .withSettled(true) + .withState().accepted(); + peer.start(); + + URI remoteURI = peer.getServerURI(); + + LOG.info("Test started, peer listening on: {}", remoteURI); + + final Client container = Client.create(); + final ConnectionOptions options = new ConnectionOptions(); + final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options); + final Receiver receiver = connection.openReceiver("test-queue"); + final Delivery delivery = receiver.receive(); + final Message message = delivery.message(); + + assertNull(message.body()); + + peer.waitForScriptToComplete(); + peer.expectDetach().respond(); + peer.expectClose().respond(); + + assertNotNull(delivery); + + receiver.close(); + connection.close(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + } + } } diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java index e434a486f..f4d561a50 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java @@ -517,7 +517,7 @@ public BodySectionBuilder withValue(Object body) { } public BodySectionBuilder withValue(byte[] body) { - TransferInjectAction.this.body = new AmqpValue(new Binary(body)); + TransferInjectAction.this.body = new AmqpValue(body == null ? null : new Binary(body)); return this; } @@ -527,7 +527,7 @@ public BodySectionBuilder withValue(Binary body) { } public BodySectionBuilder withData(byte[] body) { - TransferInjectAction.this.body = new Data(new Binary(body)); + TransferInjectAction.this.body = new Data(body == null ? null : new Binary(body)); return this; } diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/messaging/AmqpValue.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/messaging/AmqpValue.java index 8cfe25137..0f5f24f30 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/messaging/AmqpValue.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/messaging/AmqpValue.java @@ -25,7 +25,11 @@ public class AmqpValue implements DescribedType { public static final UnsignedLong DESCRIPTOR_CODE = UnsignedLong.valueOf(0x0000000000000077L); public static final Symbol DESCRIPTOR_SYMBOL = Symbol.valueOf("amqp:amqp-value:*"); - private Object described; + private final Object described; + + public AmqpValue() { + this.described = null; + } public AmqpValue(Object described) { this.described = described; diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/messaging/Data.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/messaging/Data.java index f33f9af76..4368e99ba 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/messaging/Data.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/messaging/Data.java @@ -26,13 +26,13 @@ public class Data implements DescribedType { public static final UnsignedLong DESCRIPTOR_CODE = UnsignedLong.valueOf(0x0000000000000075L); public static final Symbol DESCRIPTOR_SYMBOL = Symbol.valueOf("amqp:data:binary"); - private Binary described; + private final Binary described; - public Data(Binary described) { - if (described == null) { - throw new IllegalArgumentException("provided Binary must not be null"); - } + public Data() { + this.described = null; + } + public Data(Binary described) { this.described = described; } diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/primitives/Binary.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/primitives/Binary.java index f75e783e1..15737c852 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/primitives/Binary.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/primitives/Binary.java @@ -29,11 +29,11 @@ public Binary() { } public Binary(final byte[] data) { - this(data, 0, data.length); + this(data, 0, data != null ? data.length : 0); } public Binary(final byte[] data, final int offset, final int length) { - this.buffer = Arrays.copyOfRange(data, offset, offset + length); + this.buffer = data != null ? Arrays.copyOfRange(data, offset, offset + length) : null; } public Binary copy() { diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/types/EncodedDataMatcher.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/types/EncodedDataMatcher.java index 0a4861f17..70ac3b293 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/types/EncodedDataMatcher.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/types/EncodedDataMatcher.java @@ -18,6 +18,8 @@ */ package org.apache.qpid.protonj2.test.driver.matchers.types; +import java.nio.ByteBuffer; + import org.apache.qpid.protonj2.test.driver.codec.messaging.AmqpValue; import org.apache.qpid.protonj2.test.driver.codec.messaging.Data; import org.apache.qpid.protonj2.test.driver.codec.primitives.Binary; @@ -35,7 +37,7 @@ public class EncodedDataMatcher extends EncodedAmqpTypeMatcher { * the value that is expected to be IN the received {@link Data} */ public EncodedDataMatcher(byte[] expectedValue) { - this(new Binary(expectedValue), false); + this(expectedValue != null ? new Binary(expectedValue) : null, false); } /** @@ -54,7 +56,7 @@ public EncodedDataMatcher(Binary expectedValue) { * consuming the {@link AmqpValue} */ public EncodedDataMatcher(byte[] expectedValue, boolean permitTrailingBytes) { - this(new Binary(expectedValue), permitTrailingBytes); + this(expectedValue != null ? new Binary(expectedValue) : null, permitTrailingBytes); } /** @@ -68,6 +70,11 @@ public EncodedDataMatcher(Binary expectedValue, boolean permitTrailingBytes) { super(DESCRIPTOR_SYMBOL, DESCRIPTOR_CODE, expectedValue, permitTrailingBytes); } + @Override + protected boolean matchesSafely(ByteBuffer receivedBinary) { + return super.matchesSafely(receivedBinary); + } + @Override public void describeTo(Description description) { description.appendText("a Binary encoding of a Data that wraps a Binary containing: ").appendValue(getExpectedValue()); diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/impl/ProtonCompositeBufferImpl.java b/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/impl/ProtonCompositeBufferImpl.java index 0f567fb0d..8035e7af5 100644 --- a/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/impl/ProtonCompositeBufferImpl.java +++ b/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/impl/ProtonCompositeBufferImpl.java @@ -351,7 +351,7 @@ public void copyInto(int offset, ByteBuffer destination, int destOffset, int len throw generateIndexOutOfBounds(offset + length, false); } - int lastAccessedChunk = findChunkWithIndex(offset); + int lastAccessedChunk = length > 0 ? findChunkWithIndex(offset) : 0; while (length > 0) { final ProtonBuffer buffer = buffers[lastAccessedChunk]; @@ -384,7 +384,7 @@ public void copyInto(int offset, ProtonBuffer destination, int destOffset, int l throw generateIndexOutOfBounds(offset + length, false); } - int lastAccessedChunk = findChunkWithIndex(offset); + int lastAccessedChunk = length > 0 ? findChunkWithIndex(offset) : 0; while (length > 0) { final ProtonBuffer buffer = buffers[lastAccessedChunk]; diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DataTypeEncoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DataTypeEncoder.java index fe064a278..5af13db93 100644 --- a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DataTypeEncoder.java +++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DataTypeEncoder.java @@ -52,19 +52,24 @@ public Symbol getDescriptorSymbol() { public void writeType(ProtonBuffer buffer, EncoderState state, Data value) { buffer.writeBytes(DATA_PREAMBLE); - final int dataLength = value.getDataLength(); - - if (dataLength > 255) { - buffer.ensureWritable(dataLength + Long.BYTES); - buffer.writeByte(EncodingCodes.VBIN32); - buffer.writeInt(dataLength); + if (value.hasBinary()) { + final int dataLength = value.getDataLength(); + + if (dataLength > 255) { + buffer.ensureWritable(dataLength + Long.BYTES); + buffer.writeByte(EncodingCodes.VBIN32); + buffer.writeInt(dataLength); + } else { + buffer.ensureWritable(dataLength + Short.BYTES); + buffer.writeByte(EncodingCodes.VBIN8); + buffer.writeByte((byte) dataLength); + } + + value.copyTo(buffer); } else { - buffer.ensureWritable(dataLength + Short.BYTES); - buffer.writeByte(EncodingCodes.VBIN8); - buffer.writeByte((byte) dataLength); + buffer.ensureWritable(1); + buffer.writeByte(EncodingCodes.NULL); } - - value.copyTo(buffer); } @Override diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/types/messaging/Data.java b/protonj2/src/main/java/org/apache/qpid/protonj2/types/messaging/Data.java index 81b7f0986..19e7b8fa7 100644 --- a/protonj2/src/main/java/org/apache/qpid/protonj2/types/messaging/Data.java +++ b/protonj2/src/main/java/org/apache/qpid/protonj2/types/messaging/Data.java @@ -54,6 +54,10 @@ public Data copy() { return new Data(buffer == null ? null : buffer.copy(true)); } + public boolean hasBinary() { + return buffer != null; + } + public Binary getBinary() { if (cachedBinary != null || buffer == null) { return cachedBinary; diff --git a/protonj2/src/test/java/org/apache/qpid/protonj2/buffer/ProtonAbstractBufferTest.java b/protonj2/src/test/java/org/apache/qpid/protonj2/buffer/ProtonAbstractBufferTest.java index 50764d725..c390123ea 100644 --- a/protonj2/src/test/java/org/apache/qpid/protonj2/buffer/ProtonAbstractBufferTest.java +++ b/protonj2/src/test/java/org/apache/qpid/protonj2/buffer/ProtonAbstractBufferTest.java @@ -21,6 +21,7 @@ import static java.nio.file.StandardOpenOption.READ; import static java.nio.file.StandardOpenOption.WRITE; import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -474,6 +475,7 @@ public void testWriteBytesHeapByteBufferMustExpandCapacityIfBufferIsTooSmall() { } } + @SuppressWarnings("resource") @Test public void testWriteBytesHeapByteBufferMustThrowIfCannotBeExpanded() { // With zero offsets @@ -528,6 +530,7 @@ public void testWiteBytesDirectByteBufferMustExpandCapacityIfBufferIsTooSmall() } } + @SuppressWarnings("resource") @Test public void testWriteBytesDirectByteBufferMustThrowIfCannotBeExpanded() { // With zero offsets @@ -580,6 +583,7 @@ public void testWriteBytesByteArrayMustExpandCapacityIfTooSmall() { } } + @SuppressWarnings("resource") @Test public void testWriteBytesByteArrayMustThrowIfCannotBeExpanded() { // Starting at offsets zero. @@ -630,6 +634,7 @@ public void testWriteBytesByteArrayWithOffsetMustExpandCapacityIfTooSmall() { } } + @SuppressWarnings("resource") @Test public void testWriteBytesByteArrayWithOffsetMustThrowIfCannotBeExpanded() { // Starting at offsets zero. @@ -682,6 +687,7 @@ public void testWriteBytesBufferMustExpandCapacityIfTooSmall() { } } + @SuppressWarnings("resource") @Test public void testWriteBytesBufferMustThrowIfCannotBeExpanded() { // Starting at offsets zero. @@ -2062,6 +2068,7 @@ public void testSplitDoesNotReduceImplicitCapacityLimit() { } } + @SuppressWarnings("resource") @Test public void testEnsureWritableCanGrowBeyondImplicitCapacityLimit() { try (ProtonBufferAllocator allocator = createTestCaseAllocator(); ProtonBuffer buf = allocator.allocate(8).implicitGrowthLimit(8)) { @@ -2077,6 +2084,7 @@ public void testEnsureWritableCanGrowBeyondImplicitCapacityLimit() { } } + @SuppressWarnings("resource") @Test public void testWritesMustThrowIfSizeWouldGoBeyondImplicitCapacityLimit() { try (ProtonBufferAllocator allocator = createTestCaseAllocator()) { @@ -2537,6 +2545,7 @@ public void testSplitOfReadOnlyBufferMustBeReadOnly() { } } + @SuppressWarnings("resource") @Test public void testAllocatingOnClosedAllocatorMustThrow() { ProtonBufferAllocator allocator = createTestCaseAllocator(); @@ -2982,9 +2991,39 @@ public void testCopyIntoOnReadOnlyBufferMustThrow() { } } + @Test + public void testCopyIntoOnEmptyBufferFromFullyReadBuffer() { + try (ProtonBufferAllocator allocator = createTestCaseAllocator(); + ProtonBuffer source = allocator.allocate(8); + ProtonBuffer target = allocator.allocate(0)) { + + source.writeLong(0xFFFFFFFFL); + source.readLong(); + + assertDoesNotThrow(() -> source.copyInto(source.getReadOffset(), target, 0, 0)); + } + } + + @Test + public void testCopyIntoByteBufferOnEmptyBufferFromFullyReadBuffer() { + try (ProtonBufferAllocator allocator = createTestCaseAllocator(); + ProtonBuffer source = allocator.allocate(8)) { + + final ByteBuffer target = ByteBuffer.allocate(0); + + source.writeLong(0xFFFFFFFFL); + source.readLong(); + + assertDoesNotThrow(() -> source.copyInto(source.getReadOffset(), target, 0, 0)); + } + } + + @SuppressWarnings("resource") @Test public void testReadOnlyBuffersCannotChangeWriteOffset() { - try (ProtonBufferAllocator allocator = createTestCaseAllocator(); ProtonBuffer buf = allocator.allocate(8).convertToReadOnly()) { + try (ProtonBufferAllocator allocator = createTestCaseAllocator(); + ProtonBuffer buf = allocator.allocate(8).convertToReadOnly()) { + assertThrows(ProtonBufferReadOnlyException.class, () -> buf.setWriteOffset(0)); assertThrows(ProtonBufferReadOnlyException.class, () -> buf.setWriteOffset(4)); @@ -4895,6 +4934,7 @@ public void testCompositeBufferComponentCountsUpdateWithChangeAfterFlattening() } } + @SuppressWarnings("resource") @Test public void testIteratingComponentOnClosedBufferMustThrow() { try (ProtonBufferAllocator allocator = createTestCaseAllocator()) { @@ -5518,6 +5558,7 @@ public void testIndexOfByteOnEmptyBufferReturnsNoResult() { } } + @SuppressWarnings("resource") protected static void verifyInaccessible(ProtonBuffer buf) { verifyReadInaccessible(buf); diff --git a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandlerTest.java b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandlerTest.java index a6c607403..c4650b096 100644 --- a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandlerTest.java +++ b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandlerTest.java @@ -30,6 +30,9 @@ import org.apache.qpid.protonj2.buffer.ProtonBuffer; import org.apache.qpid.protonj2.buffer.ProtonBufferAllocator; +import org.apache.qpid.protonj2.codec.CodecFactory; +import org.apache.qpid.protonj2.codec.Decoder; +import org.apache.qpid.protonj2.codec.DecoderState; import org.apache.qpid.protonj2.engine.EmptyEnvelope; import org.apache.qpid.protonj2.engine.Engine; import org.apache.qpid.protonj2.engine.EngineHandlerContext; @@ -39,6 +42,11 @@ import org.apache.qpid.protonj2.engine.util.FrameReadSinkTransportHandler; import org.apache.qpid.protonj2.engine.util.FrameRecordingTransportHandler; import org.apache.qpid.protonj2.engine.util.FrameWriteSinkTransportHandler; +import org.apache.qpid.protonj2.types.messaging.DeliveryAnnotations; +import org.apache.qpid.protonj2.types.messaging.Header; +import org.apache.qpid.protonj2.types.messaging.MessageAnnotations; +import org.apache.qpid.protonj2.types.messaging.Properties; +import org.apache.qpid.protonj2.types.messaging.Section; import org.apache.qpid.protonj2.types.transport.AMQPHeader; import org.apache.qpid.protonj2.types.transport.Open; import org.apache.qpid.protonj2.types.transport.Transfer; @@ -833,6 +841,88 @@ public void testDecodeTransferFrameWithAttachedPayloadSplitAcrossBuffersAsContin assertFalse(open.hasProperties()); } + @Test + public void testDecodeOfSplitFramedMessage() throws Exception { + final String capture1 = "00 00 01 4c 02 00 00 00 00 53 14 c0 1d 0b 43 43 " + + "a0 10 cf 8c f4 f8 93 5d b6 47 a2 19 3f 87 34 6f " + + "03 97 43 40 42 40 40 40 40 41"; + final String capture2 = "00 53 70 c0 0b 05 40 40 70 48 19 08 00 40 52 06 " + + "00 53 71 c1 24 02 a3 10 78 2d 6f 70 74 2d 6c 6f " + + "63 6b 2d 74 6f 6b 65 6e 98 f8 f4 8c cf 5d 93 47 " + + "b6 a2 19 3f 87 34 6f 03 97 00 53 72 c1 9b 0a a3 " + + "13 78 2d 6f 70 74 2d 65 6e 71 75 65 75 65 64 2d " + + "74 69 6d 65 83 00 00 01 8e 42 cd 59 63 a3 15 78 " + + "2d 6f 70 74 2d 73 65 71 75 65 6e 63 65 2d 6e 75 " + + "6d 62 65 72 81 00 00 00 00 00 00 07 2d a3 12 78 " + + "2d 6f 70 74 2d 6c 6f 63 6b 65 64 2d 75 6e 74 69 " + + "6c 83 00 00 01 8e 43 db 33 f3 a3 1d 78 2d 6f 70 " + + "74 2d 65 6e 71 75 65 75 65 2d 73 65 71 75 65 6e " + + "63 65 2d 6e 75 6d 62 65 72 81 00 00 00 00 00 00 " + + "07 2d a3 13 78 2d 6f 70 74 2d 6d 65 73 73 61 67 " + + "65 2d 73 74 61 74 65 54 00 00 53 73 c0 3f 0d a1 " + + "20 39 34 39 37 36 34 61 61 38 30 37 62 34 30 37 " + + "38 39 39 64 32 35 66 61 65 61 63 65 36 61 38 34 " + + "65 40 40 40 40 40 40 40 83 00 00 01 8e 8a e6 61 " + + "63 83 00 00 01 8e 42 cd 59 63 40 40 40 00 53 75 " + + "a0 00"; + + final byte[] packet1 = convertCaptureToByteArray(capture1); + final byte[] packet2 = convertCaptureToByteArray(capture2); + + ArgumentCaptor argument = ArgumentCaptor.forClass(IncomingAMQPEnvelope.class); + + ProtonFrameDecodingHandler handler = createFrameDecoder(); + ProtonEngineHandlerContext context = Mockito.mock(ProtonEngineHandlerContext.class); + + handler.handleRead(context, AMQPHeader.getAMQPHeader().getBuffer()); + + final ProtonBuffer buffer1 = ProtonBufferAllocator.defaultAllocator().copy(packet1); + final ProtonBuffer buffer2 = ProtonBufferAllocator.defaultAllocator().copy(packet2); + + handler.handleRead(context, buffer1); + handler.handleRead(context, buffer2); + + Mockito.verify(context).fireRead(Mockito.any(HeaderEnvelope.class)); + Mockito.verify(context).interestMask(ProtonEngineHandlerContext.HANDLER_READS); + Mockito.verify(context, times(1)).fireRead(argument.capture()); + Mockito.verifyNoMoreInteractions(context); + + List arguments = argument.getAllValues(); + + assertNotNull(arguments.get(0)); + assertTrue(arguments.get(0).getBody() instanceof Transfer); + assertNotNull(arguments.get(0).getPayload()); + assertTrue(arguments.get(0).getPayload().isReadable()); + assertEquals(290, arguments.get(0).getPayload().getReadableBytes()); + + ProtonBuffer buffer = arguments.get(0).getPayload(); + Decoder decoder = CodecFactory.getDefaultDecoder(); + DecoderState decoderState = decoder.getCachedDecoderState(); + + final Header header = (Header) decoder.readObject(buffer, decoderState); + final DeliveryAnnotations deliveryAnnotations = (DeliveryAnnotations) decoder.readObject(buffer, decoderState); + final MessageAnnotations messageAnnotations = (MessageAnnotations) decoder.readObject(buffer, decoderState); + final Properties properties = (Properties) decoder.readObject(buffer, decoderState); + final Section body = (Section) decoder.readObject(buffer, decoderState); + + assertNotNull(header); + assertNotNull(deliveryAnnotations); + assertNotNull(messageAnnotations); + assertNotNull(properties); + assertNotNull(body); + } + + private byte[] convertCaptureToByteArray(String capture) throws Exception { + final String[] hexStrings = capture.split(" "); + final byte[] capturedBytes = new byte[hexStrings.length]; + + for(int i = 0; i < hexStrings.length; ++i) { + capturedBytes[i] = (byte) Integer.parseUnsignedInt(hexStrings[i], 16); + } + + return capturedBytes; + } + private ProtonFrameDecodingHandler createFrameDecoder() { ProtonEngineConfiguration configuration = Mockito.mock(ProtonEngineConfiguration.class); Mockito.when(configuration.getInboundMaxFrameSize()).thenReturn(Long.valueOf(65535));