Skip to content

Commit

Permalink
Refactor shared async sequence (#8)
Browse files Browse the repository at this point in the history
* Add PassthroughAsyncSequence and ThrowingPassthroughAsyncSequence

* Refactor to use actor and PassthroughAsyncSequence

* Update documentation
  • Loading branch information
reddavis authored Jan 7, 2022
1 parent f1c42d2 commit cf8d504
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 86 deletions.
143 changes: 58 additions & 85 deletions Asynchrone/Source/Sequence/SharedAsyncSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,124 +32,97 @@ import Foundation
/// let values = try await self.stream.collect()
/// // ...
/// ```
public struct SharedAsyncSequence<T: AsyncSequence>: AsyncSequence {
public typealias AsyncIterator = AsyncThrowingStream<T.Element, Error>.Iterator
public struct SharedAsyncSequence<Base: AsyncSequence>: AsyncSequence {
public typealias AsyncIterator = AsyncThrowingStream<Base.Element, Error>.Iterator

/// The kind of elements streamed.
public typealias Element = T.Element
public typealias Element = Base.Element

// MARK: SharedAsyncSequence (Private Properties)

private let inner: Inner<T>
// Private
private let manager: SubSequenceManager<Base>

// MARK: SharedAsyncSequence (Public Properties)

/// Creates a shareable async sequence that can be used across multiple tasks.
/// - Parameters:
/// - base: The async sequence in which this sequence receives it's elements.
public init(_ base: T) {
self.inner = Inner<T>(base)
public init(_ base: Base) {
self.manager = SubSequenceManager<Base>(base)
}

// MARK: AsyncSequence

/// Creates an async iterator that emits elements of this async sequence.
/// - Returns: An instance that conforms to `AsyncIteratorProtocol`.
public func makeAsyncIterator() -> AsyncThrowingStream<T.Element, Error>.Iterator {
inner.makeAsyncIterator()
public func makeAsyncIterator() -> AsyncThrowingStream<Base.Element, Error>.Iterator {
self.manager.makeAsyncIterator()
}
}



// MARK: - SharedAsyncSequence > Inner

extension SharedAsyncSequence {

fileprivate final class Inner<T: AsyncSequence> {

fileprivate typealias Element = T.Element
// MARK: Sub sequence manager

// MARK: Inner (Private Properties)
fileprivate actor SubSequenceManager<Base: AsyncSequence>{

fileprivate typealias Element = Base.Element

private var base: T

private let lock = NSLock()
private var streams: [AsyncThrowingStream<T.Element, Error>] = []
private var continuations: [AsyncThrowingStream<T.Element, Error>.Continuation] = []
private var subscriptionTask: Task<Void, Never>?
// Private
private var base: Base
private var sequences: [ThrowingPassthroughAsyncSequence<Base.Element>] = []
private var subscriptionTask: Task<Void, Never>?

// MARK: Initialization
// MARK: Initialization

fileprivate init(_ base: T) {
self.base = base
}

deinit {
subscriptionTask?.cancel()
fileprivate init(_ base: Base) {
self.base = base
}

deinit {
self.subscriptionTask?.cancel()
}

// MARK: API

/// Creates an new stream and returns its async iterator that emits elements of base async sequence.
/// - Returns: An instance that conforms to `AsyncIteratorProtocol`.
nonisolated fileprivate func makeAsyncIterator() -> ThrowingPassthroughAsyncSequence<Base.Element>.AsyncIterator {
let sequence = ThrowingPassthroughAsyncSequence<Base.Element>()
Task { [sequence] in
await self.add(sequence: sequence)
}

// MARK: API

/// Creates an new stream and returns its async iterator that emits elements of base async sequence.
/// - Returns: An instance that conforms to `AsyncIteratorProtocol`.
fileprivate func makeAsyncIterator() -> AsyncThrowingStream<T.Element, Error>.Iterator {
var streamContinuation: AsyncThrowingStream<T.Element, Error>.Continuation!
let stream = AsyncThrowingStream<T.Element, Error> { (continuation: AsyncThrowingStream<T.Element, Error>.Continuation) in
streamContinuation = continuation
}

add(stream: stream, continuation: streamContinuation)

return stream.makeAsyncIterator()
}

// MARK: Inner (Private Methods)

private func add(
stream: AsyncThrowingStream<T.Element, Error>,
continuation: AsyncThrowingStream<T.Element, Error>.Continuation
) {
modify {
streams.append(stream)
continuations.append(continuation)
subscribeToBaseStreamIfNeeded()
}
}
return sequence.makeAsyncIterator()
}

private func modify(_ block: () -> Void) {
lock.lock()
block()
lock.unlock()
}
// MARK: Sequence management

private func subscribeToBaseStreamIfNeeded() {
guard subscriptionTask == nil else { return }
private func add(sequence: ThrowingPassthroughAsyncSequence<Base.Element>) {
self.sequences.append(sequence)
self.subscribeToBaseSequenceIfNeeded()
}

private func subscribeToBaseSequenceIfNeeded() {
guard self.subscriptionTask == nil else { return }

subscriptionTask = Task { [weak self, base] in
guard let self = self else { return }
self.subscriptionTask = Task { [weak self, base] in
guard let self = self else { return }

guard !Task.isCancelled else {
self.modify {
self.continuations.forEach { $0.finish(throwing: CancellationError()) }
}
return
guard !Task.isCancelled else {
await self.sequences.forEach {
$0.finish(throwing: CancellationError())
}
return
}

do {
for try await value in base {
self.modify {
self.continuations.forEach { $0.yield(value) }
}
}
self.modify {
self.continuations.forEach { $0.finish(throwing: nil) }
}
} catch {
self.modify {
self.continuations.forEach { $0.finish(throwing: error) }
}
do {
for try await value in base {
await self.sequences.forEach { $0.yield(value) }
}

await self.sequences.forEach { $0.finish() }
} catch {
await self.sequences.forEach { $0.finish(throwing: error) }
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion AsynchroneTests/SharedAsyncSequenceTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ final class SharedAsyncSequenceTests: XCTestCase {
XCTAssertEqual(values[2], "abc")
XCTAssertEqual(values[3], "abcd")
}

let values = try await self.stream.collect()
XCTAssertEqual(values.count, 4)
XCTAssertEqual(values[0], "a")
Expand Down

0 comments on commit cf8d504

Please sign in to comment.