diff --git a/app/models/concerns/good_job/row_lockable.rb b/app/models/concerns/good_job/row_lockable.rb new file mode 100644 index 00000000..a64efbd7 --- /dev/null +++ b/app/models/concerns/good_job/row_lockable.rb @@ -0,0 +1,60 @@ +# frozen_string_literal: true + +module GoodJob + # + # Adds Postgres row-locking (FOR UPDATE SKIP LOCKED) capabilities to an ActiveRecord record. + # + module RowLockable + extend ActiveSupport::Concern + + included do + scope :row_lock, (lambda do |locked_by_id:, locked_at: Time.current| + original_query = self + + jobs_table = arel_table + bind_locked_by_id = ActiveRecord::Relation::QueryAttribute.new("lock_id", locked_by_id, ActiveRecord::Type::String.new) + bind_locked_at = ActiveRecord::Relation::QueryAttribute.new("current_time", locked_at, ActiveRecord::Type::DateTime.new) + + subquery = original_query.select(:id).arel + subquery.lock(Arel.sql("FOR NO KEY UPDATE SKIP LOCKED")) + + # Get the binds from the original_query using to_sql_and_binds + _sql, original_query_binds, _preparable = connection.send(:to_sql_and_binds, original_query.arel) + + # Build the update manager + update_manager = Arel::UpdateManager.new + update_manager.table(jobs_table) + update_manager.set([ + [jobs_table[:locked_at], Arel::Nodes::BindParam.new(bind_locked_at)], + [jobs_table[:locked_by_id], Arel::Nodes::BindParam.new(bind_locked_by_id)], + ]) + update_manager.where(jobs_table[:id].in(subquery)) + update_manager.take(1) + + update_node = Arel::Nodes::UpdateStatement.new + update_node.relation = update_manager.ast.relation + update_node.values = update_manager.ast.values + update_node.wheres = update_manager.ast.wheres + + results = connection.exec_query( + Arel.sql("#{update_node.to_sql} RETURNING *"), + "Lock Next Job", + [bind_locked_at, bind_locked_by_id] + original_query_binds + ) + + results.map { |result| instantiate(result.stringify_keys) } + end) + + scope :row_locked, -> { where.not(locked_by_id: nil) } + scope :row_unlocked, -> { where(locked_by_id: nil) } + end + + def row_locked? + locked_by_id.present? + end + + def row_unlock + update!(locked_by_id: nil, locked_at: nil) + end + end +end diff --git a/app/models/good_job/job.rb b/app/models/good_job/job.rb index ad30c711..02b835f3 100644 --- a/app/models/good_job/job.rb +++ b/app/models/good_job/job.rb @@ -4,6 +4,7 @@ module GoodJob # Active Record model that represents an +ActiveJob+ job. class Job < BaseRecord include AdvisoryLockable + include RowLockable include ErrorEvents include Filterable include Reportable @@ -190,6 +191,9 @@ class Job < BaseRecord end end) + # Find jobs that don't have a matching Process record + scope :illocked, -> { left_joins(:locked_by_process).where.not(locked_by_id: nil).where(locked_by_process: { id: nil }) } + class << self # Parse a string representing a group of queues into a more readable data # structure. diff --git a/spec/app/models/concerns/good_job/row_lockable_spec.rb b/spec/app/models/concerns/good_job/row_lockable_spec.rb new file mode 100644 index 00000000..694f3979 --- /dev/null +++ b/spec/app/models/concerns/good_job/row_lockable_spec.rb @@ -0,0 +1,77 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe GoodJob::RowLockable do + before do + stub_const "TestRecord", (Class.new(GoodJob::BaseRecord) do + include GoodJob::RowLockable + include GoodJob::AdvisoryLockable + + self.table_name = "good_jobs" + end) + end + + let(:model_class) { TestRecord } + let!(:job) { model_class.create!(active_job_id: SecureRandom.uuid, queue_name: "default") } + let!(:another_job) { model_class.create!(active_job_id: SecureRandom.uuid, queue_name: "default") } + + around do |example| + RSpec.configure do |config| + config.expect_with :rspec do |c| + original_max_formatted_output_length = c.instance_variable_get(:@max_formatted_output_length) + + c.max_formatted_output_length = 1000000 + example.run + + c.max_formatted_output_length = original_max_formatted_output_length + end + end + end + + describe '.row_lock' do + it 'returns the locked record' do + locked_by_id = SecureRandom.uuid + + locked_job = model_class.where(id: job.id).row_lock(locked_by_id: locked_by_id).first + expect(locked_job).to eq(job) + expect(locked_job.locked_by_id).to eq(locked_by_id) + expect(locked_job.locked_at).to be_present + end + + it 'returns nil if no records are locked' do + locked_job = model_class.where(id: nil).row_lock(locked_by_id: SecureRandom.uuid) + expect(locked_job).to be_empty + end + + it "respects the limit" do + locked_job = model_class.limit(2).row_lock(locked_by_id: SecureRandom.uuid) + expect(locked_job.to_a).to contain_exactly(job, another_job) + end + end + + it "generates the appropriate SQL" do + connection = model_class.connection + allow(connection).to receive(:exec_query).and_call_original + allow(model_class).to receive(:connection).and_return(connection) + + locked_by_id = SecureRandom.uuid + + model_class.where(id: job.id).order(id: :asc).row_lock(locked_by_id: locked_by_id) + + expect(connection).to have_received(:exec_query) do |sql, _name, _binds| + expect(normalize_sql(sql)).to eq normalize_sql(<<~SQL.squish) + UPDATE "good_jobs" + SET "locked_at" = $1, "locked_by_id" = $2 + WHERE "good_jobs"."id" IN ( + SELECT "good_jobs"."id" + FROM "good_jobs" + WHERE "good_jobs"."id" = $3 + ORDER BY "good_jobs"."id" ASC + FOR NO KEY UPDATE SKIP LOCKED + ) + RETURNING * + SQL + end + end +end