Skip to content

Commit

Permalink
Async Await Support via AsyncJob (#103)
Browse files Browse the repository at this point in the history
* Add async await

* Update workflow

* Update Sources/Queues/AsyncJob.swift

Co-authored-by: Tim Condon <0xTim@users.noreply.github.com>

* Use NIOCore

* Update Sources/Queues/Queue+Async.swift

Co-authored-by: Tim Condon <0xTim@users.noreply.github.com>
  • Loading branch information
jdmcd and 0xTim authored Oct 26, 2021
1 parent a0b96a5 commit 09f39d5
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 14 deletions.
32 changes: 19 additions & 13 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
name: test
on:
- pull_request
on: { pull_request: {} }

jobs:
linux:
getcidata:
runs-on: ubuntu-latest
strategy:
matrix:
image:
- swift:5.2-xenial
- swift:5.2-bionic
- swift:5.3-xenial
- swift:5.3-bionic
container: ${{ matrix.image }}
outputs:
environments: ${{ steps.output.outputs.environments }}
steps:
- uses: actions/checkout@v2
- run: swift test --enable-test-discovery --sanitize=thread
- id: output
run: |
envblob="$(curl -fsSL https://raw.githubusercontent.com/vapor/ci/main/pr-environments.json | jq -cMj '.')"
echo "::set-output name=environments::${envblob}"
test-queues:
runs-on: ubuntu-latest
container: swiftlang/swift:nightly-5.5-focal
steps:
- name: Check out Queues
uses: actions/checkout@v2
- name: Run tests with Thread Sanitizer
timeout-minutes: 30
run: swift test --enable-test-discovery --sanitize=thread
4 changes: 3 additions & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ let package = Package(
.library(name: "XCTQueues", targets: ["XCTQueues"])
],
dependencies: [
.package(url: "https://github.com/vapor/vapor.git", from: "4.0.0")
.package(url: "https://github.com/vapor/vapor.git", from: "4.0.0"),
.package(url: "https://github.com/apple/swift-nio.git", from: "2.33.0")
],
targets: [
.target(name: "Queues", dependencies: [
.product(name: "Vapor", package: "vapor"),
.product(name: "NIOCore", package: "swift-nio"),
]),
.target(name: "XCTQueues", dependencies: [
.target(name: "Queues")
Expand Down
136 changes: 136 additions & 0 deletions Sources/Queues/AsyncJob.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import Vapor
import NIOCore
import Foundation

#if compiler(>=5.5) && canImport(_Concurrency)
/// A task that can be queued for future execution.
@available(macOS 12, iOS 15, watchOS 8, tvOS 15, *)
public protocol AsyncJob: AnyAsyncJob, AnyJob {
/// The data associated with a job
associatedtype Payload

/// Called when it's this Job's turn to be dequeued.
/// - Parameters:
/// - context: The JobContext. Can be used to store and retrieve services
/// - payload: The data for this handler
func dequeue(
_ context: QueueContext,
_ payload: Payload
) async throws

/// Called when there is an error at any stage of the Job's execution.
/// - Parameters:
/// - context: The JobContext. Can be used to store and retrieve services
/// - error: The error returned by the job.
/// - payload: The typed payload for the job
func error(
_ context: QueueContext,
_ error: Error,
_ payload: Payload
) async throws

/// Called when there was an error and job will be retired.
///
/// - Parameters:
/// - attempt: Number of job attempts which failed
/// - Returns: Number of seconds for which next retry will be delayed.
/// Return `-1` if you want to retry job immediately without putting it back to the queue.
func nextRetryIn(attempt: Int) -> Int

static func serializePayload(_ payload: Payload) throws -> [UInt8]
static func parsePayload(_ bytes: [UInt8]) throws -> Payload
}

@available(macOS 12, iOS 15, watchOS 8, tvOS 15, *)
extension AsyncJob where Payload: Codable {

/// Serialize a payload into Data
/// - Parameter payload: The payload
public static func serializePayload(_ payload: Payload) throws -> [UInt8] {
try .init(JSONEncoder().encode(payload))
}

/// Parse bytes into the payload
/// - Parameter bytes: The Payload
public static func parsePayload(_ bytes: [UInt8]) throws -> Payload {
try JSONDecoder().decode(Payload.self, from: .init(bytes))
}
}

@available(macOS 12, iOS 15, watchOS 8, tvOS 15, *)
extension AsyncJob {
/// The jobName of the Job
public static var name: String {
return String(describing: Self.self)
}

/// See `Job`.`error`
public func error(
_ context: QueueContext,
_ error: Error,
_ payload: Payload
) async throws {
return
}

/// See `Job`.`nextRetryIn`
public func nextRetryIn(attempt: Int) -> Int {
return -1
}

public func _nextRetryIn(attempt: Int) -> Int {
return nextRetryIn(attempt: attempt)
}

public func _error(_ context: QueueContext, id: String, _ error: Error, payload: [UInt8]) async throws {
var contextCopy = context
contextCopy.logger[metadataKey: "job_id"] = .string(id)
return try await self.error(contextCopy, error, Self.parsePayload(payload))
}

public func _dequeue(_ context: QueueContext, id: String, payload: [UInt8]) async throws {
var contextCopy = context
contextCopy.logger[metadataKey: "job_id"] = .string(id)
return try await self.dequeue(contextCopy, Self.parsePayload(payload))
}

/// See `Job`.`error`
public func error(
_ context: QueueContext,
_ error: Error,
_ payload: Payload
) -> EventLoopFuture<Void> {
let promise = context.eventLoop.makePromise(of: Void.self)
promise.completeWithTask {
try await self.error(context, error, payload)
}
return promise.futureResult
}

public func _error(_ context: QueueContext, id: String, _ error: Error, payload: [UInt8]) -> EventLoopFuture<Void> {
let promise = context.eventLoop.makePromise(of: Void.self)
promise.completeWithTask {
try await self._error(context, id: id, error, payload: payload)
}
return promise.futureResult
}

public func _dequeue(_ context: QueueContext, id: String, payload: [UInt8]) -> EventLoopFuture<Void> {
let promise = context.eventLoop.makePromise(of: Void.self)
promise.completeWithTask {
try await self._dequeue(context, id: id, payload: payload)
}
return promise.futureResult
}
}

/// A type-erased version of `Job`
@available(macOS 12, iOS 15, watchOS 8, tvOS 15, *)
public protocol AnyAsyncJob {
/// The name of the `Job`
static var name: String { get }
func _dequeue(_ context: QueueContext, id: String, payload: [UInt8]) async throws
func _error(_ context: QueueContext, id: String, _ error: Error, payload: [UInt8]) async throws
func _nextRetryIn(attempt: Int) -> Int
}
#endif
23 changes: 23 additions & 0 deletions Sources/Queues/Queue+Async.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import Vapor
import NIOCore

#if compiler(>=5.5) && canImport(_Concurrency)
extension Queue {
/// Dispatch a job into the queue for processing
/// - Parameters:
/// - job: The Job type
/// - payload: The payload data to be dispatched
/// - maxRetryCount: Number of times to retry this job on failure
/// - delayUntil: Delay the processing of this job until a certain date
@available(macOS 12, iOS 15, watchOS 8, tvOS 15, *)
public func dispatch<J>(
_ job: J.Type,
_ payload: J.Payload,
maxRetryCount: Int = 0,
delayUntil: Date? = nil,
id: JobIdentifier = JobIdentifier()
) async throws where J: Job {
try await self.dispatch(job, payload, maxRetryCount: maxRetryCount, delayUntil: delayUntil, id: id).get()
}
}
#endif

0 comments on commit 09f39d5

Please sign in to comment.