Skip to content

Commit

Permalink
Merge branch 'feat/DEX-2851/update-condition-for-delete-items' into '…
Browse files Browse the repository at this point in the history
…master'

[DEX-2851] feat: change condition for job deleted items

Closes DEX-2851

See merge request nstmrt/rubygems/outbox!115
  • Loading branch information
Меркушин Михаил Сергеевич committed Feb 4, 2025
2 parents 346ef9d + 9020fb3 commit 0449cd2
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 47 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Fixed

## [6.17.0] - 2025-01-30

### Added

- Added options for configuring jobs to remove old items:
- `deletion_time_window` - 4 hours

### Changed

- Change condition for job deleted items

## [6.16.0] - 2025-01-28

### Added
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ default: &default
delivered_min_retention_period: PT1H #optional, default: PT1H, for statuses: delivered, retention period for delivered items, https://en.wikipedia.org/wiki/ISO_8601#Durations
deletion_batch_size: 1_000 #optional, default: 1_000
deletion_sleep_time: 0.5 #optional, default: 0.5
deletion_time_window: PT4H #optional, default: PT4H, for statuses: delivered, retention period for delivered items, https://en.wikipedia.org/wiki/ISO_8601#Durations
max_retries: 3 # default 0, the number of retries before the item will be marked as failed
strict_order: false # optional, default
transports: # transports section
Expand Down Expand Up @@ -353,6 +354,7 @@ inbox_items: # inbox items section
delivered_min_retention_period: PT1H #optional, default: PT1H, for statuses: delivered, retention period for delivered items, https://en.wikipedia.org/wiki/ISO_8601#Durations
deletion_batch_size: 1_000 #optional, default: 1_000
deletion_sleep_time: 0.5 #optional, default: 0.5
deletion_time_window: PT4H #optional, default: PT4H, for statuses: delivered, retention period for delivered items, https://en.wikipedia.org/wiki/ISO_8601#Durations
max_retries: 3 # default 0, the number of retries before the item will be marked as failed
transports: # transports section
import_order: # underscored transport class name
Expand Down
103 changes: 57 additions & 46 deletions app/jobs/sbmt/outbox/base_delete_stale_items_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,45 +102,50 @@ def delete_stale_items(waterline_failed, waterline_delivered)
# SELECT "items"."id"
# FROM "items"
# WHERE (
# "items"."status" = 1 AND "items"."created_at" < '2023-05-01 00:00:00'
# "items"."status" IN (2) AND "items"."created_at" BETWEEN "2025-01-29 12:18:32.917836" AND "2025-01-29 12:18:32.927596" LIMIT 1000
# )
# LIMIT 1000
# )
def postgres_delete_in_batches(waterline_failed, waterline_delivered)
table = item_class.arel_table

status_delivered = item_class.statuses[:delivered]
status_failed_discarded = item_class.statuses.values_at(:failed, :discarded)

delete_items_in_batches(table, table[:status].eq(status_delivered).and(table[:created_at].lt(waterline_delivered)))
delete_items_in_batches(table, table[:status].in(status_failed_discarded).and(table[:created_at].lt(waterline_failed)))
delete_items_in_batches_with_between(waterline_delivered, status_delivered)
delete_items_in_batches_with_between(waterline_failed, status_failed_discarded)
end

def delete_items_in_batches(table, condition)
subquery = table
.project(table[:id])
.where(condition)
.take(item_class.config.deletion_batch_size)

delete_statement = Arel::Nodes::DeleteStatement.new
delete_statement.relation = table
delete_statement.wheres = [table[:id].in(subquery)]
def delete_items_in_batches_with_between(waterline, statuses)
table = item_class.arel_table
batch_size = item_class.config.deletion_batch_size
time_window = item_class.config.deletion_time_window
min_date = item_class.where(table[:status].in(statuses)).minimum(:created_at)
deleted_count = nil

loop do
track_deleted_latency do
deleted_count = item_class
.connection
.execute(delete_statement.to_sql)
.cmd_tuples
end
while min_date && min_date < waterline
max_date = [min_date + time_window, waterline].min

loop do
subquery = table
.project(table[:id])
.where(table[:status].in(statuses))
.where(table[:created_at].between(min_date..max_date))
.take(batch_size)

delete_statement = Arel::Nodes::DeleteStatement.new
delete_statement.relation = table
delete_statement.wheres = [table[:id].in(subquery)]

track_deleted_latency do
deleted_count = item_class.connection.execute(delete_statement.to_sql).cmd_tuples
end

track_deleted_counter(deleted_count)
track_deleted_counter(deleted_count)

logger.log_info("Deleted #{deleted_count} #{box_type} items for #{box_name} items")
break if deleted_count == 0
lock_timer.checkpoint!
sleep(item_class.config.deletion_sleep_time)
logger.log_info("Deleted #{deleted_count} #{box_type} items for #{box_name} between #{min_date} and #{max_date}")
break if deleted_count < batch_size
lock_timer.checkpoint!
sleep(item_class.config.deletion_sleep_time) if deleted_count > 0
end
min_date = max_date
end
end

Expand All @@ -154,37 +159,43 @@ def delete_items_in_batches(table, condition)
# This approach doesn't require a subquery, making it more straightforward.
#
# Example SQL generated for deletion:
# DELETE FROM `items`
# DELETE FROM "items"
# WHERE (
# `items`.`status` = 1 AND `items`.`created_at` < '2023-05-01 00:00:00'
# "items"."status" IN (2) AND "items"."created_at" BETWEEN "2024-12-29 18:34:25.369234" AND "2024-12-29 22:34:25.369234" LIMIT 1000
# )
# LIMIT 1000
def mysql_delete_in_batches(waterline_failed, waterline_delivered)
status_delivered = item_class.statuses[:delivered]
status_failed_discarded = [item_class.statuses.values_at(:failed, :discarded)]

delete_items_in_batches_mysql(
item_class.where(status: status_delivered, created_at: ...waterline_delivered)
)
delete_items_in_batches_mysql(
item_class.where(status: status_failed_discarded).where(created_at: ...waterline_failed)
)
delete_items_in_batches_with_between_mysql(waterline_delivered, status_delivered)
delete_items_in_batches_with_between_mysql(waterline_failed, status_failed_discarded)
end

def delete_items_in_batches_mysql(query)
def delete_items_in_batches_with_between_mysql(waterline, statuses)
batch_size = item_class.config.deletion_batch_size
time_window = item_class.config.deletion_time_window
min_date = item_class.where(status: statuses).minimum(:created_at)
deleted_count = nil

loop do
track_deleted_latency do
deleted_count = query.limit(item_class.config.deletion_batch_size).delete_all
end
while min_date && min_date < waterline
max_date = [min_date + time_window, waterline].min

loop do
track_deleted_latency do
deleted_count = item_class
.where(status: statuses, created_at: min_date..max_date)
.limit(batch_size)
.delete_all
end

track_deleted_counter(deleted_count)
track_deleted_counter(deleted_count)

logger.log_info("Deleted #{deleted_count} #{box_type} items for #{box_name} items")
break if deleted_count == 0
lock_timer.checkpoint!
sleep(item_class.config.deletion_sleep_time)
logger.log_info("Deleted #{deleted_count} #{box_type} items for #{box_name} between #{min_date} and #{max_date}")
break if deleted_count < batch_size
lock_timer.checkpoint!
sleep(item_class.config.deletion_sleep_time) if deleted_count > 0
end
min_date = max_date
end
end

Expand Down
4 changes: 4 additions & 0 deletions app/models/sbmt/outbox/base_item_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ def delivered_min_retention_period
@delivered_min_retention_period ||= ActiveSupport::Duration.parse(options[:delivered_min_retention_period] || "PT1H")
end

def deletion_time_window
@deletion_time_window ||= ActiveSupport::Duration.parse(options[:deletion_time_window] || "PT4H")
end

def max_retries
@max_retries ||= (options[:max_retries] || 0).to_i
end
Expand Down
2 changes: 1 addition & 1 deletion lib/sbmt/outbox/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module Sbmt
module Outbox
VERSION = "6.16.0"
VERSION = "6.17.0"
end
end

0 comments on commit 0449cd2

Please sign in to comment.