Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ByteBufferStoringSubscriber.blockingTransferTo hanging indefinitely #5755

Open
1 task
liuchang0520 opened this issue Dec 17, 2024 · 1 comment
Open
1 task
Labels
bug This issue is a bug. needs-triage This issue or PR still needs to be triaged.

Comments

@liuchang0520
Copy link

liuchang0520 commented Dec 17, 2024

Describe the bug

Our application uses SDK v2 s3AsyncClient to send getObject request and chain futures to process the response.
We noticed that the sdk client thread sdk-async-response-* is stuck when reading the InputStream converted by AsyncResponseTransformer.toBlockingInputStream().

Implementation following:

final CompletableFuture<InputStream> future = new CompletableFuture<>();
asyncClient.getObject(requestBuilder.build(), AsyncResponseTransformer.toBlockingInputStream())
                    .handle((responseInputStream, e) -> {
                        if (e != null)
                            // exception handling step
                        else {
                            future.complete(responseInputStream);
                        }
                        return null;
                    });
return future;

Futures to read from inputStream is chained based on this future:

future.thenApply(inputStream -> {
                    ByteBuf buf = ....
                    try {
                        int expectedLen = .....;
                        buf.writeBytes(inputStream, expectedLen);
                        return buf;
                    } catch (Exception e) {
                       ......
                    } finally {
                        ......
                    }
                });

Stack trace following:

"sdk-async-response-1-883" #17464 [313453] daemon prio=5 os_prio=0 cpu=8547.18ms elapsed=168400.23s tid=0x0000ffff4c039ba0 nid=313453 waiting on condition  [0x0000fff939a53000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.4/Native Method)
	- parking to wait for  <0x000000071ad09a68> (a java.util.concurrent.Phaser$QNode)
	at java.util.concurrent.locks.LockSupport.park(java.base@21.0.4/LockSupport.java:221)
	at java.util.concurrent.Phaser$QNode.block(java.base@21.0.4/Phaser.java:1133)
	at java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@21.0.4/ForkJoinPool.java:3780)
	at java.util.concurrent.ForkJoinPool.managedBlock(java.base@21.0.4/ForkJoinPool.java:3725)
	at java.util.concurrent.Phaser.internalAwaitAdvance(java.base@21.0.4/Phaser.java:1063)
	at java.util.concurrent.Phaser.awaitAdvanceInterruptibly(java.base@21.0.4/Phaser.java:753)
	at software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber.blockingTransferTo(ByteBufferStoringSubscriber.java:148)
	at software.amazon.awssdk.utils.async.InputStreamSubscriber.read(InputStreamSubscriber.java:134)
	at software.amazon.awssdk.http.async.AbortableInputStreamSubscriber.read(AbortableInputStreamSubscriber.java:67)
	at software.amazon.awssdk.core.io.SdkFilterInputStream.read(SdkFilterInputStream.java:66)
       ....
	at io.netty.buffer.UnsafeByteBufUtil.setBytes(UnsafeByteBufUtil.java:472)
	at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:211)
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1122)

        **....... proprietary code above to invoke asyncClient.getObject and chain future to process the response ........**

	at java.util.concurrent.CompletableFuture.uniHandle(java.base@21.0.4/CompletableFuture.java:934)
	at java.util.concurrent.CompletableFuture$UniHandle.tryFire(java.base@21.0.4/CompletableFuture.java:911)
	at java.util.concurrent.CompletableFuture.postComplete(java.base@21.0.4/CompletableFuture.java:510)
	at java.util.concurrent.CompletableFuture.complete(java.base@21.0.4/CompletableFuture.java:2179)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:58)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage$$Lambda/0x00000018027ad898.accept(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(java.base@21.0.4/CompletableFuture.java:863)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(java.base@21.0.4/CompletableFuture.java:841)
	at java.util.concurrent.CompletableFuture.postComplete(java.base@21.0.4/CompletableFuture.java:510)
	at java.util.concurrent.CompletableFuture.complete(java.base@21.0.4/CompletableFuture.java:2179)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage.lambda$execute$2(AsyncApiCallTimeoutTrackingStage.java:69)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage$$Lambda/0x00000018027ad668.accept(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(java.base@21.0.4/CompletableFuture.java:863)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(java.base@21.0.4/CompletableFuture.java:841)
	at java.util.concurrent.CompletableFuture.postComplete(java.base@21.0.4/CompletableFuture.java:510)
	at java.util.concurrent.CompletableFuture.complete(java.base@21.0.4/CompletableFuture.java:2179)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage2$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage2.java:128)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage2$RetryingExecutor$$Lambda/0x00000018027aef08.accept(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(java.base@21.0.4/CompletableFuture.java:863)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(java.base@21.0.4/CompletableFuture.java:841)
	at java.util.concurrent.CompletableFuture.postComplete(java.base@21.0.4/CompletableFuture.java:510)
	at java.util.concurrent.CompletableFuture.complete(java.base@21.0.4/CompletableFuture.java:2179)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$execute$0(MakeAsyncHttpRequestStage.java:110)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$$Lambda/0x00000018027ae8a8.accept(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(java.base@21.0.4/CompletableFuture.java:863)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(java.base@21.0.4/CompletableFuture.java:841)
	at java.util.concurrent.CompletableFuture.postComplete(java.base@21.0.4/CompletableFuture.java:510)
	at java.util.concurrent.CompletableFuture.complete(java.base@21.0.4/CompletableFuture.java:2179)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.completeResponseFuture(MakeAsyncHttpRequestStage.java:253)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$3(MakeAsyncHttpRequestStage.java:167)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$$Lambda/0x00000018027ae448.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniHandle(java.base@21.0.4/CompletableFuture.java:934)
	at java.util.concurrent.CompletableFuture$UniHandle.tryFire(java.base@21.0.4/CompletableFuture.java:911)
	at java.util.concurrent.CompletableFuture$Completion.run(java.base@21.0.4/CompletableFuture.java:482)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@21.0.4/ThreadPoolExecutor.java:1144)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@21.0.4/ThreadPoolExecutor.java:642)
	at java.lang.Thread.runWith(java.base@21.0.4/Thread.java:1596)
	at java.lang.Thread.run(java.base@21.0.4/Thread.java:1583)

A couple of questions:

  1. What could cause the indefinite blocking when reading from the inputStream ?
  2. How can we avoid the indefinite blocking when processing the inputStream response from s3AsyncClient.getObject:
    e.g. Is there a way to specify timeout when using the S3AsyncClient to avoid indefinite blocking ?
    Could we expose exception so we can handle it from client side to avoid the indefinite blocking ?
  3. In the stack trace, we also noticed some timeout and retry - would could be the cause of the timeout:
software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage.lambda$execute$2(AsyncApiCallTimeoutTrackingStage.java:69)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage$$Lambda/0x00000018027ad668.accept(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(java.base@21.0.4/CompletableFuture.java:863)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(java.base@21.0.4/CompletableFuture.java:841)
	at java.util.concurrent.CompletableFuture.postComplete(java.base@21.0.4/CompletableFuture.java:510)
	at java.util.concurrent.CompletableFuture.complete(java.base@21.0.4/CompletableFuture.java:2179)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage2$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage2.java:128)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage2$RetryingExecutor$$Lambda/0x00000018027aef08.accept(Unknown Source)

Regression Issue

  • Select this option if this issue appears to be a regression.

Expected Behavior

We expect the reading from inputStream to be successful into the destination buffer.

Current Behavior

The inputStream reading blocks indefinitely(software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber.blockingTransferTo)

Reproduction Steps

Same as the code snippet above.

Possible Solution

No response

Additional Information/Context

No response

AWS Java SDK version used

2.27.9

JDK version used

Java 21

Operating System and version

Linux Wolfi, aarch64

@liuchang0520 liuchang0520 added bug This issue is a bug. needs-triage This issue or PR still needs to be triaged. labels Dec 17, 2024
@liuchang0520
Copy link
Author

Note that AsyncResponseTransformer.toBytes() doesn't apply to our use case due to a known memory issue in SDK v2:

  1. https://github.com/aws/aws-sdk-java-v2/issues/4392
  2. S3 getObject combined with AsyncResponseTransformer.toBytes() copies too much data #3193

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug This issue is a bug. needs-triage This issue or PR still needs to be triaged.
Projects
None yet
Development

No branches or pull requests

1 participant