Skip to content

Commit

Permalink
HTTPClient: Fix condition after timeout where client may remain queue…
Browse files Browse the repository at this point in the history
…d or in-progress (#31)

This adds unit tests which originally failed due to the client remaining in-progress or in queue.
The solution is to add a failure listener which will attempt to remove from queue or in-progress if the client is still active or queued.
  • Loading branch information
jentfoo authored Jun 18, 2019
1 parent 4728cc3 commit 80e4d8a
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ public ListenableFuture<HTTPResponseData> requestAsync(final ClientHTTPRequest r
return hrw.slf;
}

private void processQueue() {
protected void processQueue() {
//This should be done after we do a .select on the ntse to check for more jobs before it exits.
HTTPRequestWrapper hrw;
while(maxConcurrent > inProcess.size() && (hrw = queue.poll()) != null) {
Expand All @@ -337,8 +337,16 @@ private void process(HTTPRequestWrapper hrw) {
}
try {
hrw.requestStarting();
hrw.client = getTCPClient(hrw.chr.getHTTPAddress());
inProcess.put(hrw.client, hrw);
TCPClient freshClient = getTCPClient(hrw.chr.getHTTPAddress());
hrw.client = freshClient;
inProcess.put(freshClient, hrw);
if (hrw.slf.isDone()) { // check if completed early, likely a timeout
// since timeout logic may have missed cleanup we must ensure cleanup is done now
// TODO - see if we can improve this by avoiding the extra check
inProcess.remove(freshClient, hrw);
addBackTCPClient(hrw.chr.getHTTPAddress(), freshClient); // if client is cleaned up this will ignore
return;
}
SimpleMergedByteBuffers writeBuffer;
if (hrw.chr.getBodyBuffer() == null) {
writeBuffer = new SimpleMergedByteBuffers(false,
Expand Down Expand Up @@ -487,20 +495,18 @@ public void run() {
private class MainClientProcessor implements Reader, ClientCloseListener {
@Override
public void onClose(Client client) {
HTTPRequestWrapper hrw = inProcess.get(client);
HTTPRequestWrapper hrw = inProcess.remove(client);

client.close();
if(hrw != null) {
boolean wasProcessing = hrw.hrp.isProcessing();
hrw.hrp.connectionClosed();
if(! hrw.slf.isDone() && ! wasProcessing) {
hrw.client = null;
process(hrw);
process(hrw);
} else {
hrw.slf.setFailure(new HTTPParsingException("Did not get complete body!"));
}
}
inProcess.remove(client);
tcpClients.remove(client);
}

Expand Down Expand Up @@ -544,6 +550,18 @@ private HTTPRequestWrapper(ClientHTTPRequest chr) {
this.chr = chr;

sei.watchFuture(slf, chr.getTimeoutMS());
slf.failureCallback((t) -> {
if (queue.remove(this)) {
// was likely a timeout, avoid leaving the request suck in the queue
} else {
// since it was not in the queue, we need to release the client from being in-process
TCPClient client = this.client;
this.client = null;
if (client != null) { // may have already been cleaned up
client.close();
}
}
});
}

public void requestStarting() {
Expand Down Expand Up @@ -573,6 +591,7 @@ public void headersFinished(HTTPResponse hr) {
public void bodyData(ByteBuffer bb) {
responseMBB.add(bb);
if(responseMBB.remaining() > maxResponseSize) {
TCPClient client = this.client; // will be `null` after error
slf.setFailure(new HTTPParsingException("Response Body to large!"));
client.close();
}
Expand All @@ -584,17 +603,26 @@ public void finished() {
slf.setResult(new HTTPResponseData(HTTPClient.this, chr.getHTTPRequest(), response,
responseMBB.duplicateAndClean()));
hrp.removeHTTPResponseCallback(this);
inProcess.remove(client);
addBackTCPClient(chr.getHTTPAddress(), client);
TCPClient client = this.client;
this.client = null;
if (client != null && inProcess.remove(client, this)) {
addBackTCPClient(chr.getHTTPAddress(), client);
}
processQueue();
}

@Override
public void hasError(Throwable t) {
if (hrp.isProcessing()) {
slf.setFailure(t);
} // if not processing we likely got a close that can work after a retry
client.close();
// since it was not in the queue, we need to release the client from being in-process
TCPClient client = this.client;
if (client != null) { // may have already been cleaned up
this.client = null;

if (hrp.isProcessing()) {
slf.setFailure(t);
} // if not processing we likely got a close that can work after a retry
client.close();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,6 @@ public void handleFailure(Throwable t) {
for(int i=0; i<number; i++) {
CLIENT_PS.execute(run);
}
Thread.sleep(1000);
System.out.println(count.get());
new TestCondition(){
@Override
public boolean get() {
Expand Down Expand Up @@ -261,7 +259,7 @@ public void noContentLengthWithBody() throws IOException, HTTPParsingException {
final HTTPClient httpClient = new HTTPClient();
httpClient.start();
HTTPResponseData hrs = httpClient.request(hrb.buildClientHTTPRequest());
System.out.println(hrs.getResponse());
//System.out.println(hrs.getResponse());
assertEquals("TEST123", hrs.getBodyAsString());
}

Expand All @@ -275,7 +273,7 @@ public void contentLengthOnHeadRequest() throws IOException, HTTPParsingExceptio
final HTTPClient httpClient = new HTTPClient();
httpClient.start();
HTTPResponseData hrs = httpClient.request(hrb.buildClientHTTPRequest());
System.out.println(hrs.getResponse());
//System.out.println(hrs.getResponse());
assertEquals(CONTENT.length(), hrs.getContentLength());
assertEquals("", hrs.getBodyAsString());
}
Expand Down Expand Up @@ -305,14 +303,43 @@ public void timeoutRequest() throws IOException, HTTPParsingException {
server.start();
final HTTPRequestBuilder hrb = new HTTPRequestBuilder(new URL("http://localhost:"+port));
hrb.setBody(IOUtils.EMPTY_BYTEBUFFER);
hrb.setTimeout(500, TimeUnit.MILLISECONDS);
hrb.setTimeout(200, TimeUnit.MILLISECONDS);
final HTTPClient httpClient = new HTTPClient();
httpClient.start();
try{
httpClient.request(hrb.buildClientHTTPRequest());
fail();
} catch(HTTPParsingException e) {
assertEquals("HTTP Timeout!", e.getMessage());
// below conditions may be slightly async due to future getting a result before listeners are invoked
new TestCondition(() -> httpClient.getRequestQueueSize() == 0).blockTillTrue(1_000);
new TestCondition(() -> httpClient.getInProgressSize() == 0).blockTillTrue(1_000);
}
}

@Test
public void timeoutQueuedRequest() throws IOException, HTTPParsingException {
int port = PortUtils.findTCPPort();
TCPServer server = SEI.createTCPServer("localhost", port);
server.start();
final HTTPRequestBuilder hrb = new HTTPRequestBuilder(new URL("http://localhost:"+port));
hrb.setBody(IOUtils.EMPTY_BYTEBUFFER);
hrb.setTimeout(200, TimeUnit.MILLISECONDS);
final HTTPClient httpClient = new HTTPClient() {
@Override
protected void processQueue() {
// queue is never processed
}
};
httpClient.start();
try{
httpClient.request(hrb.buildClientHTTPRequest());
fail();
} catch(HTTPParsingException e) {
assertEquals("HTTP Timeout!", e.getMessage());
// below conditions may be slightly async due to future getting a result before listeners are invoked
new TestCondition(() -> httpClient.getRequestQueueSize() == 0).blockTillTrue(1_000);
new TestCondition(() -> httpClient.getInProgressSize() == 0).blockTillTrue(1_000);
}
}

Expand Down Expand Up @@ -412,7 +439,7 @@ public void timeOut() throws IOException, InterruptedException, ExecutionExcepti
server.setClientAcceptor(new ClientAcceptor() {
@Override
public void accept(Client c) {
System.out.println("new Client!");
//System.out.println("new Client!");
}});
server.start();
final HTTPRequestBuilder hrb = new HTTPRequestBuilder(new URL("http://localhost:"+port));
Expand All @@ -427,7 +454,6 @@ public void accept(Client c) {
try{
httpClient.request(chr);
} catch(Exception e) {
System.out.println(System.currentTimeMillis() - start);
assertTrue(System.currentTimeMillis() - start < 700);
return;
}
Expand Down
6 changes: 3 additions & 3 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
group = org.threadly
version = 0.23-SNAPSHOT
threadlyVersion = 5.34
litesocketsVersion = 4.9
version = 0.23
threadlyVersion = 5.37
litesocketsVersion = 4.10
org.gradle.parallel=false
junitVersion = 4.12

0 comments on commit 80e4d8a

Please sign in to comment.