Skip to content

Commit

Permalink
Improve shared sequence management (#32)
Browse files Browse the repository at this point in the history
* fix: remove terminated sequence continuations
  • Loading branch information
reddavis authored Jun 24, 2022
1 parent 8ed33bc commit daa2674
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 25 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@ on:

jobs:
ios-latest:
name: Unit Test - iOS 15.2, Xcode 13.2
runs-on: macOS-11
name: Unit Test - iOS 15.5, Xcode 13.4
runs-on: macOS-12
env:
DEVELOPER_DIR: /Applications/Xcode_13.2.app/Contents/Developer
DEVELOPER_DIR: /Applications/Xcode_13.4.app/Contents/Developer
steps:
- uses: actions/checkout@v2
- name: Run Tests
run: Scripts/test -d "OS=15.2,name=iPhone 13 Pro"
run: Scripts/test -d "OS=15.5,name=iPhone 13 Pro"
ios-14-5:
name: Unit Test - iOS 14.5, Xcode 13.2
name: Unit Test - iOS 14.5, Xcode 13.2.1
runs-on: macOS-11
env:
DEVELOPER_DIR: /Applications/Xcode_13.2.app/Contents/Developer
DEVELOPER_DIR: /Applications/Xcode_13.2.1.app/Contents/Developer
steps:
- uses: actions/checkout@v2
- name: Symlink simulator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
<Testables>
<TestableReference
skipped = "NO"
parallelizable = "YES"
testExecutionOrdering = "random">
<BuildableReference
BuildableIdentifier = "primary"
Expand Down
44 changes: 27 additions & 17 deletions Asynchrone/Source/Sequences/SharedAsyncSequence.swift
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import Foundation

/// An async sequence that can be shared between multiple tasks.
///
/// ```swift
Expand Down Expand Up @@ -122,17 +124,14 @@ extension SharedAsyncSequence {
}
}



// MARK: Sub sequence manager

fileprivate actor SubSequenceManager<Base: AsyncSequence>{

fileprivate typealias Element = Base.Element

// Private
private var base: Base
private var sequences: [ThrowingPassthroughAsyncSequence<Base.Element>] = []
private var continuations: [UUID : AsyncThrowingStream<Base.Element, Error>.Continuation] = [:]
private var subscriptionTask: Task<Void, Never>?

// MARK: Initialization
Expand All @@ -150,18 +149,32 @@ fileprivate actor SubSequenceManager<Base: AsyncSequence>{
/// Creates an new stream and returns its async iterator that emits elements of base async sequence.
/// - Returns: An instance that conforms to `AsyncIteratorProtocol`.
nonisolated fileprivate func makeAsyncIterator() -> ThrowingPassthroughAsyncSequence<Base.Element>.AsyncIterator {
let sequence = ThrowingPassthroughAsyncSequence<Base.Element>()
Task { [sequence] in
await self.add(sequence: sequence)
let id = UUID()
let sequence = AsyncThrowingStream<Element, Error> {
$0.onTermination = { @Sendable _ in
self.remove(id)
}

await self.add(id: id, continuation: $0)
}

return sequence.makeAsyncIterator()
}

// MARK: Sequence management

private func add(sequence: ThrowingPassthroughAsyncSequence<Base.Element>) {
self.sequences.append(sequence)

nonisolated private func remove(_ id: UUID) {
Task {
await self._remove(id)
}
}

private func _remove(_ id: UUID) {
self.continuations.removeValue(forKey: id)
}

private func add(id: UUID, continuation: AsyncThrowingStream<Base.Element, Error>.Continuation) {
self.continuations[id] = continuation
self.subscribeToBaseSequenceIfNeeded()
}

Expand All @@ -172,31 +185,28 @@ fileprivate actor SubSequenceManager<Base: AsyncSequence>{
guard let self = self else { return }

guard !Task.isCancelled else {
await self.sequences.forEach {
await self.continuations.values.forEach {
$0.finish(throwing: CancellationError())
}
return
}

do {
for try await value in base {
await self.sequences.forEach { $0.yield(value) }
await self.continuations.values.forEach { $0.yield(value) }
}

await self.sequences.forEach { $0.finish() }
await self.continuations.values.forEach { $0.finish() }
} catch {
await self.sequences.forEach { $0.finish(throwing: error) }
await self.continuations.values.forEach { $0.finish(throwing: error) }
}
}
}
}



// MARK: Shared

extension AsyncSequence {

/// Creates a shareable async sequence that can be used across multiple tasks.
public func shared() -> SharedAsyncSequence<Self> {
.init(self)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ final class SharedAsyncSequenceTests: XCTestCase {
}

// MARK: Tests

func testSharedStreamShouldNotThrowExceptionAndReceiveAllValues() async {
let taskCompleteExpectation = self.expectation(description: "Task complete")
Task {
Expand Down

0 comments on commit daa2674

Please sign in to comment.