Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Uploader failure handling improvements - DeadLetterQueueHandler implementation, logging improvements #15

Merged
merged 12 commits into from
Dec 18, 2024

Conversation

jeffxiang
Copy link
Contributor

@jeffxiang jeffxiang commented Dec 13, 2024

This PR simplifies the manual work needed to recover failed uploads in case of issues such as transient S3 slowness / failures. The PR includes the following major changes:

  1. Implement DeadLetterQueueHandler abstract class: this allows for pluggable implementations of handlers for processing upload tasks that failed after all retries are exhausted. A simple example is LocalFileDeadLetterQueueHandler which writes the failures to a local file.
  2. Swap out synchronous S3Client in MultiThreadedS3FileUploader for an async S3AsyncClient, so that the putObject() call directly returns a future, and aborts the upload if timeout occurs. This fixes the issue where a timeout may not indicate an actual failed upload, which happens when the request is still ongoing but the wrapped future is timed out.
  3. Logging improvements: have a clear log message that indicates when all retries have exhausted, to further clarify when a file actually missed upload.

Tested the functionality in unit tests and in real Kafka clusters.

@jeffxiang jeffxiang marked this pull request as ready for review December 17, 2024 22:31
@jeffxiang jeffxiang requested a review from a team as a code owner December 17, 2024 22:31
@jeffxiang jeffxiang changed the title WIP uploader failure handling improvements Uploader failure handling improvements - DeadLetterQueueHandler implementation, logging improvements Dec 17, 2024
@@ -15,7 +15,7 @@
</modules>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.surefire.version>3.5.1</maven.surefire.version>
<maven.surefire.version>3.0.0-M5</maven.surefire.version>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why downgrade this dependency?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's causing issues with maven versions below 3.6.5, we are mostly using 3.6.0.

@jeffxiang jeffxiang merged commit ac785d8 into main Dec 18, 2024
1 check passed
@jeffxiang jeffxiang deleted the uploader_improvements branch December 18, 2024 00:09
@@ -97,7 +112,7 @@ private int getErrorCode(Throwable throwable, PutObjectResponse putObjectRespons
if (throwable == null) {
return putObjectResponse == null ? UPLOAD_GENERAL_ERROR_CODE : putObjectResponse.sdkHttpResponse().statusCode();
}
if (throwable instanceof TimeoutException) {
if (throwable instanceof ApiCallTimeoutException || throwable instanceof TimeoutException) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious: TimeoutException is from java.util.concurrent while ApiCallTimeoutException is from software.amazon.awssdk.core.exception. Can both of the exceptions appear here?

"broker=" + environmentProvider.brokerId(),
"offset=" + uploadTask.getOffset()
);
if (deadLetterQueueHandler != null) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If deadLetterQueueHandler is null, shall we add another logs saying dlq handler is missing so we cannot do anything more than logging?

Comment on lines +359 to +360
throw new RuntimeException(String.format("Failed to persist failed upload %s to %s to dead-letter queue." +
" This was a best-effort attempt.", uploadTask.getAbsolutePath(), uploadTask.getUploadDestinationPathString()), e);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If DLQ call fails, I dont think we need to fail the main process. The dlq process is apart from main logic. An error log with a lot of details should be good enough.

*
* Note that the handler must have a public constructor so that it can be instantiated by reflection.
*/
public abstract class DeadLetterQueueHandler {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the abstact design. Beside local file dlq and s3 dlq, I like a kafka one the most.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants