Skip to content

Commit

Permalink
Merge branch 'feat/DEX-2703/add-task-for-update-deleted-items' into '…
Browse files Browse the repository at this point in the history
…master'

[DEX-2703] feat: add task for update deleted items

Closes DEX-2703

See merge request nstmrt/rubygems/outbox!113
  • Loading branch information
Arlantir committed Jan 23, 2025
2 parents 4868db8 + 5baaac2 commit 2c7fdf3
Show file tree
Hide file tree
Showing 8 changed files with 265 additions and 1 deletion.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Fixed

## [6.15.0] - 2025-01-23

### Added

- Add rake tasks:
- rake outbox:delete_items
- rake outbox:update_status_items


## [6.14.0] - 2025-01-20

### Added
Expand Down
36 changes: 36 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,42 @@ outbox_items:
partition_strategy: hash
```

## Rake tasks

```shell
rake outbox:delete_items
rake outbox:update_status_items
```

Example run:
```shell
rake outbox:delete_items[OutboxItem,1] # Mandatory parameters box class and status
rake outbox:update_status_items[OutboxItem,0,3] # Mandatory parameters box class, current status and new status
```

Both tasks have optional parameters:
```ruby
- start_time # boxes are younger than the specified time, by default nil, time is specified in the format "2025-01-05T23:59:59"
- end_time # boxes are older than the specified time, by default 6.hours.ago, time is specified in the format "2025-01-05T23:59:59"
- batch_size # batch size, by default 1_000
- sleep_time # sleep time between batches, by default 0.5
```

Example with optional parameters:
- format optional parameters:
```shell
rake outbox:delete_items[klass_name,status,start_time,end_time,batch_size,sleep_time]
rake outbox:update_status_items[klass_name,status,new_status,start_time,end_time,batch_size,sleep_time]
```
- example:
```shell
rake outbox:delete_items[OutboxItem,1,"2025-01-05T23:59:59","2025-01-05T00:00:00",10_000,5]
rake outbox:update_status_items[OutboxItem,0,3,"2025-01-05T23:59:59","2025-01-05T00:00:00",10_000,5]
```

## Concurrency

The worker process consists of a poller and a processor, each of which has its own thread pool.
Expand Down
2 changes: 2 additions & 0 deletions lib/sbmt/outbox/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class Engine < Rails::Engine
rake_tasks do
load "sbmt/outbox/tasks/retry_failed_items.rake"
load "sbmt/outbox/tasks/delete_failed_items.rake"
load "sbmt/outbox/tasks/delete_items.rake"
load "sbmt/outbox/tasks/update_status_items.rake"
end
end
end
Expand Down
43 changes: 43 additions & 0 deletions lib/sbmt/outbox/tasks/delete_items.rake
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# frozen_string_literal: true

namespace :outbox do
desc "Delete outbox/inbox items"
task :delete_items, [:klass_name, :status, :start_time, :end_time, :batch_size, :sleep_time] => :environment do |_, args|
args.with_defaults(start_time: nil, end_time: 6.hours.ago, batch_size: 1000, sleep_time: 0.5)

klass_name = args[:klass_name]
status = args[:status]
start_time = args[:start_time]
end_time = args[:end_time]
batch_size = args[:batch_size]
sleep_time = args[:sleep_time]

unless klass_name && status
raise "Error: Class and status must be specified. Example: rake outbox:delete_items[OutboxItem,1]"
end

klass_name = klass_name.constantize
query = klass_name.where(status: status)

if start_time && end_time
query = query.where(created_at: start_time..end_time)
elsif start_time
query = query.where(created_at: start_time..)
elsif end_time
query = query.where(created_at: ..end_time)
end

total_deleted = 0
query.in_batches(of: batch_size) do |batch|
deleted_count = batch.delete_all

Rails.logger.info("Batch items deleted: #{deleted_count}")

total_deleted += deleted_count

sleep sleep_time
end

Rails.logger.info("Total items deleted: #{total_deleted}")
end
end
43 changes: 43 additions & 0 deletions lib/sbmt/outbox/tasks/update_status_items.rake
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# frozen_string_literal: true

namespace :outbox do
desc "Update status of outbox/inbox items"
task :update_status_items, [:klass_name, :status, :new_status, :start_time, :end_time, :batch_size, :sleep_time] => :environment do |_, args|
args.with_defaults(start_time: nil, end_time: 6.hours.ago, batch_size: 1000, sleep_time: 0.5)

klass_name = args[:klass_name]
status = args[:status]
new_status = args[:new_status]
start_time = args[:start_time]
end_time = args[:end_time]
batch_size = args[:batch_size]
sleep_time = args[:sleep_time]

unless klass_name && status && new_status
raise "Error: Class, current status, and new status must be specified. Example: rake outbox:update_status_items[OutboxItem,0,3]"
end

klass_name = klass_name.constantize
query = klass_name.where(status: status)

if start_time && end_time
query = query.where(created_at: start_time..end_time)
elsif start_time
query = query.where(created_at: start_time..)
elsif end_time
query = query.where(created_at: ..end_time)
end

total_updated = 0
query.in_batches(of: batch_size) do |batch|
updated_count = batch.update_all(status: new_status)

Rails.logger.info("Batch items updated: #{updated_count}")

total_updated += updated_count
sleep sleep_time
end

Rails.logger.info("Total items updated: #{total_updated}")
end
end
2 changes: 1 addition & 1 deletion lib/sbmt/outbox/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module Sbmt
module Outbox
VERSION = "6.14.0"
VERSION = "6.15.0"
end
end
60 changes: 60 additions & 0 deletions spec/lib/sbmt/outbox/tasks/delete_items_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# frozen_string_literal: true

describe "rake outbox:delete_items" do
subject(:task) { Rake::Task["outbox:delete_items"] }

let(:klass) { "OutboxItem" }
let(:status) { 1 }

let(:created_at_a) { 6.hours.ago }
let(:created_at_b) { 8.hours.ago }
let(:created_at_c) { 4.hours.ago }

let!(:outbox_item_a) { create(:outbox_item, status: :failed, errors_count: 1, created_at: created_at_a) }
let!(:outbox_item_b) { create(:outbox_item, status: :failed, errors_count: 1, created_at: created_at_b) }
let!(:outbox_item_c) { create(:outbox_item, status: :delivered, errors_count: 0, created_at: created_at_c) }

before do
task.reenable
allow(Rails.logger).to receive(:info)
end

context "when filtering records by status" do
let(:created_at_a) { Time.zone.now }
let(:created_at_b) { 6.hours.ago }
let(:created_at_c) { Time.zone.now }

it "deletes records matching the given status" do
expect {
task.invoke(klass, status)
}.to change(OutboxItem, :count).by(-1)

expect(Rails.logger).to have_received(:info).with(/Batch items deleted: 1/)
expect(Rails.logger).to have_received(:info).with(/Total items deleted: 1/)
end
end

context "when filtering records by time range" do
let(:start_time) { 7.hours.ago }
let(:end_time) { 5.hours.ago }

it "deletes records within the specified time range" do
expect {
task.invoke(klass, status, start_time, end_time)
}.to change(OutboxItem, :count).by(-1)

expect(Rails.logger).to have_received(:info).with(/Batch items deleted: 1/)
expect(Rails.logger).to have_received(:info).with(/Total items deleted: 1/)
end
end

context "when required parameters are missing" do
it "raises an error" do
expect {
task.invoke(nil, status)
}.to raise_error("Error: Class and status must be specified. Example: rake outbox:delete_items[OutboxItem,1]")

expect(Rails.logger).not_to have_received(:info)
end
end
end
71 changes: 71 additions & 0 deletions spec/lib/sbmt/outbox/tasks/update_status_items_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# frozen_string_literal: true

describe "rake outbox:update_status_items" do
subject(:task) { Rake::Task["outbox:update_status_items"] }

let(:klass) { "OutboxItem" }
let(:status) { 1 }
let(:new_status) { 3 }

let(:created_at_a) { 6.hours.ago }
let(:created_at_b) { 8.hours.ago }
let(:created_at_c) { 4.hours.ago }

let!(:outbox_item_a) { create(:outbox_item, status: :failed, errors_count: 1, created_at: created_at_a) }
let!(:outbox_item_b) { create(:outbox_item, status: :failed, errors_count: 1, created_at: created_at_b) }
let!(:outbox_item_c) { create(:outbox_item, status: :delivered, errors_count: 0, created_at: created_at_c) }

before do
task.reenable
allow(Rails.logger).to receive(:info)
end

context "when filtering records by status" do
let(:created_at_a) { Time.zone.now }
let(:created_at_b) { 6.hours.ago }
let(:created_at_c) { Time.zone.now }

it "updates records matching the given status" do
expect {
task.invoke(klass, status, new_status)
outbox_item_a.reload
outbox_item_b.reload
outbox_item_c.reload
}.to change(outbox_item_b, :status).from("failed").to("discarded")
.and not_change { outbox_item_a.status }
.and not_change { outbox_item_c.status }

expect(Rails.logger).to have_received(:info).with(/Batch items updated: 1/)
expect(Rails.logger).to have_received(:info).with(/Total items updated: 1/)
end
end

context "when filtering records by time range" do
let(:start_time) { 7.hours.ago }
let(:end_time) { 5.hours.ago }

it "updates records within the specified time range" do
expect {
task.invoke(klass, status, new_status, start_time, end_time)
outbox_item_a.reload
outbox_item_b.reload
outbox_item_c.reload
}.to change(outbox_item_a, :status).from("failed").to("discarded")
.and not_change { outbox_item_b.status }
.and not_change { outbox_item_c.status }

expect(Rails.logger).to have_received(:info).with(/Batch items updated: 1/)
expect(Rails.logger).to have_received(:info).with(/Total items updated: 1/)
end
end

context "when required parameters are missing" do
it "raises an error" do
expect {
task.invoke(nil, status, new_status)
}.to raise_error("Error: Class, current status, and new status must be specified. Example: rake outbox:update_status_items[OutboxItem,0,3]")

expect(Rails.logger).not_to have_received(:info)
end
end
end

0 comments on commit 2c7fdf3

Please sign in to comment.