Skip to content

Commit

Permalink
PROTON-2826 Remove timed out sends from blocked queue
Browse files Browse the repository at this point in the history
Ensure that sends that time out waiting for credit are removed from the
blocked queue and also abort any partial send that was blocked waiting
for credit to ensure no leak of deliveries from the sender link.
  • Loading branch information
tabish121 committed Jun 7, 2024
1 parent 2c49b28 commit 1ead4de
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ private ClientReceiver selectNextAvailable() {
return result != null ? result : selectFirstAvailable();
}

@SuppressWarnings("resource")
private ClientReceiver selectFirstAvailable() {
return session.getProtonSession().receivers().stream()
.filter((r) -> r.getLinkedResource() instanceof ClientReceiver &&
Expand All @@ -171,6 +172,7 @@ private ClientReceiver selectFirstAvailable() {
.orElse(null);
}

@SuppressWarnings("resource")
private ClientReceiver selectLargestBacklog() {
return session.getProtonSession().receivers().stream()
.filter((r) -> r.getLinkedResource() instanceof ClientReceiver &&
Expand All @@ -180,6 +182,7 @@ private ClientReceiver selectLargestBacklog() {
.orElse(null);
}

@SuppressWarnings("resource")
private ClientReceiver selectSmallestBacklog() {
return session.getProtonSession().receivers().stream()
.filter((r) -> r.getLinkedResource() instanceof ClientReceiver &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,23 +131,23 @@ protected Sender self() {
}

private void addToTailOfBlockedQueue(ClientOutgoingEnvelope send) {
blocked.addLast(send);
if (options.sendTimeout() > 0 && send.sendTimeout() == null) {
send.sendTimeout(executor.schedule(() -> {
blocked.remove(send);
send.failed(send.createSendTimedOutException());
}, options.sendTimeout(), TimeUnit.MILLISECONDS));
}

blocked.addLast(send);
}

private void addToHeadOfBlockedQueue(ClientOutgoingEnvelope send) {
blocked.addFirst(send);
if (options.sendTimeout() > 0 && send.sendTimeout() == null) {
send.sendTimeout(executor.schedule(() -> {
blocked.remove(send);
send.failed(send.createSendTimedOutException());
}, options.sendTimeout(), TimeUnit.MILLISECONDS));
}

blocked.addFirst(send);
}

private Tracker sendMessage(AdvancedMessage<?> message, Map<String, Object> deliveryAnnotations, boolean waitForCredit) throws ClientException {
Expand Down Expand Up @@ -311,6 +311,14 @@ public ClientOutgoingEnvelope failed(ClientException exception) {
sendTimeout.cancel(true);
}

if (delivery != null) {
try {
delivery.abort();
} catch (Exception ex) {
// Attempted abort could fail if offline so we ignore it.
}
}

payload.close();

request.failed(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,23 +131,23 @@ protected StreamSender self() {
}

private void addToTailOfBlockedQueue(ClientOutgoingEnvelope send) {
blocked.addLast(send);
if (options.sendTimeout() > 0 && send.sendTimeout() == null) {
send.sendTimeout(executor.schedule(() -> {
blocked.remove(send);
send.failed(send.createSendTimedOutException());
}, options.sendTimeout(), TimeUnit.MILLISECONDS));
}

blocked.addLast(send);
}

private void addToHeadOfBlockedQueue(ClientOutgoingEnvelope send) {
blocked.addFirst(send);
if (options.sendTimeout() > 0 && send.sendTimeout() == null) {
send.sendTimeout(executor.schedule(() -> {
blocked.remove(send);
send.failed(send.createSendTimedOutException());
}, options.sendTimeout(), TimeUnit.MILLISECONDS));
}

blocked.addFirst(send);
}

private StreamTracker sendMessage(AdvancedMessage<?> message, Map<String, Object> deliveryAnnotations, boolean waitForCredit) throws ClientException {
Expand Down Expand Up @@ -499,6 +499,14 @@ public ClientOutgoingEnvelope failed(ClientException exception) {
sendTimeout.cancel(true);
}

if (delivery != null) {
try {
delivery.abort();
} catch (Exception ex) {
// Attempted abort could fail if offline so we ignore it.
}
}

if (payload != null) {
payload.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ public void operationComplete(Future<? extends Void> future) throws Exception {
super.channelActive(context);
}

@SuppressWarnings("resource")
@Override
protected void messageReceived(ChannelHandlerContext ctx, Object message) throws Exception {
LOG.trace("New data read: incoming: {}", message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,60 @@ public void testSendTimesOutWhenNoCreditIssued() throws Exception {
}
}

@Test
public void testSendTimesOutWhenNoCreditIssuedAndThenIssueCredit() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.start();

URI remoteURI = peer.getServerURI();

LOG.info("Sender test started, peer listening on: {}", remoteURI);

Client container = Client.create();
ConnectionOptions options = new ConnectionOptions();
options.sendTimeout(10);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
Session session = connection.openSession();
Sender sender = session.openSender("test-queue");
sender.openFuture().get(10, TimeUnit.SECONDS);

Message<String> message = Message.create("Hello World");
try {
sender.send(message);
fail("Should throw a send timed out exception");
} catch (ClientSendTimedOutException ex) {
// Expected error, ignore
}

peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.remoteFlow().withLinkCredit(1).now();
peer.expectAttach().ofSender().respond();
peer.expectTransfer().withMessage().withValue("Hello World 2");
peer.expectDetach().respond();
peer.expectClose().respond();

// Ensure the send happens after the remote has sent a flow with credit
session.openSender("test-queue-2").openFuture().get();

try {
sender.send(Message.create("Hello World 2"));
} catch (ClientException ex) {
LOG.trace("Error on second send", ex);
fail("Should not throw an exception");
}

sender.closeAsync().get(10, TimeUnit.SECONDS);

connection.closeAsync().get(10, TimeUnit.SECONDS);

peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}

@Test
public void testSendCompletesWhenCreditEventuallyOffered() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
Expand Down Expand Up @@ -1999,6 +2053,62 @@ void testAutoFlushDuringWriteWithRollingIncomingWindowUpdates() throws Exception
}
}

@Test
public void testSendTimesOutIfNotAllMessageFramesCanBeSent() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().withNextOutgoingId(0).respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withIncomingWindow(2).withNextIncomingId(0).withLinkCredit(1).queue();
peer.expectTransfer().withDeliveryId(0).withNonNullPayload().withMore(true);
peer.expectTransfer().withNonNullPayload().withMore(true);
peer.expectTransfer().withNullPayload().withAborted(true);
peer.start();

URI remoteURI = peer.getServerURI();

LOG.info("Test started, peer listening on: {}", remoteURI);

Client container = Client.create();
ConnectionOptions options = new ConnectionOptions().maxFrameSize(1024);
options.sendTimeout(25);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
Sender sender = connection.openSender("test-queue").openFuture().get();

final byte[] payload = new byte[4800];
Arrays.fill(payload, (byte) 1);

try {
sender.send(Message.create(payload));
} catch (ClientSendTimedOutException e) {
LOG.trace("send failed with expected error: ", e);
}

peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(4).withLinkCredit(1).now();
peer.expectAttach().ofSender().respond();
peer.expectTransfer().withDeliveryId(1).withMessage().withValue("Hello World 2");
peer.expectDetach().respond();
peer.expectClose().respond();

// Ensure the send happens after the remote has sent a flow with credit
connection.openSender("test-queue-2").openFuture().get();

try {
sender.send(Message.create("Hello World 2"));
} catch (ClientException ex) {
LOG.trace("Error on second send", ex);
fail("Should not throw an exception");
}

sender.closeAsync().get();
connection.closeAsync().get();

peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}

@Test
void testConcurrentSendOnlyBlocksForInitialSendInProgress() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.qpid.protonj2.client.StreamTracker;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
import org.apache.qpid.protonj2.client.exceptions.ClientSendTimedOutException;
import org.apache.qpid.protonj2.client.exceptions.ClientUnsupportedOperationException;
import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
import org.apache.qpid.protonj2.client.test.Wait;
Expand Down Expand Up @@ -2918,4 +2919,59 @@ public void testCannotCreateSenderWhenTagGeneratorReturnsNull() throws Exception
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}

@Test
public void testSendTimesOutWhenNoCreditIssuedAndThenIssueCredit() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.start();

URI remoteURI = peer.getServerURI();

LOG.info("Sender test started, peer listening on: {}", remoteURI);

Client container = Client.create();
ConnectionOptions options = new ConnectionOptions();
options.sendTimeout(10);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
StreamSender sender = connection.openStreamSender("test-queue");
sender.openFuture().get(10, TimeUnit.SECONDS);

Message<String> message = Message.create("Hello World");
try {
sender.send(message);
fail("Should throw a send timed out exception");
} catch (ClientSendTimedOutException ex) {
// Expected error, ignore
}

peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.remoteFlow().withLinkCredit(1).now();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.expectTransfer().withMessage().withValue("Hello World 2");
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();

// Ensure the send happens after the remote has sent a flow with credit
connection.openSender("test-queue-2").openFuture().get();

try {
sender.send(Message.create("Hello World 2"));
} catch (ClientException ex) {
LOG.trace("Error on second send", ex);
fail("Should not throw an exception");
}

sender.closeAsync().get(10, TimeUnit.SECONDS);

connection.closeAsync().get(10, TimeUnit.SECONDS);

peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
}

0 comments on commit 1ead4de

Please sign in to comment.