From 07207076dd77a5ce7af0799579aa56ed4a2b0440 Mon Sep 17 00:00:00 2001
From: Michael Law <1365977+lawmicha@users.noreply.github.com>
Date: Mon, 4 Mar 2024 16:24:31 -0500
Subject: [PATCH 01/23] WIP
---
.../xcschemes/AWSPluginsSDKCore.xcscheme | 66 +++++++++++++++++++
.../xcschemes/Amplify-Package.xcscheme | 23 +++++++
.../AWSAuthService.swift | 0
.../AWSAuthServiceBehavior.swift | 0
.../AWSAuthSessionBehavior.swift | 0
.../AWSPluginExtension.swift | 0
.../AmplifyAWSCredentialsProvider.swift | 0
.../AmplifyAWSServiceConfiguration.swift | 0
.../AmplifyAWSSignatureV4Signer.swift | 0
.../AuthAWSCredentialsProvider.swift | 0
.../AuthCognitoIdentityProvider.swift | 0
.../AuthCognitoTokensProvider.swift | 0
.../AuthTokenProvider.swift | 0
.../PluginClientEngine.swift | 0
.../SdkHttpRequest+updatingUserAgent.swift | 0
.../UserAgentSettingClientEngine.swift | 0
.../UserAgentSuffixAppender.swift | 0
.../IAMCredentialProvider.swift | 0
.../Resources/PrivacyInfo.xcprivacy | 8 +++
...lifyAWSServiceConfiguration+Platform.swift | 0
Package.swift | 26 +++++++-
21 files changed, 122 insertions(+), 1 deletion(-)
create mode 100644 .swiftpm/xcode/xcshareddata/xcschemes/AWSPluginsSDKCore.xcscheme
rename AmplifyPlugins/Core/{AWSPluginsCore/Auth => AWSPluginsSDKCore}/AWSAuthService.swift (100%)
rename AmplifyPlugins/Core/{AWSPluginsCore/Auth => AWSPluginsSDKCore}/AWSAuthServiceBehavior.swift (100%)
rename AmplifyPlugins/Core/{AWSPluginsCore/Auth => AWSPluginsSDKCore}/AWSAuthSessionBehavior.swift (100%)
rename AmplifyPlugins/Core/{AWSPluginsCore/Utils => AWSPluginsSDKCore}/AWSPluginExtension.swift (100%)
rename AmplifyPlugins/Core/{AWSPluginsCore/Auth/Provider => AWSPluginsSDKCore}/AmplifyAWSCredentialsProvider.swift (100%)
rename AmplifyPlugins/Core/{AWSPluginsCore/ServiceConfiguration => AWSPluginsSDKCore}/AmplifyAWSServiceConfiguration.swift (100%)
rename AmplifyPlugins/Core/{AWSPluginsCore/Auth/Provider => AWSPluginsSDKCore}/AmplifyAWSSignatureV4Signer.swift (100%)
rename AmplifyPlugins/Core/{AWSPluginsCore/Auth/Provider => AWSPluginsSDKCore}/AuthAWSCredentialsProvider.swift (100%)
rename AmplifyPlugins/Core/{AWSPluginsCore/Auth/Provider => AWSPluginsSDKCore}/AuthCognitoIdentityProvider.swift (100%)
rename AmplifyPlugins/Core/{AWSPluginsCore/Auth/Provider => AWSPluginsSDKCore}/AuthCognitoTokensProvider.swift (100%)
rename AmplifyPlugins/Core/{AWSPluginsCore/Auth/Provider => AWSPluginsSDKCore}/AuthTokenProvider.swift (100%)
rename AmplifyPlugins/Core/{AWSPluginsCore/Utils => AWSPluginsSDKCore}/CustomHttpClientEngine/PluginClientEngine.swift (100%)
rename AmplifyPlugins/Core/{AWSPluginsCore/Utils => AWSPluginsSDKCore}/CustomHttpClientEngine/SdkHttpRequest+updatingUserAgent.swift (100%)
rename AmplifyPlugins/Core/{AWSPluginsCore/Utils => AWSPluginsSDKCore}/CustomHttpClientEngine/UserAgentSettingClientEngine.swift (100%)
rename AmplifyPlugins/Core/{AWSPluginsCore/Utils => AWSPluginsSDKCore}/CustomHttpClientEngine/UserAgentSuffixAppender.swift (100%)
rename AmplifyPlugins/Core/{AWSPluginsCore/Auth/Provider => AWSPluginsSDKCore}/IAMCredentialProvider.swift (100%)
create mode 100644 AmplifyPlugins/Core/AWSPluginsSDKCore/Resources/PrivacyInfo.xcprivacy
rename AmplifyPlugins/Core/{AWSPluginsCore => AWSPluginsSDKCore}/ServiceConfiguration/AmplifyAWSServiceConfiguration+Platform.swift (100%)
diff --git a/.swiftpm/xcode/xcshareddata/xcschemes/AWSPluginsSDKCore.xcscheme b/.swiftpm/xcode/xcshareddata/xcschemes/AWSPluginsSDKCore.xcscheme
new file mode 100644
index 0000000000..595e4a2023
--- /dev/null
+++ b/.swiftpm/xcode/xcshareddata/xcschemes/AWSPluginsSDKCore.xcscheme
@@ -0,0 +1,66 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/.swiftpm/xcode/xcshareddata/xcschemes/Amplify-Package.xcscheme b/.swiftpm/xcode/xcshareddata/xcschemes/Amplify-Package.xcscheme
index 6970be959c..7b90cae3b1 100644
--- a/.swiftpm/xcode/xcshareddata/xcschemes/Amplify-Package.xcscheme
+++ b/.swiftpm/xcode/xcshareddata/xcschemes/Amplify-Package.xcscheme
@@ -482,6 +482,20 @@
ReferencedContainer = "container:">
+
+
+
+
+
+
+
+
diff --git a/AmplifyPlugins/Core/AWSPluginsCore/Auth/AWSAuthService.swift b/AmplifyPlugins/Core/AWSPluginsSDKCore/AWSAuthService.swift
similarity index 100%
rename from AmplifyPlugins/Core/AWSPluginsCore/Auth/AWSAuthService.swift
rename to AmplifyPlugins/Core/AWSPluginsSDKCore/AWSAuthService.swift
diff --git a/AmplifyPlugins/Core/AWSPluginsCore/Auth/AWSAuthServiceBehavior.swift b/AmplifyPlugins/Core/AWSPluginsSDKCore/AWSAuthServiceBehavior.swift
similarity index 100%
rename from AmplifyPlugins/Core/AWSPluginsCore/Auth/AWSAuthServiceBehavior.swift
rename to AmplifyPlugins/Core/AWSPluginsSDKCore/AWSAuthServiceBehavior.swift
diff --git a/AmplifyPlugins/Core/AWSPluginsCore/Auth/AWSAuthSessionBehavior.swift b/AmplifyPlugins/Core/AWSPluginsSDKCore/AWSAuthSessionBehavior.swift
similarity index 100%
rename from AmplifyPlugins/Core/AWSPluginsCore/Auth/AWSAuthSessionBehavior.swift
rename to AmplifyPlugins/Core/AWSPluginsSDKCore/AWSAuthSessionBehavior.swift
diff --git a/AmplifyPlugins/Core/AWSPluginsCore/Utils/AWSPluginExtension.swift b/AmplifyPlugins/Core/AWSPluginsSDKCore/AWSPluginExtension.swift
similarity index 100%
rename from AmplifyPlugins/Core/AWSPluginsCore/Utils/AWSPluginExtension.swift
rename to AmplifyPlugins/Core/AWSPluginsSDKCore/AWSPluginExtension.swift
diff --git a/AmplifyPlugins/Core/AWSPluginsCore/Auth/Provider/AmplifyAWSCredentialsProvider.swift b/AmplifyPlugins/Core/AWSPluginsSDKCore/AmplifyAWSCredentialsProvider.swift
similarity index 100%
rename from AmplifyPlugins/Core/AWSPluginsCore/Auth/Provider/AmplifyAWSCredentialsProvider.swift
rename to AmplifyPlugins/Core/AWSPluginsSDKCore/AmplifyAWSCredentialsProvider.swift
diff --git a/AmplifyPlugins/Core/AWSPluginsCore/ServiceConfiguration/AmplifyAWSServiceConfiguration.swift b/AmplifyPlugins/Core/AWSPluginsSDKCore/AmplifyAWSServiceConfiguration.swift
similarity index 100%
rename from AmplifyPlugins/Core/AWSPluginsCore/ServiceConfiguration/AmplifyAWSServiceConfiguration.swift
rename to AmplifyPlugins/Core/AWSPluginsSDKCore/AmplifyAWSServiceConfiguration.swift
diff --git a/AmplifyPlugins/Core/AWSPluginsCore/Auth/Provider/AmplifyAWSSignatureV4Signer.swift b/AmplifyPlugins/Core/AWSPluginsSDKCore/AmplifyAWSSignatureV4Signer.swift
similarity index 100%
rename from AmplifyPlugins/Core/AWSPluginsCore/Auth/Provider/AmplifyAWSSignatureV4Signer.swift
rename to AmplifyPlugins/Core/AWSPluginsSDKCore/AmplifyAWSSignatureV4Signer.swift
diff --git a/AmplifyPlugins/Core/AWSPluginsCore/Auth/Provider/AuthAWSCredentialsProvider.swift b/AmplifyPlugins/Core/AWSPluginsSDKCore/AuthAWSCredentialsProvider.swift
similarity index 100%
rename from AmplifyPlugins/Core/AWSPluginsCore/Auth/Provider/AuthAWSCredentialsProvider.swift
rename to AmplifyPlugins/Core/AWSPluginsSDKCore/AuthAWSCredentialsProvider.swift
diff --git a/AmplifyPlugins/Core/AWSPluginsCore/Auth/Provider/AuthCognitoIdentityProvider.swift b/AmplifyPlugins/Core/AWSPluginsSDKCore/AuthCognitoIdentityProvider.swift
similarity index 100%
rename from AmplifyPlugins/Core/AWSPluginsCore/Auth/Provider/AuthCognitoIdentityProvider.swift
rename to AmplifyPlugins/Core/AWSPluginsSDKCore/AuthCognitoIdentityProvider.swift
diff --git a/AmplifyPlugins/Core/AWSPluginsCore/Auth/Provider/AuthCognitoTokensProvider.swift b/AmplifyPlugins/Core/AWSPluginsSDKCore/AuthCognitoTokensProvider.swift
similarity index 100%
rename from AmplifyPlugins/Core/AWSPluginsCore/Auth/Provider/AuthCognitoTokensProvider.swift
rename to AmplifyPlugins/Core/AWSPluginsSDKCore/AuthCognitoTokensProvider.swift
diff --git a/AmplifyPlugins/Core/AWSPluginsCore/Auth/Provider/AuthTokenProvider.swift b/AmplifyPlugins/Core/AWSPluginsSDKCore/AuthTokenProvider.swift
similarity index 100%
rename from AmplifyPlugins/Core/AWSPluginsCore/Auth/Provider/AuthTokenProvider.swift
rename to AmplifyPlugins/Core/AWSPluginsSDKCore/AuthTokenProvider.swift
diff --git a/AmplifyPlugins/Core/AWSPluginsCore/Utils/CustomHttpClientEngine/PluginClientEngine.swift b/AmplifyPlugins/Core/AWSPluginsSDKCore/CustomHttpClientEngine/PluginClientEngine.swift
similarity index 100%
rename from AmplifyPlugins/Core/AWSPluginsCore/Utils/CustomHttpClientEngine/PluginClientEngine.swift
rename to AmplifyPlugins/Core/AWSPluginsSDKCore/CustomHttpClientEngine/PluginClientEngine.swift
diff --git a/AmplifyPlugins/Core/AWSPluginsCore/Utils/CustomHttpClientEngine/SdkHttpRequest+updatingUserAgent.swift b/AmplifyPlugins/Core/AWSPluginsSDKCore/CustomHttpClientEngine/SdkHttpRequest+updatingUserAgent.swift
similarity index 100%
rename from AmplifyPlugins/Core/AWSPluginsCore/Utils/CustomHttpClientEngine/SdkHttpRequest+updatingUserAgent.swift
rename to AmplifyPlugins/Core/AWSPluginsSDKCore/CustomHttpClientEngine/SdkHttpRequest+updatingUserAgent.swift
diff --git a/AmplifyPlugins/Core/AWSPluginsCore/Utils/CustomHttpClientEngine/UserAgentSettingClientEngine.swift b/AmplifyPlugins/Core/AWSPluginsSDKCore/CustomHttpClientEngine/UserAgentSettingClientEngine.swift
similarity index 100%
rename from AmplifyPlugins/Core/AWSPluginsCore/Utils/CustomHttpClientEngine/UserAgentSettingClientEngine.swift
rename to AmplifyPlugins/Core/AWSPluginsSDKCore/CustomHttpClientEngine/UserAgentSettingClientEngine.swift
diff --git a/AmplifyPlugins/Core/AWSPluginsCore/Utils/CustomHttpClientEngine/UserAgentSuffixAppender.swift b/AmplifyPlugins/Core/AWSPluginsSDKCore/CustomHttpClientEngine/UserAgentSuffixAppender.swift
similarity index 100%
rename from AmplifyPlugins/Core/AWSPluginsCore/Utils/CustomHttpClientEngine/UserAgentSuffixAppender.swift
rename to AmplifyPlugins/Core/AWSPluginsSDKCore/CustomHttpClientEngine/UserAgentSuffixAppender.swift
diff --git a/AmplifyPlugins/Core/AWSPluginsCore/Auth/Provider/IAMCredentialProvider.swift b/AmplifyPlugins/Core/AWSPluginsSDKCore/IAMCredentialProvider.swift
similarity index 100%
rename from AmplifyPlugins/Core/AWSPluginsCore/Auth/Provider/IAMCredentialProvider.swift
rename to AmplifyPlugins/Core/AWSPluginsSDKCore/IAMCredentialProvider.swift
diff --git a/AmplifyPlugins/Core/AWSPluginsSDKCore/Resources/PrivacyInfo.xcprivacy b/AmplifyPlugins/Core/AWSPluginsSDKCore/Resources/PrivacyInfo.xcprivacy
new file mode 100644
index 0000000000..74f8af8564
--- /dev/null
+++ b/AmplifyPlugins/Core/AWSPluginsSDKCore/Resources/PrivacyInfo.xcprivacy
@@ -0,0 +1,8 @@
+
+
+
+
+ NSPrivacyAccessedAPITypes
+
+
+
diff --git a/AmplifyPlugins/Core/AWSPluginsCore/ServiceConfiguration/AmplifyAWSServiceConfiguration+Platform.swift b/AmplifyPlugins/Core/AWSPluginsSDKCore/ServiceConfiguration/AmplifyAWSServiceConfiguration+Platform.swift
similarity index 100%
rename from AmplifyPlugins/Core/AWSPluginsCore/ServiceConfiguration/AmplifyAWSServiceConfiguration+Platform.swift
rename to AmplifyPlugins/Core/AWSPluginsSDKCore/ServiceConfiguration/AmplifyAWSServiceConfiguration+Platform.swift
diff --git a/Package.swift b/Package.swift
index 43065db332..1f3d46a58f 100644
--- a/Package.swift
+++ b/Package.swift
@@ -30,11 +30,24 @@ let amplifyTargets: [Target] = [
),
.target(
name: "AWSPluginsCore",
+ dependencies: [
+ "Amplify"
+ ],
+ path: "AmplifyPlugins/Core/AWSPluginsCore",
+ exclude: [
+ "Info.plist"
+ ],
+ resources: [
+ .copy("Resources/PrivacyInfo.xcprivacy")
+ ]
+ ),
+ .target(
+ name: "AWSPluginsSDKCore",
dependencies: [
"Amplify",
.product(name: "AWSClientRuntime", package: "aws-sdk-swift")
],
- path: "AmplifyPlugins/Core/AWSPluginsCore",
+ path: "AmplifyPlugins/Core/AWSPluginsSDKCore",
exclude: [
"Info.plist"
],
@@ -117,6 +130,7 @@ let apiTargets: [Target] = [
dependencies: [
.target(name: "Amplify"),
.target(name: "AWSPluginsCore"),
+ .target(name: "AWSPluginsSDKCore"),
.product(name: "AppSyncRealTimeClient", package: "aws-appsync-realtime-client-ios")],
path: "AmplifyPlugins/API/Sources/AWSAPIPlugin",
exclude: [
@@ -162,6 +176,7 @@ let authTargets: [Target] = [
.target(name: "Amplify"),
.target(name: "AmplifySRP"),
.target(name: "AWSPluginsCore"),
+ .target(name: "AWSPluginsSDKCore"),
.product(name: "AWSClientRuntime", package: "aws-sdk-swift"),
.product(name: "AWSCognitoIdentityProvider", package: "aws-sdk-swift"),
.product(name: "AWSCognitoIdentity", package: "aws-sdk-swift")
@@ -235,6 +250,7 @@ let storageTargets: [Target] = [
dependencies: [
.target(name: "Amplify"),
.target(name: "AWSPluginsCore"),
+ .target(name: "AWSPluginsSDKCore"),
.product(name: "AWSS3", package: "aws-sdk-swift")],
path: "AmplifyPlugins/Storage/Sources/AWSS3StoragePlugin",
exclude: [
@@ -265,6 +281,7 @@ let geoTargets: [Target] = [
dependencies: [
.target(name: "Amplify"),
.target(name: "AWSPluginsCore"),
+ .target(name: "AWSPluginsSDKCore"),
.product(name: "AWSLocation", package: "aws-sdk-swift")],
path: "AmplifyPlugins/Geo/Sources/AWSLocationGeoPlugin",
exclude: [
@@ -296,6 +313,7 @@ let internalPinpointTargets: [Target] = [
.target(name: "Amplify"),
.target(name: "AWSCognitoAuthPlugin"),
.target(name: "AWSPluginsCore"),
+ .target(name: "AWSPluginsSDKCore"),
.product(name: "SQLite", package: "SQLite.swift"),
.product(name: "AWSPinpoint", package: "aws-sdk-swift"),
.product(name: "AmplifyUtilsNotifications", package: "amplify-swift-utils-notifications")
@@ -364,6 +382,7 @@ let predictionsTargets: [Target] = [
dependencies: [
.target(name: "Amplify"),
.target(name: "AWSPluginsCore"),
+ .target(name: "AWSPluginsSDKCore"),
.target(name: "CoreMLPredictionsPlugin"),
.product(name: "AWSComprehend", package: "aws-sdk-swift"),
.product(name: "AWSPolly", package: "aws-sdk-swift"),
@@ -414,6 +433,7 @@ let loggingTargets: [Target] = [
dependencies: [
.target(name: "Amplify"),
.target(name: "AWSPluginsCore"),
+ .target(name: "AWSPluginsSDKCore"),
.product(name: "AWSCloudWatchLogs", package: "aws-sdk-swift"),
],
path: "AmplifyPlugins/Logging/Sources/AWSCloudWatchLoggingPlugin",
@@ -459,6 +479,10 @@ let package = Package(
name: "AWSPluginsCore",
targets: ["AWSPluginsCore"]
),
+ .library(
+ name: "AWSPluginsSDKCore",
+ targets: ["AWSPluginsSDKCore"]
+ ),
.library(
name: "AWSAPIPlugin",
targets: ["AWSAPIPlugin"]
From 1d47dcffe457de841158b7a25380ad16b97d0f22 Mon Sep 17 00:00:00 2001
From: Michael Law <1365977+lawmicha@users.noreply.github.com>
Date: Wed, 6 Mar 2024 11:24:57 -0800
Subject: [PATCH 02/23] DataStore compiles without SDK dependency
---
.../Auth}/AWSAuthService.swift | 5 -----
.../Auth}/AWSAuthServiceBehavior.swift | 3 ---
.../Auth}/AWSAuthSessionBehavior.swift | 0
.../Auth}/AuthAWSCredentialsProvider.swift | 0
.../Auth}/AuthCognitoIdentityProvider.swift | 0
.../Auth}/AuthCognitoTokensProvider.swift | 0
.../AWSAuthCredentialsProvider.swift | 16 ++++++++++++++++
.../AWSAuthCredentialsProviderBehavior.swift | 14 ++++++++++++++
8 files changed, 30 insertions(+), 8 deletions(-)
rename AmplifyPlugins/Core/{AWSPluginsSDKCore => AWSPluginsCore/Auth}/AWSAuthService.swift (96%)
rename AmplifyPlugins/Core/{AWSPluginsSDKCore => AWSPluginsCore/Auth}/AWSAuthServiceBehavior.swift (86%)
rename AmplifyPlugins/Core/{AWSPluginsSDKCore => AWSPluginsCore/Auth}/AWSAuthSessionBehavior.swift (100%)
rename AmplifyPlugins/Core/{AWSPluginsSDKCore => AWSPluginsCore/Auth}/AuthAWSCredentialsProvider.swift (100%)
rename AmplifyPlugins/Core/{AWSPluginsSDKCore => AWSPluginsCore/Auth}/AuthCognitoIdentityProvider.swift (100%)
rename AmplifyPlugins/Core/{AWSPluginsSDKCore => AWSPluginsCore/Auth}/AuthCognitoTokensProvider.swift (100%)
create mode 100644 AmplifyPlugins/Core/AWSPluginsSDKCore/AWSAuthCredentialsProvider.swift
create mode 100644 AmplifyPlugins/Core/AWSPluginsSDKCore/AWSAuthCredentialsProviderBehavior.swift
diff --git a/AmplifyPlugins/Core/AWSPluginsSDKCore/AWSAuthService.swift b/AmplifyPlugins/Core/AWSPluginsCore/Auth/AWSAuthService.swift
similarity index 96%
rename from AmplifyPlugins/Core/AWSPluginsSDKCore/AWSAuthService.swift
rename to AmplifyPlugins/Core/AWSPluginsCore/Auth/AWSAuthService.swift
index d0c279314c..b15b4b7d6c 100644
--- a/AmplifyPlugins/Core/AWSPluginsSDKCore/AWSAuthService.swift
+++ b/AmplifyPlugins/Core/AWSPluginsCore/Auth/AWSAuthService.swift
@@ -7,16 +7,11 @@
import Foundation
import Amplify
-import AWSClientRuntime
public class AWSAuthService: AWSAuthServiceBehavior {
public init() {}
- public func getCredentialsProvider() -> CredentialsProviding {
- return AmplifyAWSCredentialsProvider()
- }
-
/// Retrieves the identity identifier for this authentication session from Cognito.
public func getIdentityID() async throws -> String {
let session = try await Amplify.Auth.fetchAuthSession()
diff --git a/AmplifyPlugins/Core/AWSPluginsSDKCore/AWSAuthServiceBehavior.swift b/AmplifyPlugins/Core/AWSPluginsCore/Auth/AWSAuthServiceBehavior.swift
similarity index 86%
rename from AmplifyPlugins/Core/AWSPluginsSDKCore/AWSAuthServiceBehavior.swift
rename to AmplifyPlugins/Core/AWSPluginsCore/Auth/AWSAuthServiceBehavior.swift
index 6c302cc928..1c23190588 100644
--- a/AmplifyPlugins/Core/AWSPluginsSDKCore/AWSAuthServiceBehavior.swift
+++ b/AmplifyPlugins/Core/AWSPluginsCore/Auth/AWSAuthServiceBehavior.swift
@@ -7,12 +7,9 @@
import Foundation
import Amplify
-import AWSClientRuntime
public protocol AWSAuthServiceBehavior: AnyObject {
- func getCredentialsProvider() -> CredentialsProviding
-
func getTokenClaims(tokenString: String) -> Result<[String: AnyObject], AuthError>
/// Retrieves the identity identifier of for the Auth service
diff --git a/AmplifyPlugins/Core/AWSPluginsSDKCore/AWSAuthSessionBehavior.swift b/AmplifyPlugins/Core/AWSPluginsCore/Auth/AWSAuthSessionBehavior.swift
similarity index 100%
rename from AmplifyPlugins/Core/AWSPluginsSDKCore/AWSAuthSessionBehavior.swift
rename to AmplifyPlugins/Core/AWSPluginsCore/Auth/AWSAuthSessionBehavior.swift
diff --git a/AmplifyPlugins/Core/AWSPluginsSDKCore/AuthAWSCredentialsProvider.swift b/AmplifyPlugins/Core/AWSPluginsCore/Auth/AuthAWSCredentialsProvider.swift
similarity index 100%
rename from AmplifyPlugins/Core/AWSPluginsSDKCore/AuthAWSCredentialsProvider.swift
rename to AmplifyPlugins/Core/AWSPluginsCore/Auth/AuthAWSCredentialsProvider.swift
diff --git a/AmplifyPlugins/Core/AWSPluginsSDKCore/AuthCognitoIdentityProvider.swift b/AmplifyPlugins/Core/AWSPluginsCore/Auth/AuthCognitoIdentityProvider.swift
similarity index 100%
rename from AmplifyPlugins/Core/AWSPluginsSDKCore/AuthCognitoIdentityProvider.swift
rename to AmplifyPlugins/Core/AWSPluginsCore/Auth/AuthCognitoIdentityProvider.swift
diff --git a/AmplifyPlugins/Core/AWSPluginsSDKCore/AuthCognitoTokensProvider.swift b/AmplifyPlugins/Core/AWSPluginsCore/Auth/AuthCognitoTokensProvider.swift
similarity index 100%
rename from AmplifyPlugins/Core/AWSPluginsSDKCore/AuthCognitoTokensProvider.swift
rename to AmplifyPlugins/Core/AWSPluginsCore/Auth/AuthCognitoTokensProvider.swift
diff --git a/AmplifyPlugins/Core/AWSPluginsSDKCore/AWSAuthCredentialsProvider.swift b/AmplifyPlugins/Core/AWSPluginsSDKCore/AWSAuthCredentialsProvider.swift
new file mode 100644
index 0000000000..8c651aefb7
--- /dev/null
+++ b/AmplifyPlugins/Core/AWSPluginsSDKCore/AWSAuthCredentialsProvider.swift
@@ -0,0 +1,16 @@
+//
+// Copyright Amazon.com Inc. or its affiliates.
+// All Rights Reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+
+import Foundation
+import Amplify
+import AWSClientRuntime
+
+public class AWSAuthCredentialsProvider: AWSAuthCredentialsProviderBehavior {
+ public func getCredentialsProvider() -> CredentialsProviding {
+ return AmplifyAWSCredentialsProvider()
+ }
+}
diff --git a/AmplifyPlugins/Core/AWSPluginsSDKCore/AWSAuthCredentialsProviderBehavior.swift b/AmplifyPlugins/Core/AWSPluginsSDKCore/AWSAuthCredentialsProviderBehavior.swift
new file mode 100644
index 0000000000..80f1b3c4ee
--- /dev/null
+++ b/AmplifyPlugins/Core/AWSPluginsSDKCore/AWSAuthCredentialsProviderBehavior.swift
@@ -0,0 +1,14 @@
+//
+// Copyright Amazon.com Inc. or its affiliates.
+// All Rights Reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+
+import Foundation
+import Amplify
+import AWSClientRuntime
+
+public protocol AWSAuthCredentialsProviderBehavior {
+ func getCredentialsProvider() -> CredentialsProviding
+}
From 6e29d884d508804f195ef08e47dfa64f1a779a44 Mon Sep 17 00:00:00 2001
From: Di Wu
Date: Thu, 4 Apr 2024 11:53:05 -0700
Subject: [PATCH 03/23] refactor(datastore-v2): use api plugin with async
sequences
---
.../Operation/RetryableGraphQLOperation.swift | 266 ++++++++----------
.../AmplifyAsyncThrowingSequence.swift | 2 +
.../AmplifyTask+OperationTaskAdapters.swift | 4 +-
.../Sources/AWSAPIPlugin/AWSAPIPlugin.swift | 2 +-
.../AWSGraphQLSubscriptionTaskRunner.swift | 4 +-
.../APICategoryGraphQLBehaviorExtended.swift | 41 ---
.../Auth/AWSAuthModeStrategy.swift | 30 ++
.../StorageEngine+SyncRequirement.swift | 2 +-
.../InitialSync/InitialSyncOperation.swift | 73 ++---
.../InitialSync/InitialSyncOrchestrator.swift | 6 +-
.../OutgoingMutationQueue+Action.swift | 2 +-
.../OutgoingMutationQueue+State.swift | 2 +-
.../OutgoingMutationQueue.swift | 12 +-
...ocessMutationErrorFromCloudOperation.swift | 62 ++--
.../SyncMutationToCloudOperation.swift | 85 +++---
.../Sync/RemoteSyncEngine+Action.swift | 4 +-
.../Sync/RemoteSyncEngine+State.swift | 4 +-
.../Sync/RemoteSyncEngine.swift | 8 +-
.../Sync/RemoteSyncEngineBehavior.swift | 2 +-
.../AWSIncomingEventReconciliationQueue.swift | 4 +-
...WSIncomingSubscriptionEventPublisher.swift | 2 +-
...omingAsyncSubscriptionEventPublisher.swift | 174 ++++++------
.../AWSModelReconciliationQueue.swift | 4 +-
.../Sync/Support/AsyncStream+Extensions.swift | 21 ++
.../Mocks/MockOutgoingMutationQueue.swift | 2 +-
.../Mocks/MockRemoteSyncEngine.swift | 2 +-
.../TestSupport/Mocks/NoOpMutationQueue.swift | 2 +-
.../DataStoreConnectionScenario1Tests.swift | 2 +-
.../DataStoreHubEventsTests.swift | 1 -
...reLargeNumberModelsSubscriptionTests.swift | 2 +-
.../HubEventsIntegrationTestBase.swift | 4 +-
.../Mocks/MockAPICategoryPlugin.swift | 2 +-
32 files changed, 423 insertions(+), 410 deletions(-)
delete mode 100644 AmplifyPlugins/Core/AWSPluginsCore/API/APICategoryGraphQLBehaviorExtended.swift
create mode 100644 AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/Support/AsyncStream+Extensions.swift
diff --git a/Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift b/Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift
index ed2a6e2753..86281585b0 100644
--- a/Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift
+++ b/Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift
@@ -6,183 +6,159 @@
//
import Foundation
+import Combine
-/// Convenience protocol to handle any kind of GraphQLOperation
-public protocol AnyGraphQLOperation {
- associatedtype Success
- associatedtype Failure: Error
- typealias ResultListener = (Result) -> Void
-}
-/// Abastraction for a retryable GraphQLOperation.
-public protocol RetryableGraphQLOperationBehavior: Operation, DefaultLogger {
- associatedtype Payload: Decodable
+// MARK: - RetryableGraphQLOperation
+public final class RetryableGraphQLOperation {
+ public typealias Payload = Payload
- /// GraphQLOperation concrete type
- associatedtype OperationType: AnyGraphQLOperation
+ public let requestFactory: AsyncStream<() -> GraphQLRequest>
+ public weak var api: APICategoryGraphQLBehavior?
+ private var task: Task?
- typealias RequestFactory = () async -> GraphQLRequest
- typealias OperationFactory = (GraphQLRequest, @escaping OperationResultListener) -> OperationType
- typealias OperationResultListener = OperationType.ResultListener
+ public init(
+ requestFactory: T,
+ api: APICategoryGraphQLBehavior
+ ) where T.Element == () -> GraphQLRequest {
+ self.requestFactory = requestFactory.asyncStream
+ self.api = api
+ }
- /// Operation unique identifier
- var id: UUID { get }
+ deinit {
+ cancel()
+ }
- /// Number of attempts (min 1)
- var attempts: Int { get set }
+ public func execute(
+ _ operationType: GraphQLOperationType
+ ) -> Future.Success, APIError> {
+ Future() { promise in
+ self.task = Task { promise(await self.run(operationType)) }
+ }
+ }
- /// Underlying GraphQL operation instantiated by `operationFactory`
- var underlyingOperation: AtomicValue { get set }
+ public func run(_ operationType: GraphQLOperationType) async -> Result.Success, APIError> {
+ for await request in requestFactory {
+ do {
+ try Task.checkCancellation()
+ switch (self.api, operationType) {
+ case (.some(let api), .query):
+ return .success(try await api.query(request: request()))
+ case (.some(let api), .mutation):
+ return .success(try await api.mutate(request: request()))
+ default:
+ return .failure(.operationError("Unable to run GraphQL operation with type \(operationType)", ""))
+ }
- /// Maximum number of allowed retries
- var maxRetries: Int { get }
+ } catch is CancellationError {
+ return .failure(.operationError("GraphQL operation cancelled", ""))
+ } catch {
+ guard let error = error as? APIError,
+ let authError = error.underlyingError as? AuthError
+ else {
+ return .failure(.operationError("Failed to send \(operationType) GraphQL request", "", error))
+ }
- /// GraphQLRequest factory, invoked to create a new operation
- var requestFactory: RequestFactory { get }
+ switch authError {
+ case .signedOut, .notAuthorized: break;
+ default: return .failure(error)
+ }
+ }
+ }
+ return .failure(APIError.operationError("Failed to execute GraphQL operation \(operationType)", "", nil))
+ }
- /// GraphQL operation factory, invoked with a newly created GraphQL request
- /// and a wrapped result listener.
- var operationFactory: OperationFactory { get }
+ public func cancel() {
+ task?.cancel()
+ }
- var resultListener: OperationResultListener { get }
+}
- init(requestFactory: @escaping RequestFactory,
- maxRetries: Int,
- resultListener: @escaping OperationResultListener,
- _ operationFactory: @escaping OperationFactory)
+public final class RetryableGraphQLSubscriptionOperation {
- func start(request: GraphQLRequest)
+ public typealias Payload = Payload
- func shouldRetry(error: APIError?) -> Bool
-}
+ public let requestFactory: AsyncStream<() async -> GraphQLRequest>
+ public weak var api: APICategoryGraphQLBehavior?
+ private var task: Task?
-extension RetryableGraphQLOperationBehavior {
- public static var log: Logger {
- Amplify.Logging.logger(forCategory: CategoryType.api.displayName, forNamespace: String(describing: self))
+ public init(
+ requestFactory: T,
+ api: APICategoryGraphQLBehavior
+ ) where T.Element == () async -> GraphQLRequest {
+ self.requestFactory = requestFactory.asyncStream
+ self.api = api
}
- public var log: Logger {
- Self.log
+
+ deinit {
+ cancel()
}
-}
-// MARK: RetryableGraphQLOperationBehavior + default implementation
-extension RetryableGraphQLOperationBehavior {
- public func start(request: GraphQLRequest) {
- attempts += 1
- log.debug("[\(id)] - Try [\(attempts)/\(maxRetries)]")
- let wrappedResultListener: OperationResultListener = { result in
- if case let .failure(error) = result, self.shouldRetry(error: error as? APIError) {
- self.log.debug("\(error)")
- Task {
- self.start(request: await self.requestFactory())
- }
- return
- }
+ public func subscribe() -> AnyPublisher, APIError> {
+ let subject = PassthroughSubject, APIError>()
+ self.task = Task { await self.trySubscribe(subject) }
+ return subject.eraseToAnyPublisher()
+ }
- if case let .failure(error) = result {
- self.log.debug("\(error)")
- self.log.debug("[\(self.id)] - Failed")
+ private func trySubscribe(_ subject: PassthroughSubject, APIError>) async {
+ var apiError: APIError?
+ for await request in requestFactory {
+ guard let sequence = self.api?.subscribe(request: await request()) else {
+ continue
}
+ do {
+ try Task.checkCancellation()
- if case .success = result {
- self.log.debug("[Operation \(self.id)] - Success")
+ for try await event in sequence {
+ try Task.checkCancellation()
+ Self.log.debug("Subscribe event \(event)")
+ subject.send(event)
+ }
+ } catch is CancellationError {
+ subject.send(completion: .finished)
+ } catch {
+ if let error = error as? APIError {
+ apiError = error
+ }
+ Self.log.debug("Failed with subscription request: \(error)")
}
- self.resultListener(result)
+ sequence.cancel()
}
- underlyingOperation.set(operationFactory(request, wrappedResultListener))
- }
-}
-
-// MARK: - RetryableGraphQLOperation
-public final class RetryableGraphQLOperation: Operation, RetryableGraphQLOperationBehavior {
- public typealias Payload = Payload
- public typealias OperationType = GraphQLOperation
-
- public var id: UUID
- public var maxRetries: Int
- public var attempts: Int = 0
- public var requestFactory: RequestFactory
- public var underlyingOperation: AtomicValue?> = AtomicValue(initialValue: nil)
- public var resultListener: OperationResultListener
- public var operationFactory: OperationFactory
-
- public init(requestFactory: @escaping RequestFactory,
- maxRetries: Int,
- resultListener: @escaping OperationResultListener,
- _ operationFactory: @escaping OperationFactory) {
- self.id = UUID()
- self.maxRetries = max(1, maxRetries)
- self.requestFactory = requestFactory
- self.operationFactory = operationFactory
- self.resultListener = resultListener
- }
-
- public override func main() {
- Task {
- start(request: await requestFactory())
+ if apiError != nil {
+ subject.send(completion: .failure(apiError!))
+ } else {
+ subject.send(completion: .finished)
}
}
- override public func cancel() {
- self.underlyingOperation.get()?.cancel()
- }
-
- public func shouldRetry(error: APIError?) -> Bool {
- guard case let .operationError(_, _, underlyingError) = error,
- let authError = underlyingError as? AuthError else {
- return false
- }
-
- switch authError {
- case .signedOut, .notAuthorized:
- return attempts < maxRetries
- default:
- return false
- }
+ public func cancel() {
+ self.task?.cancel()
}
}
-// MARK: - RetryableGraphQLSubscriptionOperation
-public final class RetryableGraphQLSubscriptionOperation: Operation,
- RetryableGraphQLOperationBehavior {
- public typealias OperationType = GraphQLSubscriptionOperation
-
- public typealias Payload = Payload
-
- public var id: UUID
- public var maxRetries: Int
- public var attempts: Int = 0
- public var underlyingOperation: AtomicValue?> = AtomicValue(initialValue: nil)
- public var requestFactory: RequestFactory
- public var resultListener: OperationResultListener
- public var operationFactory: OperationFactory
-
- public init(requestFactory: @escaping RequestFactory,
- maxRetries: Int,
- resultListener: @escaping OperationResultListener,
- _ operationFactory: @escaping OperationFactory) {
- self.id = UUID()
- self.maxRetries = max(1, maxRetries)
- self.requestFactory = requestFactory
- self.operationFactory = operationFactory
- self.resultListener = resultListener
- }
- public override func main() {
- Task {
- start(request: await requestFactory())
+extension AsyncSequence {
+ fileprivate var asyncStream: AsyncStream {
+ AsyncStream { continuation in
+ Task {
+ var it = self.makeAsyncIterator()
+ do {
+ while let ele = try await it.next() {
+ continuation.yield(ele)
+ }
+ continuation.finish()
+ } catch {
+ continuation.finish()
+ }
+ }
}
}
+}
- public override func cancel() {
- self.underlyingOperation.get()?.cancel()
+extension RetryableGraphQLSubscriptionOperation {
+ public static var log: Logger {
+ Amplify.Logging.logger(forCategory: CategoryType.api.displayName, forNamespace: String(describing: self))
}
-
- public func shouldRetry(error: APIError?) -> Bool {
- return attempts < maxRetries
+ public var log: Logger {
+ Self.log
}
-
}
-
-// MARK: GraphQLOperation - GraphQLSubscriptionOperation + AnyGraphQLOperation
-extension GraphQLOperation: AnyGraphQLOperation {}
-extension GraphQLSubscriptionOperation: AnyGraphQLOperation {}
diff --git a/Amplify/Core/Support/AmplifyAsyncThrowingSequence.swift b/Amplify/Core/Support/AmplifyAsyncThrowingSequence.swift
index 38772392da..6a4841f13b 100644
--- a/Amplify/Core/Support/AmplifyAsyncThrowingSequence.swift
+++ b/Amplify/Core/Support/AmplifyAsyncThrowingSequence.swift
@@ -6,6 +6,7 @@
//
import Foundation
+import Combine
public typealias WeakAmplifyAsyncThrowingSequenceRef = WeakRef>
@@ -49,4 +50,5 @@ public class AmplifyAsyncThrowingSequence: AsyncSequence, Can
parent?.cancel()
finish()
}
+
}
diff --git a/Amplify/Core/Support/AmplifyTask+OperationTaskAdapters.swift b/Amplify/Core/Support/AmplifyTask+OperationTaskAdapters.swift
index fb56c6df18..50e505bce9 100644
--- a/Amplify/Core/Support/AmplifyTask+OperationTaskAdapters.swift
+++ b/Amplify/Core/Support/AmplifyTask+OperationTaskAdapters.swift
@@ -20,7 +20,9 @@ public class AmplifyOperationTaskAdapter) {
self.operation = operation
self.childTask = ChildTask(parent: operation)
- resultToken = operation.subscribe(resultListener: resultListener)
+ resultToken = operation.subscribe { [weak self] in
+ self?.resultListener($0)
+ }
}
deinit {
diff --git a/AmplifyPlugins/API/Sources/AWSAPIPlugin/AWSAPIPlugin.swift b/AmplifyPlugins/API/Sources/AWSAPIPlugin/AWSAPIPlugin.swift
index ce124f1f54..9d3117dd17 100644
--- a/AmplifyPlugins/API/Sources/AWSAPIPlugin/AWSAPIPlugin.swift
+++ b/AmplifyPlugins/API/Sources/AWSAPIPlugin/AWSAPIPlugin.swift
@@ -9,7 +9,7 @@ import Amplify
import AWSPluginsCore
import Foundation
-final public class AWSAPIPlugin: NSObject, APICategoryPlugin, APICategoryGraphQLBehaviorExtended, AWSAPIAuthInformation {
+final public class AWSAPIPlugin: NSObject, APICategoryPlugin, AWSAPIAuthInformation {
/// The unique key of the plugin within the API category.
public var key: PluginKey {
return "awsAPIPlugin"
diff --git a/AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLSubscriptionTaskRunner.swift b/AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLSubscriptionTaskRunner.swift
index 3e70654298..c58bc2da90 100644
--- a/AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLSubscriptionTaskRunner.swift
+++ b/AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLSubscriptionTaskRunner.swift
@@ -106,8 +106,8 @@ public class AWSGraphQLSubscriptionTaskRunner: InternalTaskRunner,
self.subscription = try await appSyncClient?.subscribe(
id: subscriptionId,
query: encodeRequest(query: request.document, variables: request.variables)
- ).sink(receiveValue: { [weak self] event in
- self?.onAsyncSubscriptionEvent(event: event)
+ ).sink(receiveValue: { event in
+ self.onAsyncSubscriptionEvent(event: event)
})
} catch {
let error = APIError.operationError("Unable to get connection for api \(endpointConfig.name)", "", error)
diff --git a/AmplifyPlugins/Core/AWSPluginsCore/API/APICategoryGraphQLBehaviorExtended.swift b/AmplifyPlugins/Core/AWSPluginsCore/API/APICategoryGraphQLBehaviorExtended.swift
deleted file mode 100644
index 91d56f9763..0000000000
--- a/AmplifyPlugins/Core/AWSPluginsCore/API/APICategoryGraphQLBehaviorExtended.swift
+++ /dev/null
@@ -1,41 +0,0 @@
-//
-// Copyright Amazon.com Inc. or its affiliates.
-// All Rights Reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-//
-
-import Foundation
-import Amplify
-
-/// Extending the existing `APICategoryGraphQLBehavior` to include callback based APIs.
-///
-/// This exists to allow DataStore to continue to use the `APICategoryGraphQLCallbackBehavior` APIs without exposing
-/// them publicly from Amplify in `APICategoryGraphQLBehavior`. Eventually, the goal is for DataStore to use the
-/// Async APIs, at which point, this protocol can be completely removed. Introducing this protocol allows Amplify to
-/// to fully deprecate the callback based APIs, while allowing DataStore a gradual migration path forward in moving
-/// away from APIPlugin's callback APIs to the Async APIs.
-/// See https://github.com/aws-amplify/amplify-ios/issues/2252 for more details
-///
-/// - Warning: Although this has `public` access, it is intended for internal use and should not be used directly
-/// by host applications. The behavior of this may change without warning.
-public protocol APICategoryGraphQLBehaviorExtended:
- APICategoryGraphQLCallbackBehavior, APICategoryGraphQLBehavior, AnyObject { }
-
-/// Listener callback based APIs
-///
-/// - Warning: Although this has `public` access, it is intended for internal use and should not be used directly
-/// by host applications. The behavior of this may change without warning.
-public protocol APICategoryGraphQLCallbackBehavior {
- @discardableResult
- func query(request: GraphQLRequest,
- listener: GraphQLOperation.ResultListener?) -> GraphQLOperation
- @discardableResult
- func mutate(request: GraphQLRequest,
- listener: GraphQLOperation.ResultListener?) -> GraphQLOperation
-
- func subscribe(request: GraphQLRequest,
- valueListener: GraphQLSubscriptionOperation.InProcessListener?,
- completionListener: GraphQLSubscriptionOperation.ResultListener?)
- -> GraphQLSubscriptionOperation
-}
diff --git a/AmplifyPlugins/Core/AWSPluginsCore/Auth/AWSAuthModeStrategy.swift b/AmplifyPlugins/Core/AWSPluginsCore/Auth/AWSAuthModeStrategy.swift
index 020dc68e60..eeeea09ff1 100644
--- a/AmplifyPlugins/Core/AWSPluginsCore/Auth/AWSAuthModeStrategy.swift
+++ b/AmplifyPlugins/Core/AWSPluginsCore/Auth/AWSAuthModeStrategy.swift
@@ -6,6 +6,7 @@
//
import Foundation
+import Combine
import Amplify
/// Represents different auth strategies supported by a client
@@ -95,6 +96,35 @@ public struct AWSAuthorizationTypeIterator: AuthorizationTypeIterator {
}
}
+extension AuthorizationTypeIterator {
+ public var asyncStream: AsyncStream {
+ var it = self
+ return AsyncStream { continuation in
+ while let authType = it.next() {
+ continuation.yield(authType)
+ }
+ continuation.finish()
+ }
+ }
+
+ public var optionalAsyncStream: AsyncStream {
+ var it = self
+ if it.hasNext {
+ return AsyncStream { continuation in
+ while let authType = it.next() {
+ continuation.yield(authType)
+ }
+ continuation.finish()
+ }
+ } else {
+ return AsyncStream { continuation in
+ continuation.yield(nil)
+ continuation.finish()
+ }
+ }
+ }
+}
+
// MARK: - AWSDefaultAuthModeStrategy
/// AWS default auth mode strategy.
diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngine+SyncRequirement.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngine+SyncRequirement.swift
index 0b99894ea6..b6a8aac20c 100644
--- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngine+SyncRequirement.swift
+++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngine+SyncRequirement.swift
@@ -25,7 +25,7 @@ extension StorageEngine {
))
}
- guard let apiGraphQL = api as? APICategoryGraphQLBehaviorExtended else {
+ guard let apiGraphQL = api as? APICategoryGraphQLBehavior else {
log.info("Unable to find GraphQL API plugin for syncEngine. syncEngine will not be started")
return .failure(.configuration(
"Unable to find suitable GraphQL API plugin for syncEngine. syncEngine will not be started",
diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOperation.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOperation.swift
index e01f235b88..80b29e2245 100644
--- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOperation.swift
+++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOperation.swift
@@ -13,7 +13,7 @@ import Foundation
final class InitialSyncOperation: AsynchronousOperation {
typealias SyncQueryResult = PaginatedList
- private weak var api: APICategoryGraphQLBehaviorExtended?
+ private weak var api: APICategoryGraphQLBehavior?
private weak var reconciliationQueue: IncomingEventReconciliationQueue?
private weak var storageAdapter: StorageEngineAdapter?
private let dataStoreConfiguration: DataStoreConfiguration
@@ -22,6 +22,7 @@ final class InitialSyncOperation: AsynchronousOperation {
private let modelSchema: ModelSchema
private var recordsReceived: UInt
+ private var queryTask: Task?
private var syncMaxRecords: UInt {
return dataStoreConfiguration.syncMaxRecords
@@ -61,7 +62,7 @@ final class InitialSyncOperation: AsynchronousOperation {
}
init(modelSchema: ModelSchema,
- api: APICategoryGraphQLBehaviorExtended?,
+ api: APICategoryGraphQLBehavior?,
reconciliationQueue: IncomingEventReconciliationQueue?,
storageAdapter: StorageEngineAdapter?,
dataStoreConfiguration: DataStoreConfiguration,
@@ -86,7 +87,7 @@ final class InitialSyncOperation: AsynchronousOperation {
log.info("Beginning sync for \(modelSchema.name)")
let lastSyncMetadata = getLastSyncMetadata()
let lastSyncTime = getLastSyncTime(lastSyncMetadata)
- Task {
+ self.queryTask = Task {
await query(lastSyncTime: lastSyncTime)
}
}
@@ -168,42 +169,41 @@ final class InitialSyncOperation: AsynchronousOperation {
}
let minSyncPageSize = Int(min(syncMaxRecords - recordsReceived, syncPageSize))
let limit = minSyncPageSize < 0 ? Int(syncPageSize) : minSyncPageSize
- let completionListener: GraphQLOperation.ResultListener = { result in
- switch result {
- case .failure(let apiError):
- if self.isAuthSignedOutError(apiError: apiError) {
- self.log.error("Sync for \(self.modelSchema.name) failed due to signed out error \(apiError.errorDescription)")
- }
-
- // TODO: Retry query on error
- let error = DataStoreError.api(apiError)
- self.dataStoreConfiguration.errorHandler(error)
- self.finish(result: .failure(error))
- case .success(let graphQLResult):
- self.handleQueryResults(lastSyncTime: lastSyncTime, graphQLResult: graphQLResult)
+ let authTypes = await authModeStrategy.authTypesFor(schema: modelSchema, operation: .read)
+ .optionalAsyncStream.map { authType in {
+ GraphQLRequest.syncQuery(modelSchema: self.modelSchema,
+ where: self.syncPredicate,
+ limit: limit,
+ nextToken: nextToken,
+ lastSync: lastSyncTime,
+ authType: authType
+ )
+ }}
+
+
+ let result: Result, APIError> = await RetryableGraphQLOperation(
+ requestFactory: authTypes,
+ api: api
+ ).run(.query)
+
+ switch result {
+ case .success(let graphQLResult):
+ await handleQueryResults(lastSyncTime: lastSyncTime, graphQLResult: graphQLResult)
+ case .failure(let apiError):
+ if self.isAuthSignedOutError(apiError: apiError) {
+ self.log.error("Sync for \(self.modelSchema.name) failed due to signed out error \(apiError.errorDescription)")
}
+ self.dataStoreConfiguration.errorHandler(DataStoreError.api(apiError))
+ self.finish(result: .failure(.api(apiError)))
}
-
- var authTypes = await authModeStrategy.authTypesFor(schema: modelSchema, operation: .read)
-
- RetryableGraphQLOperation(requestFactory: {
- GraphQLRequest.syncQuery(modelSchema: self.modelSchema,
- where: self.syncPredicate,
- limit: limit,
- nextToken: nextToken,
- lastSync: lastSyncTime,
- authType: authTypes.next())
- },
- maxRetries: authTypes.count,
- resultListener: completionListener) { nextRequest, wrappedCompletionListener in
- api.query(request: nextRequest, listener: wrappedCompletionListener)
- }.main()
}
/// Disposes of the query results: Stops if error, reconciles results if success, and kick off a new query if there
/// is a next token
- private func handleQueryResults(lastSyncTime: Int64?,
- graphQLResult: Result>) {
+ private func handleQueryResults(
+ lastSyncTime: Int64?,
+ graphQLResult: Result>
+ ) async {
guard !isCancelled else {
finish(result: .successfulVoid)
return
@@ -238,9 +238,7 @@ final class InitialSyncOperation: AsynchronousOperation {
}
if let nextToken = syncQueryResult.nextToken, recordsReceived < syncMaxRecords {
- Task {
- await self.query(lastSyncTime: lastSyncTime, nextToken: nextToken)
- }
+ await self.query(lastSyncTime: lastSyncTime, nextToken: nextToken)
} else {
updateModelSyncMetadata(lastSyncTime: syncQueryResult.startedAt)
}
@@ -292,6 +290,9 @@ final class InitialSyncOperation: AsynchronousOperation {
super.finish()
}
+ override func cancel() {
+ self.queryTask?.cancel()
+ }
}
extension InitialSyncOperation: DefaultLogger {
diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOrchestrator.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOrchestrator.swift
index dbfe953ab1..806b19a240 100644
--- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOrchestrator.swift
+++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOrchestrator.swift
@@ -19,7 +19,7 @@ protocol InitialSyncOrchestrator {
typealias InitialSyncOrchestratorFactory =
(DataStoreConfiguration,
AuthModeStrategy,
- APICategoryGraphQLBehaviorExtended?,
+ APICategoryGraphQLBehavior?,
IncomingEventReconciliationQueue?,
StorageEngineAdapter?) -> InitialSyncOrchestrator
@@ -30,7 +30,7 @@ final class AWSInitialSyncOrchestrator: InitialSyncOrchestrator {
private var initialSyncOperationSinks: [String: AnyCancellable]
private let dataStoreConfiguration: DataStoreConfiguration
- private weak var api: APICategoryGraphQLBehaviorExtended?
+ private weak var api: APICategoryGraphQLBehavior?
private weak var reconciliationQueue: IncomingEventReconciliationQueue?
private weak var storageAdapter: StorageEngineAdapter?
private let authModeStrategy: AuthModeStrategy
@@ -52,7 +52,7 @@ final class AWSInitialSyncOrchestrator: InitialSyncOrchestrator {
init(dataStoreConfiguration: DataStoreConfiguration,
authModeStrategy: AuthModeStrategy,
- api: APICategoryGraphQLBehaviorExtended?,
+ api: APICategoryGraphQLBehavior?,
reconciliationQueue: IncomingEventReconciliationQueue?,
storageAdapter: StorageEngineAdapter?) {
self.initialSyncOperationSinks = [:]
diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue+Action.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue+Action.swift
index a9c8309ad6..f042cfab00 100644
--- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue+Action.swift
+++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue+Action.swift
@@ -15,7 +15,7 @@ extension OutgoingMutationQueue {
enum Action {
// Startup/config actions
case initialized
- case receivedStart(APICategoryGraphQLBehaviorExtended, MutationEventPublisher, IncomingEventReconciliationQueue?)
+ case receivedStart(APICategoryGraphQLBehavior, MutationEventPublisher, IncomingEventReconciliationQueue?)
case receivedSubscription
// Event loop
diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue+State.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue+State.swift
index f7c59eb8ea..b8839a4b3e 100644
--- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue+State.swift
+++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue+State.swift
@@ -16,7 +16,7 @@ extension OutgoingMutationQueue {
// Startup/config states
case notInitialized
case stopped
- case starting(APICategoryGraphQLBehaviorExtended, MutationEventPublisher, IncomingEventReconciliationQueue?)
+ case starting(APICategoryGraphQLBehavior, MutationEventPublisher, IncomingEventReconciliationQueue?)
// Event loop
case requestingEvent
diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift
index 26cde77852..98ce9a3ce2 100644
--- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift
+++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift
@@ -13,7 +13,7 @@ import AWSPluginsCore
/// Submits outgoing mutation events to the provisioned API
protocol OutgoingMutationQueueBehavior: AnyObject {
func stopSyncingToCloud(_ completion: @escaping BasicClosure)
- func startSyncingToCloud(api: APICategoryGraphQLBehaviorExtended,
+ func startSyncingToCloud(api: APICategoryGraphQLBehavior,
mutationEventPublisher: MutationEventPublisher,
reconciliationQueue: IncomingEventReconciliationQueue?)
var publisher: AnyPublisher { get }
@@ -32,7 +32,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
target: DispatchQueue.global()
)
- private weak var api: APICategoryGraphQLBehaviorExtended?
+ private weak var api: APICategoryGraphQLBehavior?
private weak var reconciliationQueue: IncomingEventReconciliationQueue?
private var subscription: Subscription?
@@ -84,7 +84,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
// MARK: - Public API
- func startSyncingToCloud(api: APICategoryGraphQLBehaviorExtended,
+ func startSyncingToCloud(api: APICategoryGraphQLBehavior,
mutationEventPublisher: MutationEventPublisher,
reconciliationQueue: IncomingEventReconciliationQueue?) {
log.verbose(#function)
@@ -130,7 +130,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
/// Responder method for `starting`. Starts the operation queue and subscribes to
/// the publisher. After subscribing to the publisher, return actions:
/// - receivedSubscription
- private func doStart(api: APICategoryGraphQLBehaviorExtended,
+ private func doStart(api: APICategoryGraphQLBehavior,
mutationEventPublisher: MutationEventPublisher,
reconciliationQueue: IncomingEventReconciliationQueue?) {
log.verbose(#function)
@@ -222,7 +222,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
private func processSyncMutationToCloudResult(_ result: GraphQLOperation>.OperationResult,
mutationEvent: MutationEvent,
- api: APICategoryGraphQLBehaviorExtended) {
+ api: APICategoryGraphQLBehavior) {
if case let .success(graphQLResponse) = result {
if case let .success(graphQLResult) = graphQLResponse {
processSuccessEvent(mutationEvent,
@@ -271,7 +271,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
}
private func processMutationErrorFromCloud(mutationEvent: MutationEvent,
- api: APICategoryGraphQLBehaviorExtended,
+ api: APICategoryGraphQLBehavior,
apiError: APIError?,
graphQLResponseError: GraphQLResponseError>?) {
if let apiError = apiError, apiError.isOperationCancelledError {
diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/ProcessMutationErrorFromCloudOperation.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/ProcessMutationErrorFromCloudOperation.swift
index bbc4ec0895..c334745fb7 100644
--- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/ProcessMutationErrorFromCloudOperation.swift
+++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/ProcessMutationErrorFromCloudOperation.swift
@@ -27,12 +27,12 @@ class ProcessMutationErrorFromCloudOperation: AsynchronousOperation {
private let apiError: APIError?
private let completion: (Result) -> Void
private var mutationOperation: AtomicValue>?>
- private weak var api: APICategoryGraphQLBehaviorExtended?
+ private weak var api: APICategoryGraphQLBehavior?
private weak var reconciliationQueue: IncomingEventReconciliationQueue?
init(dataStoreConfiguration: DataStoreConfiguration,
mutationEvent: MutationEvent,
- api: APICategoryGraphQLBehaviorExtended,
+ api: APICategoryGraphQLBehavior,
storageAdapter: StorageEngineAdapter,
graphQLResponseError: GraphQLResponseError>? = nil,
apiError: APIError? = nil,
@@ -296,44 +296,44 @@ class ProcessMutationErrorFromCloudOperation: AsynchronousOperation {
}
log.verbose("\(#function) sending mutation with data: \(apiRequest)")
- let graphQLOperation = api.mutate(request: apiRequest) { [weak self] result in
- guard let self = self, !self.isCancelled else {
- return
- }
+ Task { [weak self] in
+ do {
+ let result = try await api.mutate(request: apiRequest)
+ guard let self = self, !self.isCancelled else {
+ self?.finish(result: .failure(APIError.operationError("Mutation operation cancelled", "")))
+ return
+ }
- self.log.verbose("sendMutationToCloud received asyncEvent: \(result)")
- self.validate(cloudResult: result, request: apiRequest)
+ self.log.verbose("sendMutationToCloud received asyncEvent: \(result)")
+ self.validate(cloudResult: result, request: apiRequest)
+ } catch {
+ self?.finish(result: .failure(APIError.operationError("Failed to do mutation", "", error)))
+ }
}
- mutationOperation.set(graphQLOperation)
}
- private func validate(cloudResult: MutationSyncCloudResult, request: MutationSyncAPIRequest) {
+ private func validate(cloudResult: GraphQLResponse, request: MutationSyncAPIRequest) {
guard !isCancelled else {
return
}
- if case .failure(let error) = cloudResult {
- dataStoreConfiguration.errorHandler(error)
- }
-
- if case let .success(graphQLResponse) = cloudResult {
- if case .failure(let error) = graphQLResponse {
- dataStoreConfiguration.errorHandler(error)
- } else if case let .success(graphQLResult) = graphQLResponse {
- guard let reconciliationQueue = reconciliationQueue else {
- let dataStoreError = DataStoreError.configuration(
- "reconciliationQueue is unexpectedly nil",
- """
- The reference to reconciliationQueue has been released while an ongoing mutation was being processed.
- \(AmplifyErrorMessages.reportBugToAWS())
- """
- )
- finish(result: .failure(dataStoreError))
- return
- }
-
- reconciliationQueue.offer([graphQLResult], modelName: mutationEvent.modelName)
+ switch cloudResult {
+ case .success(let mutationSyncResult):
+ guard let reconciliationQueue = reconciliationQueue else {
+ let dataStoreError = DataStoreError.configuration(
+ "reconciliationQueue is unexpectedly nil",
+ """
+ The reference to reconciliationQueue has been released while an ongoing mutation was being processed.
+ \(AmplifyErrorMessages.reportBugToAWS())
+ """
+ )
+ finish(result: .failure(dataStoreError))
+ return
}
+
+ reconciliationQueue.offer([mutationSyncResult], modelName: mutationEvent.modelName)
+ case .failure(let graphQLResponseError):
+ dataStoreConfiguration.errorHandler(graphQLResponseError)
}
finish(result: .success(nil))
diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/SyncMutationToCloudOperation.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/SyncMutationToCloudOperation.swift
index 3732bafb4a..db45220167 100644
--- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/SyncMutationToCloudOperation.swift
+++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/SyncMutationToCloudOperation.swift
@@ -17,23 +17,23 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
typealias MutationSyncCloudResult = GraphQLOperation>.OperationResult
- private weak var api: APICategoryGraphQLBehaviorExtended?
+ private weak var api: APICategoryGraphQLBehavior?
private let mutationEvent: MutationEvent
private let getLatestSyncMetadata: () -> MutationSyncMetadata?
private let completion: GraphQLOperation>.ResultListener
private let requestRetryablePolicy: RequestRetryablePolicy
- private let lock: NSRecursiveLock
+// private let lock: NSRecursiveLock
private var networkReachabilityPublisher: AnyPublisher?
- private var mutationOperation: GraphQLOperation>?
+ private var mutationOperation: Task?
private var mutationRetryNotifier: MutationRetryNotifier?
private var currentAttemptNumber: Int
private var authTypesIterator: AWSAuthorizationTypeIterator?
init(mutationEvent: MutationEvent,
getLatestSyncMetadata: @escaping () -> MutationSyncMetadata?,
- api: APICategoryGraphQLBehaviorExtended,
+ api: APICategoryGraphQLBehavior,
authModeStrategy: AuthModeStrategy,
networkReachabilityPublisher: AnyPublisher? = nil,
currentAttemptNumber: Int = 1,
@@ -46,7 +46,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
self.completion = completion
self.currentAttemptNumber = currentAttemptNumber
self.requestRetryablePolicy = requestRetryablePolicy ?? RequestRetryablePolicy()
- self.lock = NSRecursiveLock()
+// self.lock = NSRecursiveLock()
if let modelSchema = ModelRegistry.modelSchema(from: mutationEvent.modelName),
let mutationType = GraphQLMutationType(rawValue: mutationEvent.mutationType) {
@@ -66,11 +66,11 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
override func cancel() {
log.verbose(#function)
- lock.execute {
- mutationOperation?.cancel()
- mutationRetryNotifier?.cancel()
- mutationRetryNotifier = nil
- }
+// lock.execute {
+ mutationOperation?.cancel()
+ mutationRetryNotifier?.cancel()
+ mutationRetryNotifier = nil
+// }
let apiError = APIError(error: OperationCancelledError())
finish(result: .failure(apiError))
@@ -209,41 +209,56 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
return
}
log.verbose("\(#function) sending mutation with sync data: \(apiRequest)")
- lock.execute {
- mutationOperation = api.mutate(request: apiRequest) { [weak self] result in
- self?.respond(toCloudResult: result, withAPIRequest: apiRequest)
+
+ mutationOperation = Task { [weak self] in
+ let result: GraphQLResponse>
+ do {
+ result = try await api.mutate(request: apiRequest)
+ } catch {
+ result = .failure(.unknown("Failed to send sync mutation request", "", error))
}
+
+// self?.lock.execute { [weak self] in
+ self?.respond(
+ toCloudResult: result,
+ withAPIRequest: apiRequest
+ )
+// }
}
+
}
/// Initiates a locking context
private func respond(
- toCloudResult result: GraphQLOperation>.OperationResult,
+ toCloudResult result: GraphQLResponse>,
withAPIRequest apiRequest: GraphQLRequest>
) {
- lock.execute {
- guard !self.isCancelled else {
- Amplify.log.debug("SyncMutationToCloudOperation cancelled, aborting")
- return
- }
-
- log.verbose("GraphQL mutation operation received result: \(result)")
- validate(cloudResult: result, request: apiRequest)
+// lock.execute {
+ guard !self.isCancelled else {
+ Amplify.log.debug("SyncMutationToCloudOperation cancelled, aborting")
+ return
}
+
+ log.verbose("GraphQL mutation operation received result: \(result)")
+ validate(cloudResult: result, request: apiRequest)
+// }
}
/// - Warning: Must be invoked from a locking context
- private func validate(cloudResult: MutationSyncCloudResult,
- request: GraphQLRequest>) {
- guard !isCancelled else {
+ private func validate(
+ cloudResult: GraphQLResponse>,
+ request: GraphQLRequest>
+ ) {
+ guard !isCancelled, let mutationOperation, !mutationOperation.isCancelled else {
return
}
- if case .failure(let error) = cloudResult {
- let advice = getRetryAdviceIfRetryable(error: error)
+ if case .failure(let error) = cloudResult,
+ let apiError = error.underlyingError as? APIError {
+ let advice = getRetryAdviceIfRetryable(error: apiError)
guard advice.shouldRetry else {
- finish(result: .failure(error))
+ finish(result: .failure(apiError))
return
}
@@ -257,7 +272,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
return
}
- finish(result: cloudResult)
+ finish(result: .success(cloudResult))
}
/// - Warning: Must be invoked from a locking context
@@ -341,20 +356,20 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
/// Initiates a locking context
private func respondToMutationNotifierTriggered(withAuthType authType: AWSAuthorizationType?) {
log.verbose("\(#function) mutationRetryNotifier triggered")
- lock.execute {
+// lock.execute {
sendMutationToCloud(withAuthType: authType)
mutationRetryNotifier = nil
- }
+// }
}
/// Cleans up operation resources, finalizes AsynchronousOperation states, and invokes `completion` with `result`
/// - Parameter result: The MutationSyncCloudResult to pass to `completion`
private func finish(result: MutationSyncCloudResult) {
log.verbose(#function)
- lock.execute {
- mutationOperation?.removeResultListener()
- mutationOperation = nil
- }
+// lock.execute {
+ mutationOperation?.cancel()
+ mutationOperation = nil
+// }
DispatchQueue.global().async {
self.completion(result)
diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine+Action.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine+Action.swift
index 7421637a54..7ace6cd086 100644
--- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine+Action.swift
+++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine+Action.swift
@@ -18,10 +18,10 @@ extension RemoteSyncEngine {
case pausedSubscriptions
case pausedMutationQueue(StorageEngineAdapter)
- case clearedStateOutgoingMutations(APICategoryGraphQLBehaviorExtended, StorageEngineAdapter)
+ case clearedStateOutgoingMutations(APICategoryGraphQLBehavior, StorageEngineAdapter)
case initializedSubscriptions
case performedInitialSync
- case activatedCloudSubscriptions(APICategoryGraphQLBehaviorExtended, MutationEventPublisher, IncomingEventReconciliationQueue?)
+ case activatedCloudSubscriptions(APICategoryGraphQLBehavior, MutationEventPublisher, IncomingEventReconciliationQueue?)
case activatedMutationQueue
case notifiedSyncStarted
case cleanedUp(AmplifyError)
diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine+State.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine+State.swift
index d55c3fe5c3..a1ecebfbbb 100644
--- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine+State.swift
+++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine+State.swift
@@ -18,10 +18,10 @@ extension RemoteSyncEngine {
case pausingSubscriptions
case pausingMutationQueue
case clearingStateOutgoingMutations(StorageEngineAdapter)
- case initializingSubscriptions(APICategoryGraphQLBehaviorExtended, StorageEngineAdapter)
+ case initializingSubscriptions(APICategoryGraphQLBehavior, StorageEngineAdapter)
case performingInitialSync
case activatingCloudSubscriptions
- case activatingMutationQueue(APICategoryGraphQLBehaviorExtended, MutationEventPublisher, IncomingEventReconciliationQueue?)
+ case activatingMutationQueue(APICategoryGraphQLBehavior, MutationEventPublisher, IncomingEventReconciliationQueue?)
case notifyingSyncStarted
case syncEngineActive
diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine.swift
index 26bb453571..fd30c9ecae 100644
--- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine.swift
+++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine.swift
@@ -21,7 +21,7 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
private var authModeStrategy: AuthModeStrategy
// Assigned at `start`
- weak var api: APICategoryGraphQLBehaviorExtended?
+ weak var api: APICategoryGraphQLBehavior?
weak var auth: AuthCategoryBehavior?
// Assigned and released inside `performInitialQueries`, but we maintain a reference so we can `reset`
@@ -197,7 +197,7 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
}
// swiftlint:enable cyclomatic_complexity
- func start(api: APICategoryGraphQLBehaviorExtended, auth: AuthCategoryBehavior?) {
+ func start(api: APICategoryGraphQLBehavior, auth: AuthCategoryBehavior?) {
guard storageAdapter != nil else {
log.error(error: DataStoreError.nilStorageAdapter())
remoteSyncTopicPublisher.send(completion: .failure(DataStoreError.nilStorageAdapter()))
@@ -280,7 +280,7 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
}
}
- private func initializeSubscriptions(api: APICategoryGraphQLBehaviorExtended,
+ private func initializeSubscriptions(api: APICategoryGraphQLBehavior,
storageAdapter: StorageEngineAdapter) async {
log.debug("[InitializeSubscription] \(#function)")
let syncableModelSchemas = ModelRegistry.modelSchemas.filter { $0.isSyncable }
@@ -363,7 +363,7 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
reconciliationQueue.start()
}
- private func startMutationQueue(api: APICategoryGraphQLBehaviorExtended,
+ private func startMutationQueue(api: APICategoryGraphQLBehavior,
mutationEventPublisher: MutationEventPublisher,
reconciliationQueue: IncomingEventReconciliationQueue?) {
log.debug(#function)
diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngineBehavior.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngineBehavior.swift
index ee5710ff21..765d72473f 100644
--- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngineBehavior.swift
+++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngineBehavior.swift
@@ -41,7 +41,7 @@ protocol RemoteSyncEngineBehavior: AnyObject {
/// the updates in the Datastore
/// 1. Mutation processor drains messages off the queue in serial and sends to the service, invoking
/// any local callbacks on error if necessary
- func start(api: APICategoryGraphQLBehaviorExtended, auth: AuthCategoryBehavior?)
+ func start(api: APICategoryGraphQLBehavior, auth: AuthCategoryBehavior?)
func stop(completion: @escaping DataStoreCallback)
diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/AWSIncomingEventReconciliationQueue.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/AWSIncomingEventReconciliationQueue.swift
index 7705120b4b..4a6d765aca 100644
--- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/AWSIncomingEventReconciliationQueue.swift
+++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/AWSIncomingEventReconciliationQueue.swift
@@ -15,7 +15,7 @@ typealias DisableSubscriptions = () -> Bool
// Used for testing:
typealias IncomingEventReconciliationQueueFactory =
([ModelSchema],
- APICategoryGraphQLBehaviorExtended,
+ APICategoryGraphQLBehavior,
StorageEngineAdapter,
[DataStoreSyncExpression],
AuthCategoryBehavior?,
@@ -46,7 +46,7 @@ final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueu
private let modelSchemasCount: Int
init(modelSchemas: [ModelSchema],
- api: APICategoryGraphQLBehaviorExtended,
+ api: APICategoryGraphQLBehavior,
storageAdapter: StorageEngineAdapter,
syncExpressions: [DataStoreSyncExpression],
auth: AuthCategoryBehavior? = nil,
diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/AWSIncomingSubscriptionEventPublisher.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/AWSIncomingSubscriptionEventPublisher.swift
index 32365419fe..76c4b7dc9a 100644
--- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/AWSIncomingSubscriptionEventPublisher.swift
+++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/AWSIncomingSubscriptionEventPublisher.swift
@@ -23,7 +23,7 @@ final class AWSIncomingSubscriptionEventPublisher: IncomingSubscriptionEventPubl
}
init(modelSchema: ModelSchema,
- api: APICategoryGraphQLBehaviorExtended,
+ api: APICategoryGraphQLBehavior,
modelPredicate: QueryPredicate?,
auth: AuthCategoryBehavior?,
authModeStrategy: AuthModeStrategy) async {
diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift
index d5dae69b37..b125e239a1 100644
--- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift
+++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift
@@ -6,7 +6,7 @@
//
import Amplify
-import AWSPluginsCore
+@_spi(WebSocket) import AWSPluginsCore
import Combine
import Foundation
@@ -39,7 +39,8 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
return onCreateConnected && onUpdateConnected && onDeleteConnected
}
- private let incomingSubscriptionEvents: PassthroughSubject
+ private let incomingSubscriptionEvents = PassthroughSubject()
+ private var cancelables = Set()
private let awsAuthService: AWSAuthServiceBehavior
private let consistencyQueue: DispatchQueue
@@ -47,7 +48,7 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
private let modelName: ModelName
init(modelSchema: ModelSchema,
- api: APICategoryGraphQLBehaviorExtended,
+ api: APICategoryGraphQLBehavior,
modelPredicate: QueryPredicate?,
auth: AuthCategoryBehavior?,
authModeStrategy: AuthModeStrategy,
@@ -67,72 +68,74 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
connectionStatusQueue.maxConcurrentOperationCount = 1
connectionStatusQueue.isSuspended = false
- let incomingSubscriptionEvents = PassthroughSubject()
- self.incomingSubscriptionEvents = incomingSubscriptionEvents
self.awsAuthService = awsAuthService ?? AWSAuthService()
// onCreate operation
- let onCreateValueListener = onCreateValueListenerHandler(event:)
- let onCreateAuthTypeProvider = await authModeStrategy.authTypesFor(schema: modelSchema,
- operations: [.create, .read])
- self.onCreateValueListener = onCreateValueListener
- self.onCreateOperation = RetryableGraphQLSubscriptionOperation(
- requestFactory: IncomingAsyncSubscriptionEventPublisher.apiRequestFactoryFor(
- for: modelSchema,
- subscriptionType: .onCreate,
- api: api,
- auth: auth,
- awsAuthService: self.awsAuthService,
- authTypeProvider: onCreateAuthTypeProvider),
- maxRetries: onCreateAuthTypeProvider.count,
- resultListener: genericCompletionListenerHandler) { nextRequest, wrappedCompletion in
- api.subscribe(request: nextRequest,
- valueListener: onCreateValueListener,
- completionListener: wrappedCompletion)
- }
- onCreateOperation?.main()
+ self.onCreateValueListener = onCreateValueListenerHandler(event:)
+ self.onCreateOperation = await retryableOperation(
+ subscriptionType: .create,
+ modelSchema: modelSchema,
+ authModeStrategy: authModeStrategy,
+ auth: auth,
+ api: api
+ )
+ onCreateOperation?.subscribe()
+ .sink(receiveCompletion: genericCompletionListenerHandler(result:), receiveValue: onCreateValueListener!)
+ .store(in: &cancelables)
// onUpdate operation
- let onUpdateValueListener = onUpdateValueListenerHandler(event:)
- let onUpdateAuthTypeProvider = await authModeStrategy.authTypesFor(schema: modelSchema,
- operations: [.update, .read])
- self.onUpdateValueListener = onUpdateValueListener
- self.onUpdateOperation = RetryableGraphQLSubscriptionOperation(
- requestFactory: IncomingAsyncSubscriptionEventPublisher.apiRequestFactoryFor(
- for: modelSchema,
- subscriptionType: .onUpdate,
- api: api,
- auth: auth,
- awsAuthService: self.awsAuthService,
- authTypeProvider: onUpdateAuthTypeProvider),
- maxRetries: onUpdateAuthTypeProvider.count,
- resultListener: genericCompletionListenerHandler) { nextRequest, wrappedCompletion in
- api.subscribe(request: nextRequest,
- valueListener: onUpdateValueListener,
- completionListener: wrappedCompletion)
- }
- onUpdateOperation?.main()
+ self.onUpdateValueListener = onUpdateValueListenerHandler(event:)
+ self.onUpdateOperation = await retryableOperation(
+ subscriptionType: .update,
+ modelSchema: modelSchema,
+ authModeStrategy: authModeStrategy,
+ auth: auth,
+ api: api
+ )
+ onUpdateOperation?.subscribe()
+ .sink(receiveCompletion: genericCompletionListenerHandler(result:), receiveValue: onUpdateValueListener!)
+ .store(in: &cancelables)
// onDelete operation
- let onDeleteValueListener = onDeleteValueListenerHandler(event:)
- let onDeleteAuthTypeProvider = await authModeStrategy.authTypesFor(schema: modelSchema,
- operations: [.delete, .read])
- self.onDeleteValueListener = onDeleteValueListener
- self.onDeleteOperation = RetryableGraphQLSubscriptionOperation(
- requestFactory: IncomingAsyncSubscriptionEventPublisher.apiRequestFactoryFor(
- for: modelSchema,
- subscriptionType: .onDelete,
- api: api,
- auth: auth,
- awsAuthService: self.awsAuthService,
- authTypeProvider: onDeleteAuthTypeProvider),
- maxRetries: onUpdateAuthTypeProvider.count,
- resultListener: genericCompletionListenerHandler) { nextRequest, wrappedCompletion in
- api.subscribe(request: nextRequest,
- valueListener: onDeleteValueListener,
- completionListener: wrappedCompletion)
- }
- onDeleteOperation?.main()
+ self.onDeleteValueListener = onDeleteValueListenerHandler(event:)
+ self.onDeleteOperation = await retryableOperation(
+ subscriptionType: .delete,
+ modelSchema: modelSchema,
+ authModeStrategy: authModeStrategy,
+ auth: auth,
+ api: api
+ )
+ onDeleteOperation?.subscribe()
+ .sink(receiveCompletion: genericCompletionListenerHandler(result:), receiveValue: onDeleteValueListener!)
+ .store(in: &cancelables)
+ }
+
+
+ func retryableOperation(
+ subscriptionType: IncomingAsyncSubscriptionType,
+ modelSchema: ModelSchema,
+ authModeStrategy: AuthModeStrategy,
+ auth: AuthCategoryBehavior?,
+ api: APICategoryGraphQLBehavior
+ ) async -> RetryableGraphQLSubscriptionOperation {
+ let authTypeProvider = await authModeStrategy.authTypesFor(
+ schema: modelSchema,
+ operations: subscriptionType.operations
+ )
+
+ return RetryableGraphQLSubscriptionOperation(
+ requestFactory: authTypeProvider.optionalAsyncStream.map { authType in {
+ await IncomingAsyncSubscriptionEventPublisher.makeAPIRequest(
+ for: modelSchema,
+ subscriptionType: subscriptionType.subscriptionType,
+ api: api,
+ auth: auth,
+ authType: authType,
+ awsAuthService: self.awsAuthService
+ )
+ }},
+ api: api
+ )
}
func onCreateValueListenerHandler(event: Event) {
@@ -183,9 +186,9 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
}
}
- func genericCompletionListenerHandler(result: Result) {
+ func genericCompletionListenerHandler(result: Subscribers.Completion) {
switch result {
- case .success:
+ case .finished:
send(completion: .finished)
case .failure(let apiError):
log.verbose("[InitializeSubscription.1] API.subscribe failed for `\(modelName)` error: \(apiError.errorDescription)")
@@ -196,7 +199,7 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
static func makeAPIRequest(for modelSchema: ModelSchema,
subscriptionType: GraphQLSubscriptionType,
- api: APICategoryGraphQLBehaviorExtended,
+ api: APICategoryGraphQLBehavior,
auth: AuthCategoryBehavior?,
authType: AWSAuthorizationType?,
awsAuthService: AWSAuthServiceBehavior) async -> GraphQLRequest {
@@ -226,7 +229,7 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
return request
}
- static func hasOIDCAuthProviderAvailable(api: APICategoryGraphQLBehaviorExtended) -> AmplifyOIDCAuthProvider? {
+ static func hasOIDCAuthProviderAvailable(api: APICategoryGraphQLBehavior) -> AmplifyOIDCAuthProvider? {
if let apiPlugin = api as? APICategoryAuthProviderFactoryBehavior,
let oidcAuthProvider = apiPlugin.apiAuthProviderFactory().oidcAuthProvider() {
return oidcAuthProvider
@@ -254,7 +257,7 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
func cancel() {
consistencyQueue.sync {
- genericCompletionListenerHandler(result: .successfulVoid)
+ genericCompletionListenerHandler(result: .finished)
onCreateOperation?.cancel()
onCreateOperation = nil
@@ -287,30 +290,33 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
onDeleteOperation = nil
onDeleteValueListener?(.connection(.disconnected))
- genericCompletionListenerHandler(result: .successfulVoid)
+ genericCompletionListenerHandler(result: .finished)
}
}
}
-// MARK: - IncomingAsyncSubscriptionEventPublisher + API request factory
-extension IncomingAsyncSubscriptionEventPublisher {
- static func apiRequestFactoryFor(for modelSchema: ModelSchema,
- subscriptionType: GraphQLSubscriptionType,
- api: APICategoryGraphQLBehaviorExtended,
- auth: AuthCategoryBehavior?,
- awsAuthService: AWSAuthServiceBehavior,
- authTypeProvider: AWSAuthorizationTypeIterator) -> RetryableGraphQLOperation.RequestFactory {
- var authTypes = authTypeProvider
- return {
- return await IncomingAsyncSubscriptionEventPublisher.makeAPIRequest(for: modelSchema,
- subscriptionType: subscriptionType,
- api: api,
- auth: auth,
- authType: authTypes.next(),
- awsAuthService: awsAuthService)
+enum IncomingAsyncSubscriptionType {
+ case create
+ case delete
+ case update
+
+ var operations: [ModelOperation] {
+ switch self {
+ case .create: return [.create, .read]
+ case .delete: return [.delete, .read]
+ case .update: return [.update, .read]
}
}
+
+ var subscriptionType: GraphQLSubscriptionType {
+ switch self {
+ case .create: return .onCreate
+ case .delete: return .onDelete
+ case .update: return .onUpdate
+ }
+ }
+
}
extension IncomingAsyncSubscriptionEventPublisher: DefaultLogger {
diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/AWSModelReconciliationQueue.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/AWSModelReconciliationQueue.swift
index 7eacedb029..03074d82e3 100644
--- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/AWSModelReconciliationQueue.swift
+++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/AWSModelReconciliationQueue.swift
@@ -14,7 +14,7 @@ import Foundation
typealias ModelReconciliationQueueFactory = (
ModelSchema,
StorageEngineAdapter,
- APICategoryGraphQLBehaviorExtended,
+ APICategoryGraphQLBehavior,
ReconcileAndSaveOperationQueue,
QueryPredicate?,
AuthCategoryBehavior?,
@@ -78,7 +78,7 @@ final class AWSModelReconciliationQueue: ModelReconciliationQueue {
init(modelSchema: ModelSchema,
storageAdapter: StorageEngineAdapter?,
- api: APICategoryGraphQLBehaviorExtended,
+ api: APICategoryGraphQLBehavior,
reconcileAndSaveQueue: ReconcileAndSaveOperationQueue,
modelPredicate: QueryPredicate?,
auth: AuthCategoryBehavior?,
diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/Support/AsyncStream+Extensions.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/Support/AsyncStream+Extensions.swift
new file mode 100644
index 0000000000..0dee36be66
--- /dev/null
+++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/Support/AsyncStream+Extensions.swift
@@ -0,0 +1,21 @@
+//
+// Copyright Amazon.com Inc. or its affiliates.
+// All Rights Reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+
+
+import Foundation
+import Combine
+
+extension AsyncStream {
+ static func from(seq: any Sequence) -> AsyncStream {
+ AsyncStream { continuation in
+ for ele in seq {
+ continuation.yield(ele)
+ }
+ continuation.finish()
+ }
+ }
+}
diff --git a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/TestSupport/Mocks/MockOutgoingMutationQueue.swift b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/TestSupport/Mocks/MockOutgoingMutationQueue.swift
index 3eec2bce57..e0223bac2b 100644
--- a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/TestSupport/Mocks/MockOutgoingMutationQueue.swift
+++ b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/TestSupport/Mocks/MockOutgoingMutationQueue.swift
@@ -17,7 +17,7 @@ class MockOutgoingMutationQueue: OutgoingMutationQueueBehavior {
completion()
}
- func startSyncingToCloud(api: APICategoryGraphQLBehaviorExtended,
+ func startSyncingToCloud(api: APICategoryGraphQLBehavior,
mutationEventPublisher: MutationEventPublisher,
reconciliationQueue: IncomingEventReconciliationQueue?) {
// no-op
diff --git a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/TestSupport/Mocks/MockRemoteSyncEngine.swift b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/TestSupport/Mocks/MockRemoteSyncEngine.swift
index 105ab41ebf..1f2036784e 100644
--- a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/TestSupport/Mocks/MockRemoteSyncEngine.swift
+++ b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/TestSupport/Mocks/MockRemoteSyncEngine.swift
@@ -37,7 +37,7 @@ class MockRemoteSyncEngine: RemoteSyncEngineBehavior {
init() {
self.remoteSyncTopicPublisher = PassthroughSubject()
}
- func start(api: APICategoryGraphQLBehaviorExtended, auth: AuthCategoryBehavior?) {
+ func start(api: APICategoryGraphQLBehavior, auth: AuthCategoryBehavior?) {
syncing = true
}
diff --git a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/TestSupport/Mocks/NoOpMutationQueue.swift b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/TestSupport/Mocks/NoOpMutationQueue.swift
index 56a65bc96b..82c9b031af 100644
--- a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/TestSupport/Mocks/NoOpMutationQueue.swift
+++ b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/TestSupport/Mocks/NoOpMutationQueue.swift
@@ -17,7 +17,7 @@ class NoOpMutationQueue: OutgoingMutationQueueBehavior {
completion()
}
- func startSyncingToCloud(api: APICategoryGraphQLBehaviorExtended,
+ func startSyncingToCloud(api: APICategoryGraphQLBehavior,
mutationEventPublisher: MutationEventPublisher,
reconciliationQueue: IncomingEventReconciliationQueue?) {
// do nothing
diff --git a/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/Connection/DataStoreConnectionScenario1Tests.swift b/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/Connection/DataStoreConnectionScenario1Tests.swift
index ae26e441e7..49c5fb097e 100644
--- a/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/Connection/DataStoreConnectionScenario1Tests.swift
+++ b/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/Connection/DataStoreConnectionScenario1Tests.swift
@@ -228,7 +228,7 @@ class DataStoreConnectionScenario1Tests: SyncEngineIntegrationTestBase {
}
func testDeleteWithInvalidCondition() async throws {
- await setUp(withModels: TestModelRegistration())
+ await setUp(withModels: TestModelRegistration(), logLevel: .verbose)
try await startAmplifyAndWaitForSync()
let team = Team1(name: "name")
let project = Project1(team: team)
diff --git a/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/DataStoreHubEventsTests.swift b/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/DataStoreHubEventsTests.swift
index 916a4d26ea..a5fe3115f4 100644
--- a/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/DataStoreHubEventsTests.swift
+++ b/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/DataStoreHubEventsTests.swift
@@ -37,7 +37,6 @@ class DataStoreHubEventTests: HubEventsIntegrationTestBase {
/// {modelName: "Some Model name", isFullSync: true/false, isDeltaSync: false/true, createCount: #, updateCount: #, deleteCount: #}
/// - syncQueriesReady received, payload should be nil
func testDataStoreConfiguredDispatchesHubEvents() async throws {
- Amplify.Logging.logLevel = .verbose
try configureAmplify(withModels: TestModelRegistration())
try await Amplify.DataStore.clear()
await Amplify.reset()
diff --git a/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/DataStoreLargeNumberModelsSubscriptionTests.swift b/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/DataStoreLargeNumberModelsSubscriptionTests.swift
index cc09f0f40d..4121f562bd 100644
--- a/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/DataStoreLargeNumberModelsSubscriptionTests.swift
+++ b/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/DataStoreLargeNumberModelsSubscriptionTests.swift
@@ -46,7 +46,7 @@ class DataStoreLargeNumberModelsSubscriptionTests: SyncEngineIntegrationTestBase
}
func testDataStoreStop_subscriptionsShouldAllUnsubscribed() async throws {
- await setUp(withModels: TestModelRegistration())
+ await setUp(withModels: TestModelRegistration(), logLevel: .verbose)
try await startAmplifyAndWaitForSync()
try await stopDataStoreAndVerifyAppSyncClientDisconnected()
diff --git a/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/TestSupport/HubEventsIntegrationTestBase.swift b/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/TestSupport/HubEventsIntegrationTestBase.swift
index 9bb4167312..e480689b9e 100644
--- a/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/TestSupport/HubEventsIntegrationTestBase.swift
+++ b/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/TestSupport/HubEventsIntegrationTestBase.swift
@@ -52,7 +52,9 @@ class HubEventsIntegrationTestBase: XCTestCase {
#if os(watchOS)
try Amplify.add(plugin: AWSDataStorePlugin(modelRegistration: models, configuration: .subscriptionsDisabled))
#else
- try Amplify.add(plugin: AWSDataStorePlugin(modelRegistration: models))
+ try Amplify.add(plugin: AWSDataStorePlugin(
+ modelRegistration: models
+ ))
#endif
try Amplify.add(plugin: AWSAPIPlugin(
modelRegistration: models,
diff --git a/AmplifyTestCommon/Mocks/MockAPICategoryPlugin.swift b/AmplifyTestCommon/Mocks/MockAPICategoryPlugin.swift
index 2b65491429..31ac246db1 100644
--- a/AmplifyTestCommon/Mocks/MockAPICategoryPlugin.swift
+++ b/AmplifyTestCommon/Mocks/MockAPICategoryPlugin.swift
@@ -13,7 +13,7 @@ import Foundation
class MockAPICategoryPlugin: MessageReporter,
APICategoryPlugin,
APICategoryReachabilityBehavior,
- APICategoryGraphQLBehaviorExtended {
+ APICategoryGraphQLBehavior {
var authProviderFactory: APIAuthProviderFactory?
From 276e13f0d492414260ea3d7076e97fa225b88691 Mon Sep 17 00:00:00 2001
From: Di Wu
Date: Tue, 9 Apr 2024 23:25:32 -0700
Subject: [PATCH 04/23] change to use Publisher operators for auth type streams
---
.../Operation/RetryableGraphQLOperation.swift | 36 ++++++++++++++++++-
.../Auth/AWSAuthModeStrategy.swift | 28 ++++-----------
.../InitialSync/InitialSyncOperation.swift | 21 ++++++-----
...omingAsyncSubscriptionEventPublisher.swift | 24 +++++++------
.../Sync/Support/AsyncStream+Extensions.swift | 21 -----------
5 files changed, 68 insertions(+), 62 deletions(-)
delete mode 100644 AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/Support/AsyncStream+Extensions.swift
diff --git a/Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift b/Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift
index 86281585b0..0507431db1 100644
--- a/Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift
+++ b/Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift
@@ -16,6 +16,7 @@ public final class RetryableGraphQLOperation {
public let requestFactory: AsyncStream<() -> GraphQLRequest>
public weak var api: APICategoryGraphQLBehavior?
private var task: Task?
+ private var cancellables = Set()
public init(
requestFactory: T,
@@ -25,6 +26,21 @@ public final class RetryableGraphQLOperation {
self.api = api
}
+ public convenience init(
+ requestStream: AnyPublisher<() -> GraphQLRequest, Never>,
+ api: APICategoryGraphQLBehavior
+ ) {
+ var cancellables = Set()
+ self.init(requestFactory: AsyncStream { continuation in
+ requestStream.sink { completion in
+ continuation.finish()
+ } receiveValue: { value in
+ continuation.yield(value)
+ }.store(in: &cancellables)
+ }, api: api)
+ self.cancellables = cancellables
+ }
+
deinit {
cancel()
}
@@ -60,7 +76,7 @@ public final class RetryableGraphQLOperation {
}
switch authError {
- case .signedOut, .notAuthorized: break;
+ case .signedOut, .notAuthorized: break
default: return .failure(error)
}
}
@@ -70,6 +86,7 @@ public final class RetryableGraphQLOperation {
public func cancel() {
task?.cancel()
+ cancellables = Set()
}
}
@@ -81,6 +98,7 @@ public final class RetryableGraphQLSubscriptionOperation {
public let requestFactory: AsyncStream<() async -> GraphQLRequest>
public weak var api: APICategoryGraphQLBehavior?
private var task: Task?
+ private var cancellables = Set()
public init(
requestFactory: T,
@@ -90,6 +108,21 @@ public final class RetryableGraphQLSubscriptionOperation {
self.api = api
}
+ public convenience init(
+ requestStream: AnyPublisher<() async -> GraphQLRequest, Never>,
+ api: APICategoryGraphQLBehavior
+ ) {
+ var cancellables = Set()
+ self.init(requestFactory: AsyncStream { continuation in
+ requestStream.sink { completion in
+ continuation.finish()
+ } receiveValue: { value in
+ continuation.yield(value)
+ }.store(in: &cancellables)
+ }, api: api)
+ self.cancellables = cancellables
+ }
+
deinit {
cancel()
}
@@ -133,6 +166,7 @@ public final class RetryableGraphQLSubscriptionOperation {
public func cancel() {
self.task?.cancel()
+ self.cancellables = Set()
}
}
diff --git a/AmplifyPlugins/Core/AWSPluginsCore/Auth/AWSAuthModeStrategy.swift b/AmplifyPlugins/Core/AWSPluginsCore/Auth/AWSAuthModeStrategy.swift
index eeeea09ff1..91d38c2855 100644
--- a/AmplifyPlugins/Core/AWSPluginsCore/Auth/AWSAuthModeStrategy.swift
+++ b/AmplifyPlugins/Core/AWSPluginsCore/Auth/AWSAuthModeStrategy.swift
@@ -97,31 +97,15 @@ public struct AWSAuthorizationTypeIterator: AuthorizationTypeIterator {
}
extension AuthorizationTypeIterator {
- public var asyncStream: AsyncStream {
+ public func publisher() -> AnyPublisher {
var it = self
- return AsyncStream { continuation in
+ return Deferred {
+ var authTypes = [AuthorizationType]()
while let authType = it.next() {
- continuation.yield(authType)
+ authTypes.append(authType)
}
- continuation.finish()
- }
- }
-
- public var optionalAsyncStream: AsyncStream {
- var it = self
- if it.hasNext {
- return AsyncStream { continuation in
- while let authType = it.next() {
- continuation.yield(authType)
- }
- continuation.finish()
- }
- } else {
- return AsyncStream { continuation in
- continuation.yield(nil)
- continuation.finish()
- }
- }
+ return Publishers.MergeMany(authTypes.map { Just($0) })
+ }.eraseToAnyPublisher()
}
}
diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOperation.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOperation.swift
index 80b29e2245..6cf1e6b77a 100644
--- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOperation.swift
+++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOperation.swift
@@ -170,19 +170,24 @@ final class InitialSyncOperation: AsynchronousOperation {
let minSyncPageSize = Int(min(syncMaxRecords - recordsReceived, syncPageSize))
let limit = minSyncPageSize < 0 ? Int(syncPageSize) : minSyncPageSize
let authTypes = await authModeStrategy.authTypesFor(schema: modelSchema, operation: .read)
- .optionalAsyncStream.map { authType in {
- GraphQLRequest.syncQuery(modelSchema: self.modelSchema,
- where: self.syncPredicate,
- limit: limit,
- nextToken: nextToken,
- lastSync: lastSyncTime,
- authType: authType
+ .publisher()
+ .map { Optional.some($0) } // map to optional to have nil as element
+ .replaceEmpty(with: nil) // use a nil element to trigger default auth if no auth provided
+ .map { authType in {
+ GraphQLRequest.syncQuery(
+ modelSchema: self.modelSchema,
+ where: self.syncPredicate,
+ limit: limit,
+ nextToken: nextToken,
+ lastSync: lastSyncTime,
+ authType: authType
)
}}
+ .eraseToAnyPublisher()
let result: Result, APIError> = await RetryableGraphQLOperation(
- requestFactory: authTypes,
+ requestStream: authTypes,
api: api
).run(.query)
diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift
index b125e239a1..81171e8f90 100644
--- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift
+++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift
@@ -124,16 +124,20 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
)
return RetryableGraphQLSubscriptionOperation(
- requestFactory: authTypeProvider.optionalAsyncStream.map { authType in {
- await IncomingAsyncSubscriptionEventPublisher.makeAPIRequest(
- for: modelSchema,
- subscriptionType: subscriptionType.subscriptionType,
- api: api,
- auth: auth,
- authType: authType,
- awsAuthService: self.awsAuthService
- )
- }},
+ requestStream: authTypeProvider.publisher()
+ .map { Optional.some($0) } // map to optional to have nil as element
+ .replaceEmpty(with: nil) // use a nil element to trigger default auth if no auth provided
+ .map { authType in {
+ await IncomingAsyncSubscriptionEventPublisher.makeAPIRequest(
+ for: modelSchema,
+ subscriptionType: subscriptionType.subscriptionType,
+ api: api,
+ auth: auth,
+ authType: authType,
+ awsAuthService: self.awsAuthService
+ )
+ }}
+ .eraseToAnyPublisher(),
api: api
)
}
diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/Support/AsyncStream+Extensions.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/Support/AsyncStream+Extensions.swift
deleted file mode 100644
index 0dee36be66..0000000000
--- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/Support/AsyncStream+Extensions.swift
+++ /dev/null
@@ -1,21 +0,0 @@
-//
-// Copyright Amazon.com Inc. or its affiliates.
-// All Rights Reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-//
-
-
-import Foundation
-import Combine
-
-extension AsyncStream {
- static func from(seq: any Sequence) -> AsyncStream {
- AsyncStream { continuation in
- for ele in seq {
- continuation.yield(ele)
- }
- continuation.finish()
- }
- }
-}
From c03a50b5367a7b48fbde79d4a92a98672b0ab7b3 Mon Sep 17 00:00:00 2001
From: Di Wu
Date: Wed, 10 Apr 2024 19:51:41 -0700
Subject: [PATCH 05/23] add nondeterminsitc operation for better testability
---
.../Operation/NondeterminsticOperation.swift | 99 ++++++++++
.../Operation/RetryableGraphQLOperation.swift | 164 ++++++----------
.../InitialSync/InitialSyncOperation.swift | 18 +-
...omingAsyncSubscriptionEventPublisher.swift | 13 +-
.../API/NondeterminsticOperationTests.swift | 126 ++++++++++++
.../API/RetryableGraphQLOperationTests.swift | 184 ++++++------------
6 files changed, 363 insertions(+), 241 deletions(-)
create mode 100644 Amplify/Categories/API/Operation/NondeterminsticOperation.swift
create mode 100644 AmplifyTests/CategoryTests/API/NondeterminsticOperationTests.swift
diff --git a/Amplify/Categories/API/Operation/NondeterminsticOperation.swift b/Amplify/Categories/API/Operation/NondeterminsticOperation.swift
new file mode 100644
index 0000000000..7c930b1924
--- /dev/null
+++ b/Amplify/Categories/API/Operation/NondeterminsticOperation.swift
@@ -0,0 +1,99 @@
+//
+// Copyright Amazon.com Inc. or its affiliates.
+// All Rights Reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+
+
+import Combine
+/**
+ A non-deterministic operation offers multiple paths to accomplish its task.
+ It attempts the next path if all preceding paths have failed with an error that allows for continuation.
+ */
+enum NondeterminsticOperationError: Error {
+ case totalFailure
+ case cancelled
+}
+
+final class NondeterminsticOperation {
+ /// operation that to be eval
+ typealias Operation = () async throws -> T
+ typealias OnError = (Error) -> Bool
+
+ private let operations: AsyncStream
+ private var shouldTryNextOnError: OnError = { _ in true }
+ private var cancellables = Set()
+ private var task: Task?
+
+ deinit {
+ cancel()
+ }
+
+ init(operations: AsyncStream, shouldTryNextOnError: OnError? = nil) {
+ self.operations = operations
+ if let shouldTryNextOnError {
+ self.shouldTryNextOnError = shouldTryNextOnError
+ }
+ }
+
+ convenience init(
+ operationStream: AnyPublisher,
+ shouldTryNextOnError: OnError? = nil
+ ) {
+ var cancellables = Set()
+ self.init(
+ operations: AsyncStream { continuation in
+ operationStream.sink { _ in
+ continuation.finish()
+ } receiveValue: { operation in
+ continuation.yield(operation)
+ }.store(in: &cancellables)
+ },
+ shouldTryNextOnError: shouldTryNextOnError
+ )
+ self.cancellables = cancellables
+ }
+
+ /// Synchronous version of executing the operations
+ func execute() -> Future {
+ Future { [weak self] promise in
+ self?.task = Task { [weak self] in
+ do {
+ if let self {
+ promise(.success(try await self.run()))
+ } else {
+ promise(.failure(NondeterminsticOperationError.cancelled))
+ }
+ } catch {
+ promise(.failure(error))
+ }
+ }
+ }
+ }
+
+ /// Asynchronous version of executing the operations
+ func run() async throws -> T {
+ for await operation in operations {
+ if Task.isCancelled {
+ throw NondeterminsticOperationError.cancelled
+ }
+ do {
+ return try await operation()
+ } catch {
+ if shouldTryNextOnError(error) {
+ continue
+ } else {
+ throw error
+ }
+ }
+ }
+ throw NondeterminsticOperationError.totalFailure
+ }
+
+ /// Cancel the operation
+ func cancel() {
+ task?.cancel()
+ cancellables = Set()
+ }
+}
diff --git a/Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift b/Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift
index 0507431db1..cc4791f5f3 100644
--- a/Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift
+++ b/Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift
@@ -13,80 +13,56 @@ import Combine
public final class RetryableGraphQLOperation {
public typealias Payload = Payload
- public let requestFactory: AsyncStream<() -> GraphQLRequest>
- public weak var api: APICategoryGraphQLBehavior?
- private var task: Task?
- private var cancellables = Set()
-
- public init(
- requestFactory: T,
- api: APICategoryGraphQLBehavior
- ) where T.Element == () -> GraphQLRequest {
- self.requestFactory = requestFactory.asyncStream
- self.api = api
- }
+ private let nondeterminsticOperation: NondeterminsticOperation.Success>
- public convenience init(
- requestStream: AnyPublisher<() -> GraphQLRequest, Never>,
- api: APICategoryGraphQLBehavior
+ public init(
+ requestStream: AnyPublisher<() async throws -> GraphQLTask.Success, Never>
) {
- var cancellables = Set()
- self.init(requestFactory: AsyncStream { continuation in
- requestStream.sink { completion in
- continuation.finish()
- } receiveValue: { value in
- continuation.yield(value)
- }.store(in: &cancellables)
- }, api: api)
- self.cancellables = cancellables
+ self.nondeterminsticOperation = NondeterminsticOperation(
+ operationStream: requestStream,
+ shouldTryNextOnError: Self.onError(_:)
+ )
}
deinit {
cancel()
}
- public func execute(
- _ operationType: GraphQLOperationType
- ) -> Future.Success, APIError> {
- Future() { promise in
- self.task = Task { promise(await self.run(operationType)) }
+ static func onError(_ error: Error) -> Bool {
+ guard let error = error as? APIError,
+ let authError = error.underlyingError as? AuthError
+ else {
+ return false
}
- }
- public func run(_ operationType: GraphQLOperationType) async -> Result.Success, APIError> {
- for await request in requestFactory {
- do {
- try Task.checkCancellation()
- switch (self.api, operationType) {
- case (.some(let api), .query):
- return .success(try await api.query(request: request()))
- case (.some(let api), .mutation):
- return .success(try await api.mutate(request: request()))
- default:
- return .failure(.operationError("Unable to run GraphQL operation with type \(operationType)", ""))
- }
-
- } catch is CancellationError {
- return .failure(.operationError("GraphQL operation cancelled", ""))
- } catch {
- guard let error = error as? APIError,
- let authError = error.underlyingError as? AuthError
- else {
- return .failure(.operationError("Failed to send \(operationType) GraphQL request", "", error))
- }
+ switch authError {
+ case .signedOut, .notAuthorized: return true
+ default: return false
+ }
+ }
- switch authError {
- case .signedOut, .notAuthorized: break
- default: return .failure(error)
- }
+ public func execute(
+ _ operationType: GraphQLOperationType
+ ) -> AnyPublisher.Success, APIError> {
+ nondeterminsticOperation.execute().mapError {
+ if let apiError = $0 as? APIError {
+ return apiError
+ } else {
+ return APIError.operationError("Failed to execute GraphQL operation", "", $0)
}
+ }.eraseToAnyPublisher()
+ }
+
+ public func run() async -> Result.Success, APIError> {
+ do {
+ return .success(try await nondeterminsticOperation.run())
+ } catch {
+ return .failure(.operationError("Failed to execute GraphQL operation", "", error))
}
- return .failure(APIError.operationError("Failed to execute GraphQL operation \(operationType)", "", nil))
}
public func cancel() {
- task?.cancel()
- cancellables = Set()
+ nondeterminsticOperation.cancel()
}
}
@@ -94,69 +70,45 @@ public final class RetryableGraphQLOperation {
public final class RetryableGraphQLSubscriptionOperation {
public typealias Payload = Payload
+ public typealias SubscriptionEvents = GraphQLSubscriptionEvent
+ private var task: Task?
+ private let nondeterminsticOperation: NondeterminsticOperation>
- public let requestFactory: AsyncStream<() async -> GraphQLRequest>
- public weak var api: APICategoryGraphQLBehavior?
- private var task: Task?
- private var cancellables = Set()
-
- public init(
- requestFactory: T,
- api: APICategoryGraphQLBehavior
- ) where T.Element == () async -> GraphQLRequest {
- self.requestFactory = requestFactory.asyncStream
- self.api = api
- }
-
- public convenience init(
- requestStream: AnyPublisher<() async -> GraphQLRequest, Never>,
- api: APICategoryGraphQLBehavior
+ public init(
+ requestStream: AnyPublisher<() async throws -> AmplifyAsyncThrowingSequence, Never>
) {
- var cancellables = Set()
- self.init(requestFactory: AsyncStream { continuation in
- requestStream.sink { completion in
- continuation.finish()
- } receiveValue: { value in
- continuation.yield(value)
- }.store(in: &cancellables)
- }, api: api)
- self.cancellables = cancellables
+ self.nondeterminsticOperation = NondeterminsticOperation(operationStream: requestStream)
}
deinit {
cancel()
}
- public func subscribe() -> AnyPublisher, APIError> {
- let subject = PassthroughSubject, APIError>()
+ public func subscribe() -> AnyPublisher {
+ let subject = PassthroughSubject()
self.task = Task { await self.trySubscribe(subject) }
return subject.eraseToAnyPublisher()
}
- private func trySubscribe(_ subject: PassthroughSubject, APIError>) async {
+ private func trySubscribe(_ subject: PassthroughSubject) async {
var apiError: APIError?
- for await request in requestFactory {
- guard let sequence = self.api?.subscribe(request: await request()) else {
- continue
- }
- do {
+ do {
+ try Task.checkCancellation()
+ let sequence = try await self.nondeterminsticOperation.run()
+ defer { sequence.cancel() }
+ for try await event in sequence {
try Task.checkCancellation()
-
- for try await event in sequence {
- try Task.checkCancellation()
- Self.log.debug("Subscribe event \(event)")
- subject.send(event)
- }
- } catch is CancellationError {
- subject.send(completion: .finished)
- } catch {
- if let error = error as? APIError {
- apiError = error
- }
- Self.log.debug("Failed with subscription request: \(error)")
+ subject.send(event)
}
- sequence.cancel()
+ } catch is CancellationError {
+ subject.send(completion: .finished)
+ } catch {
+ if let error = error as? APIError {
+ apiError = error
+ }
+ Self.log.debug("Failed with subscription request: \(error)")
}
+
if apiError != nil {
subject.send(completion: .failure(apiError!))
} else {
@@ -166,7 +118,7 @@ public final class RetryableGraphQLSubscriptionOperation {
public func cancel() {
self.task?.cancel()
- self.cancellables = Set()
+ self.nondeterminsticOperation.cancel()
}
}
diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOperation.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOperation.swift
index 6cf1e6b77a..0365b91862 100644
--- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOperation.swift
+++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOperation.swift
@@ -173,25 +173,23 @@ final class InitialSyncOperation: AsynchronousOperation {
.publisher()
.map { Optional.some($0) } // map to optional to have nil as element
.replaceEmpty(with: nil) // use a nil element to trigger default auth if no auth provided
- .map { authType in {
- GraphQLRequest.syncQuery(
+ .map { authType in { [weak self] in
+ guard let self, let api = self.api else {
+ throw APIError.operationError("Operation cancelled", "")
+ }
+
+ return try await api.query(request: GraphQLRequest.syncQuery(
modelSchema: self.modelSchema,
where: self.syncPredicate,
limit: limit,
nextToken: nextToken,
lastSync: lastSyncTime,
authType: authType
- )
+ ))
}}
.eraseToAnyPublisher()
-
- let result: Result, APIError> = await RetryableGraphQLOperation(
- requestStream: authTypes,
- api: api
- ).run(.query)
-
- switch result {
+ switch await RetryableGraphQLOperation(requestStream: authTypes).run() {
case .success(let graphQLResult):
await handleQueryResults(lastSyncTime: lastSyncTime, graphQLResult: graphQLResult)
case .failure(let apiError):
diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift
index 81171e8f90..a342d12c00 100644
--- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift
+++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift
@@ -127,18 +127,21 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
requestStream: authTypeProvider.publisher()
.map { Optional.some($0) } // map to optional to have nil as element
.replaceEmpty(with: nil) // use a nil element to trigger default auth if no auth provided
- .map { authType in {
- await IncomingAsyncSubscriptionEventPublisher.makeAPIRequest(
+ .map { authType in { [weak self] in
+ guard let self else {
+ throw APIError.operationError("GraphQL subscription cancelled", "")
+ }
+
+ return api.subscribe(request: await IncomingAsyncSubscriptionEventPublisher.makeAPIRequest(
for: modelSchema,
subscriptionType: subscriptionType.subscriptionType,
api: api,
auth: auth,
authType: authType,
awsAuthService: self.awsAuthService
- )
+ ))
}}
- .eraseToAnyPublisher(),
- api: api
+ .eraseToAnyPublisher()
)
}
diff --git a/AmplifyTests/CategoryTests/API/NondeterminsticOperationTests.swift b/AmplifyTests/CategoryTests/API/NondeterminsticOperationTests.swift
new file mode 100644
index 0000000000..2923117710
--- /dev/null
+++ b/AmplifyTests/CategoryTests/API/NondeterminsticOperationTests.swift
@@ -0,0 +1,126 @@
+//
+// Copyright Amazon.com Inc. or its affiliates.
+// All Rights Reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+
+
+import XCTest
+@testable import Amplify
+
+class NondeterminsticOperationTests: XCTestCase {
+ enum TestError: Error {
+ case error
+ }
+ /**
+ Given: A nondeterminstic operation with all operation candidates would success
+ When: execute the nondeterminstic operation
+ Then: only first succeed operation will be executed
+ */
+ func test_withAllSucceedOperations_onlyFirstOneExecuted() async throws {
+ let expectation1 = expectation(description: "opeartion 1 executed")
+ let operation1: () async throws -> Void = {
+ expectation1.fulfill()
+ }
+ let expectation2 = expectation(description: "opeartion 2 executed")
+ expectation2.isInverted = true
+ let operation2: () async throws -> Void = {
+ expectation2.fulfill()
+ }
+ let expectation3 = expectation(description: "opeartion 3 executed")
+ expectation3.isInverted = true
+ let operation3: () async throws -> Void = {
+ expectation3.fulfill()
+ }
+
+ let nondeterminsticOperation = NondeterminsticOperation(operations: AsyncStream { continuation in
+ for operation in [operation1, operation2, operation3] {
+ continuation.yield(operation)
+ }
+ continuation.finish()
+ })
+
+ try await nondeterminsticOperation.run()
+ await fulfillment(of: [expectation1, expectation2, expectation3], timeout: 0.2)
+ }
+
+ /**
+ Given: A nondeterminstic operation with all operation candidates would fail
+ When: execute the nondeterminstic operation
+ Then: a totoal failure error is throwed and all operations are executed
+ */
+ func test_withAllFailedOperations_throwsTotoalFailureAndAllOperationsAreExecuted() async throws {
+ let expectation1 = expectation(description: "opeartion 1 executed")
+ let operation1: () async throws -> Void = {
+ expectation1.fulfill()
+ throw TestError.error
+ }
+ let expectation2 = expectation(description: "opeartion 2 executed")
+ let operation2: () async throws -> Void = {
+ expectation2.fulfill()
+ throw TestError.error
+ }
+ let expectation3 = expectation(description: "opeartion 3 executed")
+ let operation3: () async throws -> Void = {
+ expectation3.fulfill()
+ throw TestError.error
+ }
+
+ let nondeterminsticOperation = NondeterminsticOperation(operations: AsyncStream { continuation in
+ for operation in [operation1, operation2, operation3] {
+ continuation.yield(operation)
+ }
+ continuation.finish()
+ })
+ do {
+ try await nondeterminsticOperation.run()
+ } catch {
+ XCTAssert(error is NondeterminsticOperationError)
+ XCTAssertEqual(error as! NondeterminsticOperationError, NondeterminsticOperationError.totalFailure)
+ }
+ await fulfillment(of: [expectation1, expectation2, expectation3], timeout: 0.2)
+ }
+
+ /**
+ Given: A nondeterminstic operation with some operation candidates would succeed
+ When: execute the nondeterminstic operation
+ Then: all operations until the first success operation will be executed
+ */
+ func test_withSomeSuccessOperations_AllOperationsUntilSuccessOperationAreExecuted() async throws {
+ let expectation1 = expectation(description: "opeartion 1 executed")
+ let operation1: () async throws -> Void = {
+ expectation1.fulfill()
+ throw TestError.error
+ }
+ let expectation2 = expectation(description: "opeartion 2 executed")
+ let operation2: () async throws -> Void = {
+ expectation2.fulfill()
+ throw TestError.error
+ }
+ let expectation3 = expectation(description: "opeartion 3 executed")
+ let operation3: () async throws -> Void = {
+ expectation3.fulfill()
+ }
+ let expectation4 = expectation(description: "opeartion executed")
+ expectation4.isInverted = true
+ let operation4: () async throws -> Void = {
+ expectation4.fulfill()
+ throw TestError.error
+ }
+
+ let nondeterminsticOperation = NondeterminsticOperation(operations: AsyncStream { continuation in
+ for operation in [operation1, operation2, operation3, operation4] {
+ continuation.yield(operation)
+ }
+ continuation.finish()
+ })
+ do {
+ try await nondeterminsticOperation.run()
+ } catch {
+ XCTAssert(error is NondeterminsticOperationError)
+ XCTAssertEqual(error as! NondeterminsticOperationError, NondeterminsticOperationError.totalFailure)
+ }
+ await fulfillment(of: [expectation1, expectation2, expectation3, expectation4], timeout: 0.2)
+ }
+}
diff --git a/AmplifyTests/CategoryTests/API/RetryableGraphQLOperationTests.swift b/AmplifyTests/CategoryTests/API/RetryableGraphQLOperationTests.swift
index c76e6ec0cd..897e7303c3 100644
--- a/AmplifyTests/CategoryTests/API/RetryableGraphQLOperationTests.swift
+++ b/AmplifyTests/CategoryTests/API/RetryableGraphQLOperationTests.swift
@@ -7,150 +7,94 @@
import Foundation
import XCTest
-
+import Combine
@testable import Amplify
@testable import AmplifyTestCommon
class RetryableGraphQLOperationTests: XCTestCase {
let testApiName = "apiName"
- /// Given: a RetryableGraphQLOperation with a maxRetries of 2
- /// When: the request fails the first attempt with a .signedOut error
- /// Then: the request is re-tried and resultListener called
- func testShouldRetryOperation() {
- let maxRetries = 2
- var attempt = 0
-
- let requestFactoryExpectation = expectation(description: "Retry factory called \(maxRetries) times")
- requestFactoryExpectation.expectedFulfillmentCount = maxRetries
- let resultExpectation = expectation(description: "Result called")
-
- let resultListener: ResultListener = { _ in
- resultExpectation.fulfill()
+ /// Given: a RetryableGraphQLOperation with 2 operations
+ /// When: the first one fails with a .signedOut error, the next one succeed with response
+ /// Then: return the success response
+ func testShouldRetryOperationWithSignedOutAuthError() async throws {
+ let expectation1 = expectation(description: "Operation 1 throws signed out auth error")
+ let operation1: () async throws -> GraphQLResponse = {
+ expectation1.fulfill()
+ throw APIError.operationError("", "", AuthError.signedOut("", ""))
}
- let requestFactory: RequestFactory = {
- requestFactoryExpectation.fulfill()
- return self.makeTestRequest()
-
+ let expectation2 = expectation(description: "Operation 2 successfully finished")
+ let operation2: () async throws -> GraphQLResponse = {
+ expectation2.fulfill()
+ return .success("operation 2")
}
- let operation = RetryableGraphQLOperation(requestFactory: requestFactory,
- maxRetries: maxRetries,
- resultListener: resultListener) { _, wrappedListener in
-
- // simulate an error at first attempt
- if attempt == 0 {
- wrappedListener(
- .failure(self.makeSignedOutAuthError())
- )
- } else {
- wrappedListener(.success(.success("")))
- }
- attempt += 1
- return self.makeTestOperation()
+ let publisher = Publishers.MergeMany([operation1, operation2].map { Just($0) }).eraseToAnyPublisher()
+ let result = await RetryableGraphQLOperation(requestStream: publisher).run()
+ if case .success(.success(let string)) = result {
+ XCTAssertEqual(string, "operation 2")
+ } else {
+ XCTFail("Wrong result")
}
- operation.main()
-
- wait(for: [requestFactoryExpectation, resultExpectation], timeout: 10)
+ await fulfillment(of: [expectation1, expectation2], timeout: 1)
}
- /// Given: a RetryableGraphQLOperation with a maxRetries of 1
- /// When: the request fails the first attempt with a .signedOut error
- /// Then: the request is not re-tried
- func testShouldNotRetryOperationWithMaxRetriesOne() {
- let maxRetries = 1
-
- let requestFactoryExpectation = expectation(description: "Retry factory called \(maxRetries) times")
- requestFactoryExpectation.expectedFulfillmentCount = maxRetries
- let resultExpectation = expectation(description: "Result called")
-
- let resultListener: ResultListener = { _ in
- resultExpectation.fulfill()
+ /// Given: a RetryableGraphQLOperation with 2 operations
+ /// When: the first one fails with a .notAuthorized error, the next one succeed with response
+ /// Then: return the success response
+ func testShouldRetryOperationWithNotAuthorizedAuthError() async throws {
+ let expectation1 = expectation(description: "Operation 1 throws signed out auth error")
+ let operation1: () async throws -> GraphQLResponse = {
+ expectation1.fulfill()
+ throw APIError.operationError("", "", AuthError.notAuthorized("", ""))
}
- let requestFactory: RequestFactory = {
- requestFactoryExpectation.fulfill()
- return self.makeTestRequest()
-
+ let expectation2 = expectation(description: "Operation 2 successfully finished")
+ let operation2: () async throws -> GraphQLResponse = {
+ expectation2.fulfill()
+ return .success("operation 2")
}
- let operation = RetryableGraphQLOperation(requestFactory: requestFactory,
- maxRetries: maxRetries,
- resultListener: resultListener) { _, wrappedListener in
-
- wrappedListener(
- .failure(self.makeSignedOutAuthError())
- )
- return self.makeTestOperation()
+ let publisher = Publishers.MergeMany([operation1, operation2].map { Just($0) }).eraseToAnyPublisher()
+ let result = await RetryableGraphQLOperation(requestStream: publisher).run()
+ if case .success(.success(let string)) = result {
+ XCTAssertEqual(string, "operation 2")
+ } else {
+ XCTFail("Wrong result")
}
- operation.main()
-
- wait(for: [requestFactoryExpectation, resultExpectation], timeout: 10)
+ await fulfillment(of: [expectation1, expectation2], timeout: 1)
}
- /// Given: a RetryableGraphQLOperation with a maxRetries of 2
- /// When: the request fails both attempts
- /// Then: the request is re-tried only twice and resultListener called
- func testNotShouldRetryOperation() {
- let maxRetries = 2
-
- let requestFactoryExpectation = expectation(description: "Retry factory called \(maxRetries) times")
- requestFactoryExpectation.expectedFulfillmentCount = maxRetries
- let resultExpectation = expectation(description: "Result called")
-
- let resultListener: ResultListener = { _ in
- resultExpectation.fulfill()
+ /// Given: a RetryableGraphQLOperation with 2 operations
+ /// When: the first one fails with a .notAuthorized error, the next one succeed with response
+ /// Then: return the success response
+ func testShouldNotRetryOperationWithUnknownError() async throws {
+ let expectation1 = expectation(description: "Operation 1 throws signed out auth error")
+ let operation1: () async throws -> GraphQLResponse = {
+ expectation1.fulfill()
+ throw APIError.unknown("~Unknown~", "")
}
- let requestFactory: RequestFactory = {
- requestFactoryExpectation.fulfill()
- return self.makeTestRequest()
-
+ let expectation2 = expectation(description: "Operation 2 successfully finished")
+ expectation2.isInverted = true
+ let operation2: () async throws -> GraphQLResponse = {
+ expectation2.fulfill()
+ return .success("operation 2")
}
- let operation = RetryableGraphQLOperation(requestFactory: requestFactory,
- maxRetries: maxRetries,
- resultListener: resultListener) { _, wrappedListener in
-
- // simulate an error for both attempts
- wrappedListener(
- .failure(self.makeSignedOutAuthError())
- )
- return self.makeTestOperation()
+ let publisher = Publishers.MergeMany([operation1, operation2].map { Just($0) }).eraseToAnyPublisher()
+ let result = await RetryableGraphQLOperation(requestStream: publisher).run()
+ if case .failure(.operationError(_, _, let error)) = result {
+ XCTAssert(error is APIError)
+ if case .unknown(let description, _, _) = error as! APIError {
+ XCTAssertEqual(description, "~Unknown~")
+ } else {
+ XCTFail("Wrong result")
+ }
+ } else {
+ XCTFail("Wrong result")
}
- operation.main()
-
- wait(for: [requestFactoryExpectation, resultExpectation], timeout: 10)
- }
-}
-
-// MARK: - Test helpers
-extension RetryableGraphQLOperationTests {
- private func makeTestRequest() -> GraphQLRequest {
- GraphQLRequest(apiName: testApiName,
- document: "",
- responseType: Payload.self)
- }
-
- private func makeTestOperation() -> GraphQLOperation {
- let requestOptions = GraphQLOperationRequest.Options(pluginOptions: nil)
- let operationRequest = GraphQLOperationRequest(apiName: testApiName,
- operationType: .subscription,
- document: "",
- responseType: Payload.self,
- options: requestOptions)
- return GraphQLOperation(categoryType: .dataStore,
- eventName: "eventName",
- request: operationRequest)
+ await fulfillment(of: [expectation1, expectation2], timeout: 0.3)
}
-
- func makeSignedOutAuthError() -> APIError {
- return APIError.operationError("Error", "", AuthError.signedOut("AuthError", ""))
- }
-
- /// Convenience type alias
- private typealias Payload = String
- private typealias ResultListener = RetryableGraphQLOperation.OperationResultListener
- private typealias RequestFactory = RetryableGraphQLOperation.RequestFactory
}
From ccbfd660bcdb7955d64ddbd16d41b9d3e7b8a258 Mon Sep 17 00:00:00 2001
From: Di Wu
Date: Mon, 15 Apr 2024 23:45:35 -0700
Subject: [PATCH 06/23] fix unit test cases
---
.../Operation/RetryableGraphQLOperation.swift | 11 +-
...tialSyncOperationSyncExpressionTests.swift | 38 +-
.../InitialSyncOperationTests.swift | 89 +-
.../InitialSyncOrchestratorTests.swift | 46 +-
.../OutgoingMutationQueueNetworkTests.swift | 41 +-
.../OutgoingMutationQueueTests.swift | 40 +-
...tationQueueTestsWithMockStateMachine.swift | 75 +-
...MutationErrorFromCloudOperationTests.swift | 2558 ++++++++---------
.../SyncMutationToCloudOperationTests.swift | 966 +++----
.../ModelReconciliationDeleteTests.swift | 47 +-
.../Mocks/MockAPICategoryPlugin.swift | 170 +-
.../Mocks/MockAPIResponders.swift | 34 +-
.../Mocks/MockDataStoreCategoryPlugin.swift | 104 +-
AmplifyTestCommon/Mocks/MockResponder.swift | 16 +
.../API/RetryableGraphQLOperationTests.swift | 50 +-
15 files changed, 2169 insertions(+), 2116 deletions(-)
diff --git a/Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift b/Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift
index cc4791f5f3..f40229d9f9 100644
--- a/Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift
+++ b/Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift
@@ -36,7 +36,7 @@ public final class RetryableGraphQLOperation {
}
switch authError {
- case .signedOut, .notAuthorized: return true
+ case .notAuthorized: return true
default: return false
}
}
@@ -55,9 +55,14 @@ public final class RetryableGraphQLOperation {
public func run() async -> Result.Success, APIError> {
do {
- return .success(try await nondeterminsticOperation.run())
+ let result = try await nondeterminsticOperation.run()
+ return .success(result)
} catch {
- return .failure(.operationError("Failed to execute GraphQL operation", "", error))
+ if let apiError = error as? APIError {
+ return .failure(apiError)
+ } else {
+ return .failure(.operationError("Failed to execute GraphQL operation", "", error))
+ }
}
}
diff --git a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/InitialSync/InitialSyncOperationSyncExpressionTests.swift b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/InitialSync/InitialSyncOperationSyncExpressionTests.swift
index d2e0d60242..cc7e5e9144 100644
--- a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/InitialSync/InitialSyncOperationSyncExpressionTests.swift
+++ b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/InitialSync/InitialSyncOperationSyncExpressionTests.swift
@@ -15,7 +15,7 @@ import Combine
@testable import AWSPluginsCore
class InitialSyncOperationSyncExpressionTests: XCTestCase {
- typealias APIPluginQueryResponder = QueryRequestListenerResponder>
+ typealias APIPluginQueryResponder = QueryRequestResponder>
var storageAdapter: StorageEngineAdapter!
var apiPlugin = MockAPICategoryPlugin()
@@ -36,7 +36,7 @@ class InitialSyncOperationSyncExpressionTests: XCTestCase {
func initialSyncOperation(withSyncExpression syncExpression: DataStoreSyncExpression,
responder: APIPluginQueryResponder) -> InitialSyncOperation {
- apiPlugin.responders[.queryRequestListener] = responder
+ apiPlugin.responders[.queryRequestResponse] = responder
#if os(watchOS)
let configuration = DataStoreConfiguration.custom(syncPageSize: 10,
syncExpressions: [syncExpression],
@@ -55,7 +55,7 @@ class InitialSyncOperationSyncExpressionTests: XCTestCase {
}
func testBaseQueryWithBasicSyncExpression() throws {
- let responder = APIPluginQueryResponder { request, listener in
+ let responder = APIPluginQueryResponder { request in
XCTAssertEqual(request.document, """
query SyncMockSynceds($filter: ModelMockSyncedFilterInput, $limit: Int) {
syncMockSynceds(filter: $filter, limit: $limit) {
@@ -73,28 +73,26 @@ class InitialSyncOperationSyncExpressionTests: XCTestCase {
""")
guard let filter = request.variables?["filter"] as? [String: Any?] else {
XCTFail("Unable to get filter")
- return nil
+ return .failure(.unknown("Unable to get filter", "", nil))
}
guard let group = filter["and"] as? [[String: Any?]] else {
XCTFail("Unable to find 'and' group")
- return nil
+ return .failure(.unknown("Unable to find 'and' group", "", nil))
}
guard let key = group[0]["id"] as? [String: Any?] else {
XCTFail("Unable to get id from filter")
- return nil
+ return .failure(.unknown("Unable to get id from filter", "", nil))
}
guard let value = key["eq"] as? String else {
XCTFail("Unable to get eq from key")
- return nil
+ return .failure(.unknown("Unable to get eq from key", "", nil))
}
XCTAssertEqual(value, "id-123")
let list = PaginatedList(items: [], nextToken: nil, startedAt: nil)
- let event: GraphQLOperation>.OperationResult = .success(.success(list))
- listener?(event)
self.apiWasQueried.fulfill()
- return nil
+ return .success(list)
}
let syncExpression = DataStoreSyncExpression.syncExpression(MockSynced.schema, where: {
@@ -127,7 +125,7 @@ class InitialSyncOperationSyncExpressionTests: XCTestCase {
}
func testBaseQueryWithFilterSyncExpression() throws {
- let responder = APIPluginQueryResponder { request, listener in
+ let responder = APIPluginQueryResponder { request in
XCTAssertEqual(request.document, """
query SyncMockSynceds($filter: ModelMockSyncedFilterInput, $limit: Int) {
syncMockSynceds(filter: $filter, limit: $limit) {
@@ -145,28 +143,26 @@ class InitialSyncOperationSyncExpressionTests: XCTestCase {
""")
guard let filter = request.variables?["filter"] as? [String: Any?] else {
XCTFail("Unable to get filter")
- return nil
+ return .failure(.unknown("Unable to get filter", "", nil))
}
guard let group = filter["or"] as? [[String: Any?]] else {
XCTFail("Unable to find 'or' group")
- return nil
+ return .failure(.unknown("Unable to find 'or' group", "", nil))
}
guard let key = group[0]["id"] as? [String: Any?] else {
XCTFail("Unable to get id from filter")
- return nil
+ return .failure(.unknown("Unable to get id from filter", "", nil))
}
guard let value = key["eq"] as? String else {
XCTFail("Unable to get eq from key")
- return nil
+ return .failure(.unknown("Unable to get eq from key", "", nil))
}
XCTAssertEqual(value, "id-123")
let list = PaginatedList(items: [], nextToken: nil, startedAt: nil)
- let event: GraphQLOperation>.OperationResult = .success(.success(list))
- listener?(event)
self.apiWasQueried.fulfill()
- return nil
+ return .success(list)
}
let syncExpression = DataStoreSyncExpression.syncExpression(MockSynced.schema, where: {
@@ -199,7 +195,7 @@ class InitialSyncOperationSyncExpressionTests: XCTestCase {
}
func testBaseQueryWithSyncExpressionConstantAll() throws {
- let responder = APIPluginQueryResponder { request, listener in
+ let responder = APIPluginQueryResponder { request in
XCTAssertEqual(request.document, """
query SyncMockSynceds($limit: Int) {
syncMockSynceds(limit: $limit) {
@@ -218,10 +214,8 @@ class InitialSyncOperationSyncExpressionTests: XCTestCase {
XCTAssertNil(request.variables?["filter"])
let list = PaginatedList(items: [], nextToken: nil, startedAt: nil)
- let event: GraphQLOperation>.OperationResult = .success(.success(list))
- listener?(event)
self.apiWasQueried.fulfill()
- return nil
+ return .success(list)
}
let syncExpression = DataStoreSyncExpression.syncExpression(MockSynced.schema, where: {
diff --git a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/InitialSync/InitialSyncOperationTests.swift b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/InitialSync/InitialSyncOperationTests.swift
index d1a8a464aa..58bb5c4bea 100644
--- a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/InitialSync/InitialSyncOperationTests.swift
+++ b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/InitialSync/InitialSyncOperationTests.swift
@@ -247,16 +247,14 @@ class InitialSyncOperationTests: XCTestCase {
/// - Then:
/// - It reads sync metadata from storage
func testReadsMetadata() {
- let responder = QueryRequestListenerResponder> { _, listener in
+ let responder = QueryRequestResponder> { _ in
let startDateMilliseconds = Int64(Date().timeIntervalSince1970) * 1_000
let list = PaginatedList(items: [], nextToken: nil, startedAt: startDateMilliseconds)
- let event: GraphQLOperation>.OperationResult = .success(.success(list))
- listener?(event)
- return nil
+ return .success(list)
}
let apiPlugin = MockAPICategoryPlugin()
- apiPlugin.responders[.queryRequestListener] = responder
+ apiPlugin.responders[.queryRequestResponse] = responder
let storageAdapter = MockSQLiteStorageEngineAdapter()
let metadataQueryReceived = expectation(description: "Metadata query received by storage adapter")
@@ -308,17 +306,15 @@ class InitialSyncOperationTests: XCTestCase {
/// - It performs a sync query against the API category
func testQueriesAPI() {
let apiWasQueried = expectation(description: "API was queried for a PaginatedList of AnyModel")
- let responder = QueryRequestListenerResponder> { _, listener in
+ let responder = QueryRequestResponder> { _ in
let startDateMilliseconds = Int64(Date().timeIntervalSince1970) * 1_000
let list = PaginatedList(items: [], nextToken: nil, startedAt: startDateMilliseconds)
- let event: GraphQLOperation>.OperationResult = .success(.success(list))
- listener?(event)
apiWasQueried.fulfill()
- return nil
+ return .success(list)
}
let apiPlugin = MockAPICategoryPlugin()
- apiPlugin.responders[.queryRequestListener] = responder
+ apiPlugin.responders[.queryRequestResponse] = responder
let storageAdapter = MockSQLiteStorageEngineAdapter()
storageAdapter.returnOnQueryModelSyncMetadata(nil)
@@ -366,16 +362,14 @@ class InitialSyncOperationTests: XCTestCase {
/// - Then:
/// - The method invokes a completion callback when complete
func testInvokesPublisherCompletion() {
- let responder = QueryRequestListenerResponder> { _, listener in
+ let responder = QueryRequestResponder> { _ in
let startDateMilliseconds = Int64(Date().timeIntervalSince1970) * 1_000
let list = PaginatedList(items: [], nextToken: nil, startedAt: startDateMilliseconds)
- let event: GraphQLOperation>.OperationResult = .success(.success(list))
- listener?(event)
- return nil
+ return .success(list)
}
let apiPlugin = MockAPICategoryPlugin()
- apiPlugin.responders[.queryRequestListener] = responder
+ apiPlugin.responders[.queryRequestResponse] = responder
let storageAdapter = MockSQLiteStorageEngineAdapter()
storageAdapter.returnOnQueryModelSyncMetadata(nil)
@@ -421,18 +415,16 @@ class InitialSyncOperationTests: XCTestCase {
var nextTokens = ["token1", "token2"]
- let responder = QueryRequestListenerResponder> { _, listener in
+ let responder = QueryRequestResponder> { _ in
let startedAt = Int64(Date().timeIntervalSince1970)
let nextToken = nextTokens.isEmpty ? nil : nextTokens.removeFirst()
let list = PaginatedList(items: [], nextToken: nextToken, startedAt: startedAt)
- let event: GraphQLOperation>.OperationResult = .success(.success(list))
- listener?(event)
apiWasQueried.fulfill()
- return nil
+ return .success(list)
}
let apiPlugin = MockAPICategoryPlugin()
- apiPlugin.responders[.queryRequestListener] = responder
+ apiPlugin.responders[.queryRequestResponse] = responder
let storageAdapter = MockSQLiteStorageEngineAdapter()
storageAdapter.returnOnQueryModelSyncMetadata(nil)
@@ -482,15 +474,13 @@ class InitialSyncOperationTests: XCTestCase {
lastChangedAt: Int64(Date().timeIntervalSince1970),
version: 1)
let mutationSync = MutationSync(model: anyModel, syncMetadata: metadata)
- let responder = QueryRequestListenerResponder> { _, listener in
+ let responder = QueryRequestResponder> { _ in
let list = PaginatedList(items: [mutationSync], nextToken: nil, startedAt: startedAtMilliseconds)
- let event: GraphQLOperation>.OperationResult = .success(.success(list))
- listener?(event)
- return nil
+ return .success(list)
}
let apiPlugin = MockAPICategoryPlugin()
- apiPlugin.responders[.queryRequestListener] = responder
+ apiPlugin.responders[.queryRequestResponse] = responder
let storageAdapter = MockSQLiteStorageEngineAdapter()
storageAdapter.returnOnQueryModelSyncMetadata(nil)
@@ -551,16 +541,14 @@ class InitialSyncOperationTests: XCTestCase {
/// - The method submits the returned data to the reconciliation queue
func testUpdatesSyncMetadata() throws {
let startDateMilliseconds = Int64(Date().timeIntervalSince1970) * 1_000
- let responder = QueryRequestListenerResponder> { _, listener in
+ let responder = QueryRequestResponder> { _ in
let startedAt = startDateMilliseconds
let list = PaginatedList(items: [], nextToken: nil, startedAt: startedAt)
- let event: GraphQLOperation>.OperationResult = .success(.success(list))
- listener?(event)
- return nil
+ return .success(list)
}
let apiPlugin = MockAPICategoryPlugin()
- apiPlugin.responders[.queryRequestListener] = responder
+ apiPlugin.responders[.queryRequestResponse] = responder
let storageAdapter = try SQLiteStorageEngineAdapter(connection: Connection(.inMemory))
try storageAdapter.setUp(modelSchemas: StorageEngine.systemModelSchemas + [MockSynced.schema])
@@ -614,17 +602,15 @@ class InitialSyncOperationTests: XCTestCase {
/// - I invoke main() against an API that returns .signedOut error
/// - Then:
/// - The method completes with a failure result, error handler is called.
- func testQueriesAPIReturnSignedOutError() throws {
- let responder = QueryRequestListenerResponder> { _, listener in
+ func testQueriesAPIReturnSignedOutError() async throws {
+ let responder = QueryRequestResponder> { _ in
let authError = AuthError.signedOut("", "", nil)
let apiError = APIError.operationError("", "", authError)
- let event: GraphQLOperation>.OperationResult = .failure(apiError)
- listener?(event)
- return nil
+ throw apiError
}
let apiPlugin = MockAPICategoryPlugin()
- apiPlugin.responders[.queryRequestListener] = responder
+ apiPlugin.responders[.queryRequestResponse] = responder
let storageAdapter = try SQLiteStorageEngineAdapter(connection: Connection(.inMemory))
@@ -704,7 +690,12 @@ class InitialSyncOperationTests: XCTestCase {
operation.main()
- waitForExpectations(timeout: 1)
+ await fulfillment(of: [
+ expectErrorHandlerCalled,
+ syncStartedReceived,
+ syncCompletionReceived,
+ finishedReceived
+ ], timeout: 1)
sink.cancel()
}
@@ -734,19 +725,17 @@ class InitialSyncOperationTests: XCTestCase {
wait(for: [syncMetadataSaved], timeout: 1.0)
let apiWasQueried = expectation(description: "API was queried for a PaginatedList of AnyModel")
- let responder = QueryRequestListenerResponder> { request, listener in
+ let responder = QueryRequestResponder> { request in
let lastSync = request.variables?["lastSync"] as? Int64
XCTAssertEqual(lastSync, startDateMilliseconds)
let list = PaginatedList(items: [], nextToken: nil, startedAt: nil)
- let event: GraphQLOperation>.OperationResult = .success(.success(list))
- listener?(event)
apiWasQueried.fulfill()
- return nil
+ return .success(list)
}
let apiPlugin = MockAPICategoryPlugin()
- apiPlugin.responders[.queryRequestListener] = responder
+ apiPlugin.responders[.queryRequestResponse] = responder
let reconciliationQueue = MockReconciliationQueue()
let operation = InitialSyncOperation(
@@ -805,19 +794,17 @@ class InitialSyncOperationTests: XCTestCase {
wait(for: [syncMetadataSaved], timeout: 1.0)
let apiWasQueried = expectation(description: "API was queried for a PaginatedList of AnyModel")
- let responder = QueryRequestListenerResponder> { request, listener in
+ let responder = QueryRequestResponder> { request in
let lastSync = request.variables?["lastSync"] as? Int
XCTAssertNil(lastSync)
let list = PaginatedList(items: [], nextToken: nil, startedAt: nil)
- let event: GraphQLOperation>.OperationResult = .success(.success(list))
- listener?(event)
apiWasQueried.fulfill()
- return nil
+ return .success(list)
}
let apiPlugin = MockAPICategoryPlugin()
- apiPlugin.responders[.queryRequestListener] = responder
+ apiPlugin.responders[.queryRequestResponse] = responder
let reconciliationQueue = MockReconciliationQueue()
#if os(watchOS)
@@ -866,7 +853,7 @@ class InitialSyncOperationTests: XCTestCase {
try storageAdapter.setUp(modelSchemas: StorageEngine.systemModelSchemas + [MockSynced.schema])
let apiWasQueried = expectation(description: "API was queried for a PaginatedList of AnyModel")
- let responder = QueryRequestListenerResponder> { request, listener in
+ let responder = QueryRequestResponder> { request in
let lastSync = request.variables?["lastSync"] as? Int
XCTAssertNil(lastSync)
XCTAssert(request.document.contains("limit: Int"))
@@ -874,14 +861,12 @@ class InitialSyncOperationTests: XCTestCase {
XCTAssertEqual(10, limitValue)
let list = PaginatedList(items: [], nextToken: nil, startedAt: nil)
- let event: GraphQLOperation>.OperationResult = .success(.success(list))
- listener?(event)
apiWasQueried.fulfill()
- return nil
+ return .success(list)
}
let apiPlugin = MockAPICategoryPlugin()
- apiPlugin.responders[.queryRequestListener] = responder
+ apiPlugin.responders[.queryRequestResponse] = responder
let reconciliationQueue = MockReconciliationQueue()
#if os(watchOS)
diff --git a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/InitialSync/InitialSyncOrchestratorTests.swift b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/InitialSync/InitialSyncOrchestratorTests.swift
index fbe3f5b6af..429e955cf6 100644
--- a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/InitialSync/InitialSyncOrchestratorTests.swift
+++ b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/InitialSync/InitialSyncOrchestratorTests.swift
@@ -27,16 +27,14 @@ class InitialSyncOrchestratorTests: XCTestCase {
func testInvokesCompletionCallback() async throws {
ModelRegistry.reset()
PostCommentModelRegistration().registerModels(registry: ModelRegistry.self)
- let responder = QueryRequestListenerResponder> { _, listener in
+ let responder = QueryRequestResponder> { _ in
let startedAt = Int64(Date().timeIntervalSince1970)
let list = PaginatedList(items: [], nextToken: nil, startedAt: startedAt)
- let event: GraphQLOperation>.OperationResult = .success(.success(list))
- listener?(event)
- return nil
+ return .success(list)
}
let apiPlugin = MockAPICategoryPlugin()
- apiPlugin.responders[.queryRequestListener] = responder
+ apiPlugin.responders[.queryRequestResponse] = responder
let storageAdapter = MockSQLiteStorageEngineAdapter()
storageAdapter.returnOnQueryModelSyncMetadata(nil)
@@ -120,23 +118,19 @@ class InitialSyncOrchestratorTests: XCTestCase {
func testFinishWithAPIError() async throws {
ModelRegistry.reset()
PostCommentModelRegistration().registerModels(registry: ModelRegistry.self)
- let responder = QueryRequestListenerResponder> { request, listener in
+ let responder = QueryRequestResponder> { request in
if request.document.contains("SyncPosts") {
- let event: GraphQLOperation>.OperationResult =
- .failure(APIError.operationError("", "", nil))
- listener?(event)
+ throw APIError.operationError("", "", nil)
} else if request.document.contains("SyncComments") {
let startedAt = Int64(Date().timeIntervalSince1970)
let list = PaginatedList(items: [], nextToken: nil, startedAt: startedAt)
- let event: GraphQLOperation>.OperationResult = .success(.success(list))
- listener?(event)
+ return .success(list)
}
-
- return nil
+ return .failure(.unknown("", "", nil))
}
let apiPlugin = MockAPICategoryPlugin()
- apiPlugin.responders[.queryRequestListener] = responder
+ apiPlugin.responders[.queryRequestResponse] = responder
let storageAdapter = MockSQLiteStorageEngineAdapter()
storageAdapter.returnOnQueryModelSyncMetadata(nil)
@@ -238,16 +232,14 @@ class InitialSyncOrchestratorTests: XCTestCase {
}
TestModelsWithNoAssociations().registerModels(registry: ModelRegistry.self)
- let responder = QueryRequestListenerResponder> { _, listener in
+ let responder = QueryRequestResponder> { _ in
let startedAt = Int64(Date().timeIntervalSince1970)
let list = PaginatedList(items: [], nextToken: nil, startedAt: startedAt)
- let event: GraphQLOperation>.OperationResult = .success(.success(list))
- listener?(event)
- return nil
+ return .success(list)
}
let apiPlugin = MockAPICategoryPlugin()
- apiPlugin.responders[.queryRequestListener] = responder
+ apiPlugin.responders[.queryRequestResponse] = responder
let storageAdapter = MockSQLiteStorageEngineAdapter()
storageAdapter.returnOnQueryModelSyncMetadata(nil)
@@ -297,7 +289,7 @@ class InitialSyncOrchestratorTests: XCTestCase {
PostCommentModelRegistration().registerModels(registry: ModelRegistry.self)
let postWasQueried = expectation(description: "Post was queried")
let commentWasQueried = expectation(description: "Comment was queried")
- let responder = QueryRequestListenerResponder> { request, listener in
+ let responder = QueryRequestResponder> { request in
if request.document.hasPrefix("query SyncPosts") {
postWasQueried.fulfill()
}
@@ -308,13 +300,11 @@ class InitialSyncOrchestratorTests: XCTestCase {
let startedAt = Int64(Date().timeIntervalSince1970)
let list = PaginatedList(items: [], nextToken: nil, startedAt: startedAt)
- let event: GraphQLOperation>.OperationResult = .success(.success(list))
- listener?(event)
- return nil
+ return .success(list)
}
let apiPlugin = MockAPICategoryPlugin()
- apiPlugin.responders[.queryRequestListener] = responder
+ apiPlugin.responders[.queryRequestResponse] = responder
let storageAdapter = MockSQLiteStorageEngineAdapter()
storageAdapter.returnOnQueryModelSyncMetadata(nil)
@@ -371,7 +361,7 @@ class InitialSyncOrchestratorTests: XCTestCase {
var nextTokens = Array(repeating: "token", count: pageCount - 1)
- let responder = QueryRequestListenerResponder> { request, listener in
+ let responder = QueryRequestResponder