Skip to content

Commit

Permalink
Run until no more work is pending (#134)
Browse files Browse the repository at this point in the history
* Run until no more work is pending

* Add tests and rework API
  • Loading branch information
danpalmer authored Aug 22, 2024
1 parent 15829c5 commit d867b5b
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 13 deletions.
30 changes: 21 additions & 9 deletions Sources/Queues/QueueWorker.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,31 @@ extension Queue {
public struct QueueWorker: Sendable {
let queue: any Queue

/// Actually run the queue. This is a thin wrapper for ELF-style callers.
/// Run the queue until there is no more work to be done.
/// This is a thin wrapper for ELF-style callers.
public func run() -> EventLoopFuture<Void> {
self.queue.eventLoop.makeFutureWithTask {
try await self.run()
try await run()
}
}

/// Pop a job off the queue and try to run it. If no jobs are available, do nothing.

/// Run the queue until there is no more work to be done.
/// This is the main async entrypoint for a queue worker.
public func run() async throws {
while try await self.runOneJob() {}
}

/// Pop a job off the queue and try to run it. If no jobs are available, do
/// nothing. Returns whether a job was run.
private func runOneJob() async throws -> Bool {
var logger = self.queue.logger
logger[metadataKey: "queue"] = "\(self.queue.queueName.string)"
logger.trace("Popping job from queue")

guard let id = try await self.queue.pop().get() else {
// No job found, go around again.
return logger.trace("No pending jobs")
logger.trace("No pending jobs")
return false
}

logger[metadataKey: "job-id"] = "\(id.string)"
Expand All @@ -39,23 +48,26 @@ public struct QueueWorker: Sendable {

guard let job = self.queue.configuration.jobs[data.jobName] else {
logger.warning("No job with the desired name is registered, discarding")
return try await self.queue.clear(id).get()
try await self.queue.clear(id).get()
return false
}

// If the job has a delay that isn't up yet, requeue it.
guard (data.delayUntil ?? .distantPast) < Date() else {
logger.trace("Job is delayed, requeueing for later execution", metadata: ["delayed-until": "\(data.delayUntil ?? .distantPast)"])
return try await self.queue.push(id).get()
try await self.queue.push(id).get()
return false
}

await self.queue.sendNotification(of: "dequeue", logger: logger) {
try await $0.didDequeue(jobId: id.string, eventLoop: self.queue.eventLoop).get()
}

try await self.run(id: id, job: job, jobData: data, logger: logger)
try await self.runOneJob(id: id, job: job, jobData: data, logger: logger)
return true
}

private func run(id: JobIdentifier, job: any AnyJob, jobData: JobData, logger: Logger) async throws {
private func runOneJob(id: JobIdentifier, job: any AnyJob, jobData: JobData, logger: Logger) async throws {
logger.info("Dequeing and running job", metadata: ["attempt": "\(jobData.currentAttempt)", "retries-left": "\(jobData.remainingAttempts)"])
do {
try await job._dequeue(self.queue.context, id: id.string, payload: jobData.payload).get()
Expand Down
12 changes: 8 additions & 4 deletions Tests/QueuesTests/AsyncQueueTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ final class AsyncQueueTests: XCTestCase {

app.get("foo") { req in
try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar"))
try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "baz"))
try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "quux"))
return "done"
}

Expand All @@ -45,8 +47,8 @@ final class AsyncQueueTests: XCTestCase {
XCTAssertEqual(res.body.string, "done")
}

XCTAssertEqual(app.queues.test.queue.count, 1)
XCTAssertEqual(app.queues.test.jobs.count, 1)
XCTAssertEqual(app.queues.test.queue.count, 3)
XCTAssertEqual(app.queues.test.jobs.count, 3)
let job = app.queues.test.first(MyAsyncJob.self)
XCTAssert(app.queues.test.contains(MyAsyncJob.self))
XCTAssertNotNil(job)
Expand All @@ -67,6 +69,8 @@ final class AsyncQueueTests: XCTestCase {

app.get("foo") { req in
try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar"))
try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "baz"))
try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "quux"))
return "done"
}

Expand All @@ -75,8 +79,8 @@ final class AsyncQueueTests: XCTestCase {
XCTAssertEqual(res.body.string, "done")
}

XCTAssertEqual(app.queues.asyncTest.queue.count, 1)
XCTAssertEqual(app.queues.asyncTest.jobs.count, 1)
XCTAssertEqual(app.queues.asyncTest.queue.count, 3)
XCTAssertEqual(app.queues.asyncTest.jobs.count, 3)
let job = app.queues.asyncTest.first(MyAsyncJob.self)
XCTAssert(app.queues.asyncTest.contains(MyAsyncJob.self))
XCTAssertNotNil(job)
Expand Down
28 changes: 28 additions & 0 deletions Tests/QueuesTests/QueueTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,34 @@ final class QueueTests: XCTestCase {
await XCTAssertEqualAsync(try await promise.futureResult.get(), "bar")
}

func testRunUntilEmpty() async throws {
let promise1 = self.app.eventLoopGroup.any().makePromise(of: String.self)
self.app.queues.add(Foo1(promise: promise1))
let promise2 = self.app.eventLoopGroup.any().makePromise(of: String.self)
self.app.queues.add(Foo2(promise: promise2))

self.app.get("foo") { req in
try await req.queue.dispatch(Foo1.self, .init(foo: "bar"))
try await req.queue.dispatch(Foo1.self, .init(foo: "quux"))
try await req.queue.dispatch(Foo2.self, .init(foo: "baz"))
return "done"
}

try await self.app.testable().test(.GET, "foo") { res async in
XCTAssertEqual(res.status, .ok)
XCTAssertEqual(res.body.string, "done")
}

XCTAssertEqual(self.app.queues.test.queue.count, 3)
XCTAssertEqual(self.app.queues.test.jobs.count, 3)
try await self.app.queues.queue.worker.run()
XCTAssertEqual(self.app.queues.test.queue.count, 0)
XCTAssertEqual(self.app.queues.test.jobs.count, 0)

await XCTAssertEqualAsync(try await promise1.futureResult.get(), "quux")
await XCTAssertEqualAsync(try await promise2.futureResult.get(), "baz")
}

func testSettingCustomId() async throws {
let promise = self.app.eventLoopGroup.any().makePromise(of: String.self)
self.app.queues.add(Foo1(promise: promise))
Expand Down

0 comments on commit d867b5b

Please sign in to comment.