diff --git a/.gitignore b/.gitignore index f2beacb..08abd4f 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ DerivedData Package.resolved +.swiftpm \ No newline at end of file diff --git a/.swift-version b/.swift-version index 819e07a..8336407 100644 --- a/.swift-version +++ b/.swift-version @@ -1 +1 @@ -5.0 +5.1 \ No newline at end of file diff --git a/Package.swift b/Package.swift index 10122bf..8bf6f3d 100644 --- a/Package.swift +++ b/Package.swift @@ -1,4 +1,4 @@ -// swift-tools-version:5.0 +// swift-tools-version:5.1 import PackageDescription let package = Package( @@ -7,7 +7,7 @@ let package = Package( .library(name: "Jobs", targets: ["Jobs"]), ], dependencies: [ - .package(url: "https://github.com/vapor/vapor.git", from: "3.0.0") + .package(url: "https://github.com/vapor/vapor.git", from: "4.0.0-alpha.1.1") ], targets: [ .target(name: "Jobs", dependencies: ["Vapor"]), diff --git a/Sources/Jobs/Job.swift b/Sources/Jobs/Job.swift index e562122..02c005c 100644 --- a/Sources/Jobs/Job.swift +++ b/Sources/Jobs/Job.swift @@ -35,7 +35,7 @@ public extension Job { /// See `Job.error` func error(_ context: JobContext, _ error: Error, _ data: Data) -> EventLoopFuture { - return context.eventLoop.future() + return context.eventLoop.makeSucceededFuture(()) } func error(_ context: JobContext, _ error: Error, _ storage: JobStorage) -> EventLoopFuture { @@ -43,7 +43,7 @@ public extension Job { let data = try JSONDecoder().decode(Data.self, from: storage.data) return self.error(context, error, data) } catch { - return context.eventLoop.future(error: error) + return context.eventLoop.makeFailedFuture(error) } } @@ -53,7 +53,7 @@ public extension Job { let data = try JSONDecoder().decode(Data.self, from: storage.data) return self.dequeue(context, data) } catch { - return context.eventLoop.future(error: error) + return context.eventLoop.makeFailedFuture(error) } } } diff --git a/Sources/Jobs/JobContext.swift b/Sources/Jobs/JobContext.swift index 042fa0b..3cf1c94 100644 --- a/Sources/Jobs/JobContext.swift +++ b/Sources/Jobs/JobContext.swift @@ -2,7 +2,7 @@ import Foundation import Vapor /// A simple wrapper to hold job context and services. -public struct JobContext: Service { +public struct JobContext { /// Storage for the wrapper. public var userInfo: [String: Any] diff --git a/Sources/Jobs/JobsCommand.swift b/Sources/Jobs/JobsCommand.swift index bb3bd9d..13bd58e 100644 --- a/Sources/Jobs/JobsCommand.swift +++ b/Sources/Jobs/JobsCommand.swift @@ -3,18 +3,28 @@ import Vapor import NIO /// The command to start the Queue job -public class JobsCommand: Command { +public final class JobsCommand: Command { - /// See `Command`.`arguments` - public var arguments: [CommandArgument] = [] + /// See `Command.signature` + public let signature = Signature() - /// See `Command`.`options` - public var options: [CommandOption] { - return [ - CommandOption.value(name: "queue") - ] + /// See `Command.Signature` + public struct Signature: CommandSignature { + let queue = Option(name: "queue", type: .value) } + /// See `Command.help` + public var help: String = "Runs queued worker jobs" + + /// The registered `QueueService` + public let queueService: QueueService + + /// The registered `JobContext` + public let jobContext: JobContext + + /// The registered `JobsConfig` + public let config: JobsConfig + private var isShuttingDown: Bool { get { self._lock.lock() @@ -31,18 +41,26 @@ public class JobsCommand: Command { private var _isShuttingDown: Bool = false private var _lock: NSLock - /// See `Command`.`help` - public var help: [String] = ["Runs queued worker jobs"] + /// Creates a new `JobCommand` /// Creates a new `JobCommand` - public init() { + /// + /// - Parameters: + /// - queueService: The registered `QueueService` + /// - jobContext: The registered `JobContext` object + /// - config: The registered `JobsConfig` object + public init(queueService: QueueService, jobContext: JobContext, config: JobsConfig) { _lock = NSLock() + + self.queueService = queueService + self.jobContext = jobContext + self.config = config } /// See `Command`.`run(using:)` - public func run(using context: CommandContext) throws -> EventLoopFuture { + public func run(using context: CommandContext) throws { context.console.info("Starting Jobs worker") - + let queueName = context.option(\.queue) ?? QueueName.default.name let elg = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount) let signalQueue = DispatchQueue(label: "vapor.jobs.command.SignalHandlingQueue") @@ -66,68 +84,62 @@ public class JobsCommand: Command { signal(SIGINT, SIG_IGN) intSignalSource.resume() + var shutdownPromises: [EventLoopPromise] = [] - for eventLoop in elg.makeIterator()! { - let sub = context.container.subContainer(on: eventLoop) + for eventLoop in elg.makeIterator() { let console = context.console - let queueName = context.options["queue"] ?? QueueName.default.name - let shutdownPromise: EventLoopPromise = eventLoop.newPromise() + let shutdownPromise: EventLoopPromise = eventLoop.makePromise() shutdownPromises.append(shutdownPromise) eventLoop.submit { try self.setupTask(eventLoop: eventLoop, - container: sub, queueName: queueName, console: console, promise: shutdownPromise) - }.catch { + }.whenFailure { console.error("Could not boot EventLoop: \($0)") } } - return .andAll(shutdownPromises.map { $0.futureResult }, eventLoop: elg.next()) + try EventLoopFuture.andAllComplete(shutdownPromises.map { $0.futureResult }, on: elg.next()).wait() } - private func setupTask(eventLoop: EventLoop, - container: SubContainer, - queueName: String, - console: Console, - promise: EventLoopPromise) throws - { + private func setupTask( + eventLoop: EventLoop, + queueName: String, + console: ConsoleKit.Console, + promise: EventLoopPromise + ) throws { let queue = QueueName(name: queueName) - let queueService = try container.make(QueueService.self) - let jobContext = try container.make(JobContext.self) let key = queue.makeKey(with: queueService.persistenceKey) - let config = try container.make(JobsConfig.self) - - _ = eventLoop.scheduleRepeatedTask(initialDelay: .seconds(0), delay: queueService.refreshInterval) { task -> EventLoopFuture in + _ = eventLoop.scheduleRepeatedAsyncTask(initialDelay: .seconds(0), delay: queueService.refreshInterval) { task -> EventLoopFuture in //Check if shutting down - + if self.isShuttingDown { task.cancel() - promise.succeed() + promise.succeed(()) } - - return queueService.persistenceLayer.get(key: key).flatMap { jobStorage in + + return self.queueService.persistenceLayer.get(key: key).flatMap { jobStorage in //No job found, go to the next iteration - guard let jobStorage = jobStorage else { return eventLoop.future() } - guard let job = config.make(for: jobStorage.jobName) else { - return eventLoop.future(error: Abort(.internalServerError, reason: "Please register \(jobStorage.jobName)")) + guard let jobStorage = jobStorage else { return eventLoop.makeSucceededFuture(()) } + guard let job = self.config.make(for: jobStorage.jobName) else { + return eventLoop.makeFailedFuture(Abort(.internalServerError, reason: "Please register \(jobStorage.jobName)")) } - + console.info("Dequeing Job job_id=[\(jobStorage.id)]", newLine: true) - - let jobRunPromise = eventLoop.newPromise(Void.self) - - let futureJob = job.anyDequeue(jobContext, jobStorage) - self.firstFutureToSucceed(future: futureJob, tries: jobStorage.maxRetryCount, on: eventLoop).catchFlatMap { error in + + let jobRunPromise = eventLoop.makePromise(of: Void.self) + + let futureJob = job.anyDequeue(self.jobContext, jobStorage) + self.firstFutureToSucceed(future: futureJob, tries: jobStorage.maxRetryCount, on: eventLoop).flatMapError { error in console.error("Error: \(error) job_id=[\(jobStorage.id)]", newLine: true) - return job.error(jobContext, error, jobStorage) - }.always { - queueService.persistenceLayer.completed(key: key, jobStorage: jobStorage).cascade(promise: jobRunPromise) + return job.error(self.jobContext, error, jobStorage) + }.whenComplete { _ in + self.queueService.persistenceLayer.completed(key: key, jobStorage: jobStorage).cascade(to: jobRunPromise) } - + return jobRunPromise.futureResult } } @@ -140,12 +152,12 @@ public class JobsCommand: Command { /// - tries: The number of tries to execute this future before returning a failure /// - worker: An `EventLoopGroup` that can be used to generate future values /// - Returns: The completed future, with or without an error - private func firstFutureToSucceed(future: Future, tries: Int, on worker: EventLoopGroup) -> Future { + private func firstFutureToSucceed(future: EventLoopFuture, tries: Int, on worker: EventLoopGroup) -> EventLoopFuture { return future.map { complete in return complete - }.catchFlatMap { error in + }.flatMapError { error in if tries == 0 { - return worker.future(error: error) + return worker.next().makeFailedFuture(error) } else { return self.firstFutureToSucceed(future: future, tries: tries - 1, on: worker) } diff --git a/Sources/Jobs/JobsConfig.swift b/Sources/Jobs/JobsConfig.swift index d72c3ca..737e880 100644 --- a/Sources/Jobs/JobsConfig.swift +++ b/Sources/Jobs/JobsConfig.swift @@ -2,7 +2,7 @@ import Foundation import Vapor /// A `Service` to configure `Job`s -public struct JobsConfig: Service { +public struct JobsConfig { /// Type storage internal var storage: [String: AnyJob] diff --git a/Sources/Jobs/JobsPersistenceLayer.swift b/Sources/Jobs/JobsPersistenceLayer.swift index a61a65b..a2fd266 100644 --- a/Sources/Jobs/JobsPersistenceLayer.swift +++ b/Sources/Jobs/JobsPersistenceLayer.swift @@ -3,7 +3,7 @@ import NIO import Vapor /// A type that can store and retrieve jobs from a persistence layer -public protocol JobsPersistenceLayer: Service { +public protocol JobsPersistenceLayer { /// The event loop to be run on var eventLoop: EventLoop { get } diff --git a/Sources/Jobs/JobsProvider.swift b/Sources/Jobs/JobsProvider.swift index 6a48f78..06d41bb 100644 --- a/Sources/Jobs/JobsProvider.swift +++ b/Sources/Jobs/JobsProvider.swift @@ -1,9 +1,23 @@ import Foundation import Vapor +import NIO /// A provider used to setup the `Jobs` package public struct JobsProvider: Provider { + /// See `Provider`.`register(_ services:)` + public func register(_ s: inout Services) { + s.register( + QueueService.self + ) { container -> QueueService in + let persistenceLayer = try container.make(JobsPersistenceLayer.self) + + return QueueService(refreshInterval: self.refreshInterval, + persistenceLayer: persistenceLayer, + persistenceKey: self.persistenceKey) + } + } + /// The amount of time the queue should wait in between each completed job let refreshInterval: TimeAmount @@ -22,22 +36,4 @@ public struct JobsProvider: Provider { self.refreshInterval = refreshInterval self.persistenceKey = persistenceKey } - - - /// See `Provider`.`register(_ services:)` - public func register(_ services: inout Services) throws { - services.register { container -> QueueService in - let persistenceLayer = try container.make(JobsPersistenceLayer.self) - - return QueueService(refreshInterval: self.refreshInterval, - persistenceLayer: persistenceLayer, - persistenceKey: self.persistenceKey) - } - } - - - /// See `Provider`.`didBoot(_ container:)` - public func didBoot(_ container: Container) throws -> EventLoopFuture { - return container.future() - } } diff --git a/Sources/Jobs/QueueService.swift b/Sources/Jobs/QueueService.swift index 778cac4..dfae617 100644 --- a/Sources/Jobs/QueueService.swift +++ b/Sources/Jobs/QueueService.swift @@ -1,8 +1,9 @@ import Foundation import Vapor +import NIO /// A `Service` used to dispatch `Jobs` -public struct QueueService: Service { +public struct QueueService { /// See `JobsProvider`.`refreshInterval` let refreshInterval: TimeAmount @@ -28,6 +29,6 @@ public struct QueueService: Service { id: UUID().uuidString, jobName: J.jobName) - return persistenceLayer.set(key: queue.makeKey(with: persistenceKey), jobStorage: jobStorage).transform(to: ()) + return persistenceLayer.set(key: queue.makeKey(with: persistenceKey), jobStorage: jobStorage).map({}) } } diff --git a/Tests/JobsTests/Mocks/JobMock.swift b/Tests/JobsTests/Mocks/JobMock.swift index 199dc87..d185dea 100644 --- a/Tests/JobsTests/Mocks/JobMock.swift +++ b/Tests/JobsTests/Mocks/JobMock.swift @@ -13,10 +13,10 @@ struct JobDataOtherMock: JobData {} struct JobMock: Job { func dequeue(_ context: JobContext, _ data: T) -> EventLoopFuture { - return context.eventLoop.future() + return context.eventLoop.makeSucceededFuture(()) } func error(_ context: JobContext, _ error: Error, _ data: T) -> EventLoopFuture { - return context.eventLoop.future() + return context.eventLoop.makeSucceededFuture(()) } } diff --git a/Tests/JobsTests/XCTestManifests.swift b/Tests/JobsTests/XCTestManifests.swift index 2dd6c9f..0cf5185 100644 --- a/Tests/JobsTests/XCTestManifests.swift +++ b/Tests/JobsTests/XCTestManifests.swift @@ -1,37 +1,49 @@ +#if !canImport(ObjectiveC) import XCTest extension JobStorageTests { - static let __allTests = [ + // DO NOT MODIFY: This is autogenerated, use: + // `swift test --generate-linuxmain` + // to regenerate. + static let __allTests__JobStorageTests = [ ("testStringRepresentationIsValidJSON", testStringRepresentationIsValidJSON), ] } extension JobsConfigTests { - static let __allTests = [ + // DO NOT MODIFY: This is autogenerated, use: + // `swift test --generate-linuxmain` + // to regenerate. + static let __allTests__JobsConfigTests = [ ("testAddingAlreadyRegistratedJobsAreIgnored", testAddingAlreadyRegistratedJobsAreIgnored), ("testAddingJobs", testAddingJobs), ] } extension JobsTests { - static let __allTests = [ + // DO NOT MODIFY: This is autogenerated, use: + // `swift test --generate-linuxmain` + // to regenerate. + static let __allTests__JobsTests = [ ("testStub", testStub), ] } extension QueueNameTests { - static let __allTests = [ + // DO NOT MODIFY: This is autogenerated, use: + // `swift test --generate-linuxmain` + // to regenerate. + static let __allTests__QueueNameTests = [ ("testKeyIsGeneratedCorrectly", testKeyIsGeneratedCorrectly), ] } -#if !os(macOS) public func __allTests() -> [XCTestCaseEntry] { return [ - testCase(JobStorageTests.__allTests), - testCase(JobsConfigTests.__allTests), - testCase(JobsTests.__allTests), - testCase(QueueNameTests.__allTests), + testCase(JobStorageTests.__allTests__JobStorageTests), + testCase(JobsConfigTests.__allTests__JobsConfigTests), + testCase(JobsTests.__allTests__JobsTests), + testCase(QueueNameTests.__allTests__QueueNameTests), ] } #endif diff --git a/circle.yml b/circle.yml index 12e5aee..cee7d52 100644 --- a/circle.yml +++ b/circle.yml @@ -1,41 +1,42 @@ version: 2 jobs: - linux: - docker: - - image: vapor/swift:5.0 + macos: + macos: + xcode: "11.0.0" steps: - checkout + - run: HOMEBREW_NO_AUTO_UPDATE=1 brew install openssl@1.1 - run: swift build - run: swift test - linux-redis-driver: - docker: - - image: vapor/swift:5.0 - - image: redis:5.0 + macos-release: + macos: + xcode: "11.0.0" steps: - - run: - command: git clone -b master https://github.com/vapor-community/jobs-redis-driver.git - working_directory: ~/ - - run: - command: swift package edit jobs --revision $CIRCLE_SHA1 - working_directory: ~/jobs-redis-driver - - run: - command: swift test - working_directory: ~/jobs-redis-driver - - linux-release: + - checkout + - run: HOMEBREW_NO_AUTO_UPDATE=1 brew install openssl@1.1 + - run: swift build -c release + bionic: docker: - - image: vapor/swift:5.0 + - image: vapor/swift:5.1-bionic steps: - checkout + - run: apt-get update; apt-get install -y libssl-dev zlib1g-dev - run: swift build - run: swift test + bionic-release: + docker: + - image: vapor/swift:5.1-bionic + steps: + - checkout + - run: apt-get update; apt-get install -y libssl-dev zlib1g-dev + - run: swift build -c release workflows: version: 2 tests: jobs: - - linux - - linux-redis-driver - - linux-release - + # - macos + # - macos-release + - bionic + - bionic-release