Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved AbstractTask execution to avoid Http request timeout due to unexpected exception #100

Merged
merged 8 commits into from
Nov 22, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,7 @@ private void notFoundHandler(final @NotNull RoutingContext routingContext) {
*
* @param routingContext The routing context.
*/
private void onHeadersEnd(final @NotNull RoutingContext routingContext) {
}
private void onHeadersEnd(final @NotNull RoutingContext routingContext) {}

/**
* An end handler for the response. This will be called when the response is disposed to allow consistent cleanup of the response.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ protected AbstractApiTask(
this.routingContext = routingContext;
}

protected @NotNull XyzResponse errorResponse(@NotNull Throwable throwable) {
logger.warn("The task failed with an exception. ", throwable);
return verticle.sendErrorResponse(
routingContext, XyzError.EXCEPTION, "Task failed processing! " + throwable.getMessage());
}

public @NotNull XyzResponse executeUnsupported() {
return verticle.sendErrorResponse(routingContext, XyzError.NOT_IMPLEMENTED, "Unsupported operation!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,35 +57,47 @@ public NakshaTestWebClient() {
retryRegistry = configureRetryRegistry();
}

public HttpResponse<String> get(String subPath, String streamId) throws URISyntaxException {
public HttpResponse<String> get(String subPath, String streamId)
throws URISyntaxException, IOException, InterruptedException {
HttpRequest getRequest = requestBuilder()
.uri(nakshaPath(subPath))
.GET()
.header(HDR_STREAM_ID, streamId)
.build();
return send(getRequest);
return sendOnce(getRequest);
}

public HttpResponse<String> post(String subPath, String jsonBody, String streamId) throws URISyntaxException {
public HttpResponse<String> post(String subPath, String jsonBody, String streamId)
throws URISyntaxException, IOException, InterruptedException {
HttpRequest postRequest = requestBuilder()
.uri(nakshaPath(subPath))
.POST(BodyPublishers.ofString(jsonBody))
.header("Content-Type", "application/json")
.header(HDR_STREAM_ID, streamId)
.build();
return send(postRequest);
return sendOnce(postRequest);
}

public HttpResponse<String> put(String subPath, String jsonBody, String streamId) throws URISyntaxException {
public HttpResponse<String> put(String subPath, String jsonBody, String streamId)
throws URISyntaxException, IOException, InterruptedException {
HttpRequest putRequest = requestBuilder()
.uri(nakshaPath(subPath))
.PUT(BodyPublishers.ofString(jsonBody))
.header("Content-Type", "application/json")
.header(HDR_STREAM_ID, streamId)
.build();
return send(putRequest);
return sendOnce(putRequest);
}

// TODO : Remove this function once JUnit pipeline has got multiple stable executions
/**
* This Http retry function was temporarily introduced as a workaround to resolve JUnit test hanging
* issue, which is resolved now. This function will be removed soon.
*
* @param request http request to be submitted
* @return actual http response
* @deprecated use sendOnce() instead
*/
private HttpResponse<String> send(HttpRequest request) {
String retryId = retryIdForRequest(request);
CheckedFunction<HttpRequest, HttpResponse<String>> responseSupplier =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package com.here.naksha.lib.core;

import static com.here.naksha.lib.core.exceptions.UncheckedException.unchecked;

import com.here.naksha.lib.core.exceptions.TooManyTasks;
import com.here.naksha.lib.core.util.NanoTime;
import java.lang.Thread.UncaughtExceptionHandler;
Expand Down Expand Up @@ -295,10 +293,8 @@ protected final void unlock() {
}
if (AbstractTask.threadCount.compareAndSet(threadCount, threadCount + 1)) {
try {
final Future<RESULT> future = threadPool.submit(this::init_and_execute);
// TODO HP_QUERY : Wouldn't setting this flag, after submitting task, have concurrency failure
// risk?
state.set(State.START);
final Future<RESULT> future = threadPool.submit(this::init_and_execute);
return future;
} catch (RejectedExecutionException e) {
throw new TooManyTasks();
Expand All @@ -321,21 +317,13 @@ protected final void unlock() {
private static final AtomicLong threadCount = new AtomicLong();

private @NotNull RESULT init_and_execute() {
assert state.get() == State.START;
state.set(State.EXECUTE);
attachToCurrentThread();
@NotNull RESULT RESULT;
try {
@NotNull RESULT RESULT;
try {
init();
RESULT = execute();
} catch (final Throwable t) {
try {
RESULT = errorResponse(t);
} catch (final Throwable ignore) {
throw t;
}
}
state.set(State.EXECUTE);
attachToCurrentThread();
init();
RESULT = execute();

state.set(State.CALLING_LISTENER);
for (final @NotNull Consumer<@NotNull RESULT> listener : listeners) {
try {
Expand All @@ -347,35 +335,32 @@ protected final void unlock() {
.log();
}
}
return RESULT;
} catch (Throwable t) {
RESULT = errorResponse(t);
} finally {
state.set(State.DONE);
final long newValue = AbstractTask.threadCount.decrementAndGet();
assert newValue >= 0L;
detachFromCurrentThread();
try {
state.set(State.DONE);
final long newValue = AbstractTask.threadCount.decrementAndGet();
assert newValue >= 0L;
detachFromCurrentThread();
} catch (Throwable t) {
RESULT = errorResponse(t);
}
}
/* TODO HP_QUERY : As this function doesn't return response in case of exception, it gets suppressed under
* thread.submit() function, and API client endlessly waits for response.
* How do we return errorResponse from here? (return type doesn't match)
*/
return RESULT;
}

/**
* A method that creates an error-response from the given exception, being thrown by either {@link #init()} or {@link #execute()}. The
* default implementation will simply throw the exception again.
* Function should be overridden to return custom response when an exception is encountered during
* execution of task functions init() / execute()
*
* @param throwable The exception caught.
* @return The error-response.
* @param throwable an actual error that has been encountered
* @return RESULT should represent error response
*/
protected @NotNull RESULT errorResponse(@NotNull Throwable throwable) throws Exception {
log.atWarn()
.setMessage("The task failed with an exception")
.setCause(throwable)
.log();
if (throwable instanceof Exception e) {
throw e;
}
throw unchecked(throwable);
protected @NotNull RESULT errorResponse(@NotNull Throwable throwable) {
RESULT result = null;
log.warn("The task failed with an exception. ", throwable);
return result;
}

/**
Expand Down