diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 842d21d..bfc9242 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -1,17 +1,23 @@ name: test -on: - - pull_request +on: { pull_request: {} } + jobs: - linux: + getcidata: runs-on: ubuntu-latest - strategy: - matrix: - image: - - swift:5.2-xenial - - swift:5.2-bionic - - swift:5.3-xenial - - swift:5.3-bionic - container: ${{ matrix.image }} + outputs: + environments: ${{ steps.output.outputs.environments }} steps: - - uses: actions/checkout@v2 - - run: swift test --enable-test-discovery --sanitize=thread + - id: output + run: | + envblob="$(curl -fsSL https://raw.githubusercontent.com/vapor/ci/main/pr-environments.json | jq -cMj '.')" + echo "::set-output name=environments::${envblob}" + + test-queues: + runs-on: ubuntu-latest + container: swiftlang/swift:nightly-5.5-focal + steps: + - name: Check out Queues + uses: actions/checkout@v2 + - name: Run tests with Thread Sanitizer + timeout-minutes: 30 + run: swift test --enable-test-discovery --sanitize=thread \ No newline at end of file diff --git a/Package.swift b/Package.swift index f416c05..f09807a 100644 --- a/Package.swift +++ b/Package.swift @@ -11,11 +11,13 @@ let package = Package( .library(name: "XCTQueues", targets: ["XCTQueues"]) ], dependencies: [ - .package(url: "https://github.com/vapor/vapor.git", from: "4.0.0") + .package(url: "https://github.com/vapor/vapor.git", from: "4.0.0"), + .package(url: "https://github.com/apple/swift-nio.git", from: "2.33.0") ], targets: [ .target(name: "Queues", dependencies: [ .product(name: "Vapor", package: "vapor"), + .product(name: "NIOCore", package: "swift-nio"), ]), .target(name: "XCTQueues", dependencies: [ .target(name: "Queues") diff --git a/Sources/Queues/AsyncJob.swift b/Sources/Queues/AsyncJob.swift new file mode 100644 index 0000000..89fa3c2 --- /dev/null +++ b/Sources/Queues/AsyncJob.swift @@ -0,0 +1,136 @@ +import Vapor +import NIOCore +import Foundation + +#if compiler(>=5.5) && canImport(_Concurrency) +/// A task that can be queued for future execution. +@available(macOS 12, iOS 15, watchOS 8, tvOS 15, *) +public protocol AsyncJob: AnyAsyncJob, AnyJob { + /// The data associated with a job + associatedtype Payload + + /// Called when it's this Job's turn to be dequeued. + /// - Parameters: + /// - context: The JobContext. Can be used to store and retrieve services + /// - payload: The data for this handler + func dequeue( + _ context: QueueContext, + _ payload: Payload + ) async throws + + /// Called when there is an error at any stage of the Job's execution. + /// - Parameters: + /// - context: The JobContext. Can be used to store and retrieve services + /// - error: The error returned by the job. + /// - payload: The typed payload for the job + func error( + _ context: QueueContext, + _ error: Error, + _ payload: Payload + ) async throws + + /// Called when there was an error and job will be retired. + /// + /// - Parameters: + /// - attempt: Number of job attempts which failed + /// - Returns: Number of seconds for which next retry will be delayed. + /// Return `-1` if you want to retry job immediately without putting it back to the queue. + func nextRetryIn(attempt: Int) -> Int + + static func serializePayload(_ payload: Payload) throws -> [UInt8] + static func parsePayload(_ bytes: [UInt8]) throws -> Payload +} + +@available(macOS 12, iOS 15, watchOS 8, tvOS 15, *) +extension AsyncJob where Payload: Codable { + + /// Serialize a payload into Data + /// - Parameter payload: The payload + public static func serializePayload(_ payload: Payload) throws -> [UInt8] { + try .init(JSONEncoder().encode(payload)) + } + + /// Parse bytes into the payload + /// - Parameter bytes: The Payload + public static func parsePayload(_ bytes: [UInt8]) throws -> Payload { + try JSONDecoder().decode(Payload.self, from: .init(bytes)) + } +} + +@available(macOS 12, iOS 15, watchOS 8, tvOS 15, *) +extension AsyncJob { + /// The jobName of the Job + public static var name: String { + return String(describing: Self.self) + } + + /// See `Job`.`error` + public func error( + _ context: QueueContext, + _ error: Error, + _ payload: Payload + ) async throws { + return + } + + /// See `Job`.`nextRetryIn` + public func nextRetryIn(attempt: Int) -> Int { + return -1 + } + + public func _nextRetryIn(attempt: Int) -> Int { + return nextRetryIn(attempt: attempt) + } + + public func _error(_ context: QueueContext, id: String, _ error: Error, payload: [UInt8]) async throws { + var contextCopy = context + contextCopy.logger[metadataKey: "job_id"] = .string(id) + return try await self.error(contextCopy, error, Self.parsePayload(payload)) + } + + public func _dequeue(_ context: QueueContext, id: String, payload: [UInt8]) async throws { + var contextCopy = context + contextCopy.logger[metadataKey: "job_id"] = .string(id) + return try await self.dequeue(contextCopy, Self.parsePayload(payload)) + } + + /// See `Job`.`error` + public func error( + _ context: QueueContext, + _ error: Error, + _ payload: Payload + ) -> EventLoopFuture { + let promise = context.eventLoop.makePromise(of: Void.self) + promise.completeWithTask { + try await self.error(context, error, payload) + } + return promise.futureResult + } + + public func _error(_ context: QueueContext, id: String, _ error: Error, payload: [UInt8]) -> EventLoopFuture { + let promise = context.eventLoop.makePromise(of: Void.self) + promise.completeWithTask { + try await self._error(context, id: id, error, payload: payload) + } + return promise.futureResult + } + + public func _dequeue(_ context: QueueContext, id: String, payload: [UInt8]) -> EventLoopFuture { + let promise = context.eventLoop.makePromise(of: Void.self) + promise.completeWithTask { + try await self._dequeue(context, id: id, payload: payload) + } + return promise.futureResult + } +} + +/// A type-erased version of `Job` +@available(macOS 12, iOS 15, watchOS 8, tvOS 15, *) +public protocol AnyAsyncJob { + /// The name of the `Job` + static var name: String { get } + func _dequeue(_ context: QueueContext, id: String, payload: [UInt8]) async throws + func _error(_ context: QueueContext, id: String, _ error: Error, payload: [UInt8]) async throws + func _nextRetryIn(attempt: Int) -> Int +} +#endif diff --git a/Sources/Queues/Queue+Async.swift b/Sources/Queues/Queue+Async.swift new file mode 100644 index 0000000..8d3a4f9 --- /dev/null +++ b/Sources/Queues/Queue+Async.swift @@ -0,0 +1,23 @@ +import Vapor +import NIOCore + +#if compiler(>=5.5) && canImport(_Concurrency) +extension Queue { + /// Dispatch a job into the queue for processing + /// - Parameters: + /// - job: The Job type + /// - payload: The payload data to be dispatched + /// - maxRetryCount: Number of times to retry this job on failure + /// - delayUntil: Delay the processing of this job until a certain date + @available(macOS 12, iOS 15, watchOS 8, tvOS 15, *) + public func dispatch( + _ job: J.Type, + _ payload: J.Payload, + maxRetryCount: Int = 0, + delayUntil: Date? = nil, + id: JobIdentifier = JobIdentifier() + ) async throws where J: Job { + try await self.dispatch(job, payload, maxRetryCount: maxRetryCount, delayUntil: delayUntil, id: id).get() + } +} +#endif