Skip to content

Commit

Permalink
fix: unbounded streams are not supported (#422)
Browse files Browse the repository at this point in the history
* fix: unbounded streams are not supported

* fix indents
  • Loading branch information
kessplas authored Dec 11, 2024
1 parent 44e9886 commit 034bb89
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 15 deletions.
10 changes: 5 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.28.28</version>
<version>2.29.29</version>
<optional>true</optional>
<type>pom</type>
<scope>import</scope>
Expand All @@ -68,21 +68,21 @@
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>2.28.28</version>
<version>2.29.29</version>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>kms</artifactId>
<version>2.28.28</version>
<version>2.29.29</version>
</dependency>

<!-- Used when enableMultipartPutObject is configured -->
<dependency>
<groupId>software.amazon.awssdk.crt</groupId>
<artifactId>aws-crt</artifactId>
<optional>true</optional>
<version>0.31.3</version>
<version>0.33.5</version>
</dependency>

<dependency>
Expand Down Expand Up @@ -163,7 +163,7 @@
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId>
<version>2.28.28</version>
<version>2.29.29</version>
<optional>true</optional>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import org.reactivestreams.Subscriber;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.encryption.s3.S3EncryptionClientException;
import software.amazon.encryption.s3.materials.CryptographicMaterials;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -35,7 +36,9 @@ public CipherAsyncRequestBody(final AsyncRequestBody wrappedAsyncRequestBody, fi

@Override
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
wrappedAsyncRequestBody.subscribe(new CipherSubscriber(subscriber, contentLength().orElse(-1L), materials, iv));
wrappedAsyncRequestBody.subscribe(new CipherSubscriber(subscriber,
contentLength().orElseThrow(() -> new S3EncryptionClientException("Unbounded streams are currently not supported.")),
materials, iv));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public CompletableFuture<PutObjectResponse> putObject(PutObjectRequest request,
contentLength = request.contentLength();
}
} else {
contentLength = requestBody.contentLength().orElse(-1L);
contentLength = requestBody.contentLength().orElseThrow(() -> new S3EncryptionClientException("Unbounded streams are currently not supported."));
}

if (contentLength > AlgorithmSuite.ALG_AES_256_GCM_IV12_TAG16_NO_KDF.cipherMaxContentLengthBytes()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.utils.IoUtils;
import software.amazon.encryption.s3.utils.BoundedStreamBufferer;
import software.amazon.encryption.s3.utils.BoundedInputStream;
import software.amazon.encryption.s3.utils.BoundedStreamBufferer;
import software.amazon.encryption.s3.utils.MarkResetBoundedZerosInputStream;
import software.amazon.encryption.s3.utils.S3EncryptionClientTestResources;

Expand Down Expand Up @@ -135,6 +136,63 @@ public void ordinaryInputStreamV3Encrypt() throws IOException {
v3Client.close();
}

@Test
public void ordinaryInputStreamV3UnboundedAsync() {
try (S3AsyncClient s3AsyncEncryptionClient = S3AsyncEncryptionClient.builder().aesKey(AES_KEY).build()) {
final String objectKey = appendTestSuffix("ordinaryInputStreamV3UnboundedAsync");
BlockingInputStreamAsyncRequestBody body =
AsyncRequestBody.forBlockingInputStream(null);
try {
s3AsyncEncryptionClient.putObject(r -> r.bucket(BUCKET).key(objectKey), body);
fail("Expected exception!");
} catch (S3EncryptionClientException exception) {
// expected
assertTrue(exception.getMessage().contains("Unbounded streams are currently not supported"));
}
}
}

@Test
public void ordinaryInputStreamV3UnboundedMultipartAsync() {
try (S3AsyncClient s3AsyncEncryptionClient = S3AsyncEncryptionClient.builder()
.aesKey(AES_KEY)
.enableMultipartPutObject(true)
.build()) {
final String objectKey = appendTestSuffix("ordinaryInputStreamV3UnboundedAsync");
BlockingInputStreamAsyncRequestBody body =
AsyncRequestBody.forBlockingInputStream(null);
try {
s3AsyncEncryptionClient.putObject(r -> r.bucket(BUCKET).key(objectKey), body);
fail("Expected exception!");
} catch (S3EncryptionClientException exception) {
// expected
assertTrue(exception.getMessage().contains("Unbounded streams are currently not supported"));
}
}
}

@Test
public void ordinaryInputStreamV3UnboundedCrt() {
try (S3AsyncClient s3CrtAsyncClient = S3AsyncClient.crtCreate()) {
try (S3AsyncClient s3AsyncEncryptionClient = S3AsyncEncryptionClient.builder()
.aesKey(AES_KEY)
.enableMultipartPutObject(true)
.wrappedClient(s3CrtAsyncClient)
.build()) {
final String objectKey = appendTestSuffix("ordinaryInputStreamV3UnboundedCrt");
BlockingInputStreamAsyncRequestBody body =
AsyncRequestBody.forBlockingInputStream(null);
try {
s3AsyncEncryptionClient.putObject(r -> r.bucket(BUCKET).key(objectKey), body);
fail("Expected exception!");
} catch (S3EncryptionClientException exception) {
// expected
assertTrue(exception.getMessage().contains("Unbounded streams are currently not supported"));
}
}
}
}

@Test
public void ordinaryInputStreamV3Decrypt() throws IOException {
final String objectKey = appendTestSuffix("ordinaryInputStreamV3Decrypt");
Expand Down Expand Up @@ -274,9 +332,9 @@ public void customSetBufferSizeWithLargeObject() throws IOException {
final long fileSizeExceedingDefaultLimit = 1024 * 1024 * 32 + 1;
final InputStream largeObjectStream = new BoundedInputStream(fileSizeExceedingDefaultLimit);
v3ClientWithBuffer32MiB.putObject(PutObjectRequest.builder()
.bucket(BUCKET)
.key(objectKey)
.build(), RequestBody.fromInputStream(largeObjectStream, fileSizeExceedingDefaultLimit));
.bucket(BUCKET)
.key(objectKey)
.build(), RequestBody.fromInputStream(largeObjectStream, fileSizeExceedingDefaultLimit));

largeObjectStream.close();

Expand Down Expand Up @@ -327,9 +385,9 @@ public void customSetBufferSizeWithLargeObjectAsyncClient() throws IOException {
final InputStream largeObjectStream = new BoundedInputStream(fileSizeExceedingDefaultLimit);
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
CompletableFuture<PutObjectResponse> futurePut = v3ClientWithBuffer32MiB.putObject(PutObjectRequest.builder()
.bucket(BUCKET)
.key(objectKey)
.build(), AsyncRequestBody.fromInputStream(largeObjectStream, fileSizeExceedingDefaultLimit, singleThreadExecutor));
.bucket(BUCKET)
.key(objectKey)
.build(), AsyncRequestBody.fromInputStream(largeObjectStream, fileSizeExceedingDefaultLimit, singleThreadExecutor));

futurePut.join();
largeObjectStream.close();
Expand Down Expand Up @@ -387,7 +445,7 @@ public void delayedAuthModeWithLargeObject() throws IOException {
assertThrows(S3EncryptionClientException.class, () -> v3Client.getObjectAsBytes(builder -> builder
.bucket(BUCKET)
.key(objectKey)));

S3Client v3ClientWithDelayedAuth = S3EncryptionClient.builder()
.aesKey(AES_KEY)
.enableDelayedAuthenticationMode(true)
Expand Down

0 comments on commit 034bb89

Please sign in to comment.