-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Uploader failure handling improvements - DeadLetterQueueHandler implementation, logging improvements #15
Conversation
Update test_topic_b generated segments to conform to github filesize limit
@@ -15,7 +15,7 @@ | |||
</modules> | |||
<properties> | |||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | |||
<maven.surefire.version>3.5.1</maven.surefire.version> | |||
<maven.surefire.version>3.0.0-M5</maven.surefire.version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why downgrade this dependency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's causing issues with maven versions below 3.6.5, we are mostly using 3.6.0.
@@ -97,7 +112,7 @@ private int getErrorCode(Throwable throwable, PutObjectResponse putObjectRespons | |||
if (throwable == null) { | |||
return putObjectResponse == null ? UPLOAD_GENERAL_ERROR_CODE : putObjectResponse.sdkHttpResponse().statusCode(); | |||
} | |||
if (throwable instanceof TimeoutException) { | |||
if (throwable instanceof ApiCallTimeoutException || throwable instanceof TimeoutException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious: TimeoutException is from java.util.concurrent while ApiCallTimeoutException is from software.amazon.awssdk.core.exception. Can both of the exceptions appear here?
"broker=" + environmentProvider.brokerId(), | ||
"offset=" + uploadTask.getOffset() | ||
); | ||
if (deadLetterQueueHandler != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If deadLetterQueueHandler is null, shall we add another logs saying dlq handler is missing so we cannot do anything more than logging?
throw new RuntimeException(String.format("Failed to persist failed upload %s to %s to dead-letter queue." + | ||
" This was a best-effort attempt.", uploadTask.getAbsolutePath(), uploadTask.getUploadDestinationPathString()), e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If DLQ call fails, I dont think we need to fail the main process. The dlq process is apart from main logic. An error log with a lot of details should be good enough.
* | ||
* Note that the handler must have a public constructor so that it can be instantiated by reflection. | ||
*/ | ||
public abstract class DeadLetterQueueHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the abstact design. Beside local file dlq and s3 dlq, I like a kafka one the most.
This PR simplifies the manual work needed to recover failed uploads in case of issues such as transient S3 slowness / failures. The PR includes the following major changes:
DeadLetterQueueHandler
abstract class: this allows for pluggable implementations of handlers for processing upload tasks that failed after all retries are exhausted. A simple example isLocalFileDeadLetterQueueHandler
which writes the failures to a local file.S3Client
inMultiThreadedS3FileUploader
for an asyncS3AsyncClient
, so that theputObject()
call directly returns a future, and aborts the upload if timeout occurs. This fixes the issue where a timeout may not indicate an actual failed upload, which happens when the request is still ongoing but the wrapped future is timed out.Tested the functionality in unit tests and in real Kafka clusters.