From daa26744400ca2e0c5b1281d40af9daad2e23d0d Mon Sep 17 00:00:00 2001 From: Red Davis Date: Fri, 24 Jun 2022 15:12:02 +0100 Subject: [PATCH] Improve shared sequence management (#32) * fix: remove terminated sequence continuations --- .github/workflows/ci.yml | 12 ++--- .../xcschemes/Asynchrone.xcscheme | 1 - .../Sequences/SharedAsyncSequence.swift | 44 ++++++++++++------- .../Sequences/SharedAsyncSequenceTests.swift | 2 +- 4 files changed, 34 insertions(+), 25 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ff160ba..b54f916 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,19 +8,19 @@ on: jobs: ios-latest: - name: Unit Test - iOS 15.2, Xcode 13.2 - runs-on: macOS-11 + name: Unit Test - iOS 15.5, Xcode 13.4 + runs-on: macOS-12 env: - DEVELOPER_DIR: /Applications/Xcode_13.2.app/Contents/Developer + DEVELOPER_DIR: /Applications/Xcode_13.4.app/Contents/Developer steps: - uses: actions/checkout@v2 - name: Run Tests - run: Scripts/test -d "OS=15.2,name=iPhone 13 Pro" + run: Scripts/test -d "OS=15.5,name=iPhone 13 Pro" ios-14-5: - name: Unit Test - iOS 14.5, Xcode 13.2 + name: Unit Test - iOS 14.5, Xcode 13.2.1 runs-on: macOS-11 env: - DEVELOPER_DIR: /Applications/Xcode_13.2.app/Contents/Developer + DEVELOPER_DIR: /Applications/Xcode_13.2.1.app/Contents/Developer steps: - uses: actions/checkout@v2 - name: Symlink simulator diff --git a/Asynchrone.xcodeproj/xcshareddata/xcschemes/Asynchrone.xcscheme b/Asynchrone.xcodeproj/xcshareddata/xcschemes/Asynchrone.xcscheme index 4c1b2f7..a7fbd25 100644 --- a/Asynchrone.xcodeproj/xcshareddata/xcschemes/Asynchrone.xcscheme +++ b/Asynchrone.xcodeproj/xcshareddata/xcschemes/Asynchrone.xcscheme @@ -30,7 +30,6 @@ { - fileprivate typealias Element = Base.Element // Private private var base: Base - private var sequences: [ThrowingPassthroughAsyncSequence] = [] + private var continuations: [UUID : AsyncThrowingStream.Continuation] = [:] private var subscriptionTask: Task? // MARK: Initialization @@ -150,18 +149,32 @@ fileprivate actor SubSequenceManager{ /// Creates an new stream and returns its async iterator that emits elements of base async sequence. /// - Returns: An instance that conforms to `AsyncIteratorProtocol`. nonisolated fileprivate func makeAsyncIterator() -> ThrowingPassthroughAsyncSequence.AsyncIterator { - let sequence = ThrowingPassthroughAsyncSequence() - Task { [sequence] in - await self.add(sequence: sequence) + let id = UUID() + let sequence = AsyncThrowingStream { + $0.onTermination = { @Sendable _ in + self.remove(id) + } + + await self.add(id: id, continuation: $0) } return sequence.makeAsyncIterator() } // MARK: Sequence management - - private func add(sequence: ThrowingPassthroughAsyncSequence) { - self.sequences.append(sequence) + + nonisolated private func remove(_ id: UUID) { + Task { + await self._remove(id) + } + } + + private func _remove(_ id: UUID) { + self.continuations.removeValue(forKey: id) + } + + private func add(id: UUID, continuation: AsyncThrowingStream.Continuation) { + self.continuations[id] = continuation self.subscribeToBaseSequenceIfNeeded() } @@ -172,7 +185,7 @@ fileprivate actor SubSequenceManager{ guard let self = self else { return } guard !Task.isCancelled else { - await self.sequences.forEach { + await self.continuations.values.forEach { $0.finish(throwing: CancellationError()) } return @@ -180,23 +193,20 @@ fileprivate actor SubSequenceManager{ do { for try await value in base { - await self.sequences.forEach { $0.yield(value) } + await self.continuations.values.forEach { $0.yield(value) } } - await self.sequences.forEach { $0.finish() } + await self.continuations.values.forEach { $0.finish() } } catch { - await self.sequences.forEach { $0.finish(throwing: error) } + await self.continuations.values.forEach { $0.finish(throwing: error) } } } } } - - // MARK: Shared extension AsyncSequence { - /// Creates a shareable async sequence that can be used across multiple tasks. public func shared() -> SharedAsyncSequence { .init(self) diff --git a/AsynchroneTests/Tests/Sequences/SharedAsyncSequenceTests.swift b/AsynchroneTests/Tests/Sequences/SharedAsyncSequenceTests.swift index 6eb8dbe..3e05b0c 100644 --- a/AsynchroneTests/Tests/Sequences/SharedAsyncSequenceTests.swift +++ b/AsynchroneTests/Tests/Sequences/SharedAsyncSequenceTests.swift @@ -12,7 +12,7 @@ final class SharedAsyncSequenceTests: XCTestCase { } // MARK: Tests - + func testSharedStreamShouldNotThrowExceptionAndReceiveAllValues() async { let taskCompleteExpectation = self.expectation(description: "Task complete") Task {