diff --git a/README.md b/README.md index bbbe1ed..4dc58d2 100644 --- a/README.md +++ b/README.md @@ -133,6 +133,9 @@ In this scenario, on each iteration of the worker's loop, it will look for jobs QUEUES="priority_queue,secondary_queue" bundle exec rake qc:work ``` +`rake qc:work` spawns a long running worker process waiting for jobs. You can +use `rake qc:work_off` to process all the jobs currently in the queue and exit. + #### Custom Worker This example is probably not production ready; however, it serves as an example of how to leverage the code in the Worker class to fit your non-default requirements. diff --git a/lib/queue_classic/tasks.rb b/lib/queue_classic/tasks.rb index d3aa5a2..54e10ee 100644 --- a/lib/queue_classic/tasks.rb +++ b/lib/queue_classic/tasks.rb @@ -10,20 +10,12 @@ namespace :qc do desc "Start a new worker for the (default or $QUEUE / $QUEUES) queue" task :work => :environment do - @worker = QC.default_worker_class.new - - trap('INT') do - $stderr.puts("Received INT. Shutting down.") - abort("Worker has stopped running. Exit.") unless @worker.running - @worker.stop - end - - trap('TERM') do - $stderr.puts("Received Term. Shutting down.") - @worker.stop - end + build_worker.start + end - @worker.start + desc "Work off jobs in the (default or $QUEUE / $QUEUES) queue" + task :work_off => :environment do + build_worker.work_off end desc "Returns the number of jobs in the (default or $QUEUE / $QUEUES) queue" @@ -45,4 +37,21 @@ task :update => :environment do QC::Setup.update end + + def build_worker + @worker = QC.default_worker_class.new + + trap('INT') do + $stderr.puts("Received INT. Shutting down.") + abort("Worker has stopped running. Exit.") unless @worker.running + @worker.stop + end + + trap('TERM') do + $stderr.puts("Received Term. Shutting down.") + @worker.stop + end + + @worker + end end diff --git a/lib/queue_classic/worker.rb b/lib/queue_classic/worker.rb index 9a4b639..7c2bb86 100644 --- a/lib/queue_classic/worker.rb +++ b/lib/queue_classic/worker.rb @@ -60,6 +60,18 @@ def stop @running = false end + # Processes jobs until no jobs are left. + def work_off + while result = lock_job_no_wait + queue, job = result + if queue && job + QC.log_yield(:at => "work_off", :job => job[:id]) do + process(queue, job) + end + end + end + end + # Calls Worker#work but after the current process is forked. # The parent process will wait on the child process to exit. def fork_and_work @@ -90,15 +102,21 @@ def lock_job log(:at => "lock_job") job = nil while @running - @queues.each do |queue| - if job = queue.lock - return [queue, job] - end - end + result = lock_job_no_wait + return *result if result @conn_adapter.wait(@wait_interval, *@queues.map {|q| q.name}) end end + def lock_job_no_wait + @queues.each do |queue| + if job = queue.lock + return [queue, job] + end + end + nil + end + # A job is processed by evaluating the target code. # if the job is evaluated with no exceptions # then it is deleted from the queue. diff --git a/test/helper.rb b/test/helper.rb index 63218f6..da945f8 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -15,6 +15,7 @@ require_relative '../lib/queue_classic' require "stringio" +require 'timeout' require "minitest/autorun" class QCTest < Minitest::Test diff --git a/test/worker_test.rb b/test/worker_test.rb index 8bfe731..9812430 100644 --- a/test/worker_test.rb +++ b/test/worker_test.rb @@ -178,6 +178,22 @@ def test_work_with_more_complex_construct assert_equal(0, worker.failed_count) end + def test_work_off_until_no_jobs_are_left + QC.enqueue("TestObject.no_args") + QC.enqueue("TestObject.no_args") + QC.enqueue("TestObject.no_args") + + assert_equal 3, QC.count + worker = TestWorker.new + Timeout::timeout(2) { worker.work_off } + assert_equal 0, QC.count + end + + def test_work_off_does_not_wait_for_job + worker = TestWorker.new + Timeout::timeout(1) { worker.work_off } + end + def test_init_worker_with_database_url with_database ENV['DATABASE_URL'] || ENV['QC_DATABASE_URL'] do worker = QC::Worker.new