diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index 5f87bd171..dcb9ca361 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -50,13 +50,21 @@ public void onNext(ByteBuffer byteBuffer) { if (amountToReadFromByteBuffer > 0) { byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer, amountToReadFromByteBuffer); outputBuffer = cipher.update(buf, 0, amountToReadFromByteBuffer); - if (outputBuffer == null && amountToReadFromByteBuffer < cipher.getBlockSize()) { - // The underlying data is too short to fill in the block cipher - // This is true at the end of the file, so complete to get the final - // bytes - this.onComplete(); + if (outputBuffer == null || outputBuffer.length == 0) { + // The underlying data is too short to fill in the block cipher. + // Note that while the JCE Javadoc specifies that the outputBuffer is null in this case, + // in practice SunJCE and ACCP return an empty buffer instead, hence checks for + // null OR length == 0. + if (contentRead.get() == contentLength) { + // All content has been read, so complete to get the final bytes + this.onComplete(); + } + // Otherwise, wait for more bytes. To avoid blocking, + // send an empty buffer to the wrapped subscriber. + wrappedSubscriber.onNext(ByteBuffer.allocate(0)); + } else { + wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); } - wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); } else { // Do nothing wrappedSubscriber.onNext(byteBuffer); diff --git a/src/main/java/software/amazon/encryption/s3/legacy/internal/AdjustedRangeSubscriber.java b/src/main/java/software/amazon/encryption/s3/legacy/internal/AdjustedRangeSubscriber.java index 57f94d831..8e5b2535d 100644 --- a/src/main/java/software/amazon/encryption/s3/legacy/internal/AdjustedRangeSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/legacy/internal/AdjustedRangeSubscriber.java @@ -39,16 +39,22 @@ private void initializeForRead(long rangeBeginning, long rangeEnd) { this.virtualAvailable = (rangeEnd - rangeBeginning) + 1; } - @Override public void onSubscribe(Subscription s) { + // In edge cases where the beginning index exceeds the offset, + // there is never valid data to read, so signal completion immediately. + // Otherwise, the CipherSubscriber tries and fails to read the last block. + // This probably should be an exception, but previous implementations + // return an empty string; signalling onComplete accomplishes this result + // and thus maintains compatibility. + if (virtualAvailable <= 0) { + wrappedSubscriber.onComplete(); + } wrappedSubscriber.onSubscribe(s); } @Override public void onNext(ByteBuffer byteBuffer) { - // In edge cases where the beginning index exceeds the offset, - // there is never valid data to read, so signal completion immediately. if (virtualAvailable <= 0) { wrappedSubscriber.onComplete(); } diff --git a/src/test/java/software/amazon/encryption/s3/S3AsyncEncryptionClientTest.java b/src/test/java/software/amazon/encryption/s3/S3AsyncEncryptionClientTest.java index ddc2fec37..5c6a487f4 100644 --- a/src/test/java/software/amazon/encryption/s3/S3AsyncEncryptionClientTest.java +++ b/src/test/java/software/amazon/encryption/s3/S3AsyncEncryptionClientTest.java @@ -13,6 +13,8 @@ import com.amazonaws.services.s3.model.EncryptionMaterials; import com.amazonaws.services.s3.model.EncryptionMaterialsProvider; import com.amazonaws.services.s3.model.StaticEncryptionMaterialsProvider; +import org.apache.commons.io.IOUtils; +import org.bouncycastle.jce.provider.BouncyCastleProvider; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import software.amazon.awssdk.core.ResponseBytes; @@ -29,18 +31,27 @@ import software.amazon.awssdk.services.s3.model.ObjectIdentifier; import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.encryption.s3.utils.BoundedInputStream; +import software.amazon.encryption.s3.utils.TinyBufferAsyncRequestBody; import javax.crypto.KeyGenerator; import javax.crypto.SecretKey; +import java.io.IOException; +import java.io.InputStream; import java.security.NoSuchAlgorithmException; +import java.security.Provider; +import java.security.Security; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static software.amazon.encryption.s3.utils.S3EncryptionClientTestResources.BUCKET; import static software.amazon.encryption.s3.utils.S3EncryptionClientTestResources.appendTestSuffix; import static software.amazon.encryption.s3.utils.S3EncryptionClientTestResources.deleteObject; @@ -407,4 +418,50 @@ public void copyObjectTransparentlyAsync() { v3AsyncClient.close(); } + /** + * Test which artificially limits the size of buffers using {@link TinyBufferAsyncRequestBody}. + * This tests edge cases where network conditions result in buffers with length shorter than + * the cipher's block size. + * @throws IOException + */ + @Test + public void tinyBufferTest() throws IOException { + // BouncyCastle actually returns null buffers, unlike ACCP and SunJCE, which return empty buffers + Security.addProvider(new BouncyCastleProvider()); + Provider provider = Security.getProvider("BC"); + final String objectKey = appendTestSuffix("tiny-buffer-async"); + + S3AsyncClient v3AsyncClient = S3AsyncEncryptionClient.builder() + .aesKey(AES_KEY) + .cryptoProvider(provider) + .build(); + + // need enough data to split up + final long inputLength = 1024; + final InputStream input = new BoundedInputStream(inputLength); + final InputStream inputClean = new BoundedInputStream(inputLength); + + final ExecutorService exec = Executors.newSingleThreadExecutor(); + + // Use this request body to limit the buffer size + TinyBufferAsyncRequestBody tinyBufferAsyncRequestBody = new TinyBufferAsyncRequestBody(AsyncRequestBody.fromInputStream(input, inputLength, exec)); + CompletableFuture futurePut = v3AsyncClient.putObject(builder -> builder + .bucket(BUCKET) + .key(objectKey) + .build(), tinyBufferAsyncRequestBody); + futurePut.join(); + + CompletableFuture> futureGet = v3AsyncClient.getObject(builder -> builder + .bucket(BUCKET) + .key(objectKey) + .build(), AsyncResponseTransformer.toBytes()); + ResponseBytes getResponse = futureGet.join(); + assertTrue(IOUtils.contentEquals(inputClean, getResponse.asInputStream())); + + // Cleanup + deleteObject(BUCKET, objectKey, v3AsyncClient); + v3AsyncClient.close(); + exec.shutdown(); + } + } diff --git a/src/test/java/software/amazon/encryption/s3/utils/TinyBufferAsyncRequestBody.java b/src/test/java/software/amazon/encryption/s3/utils/TinyBufferAsyncRequestBody.java new file mode 100644 index 000000000..50c4ba6e0 --- /dev/null +++ b/src/test/java/software/amazon/encryption/s3/utils/TinyBufferAsyncRequestBody.java @@ -0,0 +1,31 @@ +package software.amazon.encryption.s3.utils; + +import org.reactivestreams.Subscriber; +import software.amazon.awssdk.core.async.AsyncRequestBody; + +import java.nio.ByteBuffer; +import java.util.Optional; + +/** + * AsyncRequestBody which wraps another AsyncRequestBody with a {@link TinyBufferSubscriber}. + * This is useful for testing poor network conditions where buffers may not be larger than + * the cipher's block size. + */ +public class TinyBufferAsyncRequestBody implements AsyncRequestBody { + + private final AsyncRequestBody wrappedAsyncRequestBody; + + public TinyBufferAsyncRequestBody(final AsyncRequestBody wrappedRequestBody) { + wrappedAsyncRequestBody = wrappedRequestBody; + } + + @Override + public Optional contentLength() { + return wrappedAsyncRequestBody.contentLength(); + } + + @Override + public void subscribe(Subscriber s) { + wrappedAsyncRequestBody.subscribe(new TinyBufferSubscriber(s)); + } +} diff --git a/src/test/java/software/amazon/encryption/s3/utils/TinyBufferSubscriber.java b/src/test/java/software/amazon/encryption/s3/utils/TinyBufferSubscriber.java new file mode 100644 index 000000000..de073d6f4 --- /dev/null +++ b/src/test/java/software/amazon/encryption/s3/utils/TinyBufferSubscriber.java @@ -0,0 +1,55 @@ +package software.amazon.encryption.s3.utils; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import software.amazon.awssdk.utils.BinaryUtils; + +import java.nio.ByteBuffer; + +/** + * Subscriber which purposefully limits the size of buffers sent to + * the wrapped subscriber. This is useful for simulating adverse network conditions. + */ +public class TinyBufferSubscriber implements Subscriber { + + private final Subscriber wrappedSubscriber; + + public TinyBufferSubscriber(final Subscriber wrappedSubscriber){ + this.wrappedSubscriber = wrappedSubscriber; + } + + @Override + public void onSubscribe(Subscription s) { + wrappedSubscriber.onSubscribe(s); + } + + @Override + public void onNext(ByteBuffer b) { + int i = 0; + // any value below GCM block size works + int chunkSize = 5; + while (b.remaining() > chunkSize) { + ByteBuffer tb = b.slice(); + tb.limit(chunkSize); + byte[] intermediateBuf = BinaryUtils.copyBytesFrom(tb, chunkSize); + b.position(i + chunkSize); + i += chunkSize; + wrappedSubscriber.onNext(ByteBuffer.wrap(intermediateBuf)); + } + // send the rest of the bytes + ByteBuffer sb = b.slice(); + sb.limit(b.remaining()); + byte[] intermedBuf = BinaryUtils.copyBytesFrom(sb, chunkSize); + wrappedSubscriber.onNext(ByteBuffer.wrap(intermedBuf)); + } + + @Override + public void onError(Throwable t) { + wrappedSubscriber.onError(t); + } + + @Override + public void onComplete() { + wrappedSubscriber.onComplete(); + } +}