From dc4b4d13fafaf1f9f57a2930bd36a07ec2b0364f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A2=D0=B0=D1=80=D0=B0=D1=81=D0=B5=D0=BD=D0=BA=D0=BE=20?= =?UTF-8?q?=D0=94=D0=B5=D0=BD=D0=B8=D1=81=20=D0=90=D0=BD=D0=B0=D1=82=D0=BE?= =?UTF-8?q?=D0=BB=D1=8C=D0=B5=D0=B2=D0=B8=D1=87?= Date: Wed, 15 Jan 2025 07:40:17 +0000 Subject: [PATCH] [DEX-2722] feat: reduce retention time successful outbox/inbox items --- CHANGELOG.md | 11 ++++ README.md | 6 +- .../outbox/base_delete_stale_items_job.rb | 63 +++++++++++++------ app/models/sbmt/outbox/base_item_config.rb | 7 +++ lib/sbmt/outbox/version.rb | 2 +- spec/internal/config/outbox.yml | 1 + .../base_delete_stale_items_job_spec.rb | 14 ++--- 7 files changed, 74 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e11cd63..bc53287 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,17 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Fixed +## [6.12.0] - 2025-01-10 + +### Added + +- Random seconds have been added to the item removal tasks to prevent tasks from running simultaneously +- Add option `retention_delivered_items` to remove successful items. Default equals option `retention` + +### Changed + +- `retention` removes items with statuses: `failed` and `discarded` + ## [6.11.0] - 2024-12-23 ### Added diff --git a/README.md b/README.md index ee9c729..a2af8d9 100644 --- a/README.md +++ b/README.md @@ -267,7 +267,8 @@ default: &default outbox_items: # outbox items section my_outbox_item: # underscored model class name owner: my_outbox_item_team # optional, used in Yabeda metrics - retention: P1W # retention period, https://en.wikipedia.org/wiki/ISO_8601#Durations + retention: P1W # for statuses: failed and discarded, retention period, https://en.wikipedia.org/wiki/ISO_8601#Durations + retention_delivered_items: PT6H # for statuses: delivered, retention period for delivered items, https://en.wikipedia.org/wiki/ISO_8601#Durations max_retries: 3 # default 0, the number of retries before the item will be marked as failed strict_order: false # optional, default transports: # transports section @@ -342,7 +343,8 @@ end inbox_items: # inbox items section my_inbox_item: # underscored model class name owner: my_inbox_item_team # optional, used in Yabeda metrics - retention: P1W # retention period, https://en.wikipedia.org/wiki/ISO_8601#Durations + retention: P1W # for statuses: failed and discarded, retention period, https://en.wikipedia.org/wiki/ISO_8601#Durations + retention_delivered_items: PT6H # for statuses: delivered, retention period for delivered items, https://en.wikipedia.org/wiki/ISO_8601#Durations max_retries: 3 # default 0, the number of retries before the item will be marked as failed transports: # transports section import_order: # underscored transport class name diff --git a/app/jobs/sbmt/outbox/base_delete_stale_items_job.rb b/app/jobs/sbmt/outbox/base_delete_stale_items_job.rb index 08263b3..a8d68be 100644 --- a/app/jobs/sbmt/outbox/base_delete_stale_items_job.rb +++ b/app/jobs/sbmt/outbox/base_delete_stale_items_job.rb @@ -13,7 +13,8 @@ class BaseDeleteStaleItemsJob < Outbox.active_job_base_class class << self def enqueue item_classes.each do |item_class| - perform_later(item_class.to_s) + delay = rand(15).seconds + set(wait: delay).perform_later(item_class.to_s) end end @@ -41,12 +42,13 @@ def perform(item_class_name) lock_manager.lock("#{self.class.name}:#{item_class_name}:lock", LOCK_TTL) do |locked| if locked - duration = item_class.config.retention + duration_failed = item_class.config.retention + duration_delivered = item_class.config.retention_delivered_items - validate_retention!(duration) + validate_retention!(duration_failed) logger.with_tags(box_type: box_type, box_name: box_name) do - delete_stale_items(Time.current - duration) + delete_stale_items(Time.current - duration_failed, Time.current - duration_delivered) end else logger.log_info("Failed to acquire lock #{self.class.name}:#{item_class_name}") @@ -58,25 +60,25 @@ def perform(item_class_name) private - def validate_retention!(duration) - return if duration >= MIN_RETENTION_PERIOD + def validate_retention!(duration_failed) + return if duration_failed >= MIN_RETENTION_PERIOD raise "Retention period for #{box_name} must be longer than #{MIN_RETENTION_PERIOD.inspect}" end - def delete_stale_items(waterline) - logger.log_info("Start deleting #{box_type} items for #{box_name} older than #{waterline}") + def delete_stale_items(waterline_failed, waterline_delivered) + logger.log_info("Start deleting #{box_type} items for #{box_name} older than: failed and discarded items #{waterline_failed} and delivered items #{waterline_delivered}") case database_type when :postgresql - postgres_delete_in_batches(waterline) + postgres_delete_in_batches(waterline_failed, waterline_delivered) when :mysql - mysql_delete_in_batches(waterline) + mysql_delete_in_batches(waterline_failed, waterline_delivered) else raise "Unsupported database type" end - logger.log_info("Successfully deleted #{box_type} items for #{box_name} older than #{waterline}") + logger.log_info("Successfully deleted #{box_type} items for #{box_name} older than: failed and discarded items #{waterline_failed} and delivered items #{waterline_delivered}") end # Deletes stale items from PostgreSQL database in batches @@ -90,12 +92,22 @@ def delete_stale_items(waterline) # WHERE "items"."id" IN ( # SELECT "items"."id" # FROM "items" - # WHERE "items"."created_at" < '2023-05-01 00:00:00' + # WHERE ( + # "items"."status" = 1 AND "items"."created_at" < '2023-05-01 00:00:00' + # ) # LIMIT 1000 # ) - def postgres_delete_in_batches(waterline) + def postgres_delete_in_batches(waterline_failed, waterline_delivered) table = item_class.arel_table - condition = table[:created_at].lt(waterline) + + status_delivered = item_class.statuses[:delivered] + status_failed_discarded = item_class.statuses.values_at(:failed, :discarded) + + delete_items_in_batches(table, table[:status].eq(status_delivered).and(table[:created_at].lt(waterline_delivered))) + delete_items_in_batches(table, table[:status].in(status_failed_discarded).and(table[:created_at].lt(waterline_failed))) + end + + def delete_items_in_batches(table, condition) subquery = table .project(table[:id]) .where(condition) @@ -129,14 +141,25 @@ def postgres_delete_in_batches(waterline) # # Example SQL generated for deletion: # DELETE FROM `items` - # WHERE `items`.`created_at` < '2023-05-01 00:00:00' + # WHERE ( + # `items`.`status` = 1 AND `items`.`created_at` < '2023-05-01 00:00:00' + # ) # LIMIT 1000 - def mysql_delete_in_batches(waterline) + def mysql_delete_in_batches(waterline_failed, waterline_delivered) + status_delivered = item_class.statuses[:delivered] + status_failed_discarded = [item_class.statuses.values_at(:failed, :discarded)] + + delete_items_in_batches_mysql( + item_class.where(status: status_delivered, created_at: ...waterline_delivered) + ) + delete_items_in_batches_mysql( + item_class.where(status: status_failed_discarded).where(created_at: ...waterline_failed) + ) + end + + def delete_items_in_batches_mysql(query) loop do - deleted_count = item_class - .where(created_at: ...waterline) - .limit(BATCH_SIZE) - .delete_all + deleted_count = query.limit(BATCH_SIZE).delete_all logger.log_info("Deleted #{deleted_count} #{box_type} items for #{box_name} items") break if deleted_count == 0 diff --git a/app/models/sbmt/outbox/base_item_config.rb b/app/models/sbmt/outbox/base_item_config.rb index 37acd36..29763e6 100644 --- a/app/models/sbmt/outbox/base_item_config.rb +++ b/app/models/sbmt/outbox/base_item_config.rb @@ -37,6 +37,13 @@ def retention @retention ||= ActiveSupport::Duration.parse(options[:retention] || "P1W") end + def retention_delivered_items + @retention_delivered_items ||= begin + value = options[:retention_delivered_items] || retention + value.is_a?(String) ? ActiveSupport::Duration.parse(value) : value + end + end + def max_retries @max_retries ||= (options[:max_retries] || 0).to_i end diff --git a/lib/sbmt/outbox/version.rb b/lib/sbmt/outbox/version.rb index bda9f81..64debb6 100644 --- a/lib/sbmt/outbox/version.rb +++ b/lib/sbmt/outbox/version.rb @@ -2,6 +2,6 @@ module Sbmt module Outbox - VERSION = "6.11.0" + VERSION = "6.12.0" end end diff --git a/spec/internal/config/outbox.yml b/spec/internal/config/outbox.yml index a2acdef..237a9dc 100644 --- a/spec/internal/config/outbox.yml +++ b/spec/internal/config/outbox.yml @@ -8,6 +8,7 @@ test: partition_size: 2 partition_strategy: number retention: P1W + retention_delivered_items: PT6H retry_strategies: - exponential_backoff - compacted_log diff --git a/spec/jobs/sbmt/outbox/base_delete_stale_items_job_spec.rb b/spec/jobs/sbmt/outbox/base_delete_stale_items_job_spec.rb index a796e64..1d702b9 100644 --- a/spec/jobs/sbmt/outbox/base_delete_stale_items_job_spec.rb +++ b/spec/jobs/sbmt/outbox/base_delete_stale_items_job_spec.rb @@ -11,8 +11,8 @@ def item_classes end end - let!(:item) { create(:outbox_item, created_at: created_at) } - let!(:item_2) { create(:outbox_item, created_at: created_at) } + let!(:item_delivered) { create(:outbox_item, created_at: created_at, status: 2) } + let!(:item_failed) { create(:outbox_item, created_at: created_at, status: 1) } let(:created_at) { 1.month.ago } before do @@ -25,17 +25,17 @@ def item_classes end end - it "deletes item" do + it "deletes items with status 2 and old items with status 1" do expect { job_class.perform_now("OutboxItem") } .to change(OutboxItem, :count).by(-2) end - context "when item is too young" do - let(:created_at) { 1.hour.ago } + context "when an element with status 1 does not retention" do + let(:created_at) { 6.hours.ago } - it "doesn't delete item" do + it "doesn't delete item with status 1 but deletes item with status 2" do expect { job_class.perform_now("OutboxItem") } - .not_to change(OutboxItem, :count) + .to change(OutboxItem, :count).by(-1) end end end