diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNextReceiverSelector.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNextReceiverSelector.java index a6f04b5c..8d8e8619 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNextReceiverSelector.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNextReceiverSelector.java @@ -162,6 +162,7 @@ private ClientReceiver selectNextAvailable() { return result != null ? result : selectFirstAvailable(); } + @SuppressWarnings("resource") private ClientReceiver selectFirstAvailable() { return session.getProtonSession().receivers().stream() .filter((r) -> r.getLinkedResource() instanceof ClientReceiver && @@ -171,6 +172,7 @@ private ClientReceiver selectFirstAvailable() { .orElse(null); } + @SuppressWarnings("resource") private ClientReceiver selectLargestBacklog() { return session.getProtonSession().receivers().stream() .filter((r) -> r.getLinkedResource() instanceof ClientReceiver && @@ -180,6 +182,7 @@ private ClientReceiver selectLargestBacklog() { .orElse(null); } + @SuppressWarnings("resource") private ClientReceiver selectSmallestBacklog() { return session.getProtonSession().receivers().stream() .filter((r) -> r.getLinkedResource() instanceof ClientReceiver && diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java index a2df99d6..ae78c60e 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java @@ -131,23 +131,23 @@ protected Sender self() { } private void addToTailOfBlockedQueue(ClientOutgoingEnvelope send) { + blocked.addLast(send); if (options.sendTimeout() > 0 && send.sendTimeout() == null) { send.sendTimeout(executor.schedule(() -> { + blocked.remove(send); send.failed(send.createSendTimedOutException()); }, options.sendTimeout(), TimeUnit.MILLISECONDS)); } - - blocked.addLast(send); } private void addToHeadOfBlockedQueue(ClientOutgoingEnvelope send) { + blocked.addFirst(send); if (options.sendTimeout() > 0 && send.sendTimeout() == null) { send.sendTimeout(executor.schedule(() -> { + blocked.remove(send); send.failed(send.createSendTimedOutException()); }, options.sendTimeout(), TimeUnit.MILLISECONDS)); } - - blocked.addFirst(send); } private Tracker sendMessage(AdvancedMessage message, Map deliveryAnnotations, boolean waitForCredit) throws ClientException { @@ -311,6 +311,14 @@ public ClientOutgoingEnvelope failed(ClientException exception) { sendTimeout.cancel(true); } + if (delivery != null) { + try { + delivery.abort(); + } catch (Exception ex) { + // Attempted abort could fail if offline so we ignore it. + } + } + payload.close(); request.failed(exception); diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java index b158e072..58d21891 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java @@ -131,23 +131,23 @@ protected StreamSender self() { } private void addToTailOfBlockedQueue(ClientOutgoingEnvelope send) { + blocked.addLast(send); if (options.sendTimeout() > 0 && send.sendTimeout() == null) { send.sendTimeout(executor.schedule(() -> { + blocked.remove(send); send.failed(send.createSendTimedOutException()); }, options.sendTimeout(), TimeUnit.MILLISECONDS)); } - - blocked.addLast(send); } private void addToHeadOfBlockedQueue(ClientOutgoingEnvelope send) { + blocked.addFirst(send); if (options.sendTimeout() > 0 && send.sendTimeout() == null) { send.sendTimeout(executor.schedule(() -> { + blocked.remove(send); send.failed(send.createSendTimedOutException()); }, options.sendTimeout(), TimeUnit.MILLISECONDS)); } - - blocked.addFirst(send); } private StreamTracker sendMessage(AdvancedMessage message, Map deliveryAnnotations, boolean waitForCredit) throws ClientException { @@ -499,6 +499,14 @@ public ClientOutgoingEnvelope failed(ClientException exception) { sendTimeout.cancel(true); } + if (delivery != null) { + try { + delivery.abort(); + } catch (Exception ex) { + // Attempted abort could fail if offline so we ignore it. + } + } + if (payload != null) { payload.close(); } diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty5/WebSocketTransport.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty5/WebSocketTransport.java index 59337461..0c7fb599 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty5/WebSocketTransport.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty5/WebSocketTransport.java @@ -169,6 +169,7 @@ public void operationComplete(Future future) throws Exception { super.channelActive(context); } + @SuppressWarnings("resource") @Override protected void messageReceived(ChannelHandlerContext ctx, Object message) throws Exception { LOG.trace("New data read: incoming: {}", message); diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java index 7a576ef7..a16b360c 100644 --- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java +++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java @@ -561,6 +561,60 @@ public void testSendTimesOutWhenNoCreditIssued() throws Exception { } } + @Test + public void testSendTimesOutWhenNoCreditIssuedAndThenIssueCredit() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender().respond(); + peer.start(); + + URI remoteURI = peer.getServerURI(); + + LOG.info("Sender test started, peer listening on: {}", remoteURI); + + Client container = Client.create(); + ConnectionOptions options = new ConnectionOptions(); + options.sendTimeout(10); + Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options); + Session session = connection.openSession(); + Sender sender = session.openSender("test-queue"); + sender.openFuture().get(10, TimeUnit.SECONDS); + + Message message = Message.create("Hello World"); + try { + sender.send(message); + fail("Should throw a send timed out exception"); + } catch (ClientSendTimedOutException ex) { + // Expected error, ignore + } + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.remoteFlow().withLinkCredit(1).now(); + peer.expectAttach().ofSender().respond(); + peer.expectTransfer().withMessage().withValue("Hello World 2"); + peer.expectDetach().respond(); + peer.expectClose().respond(); + + // Ensure the send happens after the remote has sent a flow with credit + session.openSender("test-queue-2").openFuture().get(); + + try { + sender.send(Message.create("Hello World 2")); + } catch (ClientException ex) { + LOG.trace("Error on second send", ex); + fail("Should not throw an exception"); + } + + sender.closeAsync().get(10, TimeUnit.SECONDS); + + connection.closeAsync().get(10, TimeUnit.SECONDS); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + } + } + @Test public void testSendCompletesWhenCreditEventuallyOffered() throws Exception { try (ProtonTestServer peer = new ProtonTestServer()) { @@ -1999,6 +2053,62 @@ void testAutoFlushDuringWriteWithRollingIncomingWindowUpdates() throws Exception } } + @Test + public void testSendTimesOutIfNotAllMessageFramesCanBeSent() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().withNextOutgoingId(0).respond(); + peer.expectAttach().ofSender().respond(); + peer.remoteFlow().withIncomingWindow(2).withNextIncomingId(0).withLinkCredit(1).queue(); + peer.expectTransfer().withDeliveryId(0).withNonNullPayload().withMore(true); + peer.expectTransfer().withNonNullPayload().withMore(true); + peer.expectTransfer().withNullPayload().withAborted(true); + peer.start(); + + URI remoteURI = peer.getServerURI(); + + LOG.info("Test started, peer listening on: {}", remoteURI); + + Client container = Client.create(); + ConnectionOptions options = new ConnectionOptions().maxFrameSize(1024); + options.sendTimeout(25); + Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options); + Sender sender = connection.openSender("test-queue").openFuture().get(); + + final byte[] payload = new byte[4800]; + Arrays.fill(payload, (byte) 1); + + try { + sender.send(Message.create(payload)); + } catch (ClientSendTimedOutException e) { + LOG.trace("send failed with expected error: ", e); + } + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(4).withLinkCredit(1).now(); + peer.expectAttach().ofSender().respond(); + peer.expectTransfer().withDeliveryId(1).withMessage().withValue("Hello World 2"); + peer.expectDetach().respond(); + peer.expectClose().respond(); + + // Ensure the send happens after the remote has sent a flow with credit + connection.openSender("test-queue-2").openFuture().get(); + + try { + sender.send(Message.create("Hello World 2")); + } catch (ClientException ex) { + LOG.trace("Error on second send", ex); + fail("Should not throw an exception"); + } + + sender.closeAsync().get(); + connection.closeAsync().get(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + } + } + @Test void testConcurrentSendOnlyBlocksForInitialSendInProgress() throws Exception { try (ProtonTestServer peer = new ProtonTestServer()) { diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java index 07d6d91f..ab69063c 100644 --- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java +++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java @@ -59,6 +59,7 @@ import org.apache.qpid.protonj2.client.StreamTracker; import org.apache.qpid.protonj2.client.exceptions.ClientException; import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException; +import org.apache.qpid.protonj2.client.exceptions.ClientSendTimedOutException; import org.apache.qpid.protonj2.client.exceptions.ClientUnsupportedOperationException; import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase; import org.apache.qpid.protonj2.client.test.Wait; @@ -2918,4 +2919,59 @@ public void testCannotCreateSenderWhenTagGeneratorReturnsNull() throws Exception peer.waitForScriptToComplete(5, TimeUnit.SECONDS); } } + + @Test + public void testSendTimesOutWhenNoCreditIssuedAndThenIssueCredit() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender().respond(); + peer.start(); + + URI remoteURI = peer.getServerURI(); + + LOG.info("Sender test started, peer listening on: {}", remoteURI); + + Client container = Client.create(); + ConnectionOptions options = new ConnectionOptions(); + options.sendTimeout(10); + Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options); + StreamSender sender = connection.openStreamSender("test-queue"); + sender.openFuture().get(10, TimeUnit.SECONDS); + + Message message = Message.create("Hello World"); + try { + sender.send(message); + fail("Should throw a send timed out exception"); + } catch (ClientSendTimedOutException ex) { + // Expected error, ignore + } + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.remoteFlow().withLinkCredit(1).now(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender().respond(); + peer.expectTransfer().withMessage().withValue("Hello World 2"); + peer.expectDetach().respond(); + peer.expectEnd().respond(); + peer.expectClose().respond(); + + // Ensure the send happens after the remote has sent a flow with credit + connection.openSender("test-queue-2").openFuture().get(); + + try { + sender.send(Message.create("Hello World 2")); + } catch (ClientException ex) { + LOG.trace("Error on second send", ex); + fail("Should not throw an exception"); + } + + sender.closeAsync().get(10, TimeUnit.SECONDS); + + connection.closeAsync().get(10, TimeUnit.SECONDS); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + } + } }