Skip to content

Commit

Permalink
Merge pull request #560 from hiddenalpha/FixBadHabits-QueueProcessor-…
Browse files Browse the repository at this point in the history
…20240221

[SDCISA-14715] Fix sub-optimal code in QueueProcessor
  • Loading branch information
hiddenalpha authored Feb 22, 2024
2 parents 9808cb9 + c482178 commit f4de6da
Showing 1 changed file with 18 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class QueueProcessor {
private HttpClient httpClient;
private MonitoringHandler monitoringHandler;
private QueueCircuitBreaker queueCircuitBreaker;
private static final Handler<Buffer> DEV_NULL = buf -> {};
private MessageConsumer<JsonObject> consumer;

private Logger log = LoggerFactory.getLogger(QueueProcessor.class);
Expand Down Expand Up @@ -243,32 +244,35 @@ private void executeQueuedRequest(Message<JsonObject> message, Logger logger, Ht
});

Handler<AsyncResult<HttpClientResponse>> httpAsyncHandler = asyncResult -> {
HttpClientResponse response = asyncResult.result();
if (logger.isTraceEnabled()) {
logger.trace("response: " + response.statusCode());
if (asyncResult.failed()) {
logger.error("TODO error handling", new Exception("stacktrace", asyncResult.cause()));
return;
}
if (response.statusCode() >= 200 && response.statusCode() < 300 || response.statusCode() == 409) {
if (response.statusCode() != StatusCode.CONFLICT.getStatusCode()) {
logger.debug("Successful request to " + queuedRequest.getUri());
HttpClientResponse response = asyncResult.result();
int statusCode = response.statusCode();
logger.trace("response: {}", statusCode);
if (statusCode >= 200 && statusCode < 300 || statusCode == 409) {
if (statusCode != StatusCode.CONFLICT.getStatusCode()) {
logger.debug("Successful request to {}", queuedRequest.getUri());
} else {
logger.warn("Ignoring request conflict to " + queuedRequest.getUri() + ": " + response.statusCode() + " " + response.statusMessage());
logger.warn("Ignoring request conflict to {}: {} {}", queuedRequest.getUri(), statusCode, response.statusMessage());
}
message.reply(new JsonObject().put(STATUS, OK));
performCircuitBreakerActions(queueName, queuedRequest, SUCCESS, state);
monitoringHandler.updateDequeue();
} else if (QueueRetryUtil.retryQueueItem(queuedRequest.getHeaders(), response.statusCode(), logger)) {
logger.info("Failed queued request to " + queuedRequest.getUri() + ": " + response.statusCode() + " " + response.statusMessage());
message.reply(new JsonObject().put(STATUS, ERROR).put(MESSAGE, response.statusCode() + " " + response.statusMessage()));
} else if (QueueRetryUtil.retryQueueItem(queuedRequest.getHeaders(), statusCode, logger)) {
logger.info("Failed queued request to {}: {} {}", queuedRequest.getUri(), statusCode, response.statusMessage());
message.reply(new JsonObject().put(STATUS, ERROR).put(MESSAGE, statusCode + " " + response.statusMessage()));
performCircuitBreakerActions(queueName, queuedRequest, FAILURE, state);
} else {
logger.info("Reply success, because no more retries left for failed queued request to {}: {} {}", queuedRequest.getUri(), response.statusCode(), response.statusMessage());
logger.info("Reply success, because no more retries left for failed queued request to {}: {} {}", queuedRequest.getUri(), statusCode, response.statusMessage());
message.reply(new JsonObject().put(STATUS, OK));
performCircuitBreakerActions(queueName, queuedRequest, SUCCESS, state);
}
response.bodyHandler(event -> logger.debug("Discarding backend body"));
response.endHandler(event -> logger.debug("Backend response end"));
response.handler(DEV_NULL);
response.endHandler(nothing -> logger.debug("Backend response end"));
response.exceptionHandler(exception -> {
logger.warn("Exception on response from " + queuedRequest.getUri() + ": " + exception.getMessage());
logger.warn("Exception on response from {}: {}", queuedRequest.getUri(), exception.getMessage());
message.reply(new JsonObject().put(STATUS, ERROR).put(MESSAGE, exception.getMessage()));
performCircuitBreakerActions(queueName, queuedRequest, FAILURE, state);
});
Expand Down

0 comments on commit f4de6da

Please sign in to comment.