Skip to content

Commit

Permalink
Uploader failure handling improvements - DeadLetterQueueHandler imple…
Browse files Browse the repository at this point in the history
…mentation, logging improvements (#15)
  • Loading branch information
jeffxiang authored Dec 18, 2024
1 parent 6ec5032 commit ac785d8
Show file tree
Hide file tree
Showing 31 changed files with 733 additions and 76 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
</modules>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.surefire.version>3.5.1</maven.surefire.version>
<maven.surefire.version>3.0.0-M5</maven.surefire.version>
</properties>
<dependencyManagement>
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.pinterest.kafka.tieredstorage.common.metrics.MetricRegistryManager;
import com.pinterest.kafka.tieredstorage.uploader.dlq.DeadLetterQueueHandler;
import com.pinterest.kafka.tieredstorage.uploader.leadership.LeadershipWatcher;
import org.apache.kafka.common.TopicPartition;
import org.apache.log4j.LogManager;
Expand Down Expand Up @@ -72,6 +73,7 @@ public class DirectoryTreeWatcher implements Runnable {
private final Heartbeat heartbeat;
private final SegmentUploaderConfiguration config;
private final KafkaEnvironmentProvider environmentProvider;
private DeadLetterQueueHandler deadLetterQueueHandler;
private final Object watchKeyMapLock = new Object();
private Thread thread;
private boolean cancelled = false;
Expand All @@ -97,6 +99,7 @@ public DirectoryTreeWatcher(S3FileUploader s3FileUploader, SegmentUploaderConfig
this.s3FileDownloader = new S3FileDownloader(s3FileUploader.getStorageServiceEndpointProvider(), config);
heartbeat = new Heartbeat("watcher.logs", config, environmentProvider);
this.config = config;
this.deadLetterQueueHandler = DeadLetterQueueHandler.createHandler(config);
}

/**
Expand Down Expand Up @@ -142,7 +145,8 @@ public void initialize() throws Exception {
LOG.info("Submitting s3UploadHandler loop");
}

private void handleUploadCallback(UploadTask uploadTask, long totalTimeMs, Throwable throwable, int statusCode) {
@VisibleForTesting
protected void handleUploadCallback(UploadTask uploadTask, long totalTimeMs, Throwable throwable, int statusCode) {
TopicPartition topicPartition = uploadTask.getTopicPartition();
MetricRegistryManager.getInstance(config.getMetricsConfiguration()).incrementCounter(
topicPartition.topic(),
Expand Down Expand Up @@ -315,21 +319,50 @@ private void handleUploadException(UploadTask uploadTask, Throwable throwable, T
"broker=" + environmentProvider.brokerId(),
"file=" + uploadTask.getFullFilename()
);
handleFailedUploadAfterAllRetries(uploadTask, throwable, topicPartition);
}
} else if (uploadTask.getTries() < config.getUploadMaxRetries()){
// retry all other errors
retryUpload(uploadTask.retry(), throwable, topicPartition);
} else {
// retry limit reached, upload is still erroring - send a metric
MetricRegistryManager.getInstance(config.getMetricsConfiguration()).incrementCounter(
topicPartition.topic(),
topicPartition.partition(),
UploaderMetrics.UPLOAD_ERROR_METRIC,
"exception=" + throwable.getClass().getName(),
"cluster=" + environmentProvider.clusterId(),
"broker=" + environmentProvider.brokerId(),
"offset=" + uploadTask.getOffset()
);
// retry limit reached, upload still errors
handleFailedUploadAfterAllRetries(uploadTask, throwable, topicPartition);
}
}

/**
* Handle a failed upload after all retries have been exhausted, including sending
* the failed upload to the dead-letter queue if configured.
*
* @param uploadTask the upload task that failed
* @param throwable the exception that caused the failure
* @param topicPartition the topic partition of the upload task
*/
private void handleFailedUploadAfterAllRetries(UploadTask uploadTask, Throwable throwable, TopicPartition topicPartition) {
LOG.error(String.format("Max retries exhausted (%s) for upload: %s --> %s",
uploadTask.getTries(), uploadTask.getAbsolutePath(), uploadTask.getUploadDestinationPathString()));
MetricRegistryManager.getInstance(config.getMetricsConfiguration()).incrementCounter(
topicPartition.topic(),
topicPartition.partition(),
UploaderMetrics.UPLOAD_ERROR_METRIC,
"exception=" + throwable.getClass().getName(),
"cluster=" + environmentProvider.clusterId(),
"broker=" + environmentProvider.brokerId(),
"offset=" + uploadTask.getOffset()
);
if (deadLetterQueueHandler != null) {
Future<Boolean> result = deadLetterQueueHandler.send(uploadTask, throwable);
boolean success;
try {
success = result.get(deadLetterQueueHandler.getSendTimeoutMs(), TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new RuntimeException(String.format("Failed to persist failed upload %s to %s to dead-letter queue." +
" This was a best-effort attempt.", uploadTask.getAbsolutePath(), uploadTask.getUploadDestinationPathString()), e);
}
if (success)
LOG.info(String.format("Sent failed upload %s to %s to dead letter queue", uploadTask.getAbsolutePath(), uploadTask.getUploadDestinationPathString()));
else
LOG.error(String.format("Failed to send failed upload %s to %s to dead letter queue", uploadTask.getAbsolutePath(), uploadTask.getUploadDestinationPathString()));
}
}

Expand Down Expand Up @@ -941,6 +974,11 @@ protected Map<Path, WatchKey> getWatchKeyMap() {
return this.watchKeyMap;
}

@VisibleForTesting
protected void setDeadLetterQueueHandler(DeadLetterQueueHandler deadLetterQueueHandler) {
this.deadLetterQueueHandler = deadLetterQueueHandler;
}

public static class UploadTask {
public static final int DEFAULT_BACKOFF_FACTOR = 150; // 2 ^ max_tries * 150 ms is the max backoff time
private final TopicPartition topicPartition;
Expand All @@ -952,6 +990,7 @@ public static class UploadTask {
private final long sizeBytes;
private int tries = 0;
private long nextRetryNotBeforeTimestamp = -1;
private String uploadDestinationPathString;

public UploadTask(TopicPartition topicPartition, String offset, String fullFilename, Path absolutePath) {
this.topicPartition = topicPartition;
Expand Down Expand Up @@ -1012,6 +1051,14 @@ public long getNextRetryNotBeforeTimestamp() {
return nextRetryNotBeforeTimestamp;
}

public String getUploadDestinationPathString() {
return uploadDestinationPathString;
}

public void setUploadDestinationPathString(String uploadDestinationPathString) {
this.uploadDestinationPathString = uploadDestinationPathString;
}

public boolean isReadyForUpload() {
return tries == 0 || System.currentTimeMillis() > nextRetryNotBeforeTimestamp;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@
import com.pinterest.kafka.tieredstorage.common.discovery.s3.S3StorageServiceEndpoint;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.exception.ApiCallTimeoutException;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;

import java.nio.file.NoSuchFileException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
Expand All @@ -22,24 +24,27 @@
*/
public class MultiThreadedS3FileUploader implements S3FileUploader {
private static final Logger LOG = LogManager.getLogger(MultiThreadedS3FileUploader.class);
private static final int UPLOAD_TIMEOUT_ERROR_CODE = 601;
private static final int UPLOAD_FILE_NOT_FOUND_ERROR_CODE = 602;
private static final int UPLOAD_GENERAL_ERROR_CODE = 603;
protected static final int UPLOAD_TIMEOUT_ERROR_CODE = 601;
protected static final int UPLOAD_FILE_NOT_FOUND_ERROR_CODE = 602;
protected static final int UPLOAD_GENERAL_ERROR_CODE = 603;
private final ExecutorService executorService;
private final StorageServiceEndpointProvider endpointProvider;
private final Heartbeat heartbeat;
private static S3Client s3Client;
private static S3AsyncClient s3AsyncClient;
private final SegmentUploaderConfiguration config;

public MultiThreadedS3FileUploader(StorageServiceEndpointProvider endpointProvider, SegmentUploaderConfiguration config, KafkaEnvironmentProvider environmentProvider) {
this.endpointProvider = endpointProvider;
this.config = config;
if (s3Client == null) {
s3Client = S3Client.builder().build();
ClientOverrideConfiguration overrideConfiguration = ClientOverrideConfiguration.builder()
.apiCallTimeout(Duration.ofMillis(config.getUploadTimeoutMs()))
.build();
if (s3AsyncClient == null) {
s3AsyncClient = S3AsyncClient.builder().overrideConfiguration(overrideConfiguration).build();
}
executorService = Executors.newFixedThreadPool(config.getUploadThreadCount());
heartbeat = new Heartbeat("uploader", config, environmentProvider);
LOG.info("Started MultiThreadedS3FileUploader with threadpool size=" + config.getUploadThreadCount());
LOG.info("Started MultiThreadedS3FileUploader with threadpool size=" + config.getUploadThreadCount() + " and timeout=" + config.getUploadTimeoutMs() + "ms");
}

@Override
Expand All @@ -58,25 +63,35 @@ public void uploadFile(DirectoryTreeWatcher.UploadTask uploadTask, S3UploadCallb

String s3Key = String.format("%s/%s", s3Prefix, subpath);
long queueTime = System.currentTimeMillis();
CompletableFuture<PutObjectResponse> future =
CompletableFuture.supplyAsync(() -> {
PutObjectRequest putObjectRequest = PutObjectRequest.builder()
.bucket(s3Bucket)
.key(s3Key)
// Changing checksum algorithm does not seem to
// have any impact regarding seeing CPU intensive
// sun/security/provider/MD5.implCompress
// that is observed in the flame graph.
//.checksumAlgorithm(ChecksumAlgorithm.CRC32_C)
.build();
return s3Client.putObject(putObjectRequest, uploadTask.getAbsolutePath());
}, executorService).orTimeout(config.getUploadTimeoutMs(), TimeUnit.MILLISECONDS);

LOG.info(String.format("Submitted upload of s3://%s/%s", s3Bucket, s3Key));
PutObjectRequest putObjectRequest = PutObjectRequest.builder()
.bucket(s3Bucket)
.key(s3Key)
// Changing checksum algorithm does not seem to
// have any impact regarding seeing CPU intensive
// sun/security/provider/MD5.implCompress
// that is observed in the flame graph.
//.checksumAlgorithm(ChecksumAlgorithm.CRC32_C)
.build();
CompletableFuture<PutObjectResponse> future;
String uploadPathString = String.format("s3://%s/%s", s3Bucket, s3Key);
uploadTask.setUploadDestinationPathString(uploadPathString); // set the upload destination path so that it can be used in the callback
try {
LOG.info(String.format("Submitting upload of %s --> %s", uploadTask.getAbsolutePath(), uploadPathString));
future = s3AsyncClient.putObject(putObjectRequest, uploadTask.getAbsolutePath());
} catch (Exception e) {
long timeSpentMs = System.currentTimeMillis() - queueTime;
LOG.warn(String.format("Caught exception during putObject for %s --> %s in %dms", uploadTask.getAbsolutePath(), uploadPathString, timeSpentMs), e);
int errorCode = UPLOAD_GENERAL_ERROR_CODE;
if (Utils.isAssignableFromRecursive(e, NoSuchFileException.class)) {
errorCode = UPLOAD_FILE_NOT_FOUND_ERROR_CODE;
}
s3UploadCallback.onCompletion(uploadTask, timeSpentMs, e, errorCode);
return;
}
future.whenComplete((putObjectResponse, throwable) -> {
long timeSpentMs = System.currentTimeMillis() - queueTime;
if (throwable != null) {
LOG.error(String.format("Failed upload of s3://%s/%s in %d ms.", s3Bucket, s3Key, timeSpentMs), throwable);
LOG.warn(String.format("PutObject failed for %s --> %s in %d ms.", uploadTask.getAbsolutePath(), uploadPathString, timeSpentMs), throwable);

int errorCode = getErrorCode(throwable, putObjectResponse);

Expand All @@ -87,7 +102,7 @@ public void uploadFile(DirectoryTreeWatcher.UploadTask uploadTask, S3UploadCallb
errorCode
);
} else {
LOG.info(String.format("Completed upload of s3://%s/%s in %d ms.", s3Bucket, s3Key, timeSpentMs));
LOG.info(String.format("Completed upload of %s in %d ms.", uploadPathString, timeSpentMs));
s3UploadCallback.onCompletion(uploadTask, timeSpentMs,null, putObjectResponse.sdkHttpResponse().statusCode());
}
});
Expand All @@ -97,7 +112,7 @@ private int getErrorCode(Throwable throwable, PutObjectResponse putObjectRespons
if (throwable == null) {
return putObjectResponse == null ? UPLOAD_GENERAL_ERROR_CODE : putObjectResponse.sdkHttpResponse().statusCode();
}
if (throwable instanceof TimeoutException) {
if (throwable instanceof ApiCallTimeoutException || throwable instanceof TimeoutException) {
return UPLOAD_TIMEOUT_ERROR_CODE;
}
if (throwable instanceof NoSuchFileException) {
Expand All @@ -108,7 +123,7 @@ private int getErrorCode(Throwable throwable, PutObjectResponse putObjectRespons
}

public void stop() {
s3Client.close();
s3AsyncClient.close();
executorService.shutdown();
heartbeat.stop();
}
Expand All @@ -119,7 +134,7 @@ public StorageServiceEndpointProvider getStorageServiceEndpointProvider() {
}

@VisibleForTesting
protected static void overrideS3Client(S3Client newS3Client) {
s3Client = newS3Client;
protected static void overrideS3Client(S3AsyncClient newS3Client) {
s3AsyncClient = newS3Client;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
public class SegmentUploaderConfiguration {

private static final Logger LOG = LogManager.getLogger(SegmentUploaderConfiguration.class);
private static final String TS_SEGMENT_UPLOADER_PREFIX = "ts.segment.uploader";
public static final String TS_SEGMENT_UPLOADER_PREFIX = "ts.segment.uploader";
private static final String KAFKA_PREFIX = TS_SEGMENT_UPLOADER_PREFIX + "." + "kafka";

/**
Expand Down Expand Up @@ -210,6 +210,23 @@ public int getUploadMaxRetries() {
return Integer.parseInt(properties.getProperty(UPLOAD_MAX_RETRIES, String.valueOf(Defaults.DEFAULT_UPLOAD_MAX_RETRIES)));
}

public String getProperty(String key) {
return properties.getProperty(key);
}

public String getProperty(String key, String defaultValue) {
return properties.getProperty(key, defaultValue);
}

public Properties getProperties() {
return properties;
}

@VisibleForTesting
protected void setProperty(String key, String value) {
properties.setProperty(key, value);
}

@VisibleForTesting
protected boolean isInInclusionCache(String topicName) {
return includeTopicsCache.contains(topicName);
Expand Down
Loading

0 comments on commit ac785d8

Please sign in to comment.