-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable event stream operations with RPC bound protocols in client SDKs #4036
Enable event stream operations with RPC bound protocols in client SDKs #4036
Conversation
…otocols (#4004) ## Motivation and Context Handles [initial-response](https://smithy.io/2.0/spec/streaming.html#initial-response) for event stream operations with an RPC-bound protocol in client SDKs. This makes event stream operations like [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html) in Kinesis and [StartLiveTrail](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_StartLiveTail.html) in CloudWatchLogs available in the Rust SDK, both of which are currently unavailable ([RemoveEventStreamOperations.kt](https://github.com/smithy-lang/smithy-rs/pull/4004/files#diff-b94081365137754f8674df177705a472ede6d3c21da608fff68a450fec5abd26) removes them via model transformation as unsupported). Note that this PR is the first in a series and merged into a feature branch, not into the main branch. It intentionally does not address `TODO(EventStream)` or does not disable stalled stream protection for newly enabled event stream operations. It just focuses on implementing handling `initial-response` messages. The next PR will most likely focus on handling `initial-request` message, and the last PR will focus on code cleanup (hence if code review feedback is about code cleanup, I happily accept it but might defer it to the last PR). ## Description At its core, handling initial-response for event stream operations is boiled down to using [try_recv_initial](https://github.com/smithy-lang/smithy-rs/blob/ec4292005728cec42ca2f9f8ba40f6b5bb563b05/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs#L208-L232) during deserialization phase within the orchestrator. Here is the issue, though. `try_recv_initial` is an async method but functions/methods in the deserializer in client SDKs are all synchronous, causing an impedance mismatch between async and sync. This means we need to find an async context somewhere in which to be able to call the `try_recv_initial` method. To sidestep this design limitation, we customize the `send` method for event stream operations with RPC-bound protocols whose operation output contains an event stream member. The customization makes the epilogue of the `send` method look like the following (using `unwrap` for simplicity): ``` pub async fn send( self, ) -> Result<crate::operation::some_op::SomeOp, SdkError<...>> { let input = ...; let runtime_plugins = ...; // new epilogue of the send method let output = crate::operation::some_op::SomeOp::orchestrate(&runtime_plugins, input).await?; let message = output.response_stream.try_recv_initial().await.map_err(response_error).unwrap(); match message { Some(message) => { // If operation output contains non-event stream members, we will populate them here let mut builder = output.into_builder(); // codegen needs to render this new `pub(crate)` conversion method on the output struct builder = crate::protocol_serde::shape_some_op::de_some_op(message.payload(), output).unwrap(); Ok(builder.build().unwrap()) }, // In this `None` branch, `try_recv_initial` will set aside the initial frame it has read from event stream if it's not initial-response. // The next time the user calls `recv` on the `EventReceiver`, the user correctly receives the frame that was read off. None => Ok(output), } } ``` This provides us with an async context where we can call `try_recv_initial` on an event stream member of an operation output struct and populate the non-event stream members of that struct. The downside though is that the operation output struct will be half-baked until the end of `send`, with non-event stream members uninitialized, so interceptors after deserialization won't have access to complete the output struct. (it's todo to document somewhere for operations in question). The rest of the changes are for testing. Specifically - added `cloudwatchlogs` as a new service integration test (to test `start_live_trail` event stream operation) - as part of this, the local helpers in `aws/sdk/integration-tests/transcribestreaming/tests/test.rs` have been moved to `aws_smithy_eventstream` and gated behind the `test-util` cargo feature. - added client SDK codegen test for event stream to verify the functionality in the cases a) an `initial-request` message is present and b) it is not - [The event stream test model](https://github.com/smithy-lang/smithy-rs/blob/ec4292005728cec42ca2f9f8ba40f6b5bb563b05/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/testutil/EventStreamTestModels.kt#L20) has been modified for the client SDK to include non-event stream member in the operation output struct ## Testing - Existing CI - Added a service integration test for `cloudwatchlogs` - Added client SDK codegen tests for verifying initial-response message handling ---- _By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice._
…tocols (#4015) ## Motivation and Context Continuation of #4004, this PR is part 2 of the series. The PR focus on the request side, specifically handling the serialization of an initial request message for event stream operations using RPC-bound protocols. ## Description Unlike `initial-response`, handling `initial-request` for event stream operations does _not_ require calling an async function, allowing serialization to be completed within a synchronous context. Here’s an updated snippet from the input serializer, rendered from a newly added codegen test (since we currently don’t have AWS services with RPC protocols where the operation input contains event streams, this test represents such a service): ``` let body = ::aws_smithy_types::body::SdkBody::from({ let error_marshaller = crate::event_stream_serde::TestStreamErrorMarshaller::new(); let marshaller = crate::event_stream_serde::TestStreamMarshaller::new(); let (signer, signer_sender) = ::aws_smithy_eventstream::frame::DeferredSigner::new(); _cfg.interceptor_state().store_put(signer_sender); ::aws_smithy_types::body::SdkBody::from_body_0_4(::hyper::Body::wrap_stream({ use ::futures_util::StreamExt; let body = crate::protocol_serde::shape_test_stream_op::ser_test_stream_op_input(&input)?; let initial_message = crate::event_stream_serde::initial_message_from_body(body); let mut buffer = Vec::new(); ::aws_smithy_eventstream::frame::write_message_to(&initial_message, &mut buffer)?; let initial_message_stream = futures_util::stream::iter(vec![Ok(buffer.into())]); let adapter = input.value.into_body_stream(marshaller, error_marshaller, signer); initial_message_stream.chain(adapter) })) }); ``` whereas previously ``` let body = ::aws_smithy_types::body::SdkBody::from({ let error_marshaller = crate::event_stream_serde::TestStreamErrorMarshaller::new(); let marshaller = crate::event_stream_serde::TestStreamMarshaller::new(); let (signer, signer_sender) = ::aws_smithy_eventstream::frame::DeferredSigner::new(); _cfg.interceptor_state().store_put(signer_sender); ::aws_smithy_types::body::SdkBody::from_body_0_4(::hyper::Body::wrap_stream( input .value .into_body_stream(marshaller, error_marshaller, signer), )) }); ``` This "prepending initial-request message" only occurs when the operation input contains at least one non-event stream member, other than an event stream member. The PR also refactors `ClientEventStreamUnmarshallerGeneratorTest.kt` (touched in the previous PR) while adding new tests in `ClientEventStreamMarshallerGeneratorTest`. ## Testing - Existing CI - Added client SDK codegen tests in `ClientEventStreamMarshallerGeneratorTest` to verify initial-request message handling ---- _By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice._
## Motivation and Context This is the final PR in the series adding support for event streaming with RPC-bound protocols. It primarily focuses on addressing code cleanup tasks that were intentionally left in the previous two PRs. ## Description The code clean-up tasks include - adding `DisableStalledStreamProtection` to disable stalled stream protection from at the top level of `ClientCodegenVisitor` across the models, eliminating the need for individual service-specific codegen decorators just to disable stalled stream protection. - removing `eventStreamAllowList`. - addressing `TODO(EventStream)` for RPC protocols. - adding a custom doc to fluent builders whose output operations are event stream and contain more than one structure members (i.e., need to populate non-event stream members at the end of `send`). ## Testing - Existing CI - Added `DisableStalledStreamProtectionTest` ---- _By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice._
A new generated diff is ready to view.
A new doc preview is ready to view. |
Are there public AWS SDK APIs we can test these against? |
Yes, You can try out this PR against the service models from the |
A new generated diff is ready to view.
A new doc preview is ready to view. |
Due to its current implementation, the non-event stream fields are not fully deserialized | ||
until the [`send`](Self::send) method completes. As a result, accessing these fields of the operation | ||
output struct within an interceptor may return uninitialized values. | ||
""", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This warning message has me thinking about the design. While the current implementation is workable for this PR, I have a few minor questions:
- Does this partial deserialization issue affect only interceptors, or could it also impact user-written code?
- Could we consider an alternative design in future where we use an
enum
to represent streaming messages? For example:
. One variant for the initial message (containing only fields that can be part of the initial message)
. Another variant for the complete message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Does this partial deserialization issue affect only interceptors
Yes it does, AFAIK.
- Could we consider an alternative design in future
The current implementation is two-way-door, so we should be able to switch to a different design in the future for sure. Not clear about whether using an enum
to represent streaming messages helps address the warning message in question though, since that sounds like exposing the concept of initial message to the users via event receiver when in fact that concept should be hidden as implementation details during deserialzation of the event stream.
...oftware/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentBuilderGenerator.kt
Show resolved
Hide resolved
...oftware/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentBuilderGenerator.kt
Show resolved
Hide resolved
...ware/amazon/smithy/rust/codegen/client/smithy/generators/protocol/ProtocolParserGenerator.kt
Show resolved
Hide resolved
...in/software/amazon/smithy/rust/codegen/client/smithy/protocols/HttpBoundProtocolGenerator.kt
Outdated
Show resolved
Hide resolved
...st/codegen/client/smithy/protocols/eventstream/ClientEventStreamUnmarshallerGeneratorTest.kt
Show resolved
Hide resolved
...amazon/smithy/rust/codegen/core/smithy/protocols/serialize/EventStreamMarshallerGenerator.kt
Outdated
Show resolved
Hide resolved
...amazon/smithy/rust/codegen/core/smithy/protocols/serialize/EventStreamMarshallerGenerator.kt
Outdated
Show resolved
Hide resolved
...tware/amazon/smithy/rust/codegen/core/smithy/protocols/serialize/QuerySerializerGenerator.kt
Show resolved
Hide resolved
This commit responds to #4036 (comment)
This commit responds to #4036 (comment)
This commit addresses the following: #4036 (comment) #4036 (comment) #4036 (comment)
This commit addresses #4036 (comment)
A new generated diff is ready to view.
A new doc preview is ready to view. |
I am not sure if the PR addresses the case of a union variant with a target marked with an error trait. Reference: Modeled errors in event streams. |
I believe the main branch's event stream implementation today (the one with limited support for REST protocols) already supports the modeled error, as shown in the test model. This gets rendered in
In #121, the relevant task ( |
A new generated diff is ready to view.
A new doc preview is ready to view. |
A new generated diff is ready to view.
A new doc preview is ready to view. |
Motivation and Context
Addresses aws-sdk-rust#213 and aws-sdk-rust#1188.
Description
This PR consolidates the changes from the previous sub-PRs:
Together, this PR enables event stream operations with RPC bound protocols, including
SubscribeToShard
in Kinesis andStartLiveTail
in CloudWatchLogs in the Rust SDK.Testing
(consolidated bullets from the previous sub-PRs)
ClientEventStreamUnmarshallerGeneratorTest
to reverify initial-response message handlingClientEventStreamMarshallerGeneratorTest
to verify initial-request message handlingDisableStalledStreamProtectionTest
Checklist
.changelog
directory, specifying "client," "server," or both in theapplies_to
key..changelog
directory, specifying "aws-sdk-rust" in theapplies_to
key.By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.