Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable event stream operations with RPC bound protocols in client SDKs #4036

Merged
merged 14 commits into from
Feb 28, 2025

Conversation

ysaito1001
Copy link
Contributor

Motivation and Context

Addresses aws-sdk-rust#213 and aws-sdk-rust#1188.

Description

This PR consolidates the changes from the previous sub-PRs:

Together, this PR enables event stream operations with RPC bound protocols, including SubscribeToShard in Kinesis and StartLiveTail in CloudWatchLogs in the Rust SDK.

Testing

(consolidated bullets from the previous sub-PRs)

  • Added a service integration test for cloudwatchlogs
  • Added client SDK codegen tests in ClientEventStreamUnmarshallerGeneratorTest to reverify initial-response message handling
  • Added client SDK codegen tests in ClientEventStreamMarshallerGeneratorTest to verify initial-request message handling
  • Added DisableStalledStreamProtectionTest
  • Confirmed a successful run in the release pipeline

Checklist

  • For changes to the smithy-rs codegen or runtime crates, I have created a changelog entry Markdown file in the .changelog directory, specifying "client," "server," or both in the applies_to key.
  • For changes to the AWS SDK, generated SDK code, or SDK runtime crates, I have created a changelog entry Markdown file in the .changelog directory, specifying "aws-sdk-rust" in the applies_to key.

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

…otocols (#4004)

## Motivation and Context
Handles
[initial-response](https://smithy.io/2.0/spec/streaming.html#initial-response)
for event stream operations with an RPC-bound protocol in client SDKs.
This makes event stream operations like
[SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)
in Kinesis and
[StartLiveTrail](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_StartLiveTail.html)
in CloudWatchLogs available in the Rust SDK, both of which are currently
unavailable
([RemoveEventStreamOperations.kt](https://github.com/smithy-lang/smithy-rs/pull/4004/files#diff-b94081365137754f8674df177705a472ede6d3c21da608fff68a450fec5abd26)
removes them via model transformation as unsupported).

Note that this PR is the first in a series and merged into a feature
branch, not into the main branch. It intentionally does not address
`TODO(EventStream)` or does not disable stalled stream protection for
newly enabled event stream operations. It just focuses on implementing
handling `initial-response` messages. The next PR will most likely focus
on handling `initial-request` message, and the last PR will focus on
code cleanup (hence if code review feedback is about code cleanup, I
happily accept it but might defer it to the last PR).

## Description
At its core, handling initial-response for event stream operations is
boiled down to using
[try_recv_initial](https://github.com/smithy-lang/smithy-rs/blob/ec4292005728cec42ca2f9f8ba40f6b5bb563b05/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs#L208-L232)
during deserialization phase within the orchestrator. Here is the issue,
though. `try_recv_initial` is an async method but functions/methods in
the deserializer in client SDKs are all synchronous, causing an
impedance mismatch between async and sync. This means we need to find an
async context somewhere in which to be able to call the
`try_recv_initial` method.

To sidestep this design limitation, we customize the `send` method for
event stream operations with RPC-bound protocols whose operation output
contains an event stream member. The customization makes the epilogue of
the `send` method look like the following (using `unwrap` for
simplicity):
```
    pub async fn send(
        self,
    ) -> Result<crate::operation::some_op::SomeOp, SdkError<...>> {
        let input = ...;
        let runtime_plugins = ...;

        // new epilogue of the send method
        let output = crate::operation::some_op::SomeOp::orchestrate(&runtime_plugins, input).await?;

        let message = output.response_stream.try_recv_initial().await.map_err(response_error).unwrap();

        match message {
            Some(message) => {
                // If operation output contains non-event stream members, we will populate them here
                let mut builder = output.into_builder(); // codegen needs to render this new `pub(crate)` conversion method on the output struct
                builder = crate::protocol_serde::shape_some_op::de_some_op(message.payload(), output).unwrap();
                Ok(builder.build().unwrap())
            },
            // In this `None` branch, `try_recv_initial` will set aside the initial frame it has read from event stream if it's not initial-response.
            // The next time the user calls `recv` on the `EventReceiver`, the user correctly receives the frame that was read off.
            None => Ok(output),
        }
    }
```
This provides us with an async context where we can call
`try_recv_initial` on an event stream member of an operation output
struct and populate the non-event stream members of that struct. The
downside though is that the operation output struct will be half-baked
until the end of `send`, with non-event stream members uninitialized, so
interceptors after deserialization won't have access to complete the
output struct. (it's todo to document somewhere for operations in
question).

The rest of the changes are for testing. Specifically
- added `cloudwatchlogs` as a new service integration test (to test
`start_live_trail` event stream operation)
- as part of this, the local helpers in
`aws/sdk/integration-tests/transcribestreaming/tests/test.rs` have been
moved to `aws_smithy_eventstream` and gated behind the `test-util` cargo
feature.
- added client SDK codegen test for event stream to verify the
functionality in the cases a) an `initial-request` message is present
and b) it is not
- [The event stream test
model](https://github.com/smithy-lang/smithy-rs/blob/ec4292005728cec42ca2f9f8ba40f6b5bb563b05/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/testutil/EventStreamTestModels.kt#L20)
has been modified for the client SDK to include non-event stream member
in the operation output struct

## Testing
- Existing CI
- Added a service integration test for `cloudwatchlogs`
- Added client SDK codegen tests for verifying initial-response message
handling

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
…tocols (#4015)

## Motivation and Context
Continuation of #4004, this PR is part 2 of the series. The PR focus on
the request side, specifically handling the serialization of an initial
request message for event stream operations using RPC-bound protocols.

## Description
Unlike `initial-response`, handling `initial-request` for event stream
operations does _not_ require calling an async function, allowing
serialization to be completed within a synchronous context.

Here’s an updated snippet from the input serializer, rendered from a
newly added codegen test (since we currently don’t have AWS services
with RPC protocols where the operation input contains event streams,
this test represents such a service):
```
        let body = ::aws_smithy_types::body::SdkBody::from({
            let error_marshaller = crate::event_stream_serde::TestStreamErrorMarshaller::new();
            let marshaller = crate::event_stream_serde::TestStreamMarshaller::new();
            let (signer, signer_sender) = ::aws_smithy_eventstream::frame::DeferredSigner::new();
            _cfg.interceptor_state().store_put(signer_sender);
            ::aws_smithy_types::body::SdkBody::from_body_0_4(::hyper::Body::wrap_stream({
                use ::futures_util::StreamExt;
                let body = crate::protocol_serde::shape_test_stream_op::ser_test_stream_op_input(&input)?;
                let initial_message = crate::event_stream_serde::initial_message_from_body(body);
                let mut buffer = Vec::new();
                ::aws_smithy_eventstream::frame::write_message_to(&initial_message, &mut buffer)?;
                let initial_message_stream = futures_util::stream::iter(vec![Ok(buffer.into())]);
                let adapter = input.value.into_body_stream(marshaller, error_marshaller, signer);
                initial_message_stream.chain(adapter)
            }))
        });
```
whereas previously
```
        let body = ::aws_smithy_types::body::SdkBody::from({
            let error_marshaller = crate::event_stream_serde::TestStreamErrorMarshaller::new();
            let marshaller = crate::event_stream_serde::TestStreamMarshaller::new();
            let (signer, signer_sender) = ::aws_smithy_eventstream::frame::DeferredSigner::new();
            _cfg.interceptor_state().store_put(signer_sender);
            ::aws_smithy_types::body::SdkBody::from_body_0_4(::hyper::Body::wrap_stream(
                input
                    .value
                    .into_body_stream(marshaller, error_marshaller, signer),
            ))
        });
```
This "prepending initial-request message" only occurs when the operation
input contains at least one non-event stream member, other than an event
stream member.

The PR also refactors `ClientEventStreamUnmarshallerGeneratorTest.kt`
(touched in the previous PR) while adding new tests in
`ClientEventStreamMarshallerGeneratorTest`.

## Testing
- Existing CI
- Added client SDK codegen tests in
`ClientEventStreamMarshallerGeneratorTest` to verify initial-request
message handling

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
## Motivation and Context
This is the final PR in the series adding support for event streaming
with RPC-bound protocols. It primarily focuses on addressing code
cleanup tasks that were intentionally left in the previous two PRs.

## Description
The code clean-up tasks include
- adding `DisableStalledStreamProtection` to disable stalled stream
protection from at the top level of `ClientCodegenVisitor` across the
models, eliminating the need for individual service-specific codegen
decorators just to disable stalled stream protection.
- removing `eventStreamAllowList`.
- addressing `TODO(EventStream)` for RPC protocols.
- adding a custom doc to fluent builders whose output operations are
event stream and contain more than one structure members (i.e., need to
populate non-event stream members at the end of `send`).

## Testing
- Existing CI
- Added `DisableStalledStreamProtectionTest`

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
Copy link

A new generated diff is ready to view.

A new doc preview is ready to view.

@ysaito1001 ysaito1001 marked this pull request as ready for review February 24, 2025 19:16
@ysaito1001 ysaito1001 requested review from a team as code owners February 24, 2025 19:16
@rcoh
Copy link
Collaborator

rcoh commented Feb 24, 2025

Are there public AWS SDK APIs we can test these against?

@ysaito1001
Copy link
Contributor Author

Are there public AWS SDK APIs we can test these against?

Yes, SubscribeToShard in Kinesis and StartLiveTail in CloudWatchLogs. I have tested against both, and this PR has added a connection recording test for the latter.

You can try out this PR against the service models from the aws-sdk-rust repo for kinesis.json and cloudwatch-logs.json, build their SDKs ./gradlew :aws:sdk:assemble, and call the APIs in question.

Copy link

A new generated diff is ready to view.

A new doc preview is ready to view.

Due to its current implementation, the non-event stream fields are not fully deserialized
until the [`send`](Self::send) method completes. As a result, accessing these fields of the operation
output struct within an interceptor may return uninitialized values.
""",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This warning message has me thinking about the design. While the current implementation is workable for this PR, I have a few minor questions:

  1. Does this partial deserialization issue affect only interceptors, or could it also impact user-written code?
  2. Could we consider an alternative design in future where we use an enum to represent streaming messages? For example:
    . One variant for the initial message (containing only fields that can be part of the initial message)
    . Another variant for the complete message

Copy link
Contributor Author

@ysaito1001 ysaito1001 Feb 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Does this partial deserialization issue affect only interceptors

Yes it does, AFAIK.

  1. Could we consider an alternative design in future

The current implementation is two-way-door, so we should be able to switch to a different design in the future for sure. Not clear about whether using an enum to represent streaming messages helps address the warning message in question though, since that sounds like exposing the concept of initial message to the users via event receiver when in fact that concept should be hidden as implementation details during deserialzation of the event stream.

Copy link

A new generated diff is ready to view.

A new doc preview is ready to view.

@drganjoo
Copy link
Contributor

I am not sure if the PR addresses the case of a union variant with a target marked with an error trait. Reference: Modeled errors in event streams.

@ysaito1001
Copy link
Contributor Author

I am not sure if the PR addresses the case of a union variant with a target marked with an error trait. Reference: Modeled errors in event streams.

I believe the main branch's event stream implementation today (the one with limited support for REST protocols) already supports the modeled error, as shown in the test model. This gets rendered in TestStreamUnmarshaller::unmarshall as something like

impl UnmarshallMessage for TestStreamUnmarshaller {
    // --snip--
    fn unmarshall(&self, message: &::aws_smithy_types::event_stream::Message) -> ... {
        let response_headers = ::aws_smithy_eventstream::smithy::parse_response_headers(message)?;
        match response_headers.message_type.as_str() {
            // --snip--
            "exception" => {
                if response_headers.smithy_type.as_str() == "SomeError" {
                    let mut builder = crate::types::error::builders::SomeErrorBuilder::default();
                    // --snip--
                    return Ok(::aws_smithy_eventstream::frame::UnmarshalledMessage::Error(
                        crate::types::error::TestStreamError::SomeError(builder.build()),
                    ));
                }
            }
            // --snip--
        }
    }

In #121, the relevant task (Events marked with @error are terminal...) is marked ✅.

Copy link

A new generated diff is ready to view.

A new doc preview is ready to view.

@ysaito1001 ysaito1001 added this pull request to the merge queue Feb 28, 2025
@ysaito1001 ysaito1001 removed this pull request from the merge queue due to a manual request Feb 28, 2025
@ysaito1001 ysaito1001 enabled auto-merge February 28, 2025 20:40
Copy link

A new generated diff is ready to view.

A new doc preview is ready to view.

@ysaito1001 ysaito1001 added this pull request to the merge queue Feb 28, 2025
Merged via the queue into main with commit 64144f4 Feb 28, 2025
45 checks passed
@ysaito1001 ysaito1001 deleted the ysaito/support-event-stream-for-rpc-bound-protocols branch February 28, 2025 21:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants