From 087d45c37cb1b28cec752b8c5068d7911e58bdc9 Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Tue, 20 Feb 2024 10:55:55 +0100 Subject: [PATCH] Introduce `instrument_on_wait_queue_full` for ability to disable instrumentation on non-critical queue full errors (#458) * introduce wait_timeout_on_queue_full * missing spec * fix readme --- CHANGELOG.md | 3 +++ Gemfile.lock | 2 +- lib/waterdrop/config.rb | 2 ++ lib/waterdrop/producer.rb | 30 ++++++++++++++--------- lib/waterdrop/version.rb | 2 +- spec/lib/waterdrop/producer/async_spec.rb | 26 ++++++++++++++++++++ 6 files changed, 51 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index da224bfe..a88f9bfd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # WaterDrop changelog +## .6.15 (Unreleased) +- [Enhancement] Introduce `instrument_on_wait_queue_full` flag (defaults to `true`) to be able to configure whether non critical (retryable) queue full errors should be instrumented in the error pipeline. Useful when building high-performance pipes with WaterDrop queue retry backoff as a throttler. + ## 2.6.14 (2024-02-06) - [Enhancement] Instrument `producer.connected` and `producer.closing` lifecycle events. diff --git a/Gemfile.lock b/Gemfile.lock index f031ba1c..59894cda 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - waterdrop (2.6.14) + waterdrop (2.6.15) karafka-core (>= 2.2.3, < 3.0.0) zeitwerk (~> 2.3) diff --git a/lib/waterdrop/config.rb b/lib/waterdrop/config.rb index 317962a6..96d9a760 100644 --- a/lib/waterdrop/config.rb +++ b/lib/waterdrop/config.rb @@ -64,6 +64,8 @@ class Config # option [Numeric] how many seconds should we wait with the backoff on queue having space for # more messages before re-raising the error. setting :wait_timeout_on_queue_full, default: 10 + # option [Boolean] should we instrument non-critical, retryable queue full errors + setting :instrument_on_wait_queue_full, default: true # option [Numeric] How long to wait before retrying a retryable transaction related error setting :wait_backoff_on_transaction_command, default: 0.5 # option [Numeric] How many times to retry a retryable transaction related error before diff --git a/lib/waterdrop/producer.rb b/lib/waterdrop/producer.rb index d3892030..9446e8f4 100644 --- a/lib/waterdrop/producer.rb +++ b/lib/waterdrop/producer.rb @@ -297,18 +297,24 @@ def produce(message) begin raise Errors::ProduceError, e.inspect rescue Errors::ProduceError => e - # We want to instrument on this event even when we restart it. - # The reason is simple: instrumentation and visibility. - # We can recover from this, but despite that we should be able to instrument this. - # If this type of event happens too often, it may indicate that the buffer settings are not - # well configured. - @monitor.instrument( - 'error.occurred', - producer_id: id, - message: message, - error: e, - type: "message.#{label}" - ) + # Users can configure this because in pipe-like flows with high throughput, queue full with + # retry may be used as a throttling system that will backoff and wait. + # In such scenarios this error notification can be removed and until queue full is + # retryable, it will not be raised as an error. + if @config.instrument_on_wait_queue_full + # We want to instrument on this event even when we restart it. + # The reason is simple: instrumentation and visibility. + # We can recover from this, but despite that we should be able to instrument this. + # If this type of event happens too often, it may indicate that the buffer settings are + # not well configured. + @monitor.instrument( + 'error.occurred', + producer_id: id, + message: message, + error: e, + type: "message.#{label}" + ) + end # We do not poll the producer because polling happens in a background thread # It also should not be a frequent case (queue full), hence it's ok to just throttle. diff --git a/lib/waterdrop/version.rb b/lib/waterdrop/version.rb index 3cbd9039..f2fb19af 100644 --- a/lib/waterdrop/version.rb +++ b/lib/waterdrop/version.rb @@ -3,5 +3,5 @@ # WaterDrop library module WaterDrop # Current WaterDrop version - VERSION = '2.6.14' + VERSION = '2.6.15' end diff --git a/spec/lib/waterdrop/producer/async_spec.rb b/spec/lib/waterdrop/producer/async_spec.rb index be629cc7..d297e498 100644 --- a/spec/lib/waterdrop/producer/async_spec.rb +++ b/spec/lib/waterdrop/producer/async_spec.rb @@ -119,6 +119,32 @@ it { expect(occurred.first.payload[:label]).to eq(nil) } end + context 'when inline error occurs and we retry on queue full but instrumentation off' do + let(:errors) { [] } + let(:occurred) { [] } + let(:error) { errors.first } + let(:producer) { build(:slow_producer, wait_on_queue_full: true) } + + before do + producer.config.wait_on_queue_full = true + producer.config.instrument_on_wait_queue_full = false + + producer.monitor.subscribe('error.occurred') do |event| + occurred << event + end + + begin + message = build(:valid_message, label: 'test') + 5.times { producer.produce_async(message) } + rescue WaterDrop::Errors::ProduceError => e + errors << e + end + end + + it { expect(errors).to be_empty } + it { expect(occurred).to be_empty } + end + context 'when inline error occurs in librdkafka and we go beyond max wait on queue full' do let(:errors) { [] } let(:occurred) { [] }