Skip to content

Commit

Permalink
PROTON-2809 Ensure codec and buffers handle null Binary data payloads
Browse files Browse the repository at this point in the history
Ensure that an null Data section encoding is encoded and decoded
consistently and fix issue in composite buffer decoding empty Binary
bodies when split at end of buffer.
  • Loading branch information
tabish121 committed Mar 18, 2024
1 parent 7ded26e commit b5bc7fe
Show file tree
Hide file tree
Showing 12 changed files with 366 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1287,4 +1287,95 @@ public void testSendMessageWithMultipleDataSections() throws Exception {
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}

@Test
public void testSendMessageWithNullStringValuePassedToCreate() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(10).queue();
peer.expectAttach().respond(); // Open a receiver to ensure sender link has processed
peer.expectFlow(); // the inbound flow frame we sent previously before send.
peer.start();

URI remoteURI = peer.getServerURI();

LOG.info("Sender test started, peer listening on: {}", remoteURI);

Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();

Session session = connection.openSession().openFuture().get();
SenderOptions options = new SenderOptions().deliveryMode(DeliveryMode.AT_MOST_ONCE);
Sender sender = session.openSender("test-qos", options);

// Gates send on remote flow having been sent and received
session.openReceiver("dummy").openFuture().get();

peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withMessage().withMessageFormat(0).withValue(null);
peer.expectDetach().respond();
peer.expectClose().respond();

final Message<String> message = Message.create((String) null);
final Tracker tracker = sender.send(message);

assertNotNull(tracker);
assertNotNull(tracker.settlementFuture().isDone());
assertNotNull(tracker.settlementFuture().get().settled());

sender.closeAsync().get(10, TimeUnit.SECONDS);

connection.closeAsync().get(10, TimeUnit.SECONDS);

peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}

@Test
public void testSendMessageWithNullByteArrayPassedToCreate() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(10).queue();
peer.expectAttach().respond(); // Open a receiver to ensure sender link has processed
peer.expectFlow(); // the inbound flow frame we sent previously before send.
peer.start();

URI remoteURI = peer.getServerURI();

LOG.info("Sender test started, peer listening on: {}", remoteURI);

Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();

Session session = connection.openSession().openFuture().get();
SenderOptions options = new SenderOptions().deliveryMode(DeliveryMode.AT_MOST_ONCE);
Sender sender = session.openSender("test-qos", options);

// Gates send on remote flow having been sent and received
session.openReceiver("dummy").openFuture().get();

peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withMessage().withMessageFormat(0).withData(null);
peer.expectDetach().respond();
peer.expectClose().respond();

final Message<byte[]> message = Message.create((byte[]) null);
final Tracker tracker = sender.send(message);

assertNotNull(tracker);
assertNotNull(tracker.settlementFuture().isDone());
assertNotNull(tracker.settlementFuture().get().settled());

sender.closeAsync().get(10, TimeUnit.SECONDS);
connection.closeAsync().get(10, TimeUnit.SECONDS);

peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3152,4 +3152,102 @@ public void doTestReceiveAcceptsNegtiveValuesAsInfiniteTimeouts(long timeout, Ti
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}

@Test
public void testReceiveMessageWithNullWrappedInAmqpValue() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withMessageFormat(0)
.withMessage().withBody().withValue((String) null)
.also()
.splitWrite(true)
.afterDelay(25)
.queue();
peer.expectDisposition().withFirst(0)
.withSettled(true)
.withState().accepted();
peer.start();

URI remoteURI = peer.getServerURI();

LOG.info("Test started, peer listening on: {}", remoteURI);

final Client container = Client.create();
final ConnectionOptions options = new ConnectionOptions();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
final Receiver receiver = connection.openReceiver("test-queue");
final Delivery delivery = receiver.receive();
final Message<String> message = delivery.message();

assertNull(message.body());

peer.waitForScriptToComplete();
peer.expectDetach().respond();
peer.expectClose().respond();

assertNotNull(delivery);

receiver.close();
connection.close();

peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}

@Test
public void testReceiveMessageWithNullWrappedInDataSection() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withMessageFormat(0)
.withMessage().withBody().withData((byte[]) null)
.also()
.splitWrite(true)
.afterDelay(25)
.queue();
peer.expectDisposition().withFirst(0)
.withSettled(true)
.withState().accepted();
peer.start();

URI remoteURI = peer.getServerURI();

LOG.info("Test started, peer listening on: {}", remoteURI);

final Client container = Client.create();
final ConnectionOptions options = new ConnectionOptions();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
final Receiver receiver = connection.openReceiver("test-queue");
final Delivery delivery = receiver.receive();
final Message<byte[]> message = delivery.message();

assertNull(message.body());

peer.waitForScriptToComplete();
peer.expectDetach().respond();
peer.expectClose().respond();

assertNotNull(delivery);

receiver.close();
connection.close();

peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ public BodySectionBuilder withValue(Object body) {
}

public BodySectionBuilder withValue(byte[] body) {
TransferInjectAction.this.body = new AmqpValue(new Binary(body));
TransferInjectAction.this.body = new AmqpValue(body == null ? null : new Binary(body));
return this;
}

Expand All @@ -527,7 +527,7 @@ public BodySectionBuilder withValue(Binary body) {
}

public BodySectionBuilder withData(byte[] body) {
TransferInjectAction.this.body = new Data(new Binary(body));
TransferInjectAction.this.body = new Data(body == null ? null : new Binary(body));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ public class AmqpValue implements DescribedType {
public static final UnsignedLong DESCRIPTOR_CODE = UnsignedLong.valueOf(0x0000000000000077L);
public static final Symbol DESCRIPTOR_SYMBOL = Symbol.valueOf("amqp:amqp-value:*");

private Object described;
private final Object described;

public AmqpValue() {
this.described = null;
}

public AmqpValue(Object described) {
this.described = described;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ public class Data implements DescribedType {
public static final UnsignedLong DESCRIPTOR_CODE = UnsignedLong.valueOf(0x0000000000000075L);
public static final Symbol DESCRIPTOR_SYMBOL = Symbol.valueOf("amqp:data:binary");

private Binary described;
private final Binary described;

public Data(Binary described) {
if (described == null) {
throw new IllegalArgumentException("provided Binary must not be null");
}
public Data() {
this.described = null;
}

public Data(Binary described) {
this.described = described;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ public Binary() {
}

public Binary(final byte[] data) {
this(data, 0, data.length);
this(data, 0, data != null ? data.length : 0);
}

public Binary(final byte[] data, final int offset, final int length) {
this.buffer = Arrays.copyOfRange(data, offset, offset + length);
this.buffer = data != null ? Arrays.copyOfRange(data, offset, offset + length) : null;
}

public Binary copy() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.qpid.protonj2.test.driver.matchers.types;

import java.nio.ByteBuffer;

import org.apache.qpid.protonj2.test.driver.codec.messaging.AmqpValue;
import org.apache.qpid.protonj2.test.driver.codec.messaging.Data;
import org.apache.qpid.protonj2.test.driver.codec.primitives.Binary;
Expand All @@ -35,7 +37,7 @@ public class EncodedDataMatcher extends EncodedAmqpTypeMatcher {
* the value that is expected to be IN the received {@link Data}
*/
public EncodedDataMatcher(byte[] expectedValue) {
this(new Binary(expectedValue), false);
this(expectedValue != null ? new Binary(expectedValue) : null, false);
}

/**
Expand All @@ -54,7 +56,7 @@ public EncodedDataMatcher(Binary expectedValue) {
* consuming the {@link AmqpValue}
*/
public EncodedDataMatcher(byte[] expectedValue, boolean permitTrailingBytes) {
this(new Binary(expectedValue), permitTrailingBytes);
this(expectedValue != null ? new Binary(expectedValue) : null, permitTrailingBytes);
}

/**
Expand All @@ -68,6 +70,11 @@ public EncodedDataMatcher(Binary expectedValue, boolean permitTrailingBytes) {
super(DESCRIPTOR_SYMBOL, DESCRIPTOR_CODE, expectedValue, permitTrailingBytes);
}

@Override
protected boolean matchesSafely(ByteBuffer receivedBinary) {
return super.matchesSafely(receivedBinary);
}

@Override
public void describeTo(Description description) {
description.appendText("a Binary encoding of a Data that wraps a Binary containing: ").appendValue(getExpectedValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ public void copyInto(int offset, ByteBuffer destination, int destOffset, int len
throw generateIndexOutOfBounds(offset + length, false);
}

int lastAccessedChunk = findChunkWithIndex(offset);
int lastAccessedChunk = length > 0 ? findChunkWithIndex(offset) : 0;

while (length > 0) {
final ProtonBuffer buffer = buffers[lastAccessedChunk];
Expand Down Expand Up @@ -384,7 +384,7 @@ public void copyInto(int offset, ProtonBuffer destination, int destOffset, int l
throw generateIndexOutOfBounds(offset + length, false);
}

int lastAccessedChunk = findChunkWithIndex(offset);
int lastAccessedChunk = length > 0 ? findChunkWithIndex(offset) : 0;

while (length > 0) {
final ProtonBuffer buffer = buffers[lastAccessedChunk];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,24 @@ public Symbol getDescriptorSymbol() {
public void writeType(ProtonBuffer buffer, EncoderState state, Data value) {
buffer.writeBytes(DATA_PREAMBLE);

final int dataLength = value.getDataLength();

if (dataLength > 255) {
buffer.ensureWritable(dataLength + Long.BYTES);
buffer.writeByte(EncodingCodes.VBIN32);
buffer.writeInt(dataLength);
if (value.hasBinary()) {
final int dataLength = value.getDataLength();

if (dataLength > 255) {
buffer.ensureWritable(dataLength + Long.BYTES);
buffer.writeByte(EncodingCodes.VBIN32);
buffer.writeInt(dataLength);
} else {
buffer.ensureWritable(dataLength + Short.BYTES);
buffer.writeByte(EncodingCodes.VBIN8);
buffer.writeByte((byte) dataLength);
}

value.copyTo(buffer);
} else {
buffer.ensureWritable(dataLength + Short.BYTES);
buffer.writeByte(EncodingCodes.VBIN8);
buffer.writeByte((byte) dataLength);
buffer.ensureWritable(1);
buffer.writeByte(EncodingCodes.NULL);
}

value.copyTo(buffer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public Data copy() {
return new Data(buffer == null ? null : buffer.copy(true));
}

public boolean hasBinary() {
return buffer != null;
}

public Binary getBinary() {
if (cachedBinary != null || buffer == null) {
return cachedBinary;
Expand Down
Loading

0 comments on commit b5bc7fe

Please sign in to comment.