Skip to content

Commit

Permalink
fix: do not signal onComplete when the incoming buffer length is less…
Browse files Browse the repository at this point in the history
… than the cipher block (#209)
  • Loading branch information
kessplas authored Mar 19, 2024
1 parent 5ee8b08 commit 8b1a686
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,21 @@ public void onNext(ByteBuffer byteBuffer) {
if (amountToReadFromByteBuffer > 0) {
byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer, amountToReadFromByteBuffer);
outputBuffer = cipher.update(buf, 0, amountToReadFromByteBuffer);
if (outputBuffer == null && amountToReadFromByteBuffer < cipher.getBlockSize()) {
// The underlying data is too short to fill in the block cipher
// This is true at the end of the file, so complete to get the final
// bytes
this.onComplete();
if (outputBuffer == null || outputBuffer.length == 0) {
// The underlying data is too short to fill in the block cipher.
// Note that while the JCE Javadoc specifies that the outputBuffer is null in this case,
// in practice SunJCE and ACCP return an empty buffer instead, hence checks for
// null OR length == 0.
if (contentRead.get() == contentLength) {
// All content has been read, so complete to get the final bytes
this.onComplete();
}
// Otherwise, wait for more bytes. To avoid blocking,
// send an empty buffer to the wrapped subscriber.
wrappedSubscriber.onNext(ByteBuffer.allocate(0));
} else {
wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer));
}
wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer));
} else {
// Do nothing
wrappedSubscriber.onNext(byteBuffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,22 @@ private void initializeForRead(long rangeBeginning, long rangeEnd) {
this.virtualAvailable = (rangeEnd - rangeBeginning) + 1;
}


@Override
public void onSubscribe(Subscription s) {
// In edge cases where the beginning index exceeds the offset,
// there is never valid data to read, so signal completion immediately.
// Otherwise, the CipherSubscriber tries and fails to read the last block.
// This probably should be an exception, but previous implementations
// return an empty string; signalling onComplete accomplishes this result
// and thus maintains compatibility.
if (virtualAvailable <= 0) {
wrappedSubscriber.onComplete();
}
wrappedSubscriber.onSubscribe(s);
}

@Override
public void onNext(ByteBuffer byteBuffer) {
// In edge cases where the beginning index exceeds the offset,
// there is never valid data to read, so signal completion immediately.
if (virtualAvailable <= 0) {
wrappedSubscriber.onComplete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import com.amazonaws.services.s3.model.EncryptionMaterials;
import com.amazonaws.services.s3.model.EncryptionMaterialsProvider;
import com.amazonaws.services.s3.model.StaticEncryptionMaterialsProvider;
import org.apache.commons.io.IOUtils;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.core.ResponseBytes;
Expand All @@ -29,18 +31,27 @@
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.encryption.s3.utils.BoundedInputStream;
import software.amazon.encryption.s3.utils.TinyBufferAsyncRequestBody;

import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import java.io.IOException;
import java.io.InputStream;
import java.security.NoSuchAlgorithmException;
import java.security.Provider;
import java.security.Security;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static software.amazon.encryption.s3.utils.S3EncryptionClientTestResources.BUCKET;
import static software.amazon.encryption.s3.utils.S3EncryptionClientTestResources.appendTestSuffix;
import static software.amazon.encryption.s3.utils.S3EncryptionClientTestResources.deleteObject;
Expand Down Expand Up @@ -407,4 +418,50 @@ public void copyObjectTransparentlyAsync() {
v3AsyncClient.close();
}

/**
* Test which artificially limits the size of buffers using {@link TinyBufferAsyncRequestBody}.
* This tests edge cases where network conditions result in buffers with length shorter than
* the cipher's block size.
* @throws IOException
*/
@Test
public void tinyBufferTest() throws IOException {
// BouncyCastle actually returns null buffers, unlike ACCP and SunJCE, which return empty buffers
Security.addProvider(new BouncyCastleProvider());
Provider provider = Security.getProvider("BC");
final String objectKey = appendTestSuffix("tiny-buffer-async");

S3AsyncClient v3AsyncClient = S3AsyncEncryptionClient.builder()
.aesKey(AES_KEY)
.cryptoProvider(provider)
.build();

// need enough data to split up
final long inputLength = 1024;
final InputStream input = new BoundedInputStream(inputLength);
final InputStream inputClean = new BoundedInputStream(inputLength);

final ExecutorService exec = Executors.newSingleThreadExecutor();

// Use this request body to limit the buffer size
TinyBufferAsyncRequestBody tinyBufferAsyncRequestBody = new TinyBufferAsyncRequestBody(AsyncRequestBody.fromInputStream(input, inputLength, exec));
CompletableFuture<PutObjectResponse> futurePut = v3AsyncClient.putObject(builder -> builder
.bucket(BUCKET)
.key(objectKey)
.build(), tinyBufferAsyncRequestBody);
futurePut.join();

CompletableFuture<ResponseBytes<GetObjectResponse>> futureGet = v3AsyncClient.getObject(builder -> builder
.bucket(BUCKET)
.key(objectKey)
.build(), AsyncResponseTransformer.toBytes());
ResponseBytes<GetObjectResponse> getResponse = futureGet.join();
assertTrue(IOUtils.contentEquals(inputClean, getResponse.asInputStream()));

// Cleanup
deleteObject(BUCKET, objectKey, v3AsyncClient);
v3AsyncClient.close();
exec.shutdown();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package software.amazon.encryption.s3.utils;

import org.reactivestreams.Subscriber;
import software.amazon.awssdk.core.async.AsyncRequestBody;

import java.nio.ByteBuffer;
import java.util.Optional;

/**
* AsyncRequestBody which wraps another AsyncRequestBody with a {@link TinyBufferSubscriber}.
* This is useful for testing poor network conditions where buffers may not be larger than
* the cipher's block size.
*/
public class TinyBufferAsyncRequestBody implements AsyncRequestBody {

private final AsyncRequestBody wrappedAsyncRequestBody;

public TinyBufferAsyncRequestBody(final AsyncRequestBody wrappedRequestBody) {
wrappedAsyncRequestBody = wrappedRequestBody;
}

@Override
public Optional<Long> contentLength() {
return wrappedAsyncRequestBody.contentLength();
}

@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
wrappedAsyncRequestBody.subscribe(new TinyBufferSubscriber(s));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package software.amazon.encryption.s3.utils;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.utils.BinaryUtils;

import java.nio.ByteBuffer;

/**
* Subscriber which purposefully limits the size of buffers sent to
* the wrapped subscriber. This is useful for simulating adverse network conditions.
*/
public class TinyBufferSubscriber implements Subscriber<ByteBuffer> {

private final Subscriber<? super ByteBuffer> wrappedSubscriber;

public TinyBufferSubscriber(final Subscriber wrappedSubscriber){
this.wrappedSubscriber = wrappedSubscriber;
}

@Override
public void onSubscribe(Subscription s) {
wrappedSubscriber.onSubscribe(s);
}

@Override
public void onNext(ByteBuffer b) {
int i = 0;
// any value below GCM block size works
int chunkSize = 5;
while (b.remaining() > chunkSize) {
ByteBuffer tb = b.slice();
tb.limit(chunkSize);
byte[] intermediateBuf = BinaryUtils.copyBytesFrom(tb, chunkSize);
b.position(i + chunkSize);
i += chunkSize;
wrappedSubscriber.onNext(ByteBuffer.wrap(intermediateBuf));
}
// send the rest of the bytes
ByteBuffer sb = b.slice();
sb.limit(b.remaining());
byte[] intermedBuf = BinaryUtils.copyBytesFrom(sb, chunkSize);
wrappedSubscriber.onNext(ByteBuffer.wrap(intermedBuf));
}

@Override
public void onError(Throwable t) {
wrappedSubscriber.onError(t);
}

@Override
public void onComplete() {
wrappedSubscriber.onComplete();
}
}

0 comments on commit 8b1a686

Please sign in to comment.