Skip to content

Commit

Permalink
Merge branch 'feat/DEX-2722/reduce-retention-time-successful-outbox-i…
Browse files Browse the repository at this point in the history
…tems' into 'master'

[DEX-2722] feat: reduce retention time successful outbox/inbox items

Closes DEX-2722

See merge request nstmrt/rubygems/outbox!109
  • Loading branch information
Arlantir committed Jan 15, 2025
2 parents e5c1d01 + dc4b4d1 commit 15bb401
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 30 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Fixed

## [6.12.0] - 2025-01-10

### Added

- Random seconds have been added to the item removal tasks to prevent tasks from running simultaneously
- Add option `retention_delivered_items` to remove successful items. Default equals option `retention`

### Changed

- `retention` removes items with statuses: `failed` and `discarded`

## [6.11.0] - 2024-12-23

### Added
Expand Down
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ default: &default
outbox_items: # outbox items section
my_outbox_item: # underscored model class name
owner: my_outbox_item_team # optional, used in Yabeda metrics
retention: P1W # retention period, https://en.wikipedia.org/wiki/ISO_8601#Durations
retention: P1W # for statuses: failed and discarded, retention period, https://en.wikipedia.org/wiki/ISO_8601#Durations
retention_delivered_items: PT6H # for statuses: delivered, retention period for delivered items, https://en.wikipedia.org/wiki/ISO_8601#Durations
max_retries: 3 # default 0, the number of retries before the item will be marked as failed
strict_order: false # optional, default
transports: # transports section
Expand Down Expand Up @@ -342,7 +343,8 @@ end
inbox_items: # inbox items section
my_inbox_item: # underscored model class name
owner: my_inbox_item_team # optional, used in Yabeda metrics
retention: P1W # retention period, https://en.wikipedia.org/wiki/ISO_8601#Durations
retention: P1W # for statuses: failed and discarded, retention period, https://en.wikipedia.org/wiki/ISO_8601#Durations
retention_delivered_items: PT6H # for statuses: delivered, retention period for delivered items, https://en.wikipedia.org/wiki/ISO_8601#Durations
max_retries: 3 # default 0, the number of retries before the item will be marked as failed
transports: # transports section
import_order: # underscored transport class name
Expand Down
63 changes: 43 additions & 20 deletions app/jobs/sbmt/outbox/base_delete_stale_items_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ class BaseDeleteStaleItemsJob < Outbox.active_job_base_class
class << self
def enqueue
item_classes.each do |item_class|
perform_later(item_class.to_s)
delay = rand(15).seconds
set(wait: delay).perform_later(item_class.to_s)
end
end

Expand Down Expand Up @@ -41,12 +42,13 @@ def perform(item_class_name)

lock_manager.lock("#{self.class.name}:#{item_class_name}:lock", LOCK_TTL) do |locked|
if locked
duration = item_class.config.retention
duration_failed = item_class.config.retention
duration_delivered = item_class.config.retention_delivered_items

validate_retention!(duration)
validate_retention!(duration_failed)

logger.with_tags(box_type: box_type, box_name: box_name) do
delete_stale_items(Time.current - duration)
delete_stale_items(Time.current - duration_failed, Time.current - duration_delivered)
end
else
logger.log_info("Failed to acquire lock #{self.class.name}:#{item_class_name}")
Expand All @@ -58,25 +60,25 @@ def perform(item_class_name)

private

def validate_retention!(duration)
return if duration >= MIN_RETENTION_PERIOD
def validate_retention!(duration_failed)
return if duration_failed >= MIN_RETENTION_PERIOD

raise "Retention period for #{box_name} must be longer than #{MIN_RETENTION_PERIOD.inspect}"
end

def delete_stale_items(waterline)
logger.log_info("Start deleting #{box_type} items for #{box_name} older than #{waterline}")
def delete_stale_items(waterline_failed, waterline_delivered)
logger.log_info("Start deleting #{box_type} items for #{box_name} older than: failed and discarded items #{waterline_failed} and delivered items #{waterline_delivered}")

case database_type
when :postgresql
postgres_delete_in_batches(waterline)
postgres_delete_in_batches(waterline_failed, waterline_delivered)
when :mysql
mysql_delete_in_batches(waterline)
mysql_delete_in_batches(waterline_failed, waterline_delivered)
else
raise "Unsupported database type"
end

logger.log_info("Successfully deleted #{box_type} items for #{box_name} older than #{waterline}")
logger.log_info("Successfully deleted #{box_type} items for #{box_name} older than: failed and discarded items #{waterline_failed} and delivered items #{waterline_delivered}")
end

# Deletes stale items from PostgreSQL database in batches
Expand All @@ -90,12 +92,22 @@ def delete_stale_items(waterline)
# WHERE "items"."id" IN (
# SELECT "items"."id"
# FROM "items"
# WHERE "items"."created_at" < '2023-05-01 00:00:00'
# WHERE (
# "items"."status" = 1 AND "items"."created_at" < '2023-05-01 00:00:00'
# )
# LIMIT 1000
# )
def postgres_delete_in_batches(waterline)
def postgres_delete_in_batches(waterline_failed, waterline_delivered)
table = item_class.arel_table
condition = table[:created_at].lt(waterline)

status_delivered = item_class.statuses[:delivered]
status_failed_discarded = item_class.statuses.values_at(:failed, :discarded)

delete_items_in_batches(table, table[:status].eq(status_delivered).and(table[:created_at].lt(waterline_delivered)))
delete_items_in_batches(table, table[:status].in(status_failed_discarded).and(table[:created_at].lt(waterline_failed)))
end

def delete_items_in_batches(table, condition)
subquery = table
.project(table[:id])
.where(condition)
Expand Down Expand Up @@ -129,14 +141,25 @@ def postgres_delete_in_batches(waterline)
#
# Example SQL generated for deletion:
# DELETE FROM `items`
# WHERE `items`.`created_at` < '2023-05-01 00:00:00'
# WHERE (
# `items`.`status` = 1 AND `items`.`created_at` < '2023-05-01 00:00:00'
# )
# LIMIT 1000
def mysql_delete_in_batches(waterline)
def mysql_delete_in_batches(waterline_failed, waterline_delivered)
status_delivered = item_class.statuses[:delivered]
status_failed_discarded = [item_class.statuses.values_at(:failed, :discarded)]

delete_items_in_batches_mysql(
item_class.where(status: status_delivered, created_at: ...waterline_delivered)
)
delete_items_in_batches_mysql(
item_class.where(status: status_failed_discarded).where(created_at: ...waterline_failed)
)
end

def delete_items_in_batches_mysql(query)
loop do
deleted_count = item_class
.where(created_at: ...waterline)
.limit(BATCH_SIZE)
.delete_all
deleted_count = query.limit(BATCH_SIZE).delete_all

logger.log_info("Deleted #{deleted_count} #{box_type} items for #{box_name} items")
break if deleted_count == 0
Expand Down
7 changes: 7 additions & 0 deletions app/models/sbmt/outbox/base_item_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ def retention
@retention ||= ActiveSupport::Duration.parse(options[:retention] || "P1W")
end

def retention_delivered_items
@retention_delivered_items ||= begin
value = options[:retention_delivered_items] || retention
value.is_a?(String) ? ActiveSupport::Duration.parse(value) : value
end
end

def max_retries
@max_retries ||= (options[:max_retries] || 0).to_i
end
Expand Down
2 changes: 1 addition & 1 deletion lib/sbmt/outbox/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module Sbmt
module Outbox
VERSION = "6.11.0"
VERSION = "6.12.0"
end
end
1 change: 1 addition & 0 deletions spec/internal/config/outbox.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ test:
partition_size: 2
partition_strategy: number
retention: P1W
retention_delivered_items: PT6H
retry_strategies:
- exponential_backoff
- compacted_log
Expand Down
14 changes: 7 additions & 7 deletions spec/jobs/sbmt/outbox/base_delete_stale_items_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ def item_classes
end
end

let!(:item) { create(:outbox_item, created_at: created_at) }
let!(:item_2) { create(:outbox_item, created_at: created_at) }
let!(:item_delivered) { create(:outbox_item, created_at: created_at, status: 2) }
let!(:item_failed) { create(:outbox_item, created_at: created_at, status: 1) }
let(:created_at) { 1.month.ago }

before do
Expand All @@ -25,17 +25,17 @@ def item_classes
end
end

it "deletes item" do
it "deletes items with status 2 and old items with status 1" do
expect { job_class.perform_now("OutboxItem") }
.to change(OutboxItem, :count).by(-2)
end

context "when item is too young" do
let(:created_at) { 1.hour.ago }
context "when an element with status 1 does not retention" do
let(:created_at) { 6.hours.ago }

it "doesn't delete item" do
it "doesn't delete item with status 1 but deletes item with status 2" do
expect { job_class.perform_now("OutboxItem") }
.not_to change(OutboxItem, :count)
.to change(OutboxItem, :count).by(-1)
end
end
end

0 comments on commit 15bb401

Please sign in to comment.