Skip to content

Commit

Permalink
beta 2 updates (#48)
Browse files Browse the repository at this point in the history
* beta 2

* beta 2 updates

* update schedule builder

* regen linuxmain

* fix ci
  • Loading branch information
tanner0101 authored Dec 5, 2019
1 parent e7b958f commit 6999d96
Show file tree
Hide file tree
Showing 36 changed files with 824 additions and 3,684 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ jobs:
image: vapor/swift:5.1-xenial
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
- run: swift test
- uses: actions/checkout@v1
- run: swift test --enable-test-discovery
bionic:
container:
image: vapor/swift:5.1-bionic
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
- run: swift test
- uses: actions/checkout@v1
- run: swift test --enable-test-discovery
thread:
container:
image: vapor/swift:5.1-bionic
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
- run: swift test --sanitize=thread
- uses: actions/checkout@v1
- run: swift test --enable-test-discovery --sanitize=thread
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ let package = Package(
.library(name: "Jobs", targets: ["Jobs"]),
],
dependencies: [
.package(url: "https://github.com/vapor/vapor.git", from: "4.0.0-beta.1")
.package(url: "https://github.com/vapor/vapor.git", .branch("master"))
],
targets: [
.target(name: "Jobs", dependencies: ["Vapor"]),
Expand Down
17 changes: 17 additions & 0 deletions Sources/Jobs/Exports.swift
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
@_exported import struct Foundation.Date
@_exported import struct Logging.Logger
@_exported import class NIO.EventLoopFuture
@_exported import struct NIO.EventLoopPromise
@_exported import protocol NIO.EventLoop
@_exported import struct NIO.TimeAmount

import class NIO.RepeatedTask

extension RepeatedTask {
func syncCancel(on eventLoop: EventLoop) {
do {
let promise = eventLoop.makePromise(of: Void.self)
self.cancel(promise: promise)
try promise.futureResult.wait()
} catch {
print("failed cancelling repeated task \(error)")
}
}
}
76 changes: 38 additions & 38 deletions Sources/Jobs/Job.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@ import Vapor
/// A task that can be queued for future execution.
public protocol Job: AnyJob {
/// The data associated with a job
associatedtype Data: Codable
associatedtype Payload

/// Called when it's this Job's turn to be dequeued.
///
/// - Parameters:
/// - context: The JobContext. Can be used to store and retrieve services
/// - data: The data for this handler
/// - Returns: A future `Void` value used to signify completion
func dequeue(_ context: JobContext, _ data: Data) -> EventLoopFuture<Void>
func dequeue(
_ context: JobContext,
_ payload: Payload
) -> EventLoopFuture<Void>


/// Called when there is an error at any stage of the Job's execution.
Expand All @@ -22,34 +25,51 @@ public protocol Job: AnyJob {
/// - context: The JobContext. Can be used to store and retrieve services
/// - error: The error returned by the job.
/// - Returns: A future `Void` value used to signify completion
func error(_ context: JobContext, _ error: Error, _ data: Data) -> EventLoopFuture<Void>
func error(
_ context: JobContext,
_ error: Error,
_ payload: Payload
) -> EventLoopFuture<Void>

static func serializePayload(_ payload: Payload) throws -> [UInt8]
static func parsePayload(_ bytes: [UInt8]) throws -> Payload
}

public extension Job {
extension Job where Payload: Codable {
public static func serializePayload(_ payload: Payload) throws -> [UInt8] {
try .init(JSONEncoder().encode(payload))
}

public static func parsePayload(_ bytes: [UInt8]) throws -> Payload {
try JSONDecoder().decode(Payload.self, from: .init(bytes))
}
}

extension Job {
/// The jobName of the Job
static var jobName: String {
public static var name: String {
return String(describing: Self.self)
}

/// See `Job.error`
func error(_ context: JobContext, _ error: Error, _ data: Data) -> EventLoopFuture<Void> {
return context.eventLoop.makeSucceededFuture(())
public func error(
_ context: JobContext,
_ error: Error,
_ payload: Payload
) -> EventLoopFuture<Void> {
context.eventLoop.makeSucceededFuture(())
}

func error(_ context: JobContext, _ error: Error, _ storage: JobStorage) -> EventLoopFuture<Void> {
public func _error(_ context: JobContext, _ error: Error, payload: [UInt8]) -> EventLoopFuture<Void> {
do {
let data = try JSONDecoder().decode(Data.self, from: storage.data)
return self.error(context, error, data)
return try self.error(context, error, Self.parsePayload(payload))
} catch {
return context.eventLoop.makeFailedFuture(error)
}
}

/// See `AnyJob.anyDequeue`
func anyDequeue(_ context: JobContext, _ storage: JobStorage) -> EventLoopFuture<Void> {
public func _dequeue(_ context: JobContext, payload: [UInt8]) -> EventLoopFuture<Void> {
do {
let data = try JSONDecoder().decode(Data.self, from: storage.data)
return self.dequeue(context, data)
return try self.dequeue(context, Self.parsePayload(payload))
} catch {
return context.eventLoop.makeFailedFuture(error)
}
Expand All @@ -59,15 +79,15 @@ public extension Job {
/// A type-erased version of `Job`
public protocol AnyJob {
/// The name of the `Job`
static var jobName: String { get }
static var name: String { get }

/// Dequeues the `Job`
///
/// - Parameters:
/// - context: The context for the job
/// - storage: The `JobStorage` metadata object
/// - Returns: A future void, signifying completion
func anyDequeue(_ context: JobContext, _ storage: JobStorage) -> EventLoopFuture<Void>
func _dequeue(_ context: JobContext, payload: [UInt8]) -> EventLoopFuture<Void>

/// Handles errors thrown from `anyDequeue`
///
Expand All @@ -76,25 +96,5 @@ public protocol AnyJob {
/// - error: The error thrown
/// - storage: The JobStorage
/// - Returns: A future void, signifying completion
func error(_ context: JobContext, _ error: Error, _ storage: JobStorage) -> EventLoopFuture<Void>
}

// MARK: Scheduled

/// Describes a job that can be scheduled and repeated
public protocol ScheduledJob {

/// The method called when the job is run
/// - Parameter context: A `JobContext` that can be used
func run(context: JobContext) -> EventLoopFuture<Void>
}

class AnyScheduledJob {
let job: ScheduledJob
let scheduler: ScheduleBuilder

init(job: ScheduledJob, scheduler: ScheduleBuilder) {
self.job = job
self.scheduler = scheduler
}
func _error(_ context: JobContext, _ error: Error, payload: [UInt8]) -> EventLoopFuture<Void>
}
22 changes: 12 additions & 10 deletions Sources/Jobs/JobContext.swift
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import Foundation
import Vapor

/// A simple wrapper to hold job context and services.
public struct JobContext {
/// Storage for the wrapper.
public var userInfo: [AnyHashable: Any]

public let queueName: JobsQueueName
public let configuration: JobsConfiguration
public let logger: Logger
public let eventLoop: EventLoop

/// Creates an empty `JobContext`
public init(userInfo: [AnyHashable: Any] = [:], on eventLoop: EventLoop) {
public init(
queueName: JobsQueueName,
configuration: JobsConfiguration,
logger: Logger,
on eventLoop: EventLoop
) {
self.queueName = queueName
self.configuration = configuration
self.logger = logger
self.eventLoop = eventLoop
self.userInfo = [:]
}
}
32 changes: 32 additions & 0 deletions Sources/Jobs/JobData.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/// Holds information about the Job that is to be encoded to the persistence store.
public struct JobData: Codable {
/// The job data to be encoded.
public let payload: [UInt8]

/// The maxRetryCount for the `Job`.
public let maxRetryCount: Int

/// A date to execute this job after
public let delayUntil: Date?

/// The date this job was queued
public let queuedAt: Date

/// The name of the `Job`
public let jobName: String

/// Creates a new `JobStorage` holding object
public init(
payload: [UInt8],
maxRetryCount: Int,
jobName: String,
delayUntil: Date?,
queuedAt: Date
) {
self.payload = payload
self.maxRetryCount = maxRetryCount
self.jobName = jobName
self.delayUntil = delayUntil
self.queuedAt = queuedAt
}
}
13 changes: 13 additions & 0 deletions Sources/Jobs/JobIdentifier.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import struct Foundation.UUID

public struct JobIdentifier: Hashable, Equatable {
public let string: String

public init(string: String) {
self.string = string
}

public init() {
self.init(string: UUID().uuidString)
}
}
53 changes: 0 additions & 53 deletions Sources/Jobs/JobStorage.swift

This file was deleted.

Loading

0 comments on commit 6999d96

Please sign in to comment.