Skip to content

Commit

Permalink
Introduce instrument_on_wait_queue_full for ability to disable inst…
Browse files Browse the repository at this point in the history
…rumentation on non-critical queue full errors (#458)

* introduce wait_timeout_on_queue_full

* missing spec

* fix readme
  • Loading branch information
mensfeld authored Feb 20, 2024
1 parent b9e56a2 commit 087d45c
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 14 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# WaterDrop changelog

## .6.15 (Unreleased)
- [Enhancement] Introduce `instrument_on_wait_queue_full` flag (defaults to `true`) to be able to configure whether non critical (retryable) queue full errors should be instrumented in the error pipeline. Useful when building high-performance pipes with WaterDrop queue retry backoff as a throttler.

## 2.6.14 (2024-02-06)
- [Enhancement] Instrument `producer.connected` and `producer.closing` lifecycle events.

Expand Down
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
waterdrop (2.6.14)
waterdrop (2.6.15)
karafka-core (>= 2.2.3, < 3.0.0)
zeitwerk (~> 2.3)

Expand Down
2 changes: 2 additions & 0 deletions lib/waterdrop/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class Config
# option [Numeric] how many seconds should we wait with the backoff on queue having space for
# more messages before re-raising the error.
setting :wait_timeout_on_queue_full, default: 10
# option [Boolean] should we instrument non-critical, retryable queue full errors
setting :instrument_on_wait_queue_full, default: true
# option [Numeric] How long to wait before retrying a retryable transaction related error
setting :wait_backoff_on_transaction_command, default: 0.5
# option [Numeric] How many times to retry a retryable transaction related error before
Expand Down
30 changes: 18 additions & 12 deletions lib/waterdrop/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -297,18 +297,24 @@ def produce(message)
begin
raise Errors::ProduceError, e.inspect
rescue Errors::ProduceError => e
# We want to instrument on this event even when we restart it.
# The reason is simple: instrumentation and visibility.
# We can recover from this, but despite that we should be able to instrument this.
# If this type of event happens too often, it may indicate that the buffer settings are not
# well configured.
@monitor.instrument(
'error.occurred',
producer_id: id,
message: message,
error: e,
type: "message.#{label}"
)
# Users can configure this because in pipe-like flows with high throughput, queue full with
# retry may be used as a throttling system that will backoff and wait.
# In such scenarios this error notification can be removed and until queue full is
# retryable, it will not be raised as an error.
if @config.instrument_on_wait_queue_full
# We want to instrument on this event even when we restart it.
# The reason is simple: instrumentation and visibility.
# We can recover from this, but despite that we should be able to instrument this.
# If this type of event happens too often, it may indicate that the buffer settings are
# not well configured.
@monitor.instrument(
'error.occurred',
producer_id: id,
message: message,
error: e,
type: "message.#{label}"
)
end

# We do not poll the producer because polling happens in a background thread
# It also should not be a frequent case (queue full), hence it's ok to just throttle.
Expand Down
2 changes: 1 addition & 1 deletion lib/waterdrop/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
# WaterDrop library
module WaterDrop
# Current WaterDrop version
VERSION = '2.6.14'
VERSION = '2.6.15'
end
26 changes: 26 additions & 0 deletions spec/lib/waterdrop/producer/async_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,32 @@
it { expect(occurred.first.payload[:label]).to eq(nil) }
end

context 'when inline error occurs and we retry on queue full but instrumentation off' do
let(:errors) { [] }
let(:occurred) { [] }
let(:error) { errors.first }
let(:producer) { build(:slow_producer, wait_on_queue_full: true) }

before do
producer.config.wait_on_queue_full = true
producer.config.instrument_on_wait_queue_full = false

producer.monitor.subscribe('error.occurred') do |event|
occurred << event
end

begin
message = build(:valid_message, label: 'test')
5.times { producer.produce_async(message) }
rescue WaterDrop::Errors::ProduceError => e
errors << e
end
end

it { expect(errors).to be_empty }
it { expect(occurred).to be_empty }
end

context 'when inline error occurs in librdkafka and we go beyond max wait on queue full' do
let(:errors) { [] }
let(:occurred) { [] }
Expand Down

0 comments on commit 087d45c

Please sign in to comment.