Skip to content

Commit

Permalink
Fail UploadDirectory if xform throws (#5756)
Browse files Browse the repository at this point in the history
* Fail UploadDirectory if xform throws

This fixes an issue where if the request transformation function given
to UploadDirectoryRequest throws an error when it is invoked, the error
would be silently swallowed. Now, the completion future will be
completed exceptionally if the function throws.

* Synchronize cancel

* Update
  • Loading branch information
dagnir authored Dec 23, 2024
1 parent f37d56f commit a23d6ad
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 5 deletions.
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-S3TransferManager-07e5b6b.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "S3 Transfer Manager",
"contributor": "",
"description": "Fix an issue where if the request transformation function given to UploadDirectoryRequest throws an error when it is invoked, the error would be silently swallowed. Now, the completion future will be completed exceptionally if the function throws."
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class AsyncBufferingSubscriber<T> implements Subscriber<T> {
private final int maxConcurrentExecutions;
private final AtomicInteger numRequestsInFlight;
private volatile boolean upstreamDone;
private Subscription subscription;
private volatile Subscription subscription;

private final Set<CompletableFuture<?>> requestsInFlight;

Expand Down Expand Up @@ -75,7 +75,18 @@ public void onSubscribe(Subscription subscription) {
@Override
public void onNext(T item) {
numRequestsInFlight.incrementAndGet();
CompletableFuture<?> currentRequest = consumer.apply(item);
CompletableFuture<?> currentRequest;

try {
currentRequest = consumer.apply(item);
} catch (Throwable t) {
synchronized (this) {
subscription.cancel();
}
onError(t);
return;
}

requestsInFlight.add(currentRequest);
currentRequest.whenComplete((r, t) -> {
checkForCompletion(numRequestsInFlight.decrementAndGet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,15 @@ private void doUploadDirectory(CompletableFuture<CompletedDirectoryUpload> retur
iterablePublisher.subscribe(bufferingSubscriber);
CompletableFutureUtils.forwardExceptionTo(returnFuture, allOfFutures);

allOfFutures.whenComplete((r, t) -> returnFuture.complete(CompletedDirectoryUpload.builder()
.failedTransfers(failedFileUploads)
.build()));
allOfFutures.whenComplete((r, t) -> {
if (t != null) {
returnFuture.completeExceptionally(SdkClientException.create("Failed to send request", t));
return;
}
returnFuture.complete(CompletedDirectoryUpload.builder()
.failedTransfers(failedFileUploads)
.build());
});
}

private void validateDirectory(UploadDirectoryRequest uploadDirectoryRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
package software.amazon.awssdk.transfer.s3.internal;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import io.reactivex.Flowable;
import io.reactivex.Observable;
Expand All @@ -36,6 +40,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.utils.async.SimplePublisher;

class AsyncBufferingSubscriberTest {
Expand Down Expand Up @@ -108,4 +113,21 @@ void onErrorInvoked_shouldCompleteFutureExceptionallyAndCancelRequestsFuture() {
assertThat(futures.get(0)).isCancelled();
assertThat(futures.get(1)).isCancelled();
}

@Test
public void consumerFunctionThrows_shouldCancelSubscriptionAndCompleteFutureExceptionally() {
RuntimeException exception = new RuntimeException("test");
CompletableFuture<Void> future = new CompletableFuture<>();
AsyncBufferingSubscriber<String> subscriber = new AsyncBufferingSubscriber<>(s -> {
throw exception;
}, future, 1);

Subscription mockSubscription = mock(Subscription.class);

subscriber.onSubscribe(mockSubscription);
subscriber.onNext("item");

verify(mockSubscription, times(1)).cancel();
assertThatThrownBy(future::join).hasCause(exception);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.assertj.core.util.Sets;
Expand Down Expand Up @@ -471,6 +472,56 @@ void downloadDirectory_notDirectory_shouldCompleteFutureExceptionally(FileSystem
.hasMessageContaining("is not a directory").hasCauseInstanceOf(IllegalArgumentException.class);
}

@Test
void downloadDirectory_withDownloadRequestTransformer_transformerThrows_failsDownload() {
stubSuccessfulListObjects(listObjectsHelper, "key1", "key2");

FileDownload fileDownload = newSuccessfulDownload();
FileDownload fileDownload2 = newSuccessfulDownload();

when(singleDownloadFunction.apply(any(DownloadFileRequest.class))).thenReturn(fileDownload, fileDownload2);


RuntimeException exception = new RuntimeException("boom");
Consumer<DownloadFileRequest.Builder> downloadFileRequestTransformer = b -> {
throw exception;
};

DirectoryDownload downloadDirectory =
downloadDirectoryHelper.downloadDirectory(DownloadDirectoryRequest.builder()
.destination(directory)
.bucket("bucket")
.downloadFileRequestTransformer(downloadFileRequestTransformer)
.build());

assertThatThrownBy(downloadDirectory.completionFuture()::join).getCause().hasCause(exception);
}

@Test
void downloadDirectory_withListObjectsRequestTransformer_transformerThrows_failsDownload() {
stubSuccessfulListObjects(listObjectsHelper, "key1", "key2");

FileDownload fileDownload = newSuccessfulDownload();
FileDownload fileDownload2 = newSuccessfulDownload();

when(singleDownloadFunction.apply(any(DownloadFileRequest.class))).thenReturn(fileDownload, fileDownload2);


RuntimeException exception = new RuntimeException("boom");
Consumer<ListObjectsV2Request.Builder> downloadFileRequestTransformer = b -> {
throw exception;
};

DirectoryDownload downloadDirectory =
downloadDirectoryHelper.downloadDirectory(DownloadDirectoryRequest.builder()
.destination(directory)
.bucket("bucket")
.listObjectsV2RequestTransformer(downloadFileRequestTransformer)
.build());

assertThatThrownBy(downloadDirectory.completionFuture()::join).hasCause(exception);
}

private static DefaultFileDownload completedDownload() {
return new DefaultFileDownload(CompletableFuture.completedFuture(CompletedFileDownload.builder()
.response(GetObjectResponse.builder().build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,29 @@ void uploadDirectory_notDirectoryFollowSymlinkTrue_shouldCompleteSuccessfully()
assertThat(keys).containsOnly("2.txt");
}

@Test
public void uploadDirectory_requestTransformFunctionThrows_failsUpload() {
when(singleUploadFunction.apply(any())).thenReturn(null);

RuntimeException exception = new RuntimeException("boom");

Consumer<UploadFileRequest.Builder> uploadFileRequestTransformer = r -> {
throw exception;
};

CompletableFuture<CompletedDirectoryUpload> uploadFuture =
uploadDirectoryHelper.uploadDirectory(
UploadDirectoryRequest.builder()
.source(directory)
.bucket("bucket")
.uploadFileRequestTransformer(uploadFileRequestTransformer)
.build())
.completionFuture();

assertThatThrownBy(uploadFuture::join).getCause().hasCause(exception);
}


private DefaultFileUpload completedUpload() {
return new DefaultFileUpload(CompletableFuture.completedFuture(CompletedFileUpload.builder()
.response(PutObjectResponse.builder().build())
Expand Down

0 comments on commit a23d6ad

Please sign in to comment.