From 80e4d8a2d4683142246d005191328ad391044025 Mon Sep 17 00:00:00 2001 From: Mike Jensen Date: Tue, 18 Jun 2019 12:15:29 -0600 Subject: [PATCH] HTTPClient: Fix condition after timeout where client may remain queued or in-progress (#31) This adds unit tests which originally failed due to the client remaining in-progress or in queue. The solution is to add a failure listener which will attempt to remove from queue or in-progress if the client is still active or queued. --- .../litesockets/client/http/HTTPClient.java | 54 ++++++++++++++----- .../client/http/HTTPClientTests.java | 40 +++++++++++--- gradle.properties | 6 +-- 3 files changed, 77 insertions(+), 23 deletions(-) diff --git a/client/src/main/java/org/threadly/litesockets/client/http/HTTPClient.java b/client/src/main/java/org/threadly/litesockets/client/http/HTTPClient.java index 57c9b6e..dd99666 100644 --- a/client/src/main/java/org/threadly/litesockets/client/http/HTTPClient.java +++ b/client/src/main/java/org/threadly/litesockets/client/http/HTTPClient.java @@ -322,7 +322,7 @@ public ListenableFuture requestAsync(final ClientHTTPRequest r return hrw.slf; } - private void processQueue() { + protected void processQueue() { //This should be done after we do a .select on the ntse to check for more jobs before it exits. HTTPRequestWrapper hrw; while(maxConcurrent > inProcess.size() && (hrw = queue.poll()) != null) { @@ -337,8 +337,16 @@ private void process(HTTPRequestWrapper hrw) { } try { hrw.requestStarting(); - hrw.client = getTCPClient(hrw.chr.getHTTPAddress()); - inProcess.put(hrw.client, hrw); + TCPClient freshClient = getTCPClient(hrw.chr.getHTTPAddress()); + hrw.client = freshClient; + inProcess.put(freshClient, hrw); + if (hrw.slf.isDone()) { // check if completed early, likely a timeout + // since timeout logic may have missed cleanup we must ensure cleanup is done now + // TODO - see if we can improve this by avoiding the extra check + inProcess.remove(freshClient, hrw); + addBackTCPClient(hrw.chr.getHTTPAddress(), freshClient); // if client is cleaned up this will ignore + return; + } SimpleMergedByteBuffers writeBuffer; if (hrw.chr.getBodyBuffer() == null) { writeBuffer = new SimpleMergedByteBuffers(false, @@ -487,20 +495,18 @@ public void run() { private class MainClientProcessor implements Reader, ClientCloseListener { @Override public void onClose(Client client) { - HTTPRequestWrapper hrw = inProcess.get(client); + HTTPRequestWrapper hrw = inProcess.remove(client); client.close(); if(hrw != null) { boolean wasProcessing = hrw.hrp.isProcessing(); hrw.hrp.connectionClosed(); if(! hrw.slf.isDone() && ! wasProcessing) { - hrw.client = null; - process(hrw); + process(hrw); } else { hrw.slf.setFailure(new HTTPParsingException("Did not get complete body!")); } } - inProcess.remove(client); tcpClients.remove(client); } @@ -544,6 +550,18 @@ private HTTPRequestWrapper(ClientHTTPRequest chr) { this.chr = chr; sei.watchFuture(slf, chr.getTimeoutMS()); + slf.failureCallback((t) -> { + if (queue.remove(this)) { + // was likely a timeout, avoid leaving the request suck in the queue + } else { + // since it was not in the queue, we need to release the client from being in-process + TCPClient client = this.client; + this.client = null; + if (client != null) { // may have already been cleaned up + client.close(); + } + } + }); } public void requestStarting() { @@ -573,6 +591,7 @@ public void headersFinished(HTTPResponse hr) { public void bodyData(ByteBuffer bb) { responseMBB.add(bb); if(responseMBB.remaining() > maxResponseSize) { + TCPClient client = this.client; // will be `null` after error slf.setFailure(new HTTPParsingException("Response Body to large!")); client.close(); } @@ -584,17 +603,26 @@ public void finished() { slf.setResult(new HTTPResponseData(HTTPClient.this, chr.getHTTPRequest(), response, responseMBB.duplicateAndClean())); hrp.removeHTTPResponseCallback(this); - inProcess.remove(client); - addBackTCPClient(chr.getHTTPAddress(), client); + TCPClient client = this.client; + this.client = null; + if (client != null && inProcess.remove(client, this)) { + addBackTCPClient(chr.getHTTPAddress(), client); + } processQueue(); } @Override public void hasError(Throwable t) { - if (hrp.isProcessing()) { - slf.setFailure(t); - } // if not processing we likely got a close that can work after a retry - client.close(); + // since it was not in the queue, we need to release the client from being in-process + TCPClient client = this.client; + if (client != null) { // may have already been cleaned up + this.client = null; + + if (hrp.isProcessing()) { + slf.setFailure(t); + } // if not processing we likely got a close that can work after a retry + client.close(); + } } @Override diff --git a/client/src/test/java/org/threadly/litesockets/client/http/HTTPClientTests.java b/client/src/test/java/org/threadly/litesockets/client/http/HTTPClientTests.java index ca76b3b..7ac5736 100644 --- a/client/src/test/java/org/threadly/litesockets/client/http/HTTPClientTests.java +++ b/client/src/test/java/org/threadly/litesockets/client/http/HTTPClientTests.java @@ -165,8 +165,6 @@ public void handleFailure(Throwable t) { for(int i=0; i httpClient.getRequestQueueSize() == 0).blockTillTrue(1_000); + new TestCondition(() -> httpClient.getInProgressSize() == 0).blockTillTrue(1_000); + } + } + + @Test + public void timeoutQueuedRequest() throws IOException, HTTPParsingException { + int port = PortUtils.findTCPPort(); + TCPServer server = SEI.createTCPServer("localhost", port); + server.start(); + final HTTPRequestBuilder hrb = new HTTPRequestBuilder(new URL("http://localhost:"+port)); + hrb.setBody(IOUtils.EMPTY_BYTEBUFFER); + hrb.setTimeout(200, TimeUnit.MILLISECONDS); + final HTTPClient httpClient = new HTTPClient() { + @Override + protected void processQueue() { + // queue is never processed + } + }; + httpClient.start(); + try{ + httpClient.request(hrb.buildClientHTTPRequest()); + fail(); + } catch(HTTPParsingException e) { + assertEquals("HTTP Timeout!", e.getMessage()); + // below conditions may be slightly async due to future getting a result before listeners are invoked + new TestCondition(() -> httpClient.getRequestQueueSize() == 0).blockTillTrue(1_000); + new TestCondition(() -> httpClient.getInProgressSize() == 0).blockTillTrue(1_000); } } @@ -412,7 +439,7 @@ public void timeOut() throws IOException, InterruptedException, ExecutionExcepti server.setClientAcceptor(new ClientAcceptor() { @Override public void accept(Client c) { - System.out.println("new Client!"); + //System.out.println("new Client!"); }}); server.start(); final HTTPRequestBuilder hrb = new HTTPRequestBuilder(new URL("http://localhost:"+port)); @@ -427,7 +454,6 @@ public void accept(Client c) { try{ httpClient.request(chr); } catch(Exception e) { - System.out.println(System.currentTimeMillis() - start); assertTrue(System.currentTimeMillis() - start < 700); return; } diff --git a/gradle.properties b/gradle.properties index fadc804..424dc34 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,7 +1,7 @@ group = org.threadly -version = 0.23-SNAPSHOT -threadlyVersion = 5.34 -litesocketsVersion = 4.9 +version = 0.23 +threadlyVersion = 5.37 +litesocketsVersion = 4.10 org.gradle.parallel=false junitVersion = 4.12