Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable event stream operations with RPC bound protocols in client SDKs #4036

Merged
merged 14 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .changelog/1740420128.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
applies_to:
- aws-sdk-rust
authors:
- ysaito1001
references:
- aws-sdk-rust#213
- aws-sdk-rust#1188
breaking: false
new_feature: true
bug_fix: false
---
Adds support for event stream operations with non-REST protocols such as AWS JSON. This update enables operations, including `SubscribeToShard` in Kinesis and `StartLiveTail` in CloudWatchLogs in the Rust SDK.
12 changes: 12 additions & 0 deletions .changelog/1740421036.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
applies_to:
- client
authors:
- ysaito1001
references:
- smithy-rs#121
breaking: false
new_feature: true
bug_fix: false
---
Adds support for event stream operations with non-REST protocols such as RPC v2 CBOR.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import software.amazon.smithy.rust.codegen.client.smithy.customizations.DocsRsMe
import software.amazon.smithy.rust.codegen.client.smithy.customizations.DocsRsMetadataSettings
import software.amazon.smithy.rust.codegen.client.smithy.customize.ClientCodegenDecorator
import software.amazon.smithy.rust.codegen.client.smithy.customize.CombinedClientCodegenDecorator
import software.amazon.smithy.rustsdk.customize.AwsDisableStalledStreamProtection
import software.amazon.smithy.rustsdk.customize.DisabledAuthDecorator
import software.amazon.smithy.rustsdk.customize.IsTruncatedPaginatorDecorator
import software.amazon.smithy.rustsdk.customize.RemoveDefaultsDecorator
Expand All @@ -18,7 +19,6 @@ import software.amazon.smithy.rustsdk.customize.applyExceptFor
import software.amazon.smithy.rustsdk.customize.dsql.DsqlDecorator
import software.amazon.smithy.rustsdk.customize.ec2.Ec2Decorator
import software.amazon.smithy.rustsdk.customize.glacier.GlacierDecorator
import software.amazon.smithy.rustsdk.customize.lambda.LambdaDecorator
import software.amazon.smithy.rustsdk.customize.onlyApplyTo
import software.amazon.smithy.rustsdk.customize.rds.RdsDecorator
import software.amazon.smithy.rustsdk.customize.route53.Route53Decorator
Expand All @@ -30,7 +30,6 @@ import software.amazon.smithy.rustsdk.customize.s3control.S3ControlDecorator
import software.amazon.smithy.rustsdk.customize.sso.SSODecorator
import software.amazon.smithy.rustsdk.customize.sts.STSDecorator
import software.amazon.smithy.rustsdk.customize.timestream.TimestreamDecorator
import software.amazon.smithy.rustsdk.customize.transcribestreaming.TranscribeStreamingDecorator
import software.amazon.smithy.rustsdk.endpoints.AwsEndpointsStdLib
import software.amazon.smithy.rustsdk.endpoints.OperationInputTestDecorator
import software.amazon.smithy.rustsdk.endpoints.RequireEndpointRules
Expand Down Expand Up @@ -67,6 +66,7 @@ val DECORATORS: List<ClientCodegenDecorator> =
ServiceEnvConfigDecorator(),
HttpRequestCompressionDecorator(),
DisablePayloadSigningDecorator(),
AwsDisableStalledStreamProtection(),
// TODO(https://github.com/smithy-lang/smithy-rs/issues/3863): Comment in once the issue has been resolved
// SmokeTestsDecorator(),
),
Expand All @@ -80,7 +80,6 @@ val DECORATORS: List<ClientCodegenDecorator> =
DsqlDecorator().onlyApplyTo("com.amazonaws.dsql#DSQL"),
Ec2Decorator().onlyApplyTo("com.amazonaws.ec2#AmazonEC2"),
GlacierDecorator().onlyApplyTo("com.amazonaws.glacier#Glacier"),
LambdaDecorator().onlyApplyTo("com.amazonaws.lambda#AWSGirApiService"),
RdsDecorator().onlyApplyTo("com.amazonaws.rds#AmazonRDSv19"),
Route53Decorator().onlyApplyTo("com.amazonaws.route53#AWSDnsV20130401"),
"com.amazonaws.s3#AmazonS3".applyDecorators(
Expand All @@ -95,7 +94,6 @@ val DECORATORS: List<ClientCodegenDecorator> =
SSODecorator().onlyApplyTo("com.amazonaws.sso#SWBPortalService"),
TimestreamDecorator().onlyApplyTo("com.amazonaws.timestreamwrite#Timestream_20181101"),
TimestreamDecorator().onlyApplyTo("com.amazonaws.timestreamquery#Timestream_20181101"),
TranscribeStreamingDecorator().onlyApplyTo("com.amazonaws.transcribestreaming#Transcribe"),
// Only build docs-rs for linux to reduce load on docs.rs
listOf(
DocsRsMetadataDecorator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Compani
import software.amazon.smithy.rust.codegen.core.rustlang.DependencyScope
import software.amazon.smithy.rust.codegen.core.rustlang.Writable
import software.amazon.smithy.rust.codegen.core.rustlang.writable
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeConfig
import software.amazon.smithy.rust.codegen.core.smithy.generators.LibRsCustomization
import software.amazon.smithy.rust.codegen.core.smithy.generators.LibRsSection
import software.amazon.smithy.rust.codegen.core.testutil.testDependenciesOnly
import software.amazon.smithy.rust.codegen.core.util.hasEventStreamOperations
import software.amazon.smithy.rustsdk.AwsCargoDependency.awsConfig
import software.amazon.smithy.rustsdk.AwsCargoDependency.awsRuntime
import java.nio.file.Files
Expand Down Expand Up @@ -76,29 +78,31 @@ class IntegrationTestDecorator : ClientCodegenDecorator {
}

class IntegrationTestDependencies(
private val codegenContext: ClientCodegenContext,
codegenContext: ClientCodegenContext,
private val moduleName: String,
private val hasTests: Boolean,
private val hasBenches: Boolean,
) : LibRsCustomization() {
private val runtimeConfig = codegenContext.runtimeConfig
private val serviceShape = codegenContext.serviceShape
private val model = codegenContext.model

override fun section(section: LibRsSection) =
when (section) {
is LibRsSection.Body ->
testDependenciesOnly {
if (hasTests) {
val smithyAsync =
CargoDependency.smithyAsync(codegenContext.runtimeConfig)
CargoDependency.smithyAsync(runtimeConfig)
.copy(features = setOf("test-util"), scope = DependencyScope.Dev)
val smithyTypes =
CargoDependency.smithyTypes(codegenContext.runtimeConfig)
CargoDependency.smithyTypes(runtimeConfig)
.copy(features = setOf("test-util"), scope = DependencyScope.Dev)
addDependency(awsRuntime(runtimeConfig).toDevDependency().withFeature("test-util"))
addDependency(FuturesUtil)
addDependency(FuturesUtil.toDevDependency())
addDependency(SerdeJson)
addDependency(smithyAsync)
addDependency(smithyProtocolTestHelpers(codegenContext.runtimeConfig))
addDependency(smithyProtocolTestHelpers(runtimeConfig))
addDependency(smithyRuntime(runtimeConfig).copy(features = setOf("test-util", "wire-mock"), scope = DependencyScope.Dev))
addDependency(smithyRuntimeApiTestUtil(runtimeConfig))
addDependency(smithyTypes)
Expand All @@ -109,6 +113,12 @@ class IntegrationTestDependencies(
if (hasBenches) {
addDependency(Criterion)
}
if (serviceShape.hasEventStreamOperations(model)) {
addDependency(
CargoDependency.smithyEventStream(runtimeConfig)
.copy(features = setOf("test-util"), scope = DependencyScope.Dev),
)
}
for (serviceSpecific in serviceSpecificCustomizations()) {
serviceSpecific.section(section)(this)
}
Expand All @@ -120,7 +130,7 @@ class IntegrationTestDependencies(
private fun serviceSpecificCustomizations(): List<LibRsCustomization> =
when (moduleName) {
"transcribestreaming" -> listOf(TranscribeTestDependencies())
"s3" -> listOf(S3TestDependencies(codegenContext))
"s3" -> listOf(S3TestDependencies(runtimeConfig))
"dynamodb" -> listOf(DynamoDbTestDependencies())
else -> emptyList()
}
Expand All @@ -142,11 +152,11 @@ class DynamoDbTestDependencies : LibRsCustomization() {
}
}

class S3TestDependencies(private val codegenContext: ClientCodegenContext) : LibRsCustomization() {
class S3TestDependencies(private val runtimeConfig: RuntimeConfig) : LibRsCustomization() {
override fun section(section: LibRsSection): Writable =
writable {
addDependency(awsConfig(codegenContext.runtimeConfig).toDevDependency().withFeature("behavior-version-latest"))
addDependency(smithyExperimental(codegenContext.runtimeConfig).toDevDependency())
addDependency(awsConfig(runtimeConfig).toDevDependency().withFeature("behavior-version-latest"))
addDependency(smithyExperimental(runtimeConfig).toDevDependency())
addDependency(AsyncStd)
addDependency(BytesUtils.toDevDependency())
addDependency(FastRand.toDevDependency())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package software.amazon.smithy.rustsdk.customize.lambda
package software.amazon.smithy.rustsdk.customize

import software.amazon.smithy.model.Model
import software.amazon.smithy.model.shapes.OperationShape
Expand All @@ -12,22 +12,31 @@ import software.amazon.smithy.model.shapes.ShapeId
import software.amazon.smithy.model.transform.ModelTransformer
import software.amazon.smithy.rust.codegen.client.smithy.ClientRustSettings
import software.amazon.smithy.rust.codegen.client.smithy.customize.ClientCodegenDecorator
import software.amazon.smithy.rust.codegen.client.smithy.traits.IncompatibleWithStalledStreamProtectionTrait
import software.amazon.smithy.rust.codegen.client.smithy.transformers.DisableStalledStreamProtection
import software.amazon.smithy.rust.codegen.core.util.letIf
import java.util.logging.Logger

/**
* Top level decorator for Lambda
* Disables stalled stream protection for specific operations.
*
* While a generic client-level decorator, `DisableStalledStreamProtection`, exists to handle this
* at the model level, certain cases require operation-specific removal criteria that cannot be
* generalized. (If we can fully generate the criteria, this class can be removed.)
*
* This class serves as a centralized solution for disabling stalled stream protection in such cases,
* preventing the need for service-specific decorators solely for this purpose.
*/
class LambdaDecorator : ClientCodegenDecorator {
class AwsDisableStalledStreamProtection : ClientCodegenDecorator {
// These long-running operations may have times with no data transfer,
// violating stalled stream protection.
private val operationsIncompatibleWithStalledStreamProtection =
setOf(
ShapeId.from("com.amazonaws.lambda#Invoke"),
ShapeId.from("com.amazonaws.lambda#InvokeAsync"),
ShapeId.from("com.amazonaws.lambda#InvokeWithResponseStream"),
ShapeId.from("com.amazonaws.s3#CopyObject"),
)

override val name: String = "Lambda"
override val name: String = "AwsDisableStalledStreamProtection"
override val order: Byte = 0
private val logger = Logger.getLogger(javaClass.name)

Expand All @@ -37,9 +46,9 @@ class LambdaDecorator : ClientCodegenDecorator {
settings: ClientRustSettings,
): Model =
ModelTransformer.create().mapShapes(model) { shape ->
shape.letIf(shape.id in operationsIncompatibleWithStalledStreamProtection) {
shape.letIf(operationsIncompatibleWithStalledStreamProtection.contains(shape.id)) {
logger.info("Adding IncompatibleWithStalledStreamProtection trait to $it")
(it as OperationShape).toBuilder().addTrait(IncompatibleWithStalledStreamProtectionTrait()).build()
(DisableStalledStreamProtection::transformOperation)((it as OperationShape))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import software.amazon.smithy.rust.codegen.client.smithy.generators.OperationCus
import software.amazon.smithy.rust.codegen.client.smithy.generators.OperationGenerator
import software.amazon.smithy.rust.codegen.client.smithy.generators.OperationSection
import software.amazon.smithy.rust.codegen.client.smithy.protocols.ClientRestXmlFactory
import software.amazon.smithy.rust.codegen.client.smithy.traits.IncompatibleWithStalledStreamProtectionTrait
import software.amazon.smithy.rust.codegen.core.rustlang.Writable
import software.amazon.smithy.rust.codegen.core.rustlang.rustBlockTemplate
import software.amazon.smithy.rust.codegen.core.rustlang.rustTemplate
Expand Down Expand Up @@ -61,10 +60,6 @@ class S3Decorator : ClientCodegenDecorator {
// API returns ListAllMyDirectoryBucketsResult instead of ListDirectoryBucketsOutput
ShapeId.from("com.amazonaws.s3#ListDirectoryBucketsOutput"),
)
private val operationsIncompatibleWithStalledStreamProtection =
setOf(
ShapeId.from("com.amazonaws.s3#CopyObject"),
)

override fun protocols(
serviceId: ShapeId,
Expand All @@ -87,9 +82,6 @@ class S3Decorator : ClientCodegenDecorator {
shape.letIf(isInInvalidXmlRootAllowList(shape)) {
logger.info("Adding AllowInvalidXmlRoot trait to $it")
(it as StructureShape).toBuilder().addTrait(AllowInvalidXmlRoot()).build()
}.letIf(operationsIncompatibleWithStalledStreamProtection.contains(shape.id)) {
logger.info("Adding IncompatibleWithStalledStreamProtection trait to $it")
(it as OperationShape).toBuilder().addTrait(IncompatibleWithStalledStreamProtectionTrait()).build()
}
}
// the model has the bucket in the path
Expand Down

This file was deleted.

Loading