Currently pre-release version, continuously optimizing and adding new features.
pod 'SwiftConcurrencySupport', :git => '', :branch => 'master'
// add package to your dependencies:
.package(url: "", branch: "master")
// add target dependency:
.product(name: "SwiftConcurrencySupport", package: "swift-concurrency-support")
let task = Task<String, any Error> {
var count = 0
while true { }
return "some"
do {
_ = try await task.value(timeout: .seconds(2)) {
print("on timeout")
} catch {
print("###", error)
if case Task<String, any Error>.CustomError.timeout = error {
} else {
A semaphore implemented by actor
, can be used to limit the number of concurrent operations or protect data.
let semaphore = SemaphoreActor(value: 0)
Task {
await semaphore.wait()
Task {
try await Task.sleep(for: .seconds(0.05))
await semaphore.signal()
Multicaster for a concurrency context, create an AsyncStream
and a cancel token for the observer when subscribing from this multicaster.
let (stream, token) = multicaster.subscribe()
Task {
var results = [Int]()
for try await item in stream {
XCTAssert(results.count == 3)
Task {
try await Task.sleep(for: Duration.seconds(2))
try await Task.sleep(for: Duration.seconds(2))
It can subscribe its changes overtime. It also is an AsyncSequence
so you can transform it with AsyncSequence
's operators. It can be driven by other AsyncSequence
let property: AsyncProperty<Int> = .init(initialValue: 1)
let driver = AsyncStream<Int>.makeStream()
let (stream, token) = property.subscribe()
Task {
for i in 0...10 {
Task {
for await changes in stream {
print("Got \(changes)")
let transformed = { $0 * 2 }
let next = await transfromed.first { $0 > 2 }
XCTAssert(next == 4)
(Please see the code detail.)
let stream: AsyncThrowingSignalStream<Int> = .init()
Task.detached {
try await Task.sleep(for: Duration.seconds(2))
stream.send(signal: 1)
try await Task.sleep(for: Duration.seconds(2))
stream.send(signal: 2)
Task {
let one = try await stream.wait { $0 == 1 }
XCTAssert(one == 1)
let two = try await stream.wait { $0 == 2 }
XCTAssert(two == 2)
for index in assuming {
let task = queue.enqueueTask {
try! await Task.sleep(for: .seconds(1))
print("running", index)
return index
let results = try await tasks.asyncMap { try await $0.value }
XCTAssert(results == assuming)
Metrics's log:
id: 6B1802A9-B417-46D7-8AFD-1DDA8EFF3570
waitingDuration: 1.00012505054473876953
executionDuration: 1.042160987854004
Provides basic functional programming unit and flatMap
, map
, combine
and lots of operators to manipulate the operation.
let operation1: AsyncOperation<Int> = .init {
try await Task.sleep(for: .seconds(2))
return 1
value: { value in
print("Get value \(value) in operation 1")
error: { error in
print("Get error \(error) in operation 1")
let operation2 =
.flatMap { _ in
return .init {
try await Task.sleep(for: .seconds(1))
return 2
.mapError { error in
print("Get error \(error) in operation 2")
return .value(0)
.metrics { metrics in
print("Metrics \(metrics)")
.retry(times: 1, interval: 5)
.timeout(after: 10)
let result = try await operation2.start()
XCTAssert(result == 3)
let result1 = await operation1.startWithResult()
XCTAssert(result1.error() == nil)
let combined = AsyncOperation.combine([operation1, operation2])
let result = try await combined.start()
XCTAssert(result == [1, 3])
let mergedStream = AsyncOperation.merge([operation1, operation2])
var iterator = mergedStream.makeAsyncIterator()
var results: [Int] = []
while let value = try await {
print("Got value \(value)")
XCTAssert(results == [1, 2])
let queue = AsyncOperationQueue()
Task {
let result = try await queue.operation {
try await Task.sleep(for: .seconds(2))
return 1
Task {
let result = try await queue.operation {
try await Task.sleep(for: .seconds(2))
return 2
let protected = ActorAtomic<Bool>.init(value: true)
await protected.with { print($0) }
await protected.modify { $0 = false }
stream.prefix(with: [1, 2, 3])
Convert any AsyncSequence
to AsyncStream
, preventing the long generics type.
let earased = stream
.map { ... }
.filter { ... }