Skip to content

Commit

Permalink
Vapor 4 (#13)
Browse files Browse the repository at this point in the history
* update to Vapor 4

* flatmap calls

* update transform call

* update future call

* fixes

* swiftpm

* Update to vapor beta

* update tests

* circle file
  • Loading branch information
jdmcd authored Jun 11, 2019
1 parent 187a375 commit ae7094c
Show file tree
Hide file tree
Showing 13 changed files with 137 additions and 114 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
DerivedData
Package.resolved

.swiftpm
2 changes: 1 addition & 1 deletion .swift-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
5.0
5.1
4 changes: 2 additions & 2 deletions Package.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// swift-tools-version:5.0
// swift-tools-version:5.1
import PackageDescription

let package = Package(
Expand All @@ -7,7 +7,7 @@ let package = Package(
.library(name: "Jobs", targets: ["Jobs"]),
],
dependencies: [
.package(url: "https://github.com/vapor/vapor.git", from: "3.0.0")
.package(url: "https://github.com/vapor/vapor.git", from: "4.0.0-alpha.1.1")
],
targets: [
.target(name: "Jobs", dependencies: ["Vapor"]),
Expand Down
6 changes: 3 additions & 3 deletions Sources/Jobs/Job.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ public extension Job {

/// See `Job.error`
func error(_ context: JobContext, _ error: Error, _ data: Data) -> EventLoopFuture<Void> {
return context.eventLoop.future()
return context.eventLoop.makeSucceededFuture(())
}

func error(_ context: JobContext, _ error: Error, _ storage: JobStorage) -> EventLoopFuture<Void> {
do {
let data = try JSONDecoder().decode(Data.self, from: storage.data)
return self.error(context, error, data)
} catch {
return context.eventLoop.future(error: error)
return context.eventLoop.makeFailedFuture(error)
}
}

Expand All @@ -53,7 +53,7 @@ public extension Job {
let data = try JSONDecoder().decode(Data.self, from: storage.data)
return self.dequeue(context, data)
} catch {
return context.eventLoop.future(error: error)
return context.eventLoop.makeFailedFuture(error)
}
}
}
2 changes: 1 addition & 1 deletion Sources/Jobs/JobContext.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import Foundation
import Vapor

/// A simple wrapper to hold job context and services.
public struct JobContext: Service {
public struct JobContext {

/// Storage for the wrapper.
public var userInfo: [String: Any]
Expand Down
114 changes: 63 additions & 51 deletions Sources/Jobs/JobsCommand.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,28 @@ import Vapor
import NIO

/// The command to start the Queue job
public class JobsCommand: Command {
public final class JobsCommand: Command {

/// See `Command`.`arguments`
public var arguments: [CommandArgument] = []
/// See `Command.signature`
public let signature = Signature()

/// See `Command`.`options`
public var options: [CommandOption] {
return [
CommandOption.value(name: "queue")
]
/// See `Command.Signature`
public struct Signature: CommandSignature {
let queue = Option<String>(name: "queue", type: .value)
}

/// See `Command.help`
public var help: String = "Runs queued worker jobs"

/// The registered `QueueService`
public let queueService: QueueService

/// The registered `JobContext`
public let jobContext: JobContext

/// The registered `JobsConfig`
public let config: JobsConfig

private var isShuttingDown: Bool {
get {
self._lock.lock()
Expand All @@ -31,18 +41,26 @@ public class JobsCommand: Command {
private var _isShuttingDown: Bool = false
private var _lock: NSLock

/// See `Command`.`help`
public var help: [String] = ["Runs queued worker jobs"]
/// Creates a new `JobCommand`

/// Creates a new `JobCommand`
public init() {
///
/// - Parameters:
/// - queueService: The registered `QueueService`
/// - jobContext: The registered `JobContext` object
/// - config: The registered `JobsConfig` object
public init(queueService: QueueService, jobContext: JobContext, config: JobsConfig) {
_lock = NSLock()

self.queueService = queueService
self.jobContext = jobContext
self.config = config
}

/// See `Command`.`run(using:)`
public func run(using context: CommandContext) throws -> EventLoopFuture<Void> {
public func run(using context: CommandContext<JobsCommand>) throws {
context.console.info("Starting Jobs worker")

let queueName = context.option(\.queue) ?? QueueName.default.name
let elg = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
let signalQueue = DispatchQueue(label: "vapor.jobs.command.SignalHandlingQueue")

Expand All @@ -66,68 +84,62 @@ public class JobsCommand: Command {
signal(SIGINT, SIG_IGN)
intSignalSource.resume()


var shutdownPromises: [EventLoopPromise<Void>] = []
for eventLoop in elg.makeIterator()! {
let sub = context.container.subContainer(on: eventLoop)
for eventLoop in elg.makeIterator() {
let console = context.console
let queueName = context.options["queue"] ?? QueueName.default.name
let shutdownPromise: EventLoopPromise<Void> = eventLoop.newPromise()
let shutdownPromise: EventLoopPromise<Void> = eventLoop.makePromise()

shutdownPromises.append(shutdownPromise)

eventLoop.submit {
try self.setupTask(eventLoop: eventLoop,
container: sub,
queueName: queueName,
console: console,
promise: shutdownPromise)
}.catch {
}.whenFailure {
console.error("Could not boot EventLoop: \($0)")
}
}

return .andAll(shutdownPromises.map { $0.futureResult }, eventLoop: elg.next())
try EventLoopFuture.andAllComplete(shutdownPromises.map { $0.futureResult }, on: elg.next()).wait()
}

private func setupTask(eventLoop: EventLoop,
container: SubContainer,
queueName: String,
console: Console,
promise: EventLoopPromise<Void>) throws
{
private func setupTask(
eventLoop: EventLoop,
queueName: String,
console: ConsoleKit.Console,
promise: EventLoopPromise<Void>
) throws {
let queue = QueueName(name: queueName)
let queueService = try container.make(QueueService.self)
let jobContext = try container.make(JobContext.self)
let key = queue.makeKey(with: queueService.persistenceKey)
let config = try container.make(JobsConfig.self)

_ = eventLoop.scheduleRepeatedTask(initialDelay: .seconds(0), delay: queueService.refreshInterval) { task -> EventLoopFuture<Void> in
_ = eventLoop.scheduleRepeatedAsyncTask(initialDelay: .seconds(0), delay: queueService.refreshInterval) { task -> EventLoopFuture<Void> in
//Check if shutting down

if self.isShuttingDown {
task.cancel()
promise.succeed()
promise.succeed(())
}
return queueService.persistenceLayer.get(key: key).flatMap { jobStorage in

return self.queueService.persistenceLayer.get(key: key).flatMap { jobStorage in
//No job found, go to the next iteration
guard let jobStorage = jobStorage else { return eventLoop.future() }
guard let job = config.make(for: jobStorage.jobName) else {
return eventLoop.future(error: Abort(.internalServerError, reason: "Please register \(jobStorage.jobName)"))
guard let jobStorage = jobStorage else { return eventLoop.makeSucceededFuture(()) }
guard let job = self.config.make(for: jobStorage.jobName) else {
return eventLoop.makeFailedFuture(Abort(.internalServerError, reason: "Please register \(jobStorage.jobName)"))
}

console.info("Dequeing Job job_id=[\(jobStorage.id)]", newLine: true)
let jobRunPromise = eventLoop.newPromise(Void.self)
let futureJob = job.anyDequeue(jobContext, jobStorage)
self.firstFutureToSucceed(future: futureJob, tries: jobStorage.maxRetryCount, on: eventLoop).catchFlatMap { error in

let jobRunPromise = eventLoop.makePromise(of: Void.self)

let futureJob = job.anyDequeue(self.jobContext, jobStorage)
self.firstFutureToSucceed(future: futureJob, tries: jobStorage.maxRetryCount, on: eventLoop).flatMapError { error in
console.error("Error: \(error) job_id=[\(jobStorage.id)]", newLine: true)
return job.error(jobContext, error, jobStorage)
}.always {
queueService.persistenceLayer.completed(key: key, jobStorage: jobStorage).cascade(promise: jobRunPromise)
return job.error(self.jobContext, error, jobStorage)
}.whenComplete { _ in
self.queueService.persistenceLayer.completed(key: key, jobStorage: jobStorage).cascade(to: jobRunPromise)
}

return jobRunPromise.futureResult
}
}
Expand All @@ -140,12 +152,12 @@ public class JobsCommand: Command {
/// - tries: The number of tries to execute this future before returning a failure
/// - worker: An `EventLoopGroup` that can be used to generate future values
/// - Returns: The completed future, with or without an error
private func firstFutureToSucceed<T>(future: Future<T>, tries: Int, on worker: EventLoopGroup) -> Future<T> {
private func firstFutureToSucceed<T>(future: EventLoopFuture<T>, tries: Int, on worker: EventLoopGroup) -> EventLoopFuture<T> {
return future.map { complete in
return complete
}.catchFlatMap { error in
}.flatMapError { error in
if tries == 0 {
return worker.future(error: error)
return worker.next().makeFailedFuture(error)
} else {
return self.firstFutureToSucceed(future: future, tries: tries - 1, on: worker)
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/Jobs/JobsConfig.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import Foundation
import Vapor

/// A `Service` to configure `Job`s
public struct JobsConfig: Service {
public struct JobsConfig {

/// Type storage
internal var storage: [String: AnyJob]
Expand Down
2 changes: 1 addition & 1 deletion Sources/Jobs/JobsPersistenceLayer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import NIO
import Vapor

/// A type that can store and retrieve jobs from a persistence layer
public protocol JobsPersistenceLayer: Service {
public protocol JobsPersistenceLayer {

/// The event loop to be run on
var eventLoop: EventLoop { get }
Expand Down
32 changes: 14 additions & 18 deletions Sources/Jobs/JobsProvider.swift
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
import Foundation
import Vapor
import NIO

/// A provider used to setup the `Jobs` package
public struct JobsProvider: Provider {

/// See `Provider`.`register(_ services:)`
public func register(_ s: inout Services) {
s.register(
QueueService.self
) { container -> QueueService in
let persistenceLayer = try container.make(JobsPersistenceLayer.self)

return QueueService(refreshInterval: self.refreshInterval,
persistenceLayer: persistenceLayer,
persistenceKey: self.persistenceKey)
}
}

/// The amount of time the queue should wait in between each completed job
let refreshInterval: TimeAmount

Expand All @@ -22,22 +36,4 @@ public struct JobsProvider: Provider {
self.refreshInterval = refreshInterval
self.persistenceKey = persistenceKey
}


/// See `Provider`.`register(_ services:)`
public func register(_ services: inout Services) throws {
services.register { container -> QueueService in
let persistenceLayer = try container.make(JobsPersistenceLayer.self)

return QueueService(refreshInterval: self.refreshInterval,
persistenceLayer: persistenceLayer,
persistenceKey: self.persistenceKey)
}
}


/// See `Provider`.`didBoot(_ container:)`
public func didBoot(_ container: Container) throws -> EventLoopFuture<Void> {
return container.future()
}
}
5 changes: 3 additions & 2 deletions Sources/Jobs/QueueService.swift
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import Foundation
import Vapor
import NIO

/// A `Service` used to dispatch `Jobs`
public struct QueueService: Service {
public struct QueueService {

/// See `JobsProvider`.`refreshInterval`
let refreshInterval: TimeAmount
Expand All @@ -28,6 +29,6 @@ public struct QueueService: Service {
id: UUID().uuidString,
jobName: J.jobName)

return persistenceLayer.set(key: queue.makeKey(with: persistenceKey), jobStorage: jobStorage).transform(to: ())
return persistenceLayer.set(key: queue.makeKey(with: persistenceKey), jobStorage: jobStorage).map({})
}
}
4 changes: 2 additions & 2 deletions Tests/JobsTests/Mocks/JobMock.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ struct JobDataOtherMock: JobData {}

struct JobMock<T: JobData>: Job {
func dequeue(_ context: JobContext, _ data: T) -> EventLoopFuture<Void> {
return context.eventLoop.future()
return context.eventLoop.makeSucceededFuture(())
}

func error(_ context: JobContext, _ error: Error, _ data: T) -> EventLoopFuture<Void> {
return context.eventLoop.future()
return context.eventLoop.makeSucceededFuture(())
}
}
30 changes: 21 additions & 9 deletions Tests/JobsTests/XCTestManifests.swift
Original file line number Diff line number Diff line change
@@ -1,37 +1,49 @@
#if !canImport(ObjectiveC)
import XCTest

extension JobStorageTests {
static let __allTests = [
// DO NOT MODIFY: This is autogenerated, use:
// `swift test --generate-linuxmain`
// to regenerate.
static let __allTests__JobStorageTests = [
("testStringRepresentationIsValidJSON", testStringRepresentationIsValidJSON),
]
}

extension JobsConfigTests {
static let __allTests = [
// DO NOT MODIFY: This is autogenerated, use:
// `swift test --generate-linuxmain`
// to regenerate.
static let __allTests__JobsConfigTests = [
("testAddingAlreadyRegistratedJobsAreIgnored", testAddingAlreadyRegistratedJobsAreIgnored),
("testAddingJobs", testAddingJobs),
]
}

extension JobsTests {
static let __allTests = [
// DO NOT MODIFY: This is autogenerated, use:
// `swift test --generate-linuxmain`
// to regenerate.
static let __allTests__JobsTests = [
("testStub", testStub),
]
}

extension QueueNameTests {
static let __allTests = [
// DO NOT MODIFY: This is autogenerated, use:
// `swift test --generate-linuxmain`
// to regenerate.
static let __allTests__QueueNameTests = [
("testKeyIsGeneratedCorrectly", testKeyIsGeneratedCorrectly),
]
}

#if !os(macOS)
public func __allTests() -> [XCTestCaseEntry] {
return [
testCase(JobStorageTests.__allTests),
testCase(JobsConfigTests.__allTests),
testCase(JobsTests.__allTests),
testCase(QueueNameTests.__allTests),
testCase(JobStorageTests.__allTests__JobStorageTests),
testCase(JobsConfigTests.__allTests__JobsConfigTests),
testCase(JobsTests.__allTests__JobsTests),
testCase(QueueNameTests.__allTests__QueueNameTests),
]
}
#endif
Loading

0 comments on commit ae7094c

Please sign in to comment.