Skip to content

Commit

Permalink
Improved exception handling for AbstractTask execution
Browse files Browse the repository at this point in the history
  • Loading branch information
hirenkp2000 committed Nov 22, 2023
1 parent 5bd8874 commit 5417876
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ protected AbstractApiTask(
this.routingContext = routingContext;
}

protected @NotNull XyzResponse errorResponse(@NotNull Throwable throwable) {
logger.warn("The task failed with an exception. ", throwable);
return verticle.sendErrorResponse(
routingContext, XyzError.EXCEPTION, "Task failed processing! " + throwable.getMessage());
}

public @NotNull XyzResponse executeUnsupported() {
return verticle.sendErrorResponse(routingContext, XyzError.NOT_IMPLEMENTED, "Unsupported operation!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public NakshaTestWebClient() {
retryRegistry = configureRetryRegistry();
}

public HttpResponse<String> get(String subPath, String streamId) throws URISyntaxException {
public HttpResponse<String> get(String subPath, String streamId)
throws URISyntaxException, IOException, InterruptedException {
HttpRequest getRequest = requestBuilder()
.uri(nakshaPath(subPath))
.GET()
Expand All @@ -65,7 +66,8 @@ public HttpResponse<String> get(String subPath, String streamId) throws URISynta
return send(getRequest);
}

public HttpResponse<String> post(String subPath, String jsonBody, String streamId) throws URISyntaxException {
public HttpResponse<String> post(String subPath, String jsonBody, String streamId)
throws URISyntaxException, IOException, InterruptedException {
HttpRequest postRequest = requestBuilder()
.uri(nakshaPath(subPath))
.POST(BodyPublishers.ofString(jsonBody))
Expand All @@ -75,7 +77,8 @@ public HttpResponse<String> post(String subPath, String jsonBody, String streamI
return send(postRequest);
}

public HttpResponse<String> put(String subPath, String jsonBody, String streamId) throws URISyntaxException {
public HttpResponse<String> put(String subPath, String jsonBody, String streamId)
throws URISyntaxException, IOException, InterruptedException {
HttpRequest putRequest = requestBuilder()
.uri(nakshaPath(subPath))
.PUT(BodyPublishers.ofString(jsonBody))
Expand All @@ -85,12 +88,8 @@ public HttpResponse<String> put(String subPath, String jsonBody, String streamId
return send(putRequest);
}

private HttpResponse<String> send(HttpRequest request) {
try {
return this.sendOnce(request);
} catch (Exception ex) {
throw new RuntimeException("Http request submission failed", ex);
}
private HttpResponse<String> send(HttpRequest request) throws IOException, InterruptedException {
return this.sendOnce(request);
/*
String retryId = retryIdForRequest(request);
CheckedFunction<HttpRequest, HttpResponse<String>> responseSupplier =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package com.here.naksha.lib.core;

import static com.here.naksha.lib.core.exceptions.UncheckedException.unchecked;

import com.here.naksha.lib.core.exceptions.TooManyTasks;
import com.here.naksha.lib.core.util.NanoTime;
import java.lang.Thread.UncaughtExceptionHandler;
Expand Down Expand Up @@ -319,21 +317,13 @@ protected final void unlock() {
private static final AtomicLong threadCount = new AtomicLong();

private @NotNull RESULT init_and_execute() {
assert state.get() == State.START;
state.set(State.EXECUTE);
attachToCurrentThread();
@NotNull RESULT RESULT;
try {
@NotNull RESULT RESULT;
try {
init();
RESULT = execute();
} catch (final Throwable t) {
try {
RESULT = errorResponse(t);
} catch (final Throwable ignore) {
throw t;
}
}
state.set(State.EXECUTE);
attachToCurrentThread();
init();
RESULT = execute();

state.set(State.CALLING_LISTENER);
for (final @NotNull Consumer<@NotNull RESULT> listener : listeners) {
try {
Expand All @@ -345,36 +335,34 @@ protected final void unlock() {
.log();
}
}
return RESULT;
} catch (Throwable t) {
RESULT = errorResponse(t);
} finally {
state.set(State.DONE);
final long newValue = AbstractTask.threadCount.decrementAndGet();
assert newValue >= 0L;
detachFromCurrentThread();
try {
state.set(State.DONE);
final long newValue = AbstractTask.threadCount.decrementAndGet();
assert newValue >= 0L;
detachFromCurrentThread();
} catch (Throwable t) {
RESULT = errorResponse(t);
}
}
/* TODO HP_QUERY : As this function doesn't return response in case of exception, it gets suppressed under
* thread.submit() function, and API client endlessly waits for response.
* How do we return errorResponse from here? (return type doesn't match)
*/
return RESULT;
}

/**
* A method that creates an error-response from the given exception, being thrown by either {@link #init()} or {@link #execute()}. The
* default implementation will simply throw the exception again.
* Function should be overridden to return custom response when an exception is encountered during
* execution of task functions init() / execute()
*
* @param throwable The exception caught.
* @return The error-response.
* @param throwable an actual error that has been encountered
* @return RESULT should represent error response
*/
protected @NotNull RESULT errorResponse(@NotNull Throwable throwable) throws Exception {
log.atWarn()
.setMessage("The task failed with an exception")
.setCause(throwable)
.log();
if (throwable instanceof Exception e) {
throw e;
}
throw unchecked(throwable);
protected @NotNull RESULT errorResponse(@NotNull Throwable throwable) {
RESULT result = null;
log.warn("The task failed with an exception. ", throwable);
return result;
}
;

/**
* Initializes this task.
Expand Down

0 comments on commit 5417876

Please sign in to comment.