Skip to content

Additional functionalities for building Swift concurrency code easier.

License

Notifications You must be signed in to change notification settings

Mioke/swift-concurrency-support

Repository files navigation

swift-concurrency-support

SwiftConcurrencySupport Swift Package Manager Supported Platforms: iOS, macOS, tvOS, watchOS & Linux

Currently pre-release version, continuously optimizing and adding new features.

Install

Using CocoaPods

pod 'SwiftConcurrencySupport', :git => 'https://github.com/mioke/swift-concurrency-support.git', :branch => 'master'

Using SPM

// add package to your dependencies:
.package(url: "https://github.com/Mioke/swift-concurrency-support.git", branch: "master")

// add target dependency:
.product(name: "SwiftConcurrencySupport", package: "swift-concurrency-support")

Index

Functions

Timeout functions for Task

let task = Task<String, any Error> {
  var count = 0
  while true { }
  XCTAssert(false)
  return "some"
}

do {
  _ = try await task.value(timeout: .seconds(2)) {
    print("on timeout")
  }
} catch {
  print("###", error)

  if case Task<String, any Error>.CustomError.timeout = error {
    XCTAssert(true)
  } else {
    XCTAssert(false)
  }
}

SemaphoreActor

A semaphore implemented by actor, can be used to limit the number of concurrent operations or protect data.

let semaphore = SemaphoreActor(value: 0)

Task {
  await semaphore.wait()
}

Task {
  try await Task.sleep(for: .seconds(0.05))
  await semaphore.signal()
}

AsyncMuticast : Multicaster for a concurrency context

Multicaster for a concurrency context, create an AsyncStream and a cancel token for the observer when subscribing from this multicaster.

let (stream, token) = multicaster.subscribe()

Task {
  var results = [Int]()
  for try await item in stream {
    results.append(item)
  }
  XCTAssert(results.count == 3)
  expect.fulfill()
}

Task {
  multicaster.cast(1)
  try await Task.sleep(for: Duration.seconds(2))
  multicaster.cast(2)
  try await Task.sleep(for: Duration.seconds(2))
  multicaster.cast(3)
  token.unsubscribe()
}

AsyncProperty: Similar to Property<T> in ReactiveSwift

It can subscribe its changes overtime. It also is an AsyncSequence so you can transform it with AsyncSequence's operators. It can be driven by other AsyncSequence too.

let property: AsyncProperty<Int> = .init(initialValue: 1)
property.update(2)

let driver = AsyncStream<Int>.makeStream()
property.driven(by: driver.stream)
let (stream, token) = property.subscribe()

Task {
  for i in 0...10 {
    driver.continuation.yeild(value)
  }
}

Task {
  for await changes in stream {
    print("Got \(changes)")
    break
  }
  let transformed = property.map { $0 * 2 }
  let next = await transfromed.first { $0 > 2 }
  XCTAssert(next == 4)

  token.unsubscribe()
}

AsyncThrowingSignalStream

(Please see the code detail.)

let stream: AsyncThrowingSignalStream<Int> = .init()

Task.detached {
  try await Task.sleep(for: Duration.seconds(2))
  stream.send(signal: 1)

  try await Task.sleep(for: Duration.seconds(2))
  stream.send(signal: 2)
}

Task {
  let one = try await stream.wait { $0 == 1 }
  XCTAssert(one == 1)
  let two = try await stream.wait { $0 == 2 }
  XCTAssert(two == 2)
}

TaskQueue: Run Task one by one, have task metrics when task is finished

for index in assuming {
  let task = queue.enqueueTask {
    try! await Task.sleep(for: .seconds(1))
    print("running", index)
    return index
  }
  tasks.append(task)
}

let results = try await tasks.asyncMap { try await $0.value }
XCTAssert(results == assuming)

/*
Metrics's log:
id: 6B1802A9-B417-46D7-8AFD-1DDA8EFF3570
  waitingDuration: 1.00012505054473876953
  executionDuration: 1.042160987854004
*/

Functional Programming

AsyncOperation : Wrapped async operation

Provides basic functional programming unit and flatMap, map, combine and lots of operators to manipulate the operation.

let operation1: AsyncOperation<Int> = .init {
  try await Task.sleep(for: .seconds(2))
  return 1
}
.on(
  value: { value in
    print("Get value \(value) in operation 1")
  },
  error: { error in
    print("Get error \(error) in operation 1")
  })

let operation2 =
  operation1
  .flatMap { _ in
    return .init {
      try await Task.sleep(for: .seconds(1))
      return 2
    }
  }
  .combine(operation1)
  .map(+)
  .mapError { error in
    print("Get error \(error) in operation 2")
    return .value(0)
  }
  .metrics { metrics in
    print("Metrics \(metrics)")
  }
  .retry(times: 1, interval: 5)
  .timeout(after: 10)
  

let result = try await operation2.start()
XCTAssert(result == 3)

let result1 = await operation1.startWithResult()
XCTAssert(result1.error() == nil)

AsyncOperation.combine : Combine multiple AsyncOperation into one

let combined = AsyncOperation.combine([operation1, operation2])
let result = try await combined.start()

XCTAssert(result == [1, 3])

AsyncOperation.merge : Merge multiple AsyncOperation into one AsyncStream

let mergedStream = AsyncOperation.merge([operation1, operation2])
var iterator = mergedStream.makeAsyncIterator()
var results: [Int] = []
while let value = try await iterator.next() {
  print("Got value \(value)")
  results.append(value)
}

XCTAssert(results == [1, 2])

AsyncOperationQueue : Run AsyncOperation in serial or concurrent ways, Please see the code detail

let queue = AsyncOperationQueue()

Task {
  let result = try await queue.operation {
    try await Task.sleep(for: .seconds(2))
    return 1
  }
}

Task {
  let result = try await queue.operation {
    try await Task.sleep(for: .seconds(2))
    return 2
  }
}

Other supporting features

ActorAtomic<T>: Locking wrapper implemented by actor

let protected = ActorAtomic<Bool>.init(value: true)
await protected.with { print($0) }
await protected.modify { $0 = false }

AsyncStartWithSequence: AsyncSequence that starts with values

stream.prefix(with: [1, 2, 3])

AsyncSequence eraseToStream() and eraseToThrowingStream()

Convert any AsyncSequence to AsyncStream, preventing the long generics type.

let earased = stream
  .map { ... }
  .filter { ... }
  .eraseToStream()

About

Additional functionalities for building Swift concurrency code easier.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published