Skip to content

Commit

Permalink
Added dependency to swift-metrics (#117)
Browse files Browse the repository at this point in the history
* Added dependency to swift-metrics

* Removed useless metric dimensions

* Add failing job test

* Address feedback

* Remove useless test file

* Update 5.8 manifest
  • Loading branch information
ptoffy authored Aug 28, 2024
1 parent d867b5b commit be4ac72
Show file tree
Hide file tree
Showing 12 changed files with 346 additions and 163 deletions.
9 changes: 6 additions & 3 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,20 @@ let package = Package(
],
products: [
.library(name: "Queues", targets: ["Queues"]),
.library(name: "XCTQueues", targets: ["XCTQueues"])
.library(name: "XCTQueues", targets: ["XCTQueues"]),
],
dependencies: [
.package(url: "https://github.com/vapor/vapor.git", from: "4.101.1"),
.package(url: "https://github.com/apple/swift-nio.git", from: "2.65.0"),
.package(url: "https://github.com/vapor/vapor.git", from: "4.104.0"),
.package(url: "https://github.com/apple/swift-nio.git", from: "2.70.0"),
.package(url: "https://github.com/apple/swift-metrics.git", from: "2.5.0"),
],
targets: [
.target(
name: "Queues",
dependencies: [
.product(name: "Vapor", package: "vapor"),
.product(name: "NIOCore", package: "swift-nio"),
.product(name: "Metrics", package: "swift-metrics"),
],
swiftSettings: swiftSettings
),
Expand All @@ -39,6 +41,7 @@ let package = Package(
.target(name: "Queues"),
.target(name: "XCTQueues"),
.product(name: "XCTVapor", package: "vapor"),
.product(name: "MetricsTestKit", package: "swift-metrics"),
],
swiftSettings: swiftSettings
),
Expand Down
9 changes: 6 additions & 3 deletions Package@swift-5.9.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,20 @@ let package = Package(
],
products: [
.library(name: "Queues", targets: ["Queues"]),
.library(name: "XCTQueues", targets: ["XCTQueues"])
.library(name: "XCTQueues", targets: ["XCTQueues"]),
],
dependencies: [
.package(url: "https://github.com/vapor/vapor.git", from: "4.101.1"),
.package(url: "https://github.com/apple/swift-nio.git", from: "2.65.0"),
.package(url: "https://github.com/vapor/vapor.git", from: "4.104.0"),
.package(url: "https://github.com/apple/swift-nio.git", from: "2.70.0"),
.package(url: "https://github.com/apple/swift-metrics.git", from: "2.5.0"),
],
targets: [
.target(
name: "Queues",
dependencies: [
.product(name: "Vapor", package: "vapor"),
.product(name: "NIOCore", package: "swift-nio"),
.product(name: "Metrics", package: "swift-metrics"),
],
swiftSettings: swiftSettings
),
Expand All @@ -39,6 +41,7 @@ let package = Package(
.target(name: "Queues"),
.target(name: "XCTQueues"),
.product(name: "XCTVapor", package: "vapor"),
.product(name: "MetricsTestKit", package: "swift-metrics"),
],
swiftSettings: swiftSettings
),
Expand Down
31 changes: 18 additions & 13 deletions Sources/Queues/AsyncQueue.swift
Original file line number Diff line number Diff line change
@@ -1,28 +1,29 @@
import Foundation
import Vapor
import Metrics
import NIOCore
import Vapor

public protocol AsyncQueue: Queue {
/// The job context
var context: QueueContext { get }

/// Gets the next job to be run
/// - Parameter id: The ID of the job
func get(_ id: JobIdentifier) async throws -> JobData

/// Sets a job that should be run in the future
/// - Parameters:
/// - id: The ID of the job
/// - data: Data for the job
func set(_ id: JobIdentifier, to data: JobData) async throws

/// Removes a job from the queue
/// - Parameter id: The ID of the job
func clear(_ id: JobIdentifier) async throws

/// Pops the next job in the queue
func pop() async throws -> JobIdentifier?

/// Pushes the next job into a queue
/// - Parameter id: The ID of the job
func push(_ id: JobIdentifier) async throws
Expand All @@ -32,19 +33,19 @@ extension AsyncQueue {
public func get(_ id: JobIdentifier) -> EventLoopFuture<JobData> {
self.context.eventLoop.makeFutureWithTask { try await self.get(id) }
}

public func set(_ id: JobIdentifier, to data: JobData) -> EventLoopFuture<Void> {
self.context.eventLoop.makeFutureWithTask { try await self.set(id, to: data) }
}

public func clear(_ id: JobIdentifier) -> EventLoopFuture<Void> {
self.context.eventLoop.makeFutureWithTask { try await self.clear(id) }
}

public func pop() -> EventLoopFuture<JobIdentifier?> {
self.context.eventLoop.makeFutureWithTask { try await self.pop() }
}

public func push(_ id: JobIdentifier) -> EventLoopFuture<Void> {
self.context.eventLoop.makeFutureWithTask { try await self.push(id) }
}
Expand All @@ -68,9 +69,9 @@ extension Queue {
logger[metadataKey: "queue"] = "\(self.queueName.string)"
logger[metadataKey: "job-id"] = "\(id.string)"
logger[metadataKey: "job-name"] = "\(J.name)"
let storage = JobData(
payload: try J.serializePayload(payload),

let storage = try JobData(
payload: J.serializePayload(payload),
maxRetryCount: maxRetryCount,
jobName: J.name,
delayUntil: delayUntil,
Expand All @@ -82,7 +83,11 @@ extension Queue {
logger.trace("Pusing job to queue")
try await self.push(id).get()
logger.info("Dispatched job")

Counter(label: "dispatched.jobs.counter", dimensions: [
("queueName", self.queueName.string),
("jobName", J.name),
]).increment()

await self.sendNotification(of: "dispatch", logger: logger) {
try await $0.dispatched(job: .init(id: id.string, queueName: self.queueName.string, jobData: storage), eventLoop: self.eventLoop).get()
}
Expand Down
30 changes: 18 additions & 12 deletions Sources/Queues/Queue.swift
Original file line number Diff line number Diff line change
@@ -1,29 +1,31 @@
import NIOCore
import Logging

import Foundation
import Logging
import Metrics
import NIOCore

/// A type that can store and retrieve jobs from a persistence layer
public protocol Queue: Sendable {
/// The job context
var context: QueueContext { get }

/// Gets the next job to be run
/// - Parameter id: The ID of the job
func get(_ id: JobIdentifier) -> EventLoopFuture<JobData>

/// Sets a job that should be run in the future
/// - Parameters:
/// - id: The ID of the job
/// - data: Data for the job
func set(_ id: JobIdentifier, to data: JobData) -> EventLoopFuture<Void>

/// Removes a job from the queue
/// - Parameter id: The ID of the job
func clear(_ id: JobIdentifier) -> EventLoopFuture<Void>

/// Pops the next job in the queue
func pop() -> EventLoopFuture<JobIdentifier?>

/// Pushes the next job into a queue
/// - Parameter id: The ID of the job
func push(_ id: JobIdentifier) -> EventLoopFuture<Void>
Expand All @@ -34,27 +36,27 @@ extension Queue {
public var eventLoop: any EventLoop {
self.context.eventLoop
}

/// A logger
public var logger: Logger {
self.context.logger
}

/// The configuration for the queue
public var configuration: QueuesConfiguration {
self.context.configuration
}

/// The queue's name
public var queueName: QueueName {
self.context.queueName
}

/// The key name of the queue
public var key: String {
self.queueName.makeKey(with: self.configuration.persistenceKey)
}

/// Dispatch a job into the queue for processing
/// - Parameters:
/// - job: The Job type
Expand Down Expand Up @@ -94,7 +96,11 @@ extension Queue {
logger.trace("Pusing job to queue")
return self.push(id)
}.flatMapWithEventLoop { _, eventLoop in
logger.info("Dispatched job")
Counter(label: "dispatched.jobs.counter", dimensions: [
("queueName", self.queueName.string),
("jobName", J.name),
]).increment()
self.logger.info("Dispatched queue job")
return self.sendNotification(of: "dispatch", logger: logger) {
$0.dispatched(job: .init(id: id.string, queueName: self.queueName.string, jobData: storage), eventLoop: eventLoop)
}
Expand Down
51 changes: 43 additions & 8 deletions Sources/Queues/QueueWorker.swift
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import NIOCore
import Logging
import Dispatch
import Foundation
import Logging
import Metrics
import NIOCore

extension Queue {
public var worker: QueueWorker {
Expand All @@ -16,7 +18,7 @@ public struct QueueWorker: Sendable {
/// This is a thin wrapper for ELF-style callers.
public func run() -> EventLoopFuture<Void> {
self.queue.eventLoop.makeFutureWithTask {
try await run()
try await self.run()
}
}

Expand All @@ -25,14 +27,14 @@ public struct QueueWorker: Sendable {
public func run() async throws {
while try await self.runOneJob() {}
}

/// Pop a job off the queue and try to run it. If no jobs are available, do
/// nothing. Returns whether a job was run.
private func runOneJob() async throws -> Bool {
var logger = self.queue.logger
logger[metadataKey: "queue"] = "\(self.queue.queueName.string)"
logger.trace("Popping job from queue")

guard let id = try await self.queue.pop().get() else {
// No job found, go around again.
logger.trace("No pending jobs")
Expand All @@ -41,7 +43,7 @@ public struct QueueWorker: Sendable {

logger[metadataKey: "job-id"] = "\(id.string)"
logger.trace("Found pending job")

let data = try await self.queue.get(id).get()
logger.trace("Received job data", metadata: ["job-data": "\(data)"])
logger[metadataKey: "job-name"] = "\(data.jobName)"
Expand All @@ -68,11 +70,13 @@ public struct QueueWorker: Sendable {
}

private func runOneJob(id: JobIdentifier, job: any AnyJob, jobData: JobData, logger: Logger) async throws {
let startTime = DispatchTime.now().uptimeNanoseconds
logger.info("Dequeing and running job", metadata: ["attempt": "\(jobData.currentAttempt)", "retries-left": "\(jobData.remainingAttempts)"])
do {
try await job._dequeue(self.queue.context, id: id.string, payload: jobData.payload).get()

logger.trace("Job ran successfully", metadata: ["attempts-made": "\(jobData.currentAttempt)"])
self.updateMetrics(for: id, startTime: startTime, queue: self.queue)
await self.queue.sendNotification(of: "success", logger: logger) {
try await $0.success(jobId: id.string, eventLoop: self.queue.context.eventLoop).get()
}
Expand All @@ -82,6 +86,7 @@ public struct QueueWorker: Sendable {
return try await self.retry(id: id, job: job, jobData: jobData, error: error, logger: logger)
} else {
logger.warning("Job failed, no retries remaining", metadata: ["error": "\(error)", "attempts-made": "\(jobData.currentAttempt)"])
self.updateMetrics(for: id, startTime: startTime, queue: self.queue, error: error)

try await job._error(self.queue.context, id: id.string, error, payload: jobData.payload).get()
await self.queue.sendNotification(of: "failure", logger: logger) {
Expand All @@ -102,12 +107,42 @@ public struct QueueWorker: Sendable {
queuedAt: .init(),
attempts: jobData.currentAttempt
)

logger.warning("Job failed, retrying", metadata: [
"retry-delay": "\(delay)", "error": "\(error)", "next-attempt": "\(updatedData.currentAttempt)", "retries-left": "\(updatedData.remainingAttempts)"
"retry-delay": "\(delay)", "error": "\(error)", "next-attempt": "\(updatedData.currentAttempt)", "retries-left": "\(updatedData.remainingAttempts)",
])
try await self.queue.clear(id).get()
try await self.queue.set(id, to: updatedData).get()
try await self.queue.push(id).get()
}

private func updateMetrics(
for id: JobIdentifier,
startTime: UInt64,
queue: any Queue,
error: (any Error)? = nil
) {
// Checks how long the job took to complete
Timer(
label: "\(id.string).jobDurationTimer",
dimensions: [
("success", error == nil ? "true" : "false"),
("id", id.string),
],
preferredDisplayUnit: .seconds
).recordNanoseconds(DispatchTime.now().uptimeNanoseconds - startTime)

// Adds the completed job to a different counter depending on its result
if error != nil {
Counter(
label: "error.completed.jobs.counter",
dimensions: [("queueName", queue.queueName.string)]
).increment()
} else {
Counter(
label: "success.completed.jobs.counter",
dimensions: [("queueName", queue.queueName.string)]
).increment()
}
}
}
Loading

0 comments on commit be4ac72

Please sign in to comment.