diff --git a/.changes/next-release/bugfix-S3TransferManager-07e5b6b.json b/.changes/next-release/bugfix-S3TransferManager-07e5b6b.json new file mode 100644 index 000000000000..5ce8e98cfb62 --- /dev/null +++ b/.changes/next-release/bugfix-S3TransferManager-07e5b6b.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "S3 Transfer Manager", + "contributor": "", + "description": "Fix an issue where if the request transformation function given to UploadDirectoryRequest throws an error when it is invoked, the error would be silently swallowed. Now, the completion future will be completed exceptionally if the function throws." +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java index 0641ac402fb2..75231bd262d5 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java @@ -40,7 +40,7 @@ public class AsyncBufferingSubscriber implements Subscriber { private final int maxConcurrentExecutions; private final AtomicInteger numRequestsInFlight; private volatile boolean upstreamDone; - private Subscription subscription; + private volatile Subscription subscription; private final Set> requestsInFlight; @@ -75,7 +75,18 @@ public void onSubscribe(Subscription subscription) { @Override public void onNext(T item) { numRequestsInFlight.incrementAndGet(); - CompletableFuture currentRequest = consumer.apply(item); + CompletableFuture currentRequest; + + try { + currentRequest = consumer.apply(item); + } catch (Throwable t) { + synchronized (this) { + subscription.cancel(); + } + onError(t); + return; + } + requestsInFlight.add(currentRequest); currentRequest.whenComplete((r, t) -> { checkForCompletion(numRequestsInFlight.decrementAndGet()); diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelper.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelper.java index d92e0b3b6450..a6d9c6f1a270 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelper.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelper.java @@ -107,9 +107,15 @@ private void doUploadDirectory(CompletableFuture retur iterablePublisher.subscribe(bufferingSubscriber); CompletableFutureUtils.forwardExceptionTo(returnFuture, allOfFutures); - allOfFutures.whenComplete((r, t) -> returnFuture.complete(CompletedDirectoryUpload.builder() - .failedTransfers(failedFileUploads) - .build())); + allOfFutures.whenComplete((r, t) -> { + if (t != null) { + returnFuture.completeExceptionally(SdkClientException.create("Failed to send request", t)); + return; + } + returnFuture.complete(CompletedDirectoryUpload.builder() + .failedTransfers(failedFileUploads) + .build()); + }); } private void validateDirectory(UploadDirectoryRequest uploadDirectoryRequest) { diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java index b8649045850e..a4b7b68e75b4 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java @@ -16,6 +16,10 @@ package software.amazon.awssdk.transfer.s3.internal; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import io.reactivex.Flowable; import io.reactivex.Observable; @@ -36,6 +40,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.reactivestreams.Subscription; import software.amazon.awssdk.utils.async.SimplePublisher; class AsyncBufferingSubscriberTest { @@ -108,4 +113,21 @@ void onErrorInvoked_shouldCompleteFutureExceptionallyAndCancelRequestsFuture() { assertThat(futures.get(0)).isCancelled(); assertThat(futures.get(1)).isCancelled(); } + + @Test + public void consumerFunctionThrows_shouldCancelSubscriptionAndCompleteFutureExceptionally() { + RuntimeException exception = new RuntimeException("test"); + CompletableFuture future = new CompletableFuture<>(); + AsyncBufferingSubscriber subscriber = new AsyncBufferingSubscriber<>(s -> { + throw exception; + }, future, 1); + + Subscription mockSubscription = mock(Subscription.class); + + subscriber.onSubscribe(mockSubscription); + subscriber.onNext("item"); + + verify(mockSubscription, times(1)).cancel(); + assertThatThrownBy(future::join).hasCause(exception); + } } diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java index 700f65fb56d8..7d78b482ee69 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java @@ -39,6 +39,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; import org.assertj.core.util.Sets; @@ -471,6 +472,56 @@ void downloadDirectory_notDirectory_shouldCompleteFutureExceptionally(FileSystem .hasMessageContaining("is not a directory").hasCauseInstanceOf(IllegalArgumentException.class); } + @Test + void downloadDirectory_withDownloadRequestTransformer_transformerThrows_failsDownload() { + stubSuccessfulListObjects(listObjectsHelper, "key1", "key2"); + + FileDownload fileDownload = newSuccessfulDownload(); + FileDownload fileDownload2 = newSuccessfulDownload(); + + when(singleDownloadFunction.apply(any(DownloadFileRequest.class))).thenReturn(fileDownload, fileDownload2); + + + RuntimeException exception = new RuntimeException("boom"); + Consumer downloadFileRequestTransformer = b -> { + throw exception; + }; + + DirectoryDownload downloadDirectory = + downloadDirectoryHelper.downloadDirectory(DownloadDirectoryRequest.builder() + .destination(directory) + .bucket("bucket") + .downloadFileRequestTransformer(downloadFileRequestTransformer) + .build()); + + assertThatThrownBy(downloadDirectory.completionFuture()::join).getCause().hasCause(exception); + } + + @Test + void downloadDirectory_withListObjectsRequestTransformer_transformerThrows_failsDownload() { + stubSuccessfulListObjects(listObjectsHelper, "key1", "key2"); + + FileDownload fileDownload = newSuccessfulDownload(); + FileDownload fileDownload2 = newSuccessfulDownload(); + + when(singleDownloadFunction.apply(any(DownloadFileRequest.class))).thenReturn(fileDownload, fileDownload2); + + + RuntimeException exception = new RuntimeException("boom"); + Consumer downloadFileRequestTransformer = b -> { + throw exception; + }; + + DirectoryDownload downloadDirectory = + downloadDirectoryHelper.downloadDirectory(DownloadDirectoryRequest.builder() + .destination(directory) + .bucket("bucket") + .listObjectsV2RequestTransformer(downloadFileRequestTransformer) + .build()); + + assertThatThrownBy(downloadDirectory.completionFuture()::join).hasCause(exception); + } + private static DefaultFileDownload completedDownload() { return new DefaultFileDownload(CompletableFuture.completedFuture(CompletedFileDownload.builder() .response(GetObjectResponse.builder().build()) diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperTest.java index 9e975f09a357..735f5b735653 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperTest.java @@ -436,6 +436,29 @@ void uploadDirectory_notDirectoryFollowSymlinkTrue_shouldCompleteSuccessfully() assertThat(keys).containsOnly("2.txt"); } + @Test + public void uploadDirectory_requestTransformFunctionThrows_failsUpload() { + when(singleUploadFunction.apply(any())).thenReturn(null); + + RuntimeException exception = new RuntimeException("boom"); + + Consumer uploadFileRequestTransformer = r -> { + throw exception; + }; + + CompletableFuture uploadFuture = + uploadDirectoryHelper.uploadDirectory( + UploadDirectoryRequest.builder() + .source(directory) + .bucket("bucket") + .uploadFileRequestTransformer(uploadFileRequestTransformer) + .build()) + .completionFuture(); + + assertThatThrownBy(uploadFuture::join).getCause().hasCause(exception); + } + + private DefaultFileUpload completedUpload() { return new DefaultFileUpload(CompletableFuture.completedFuture(CompletedFileUpload.builder() .response(PutObjectResponse.builder().build())