diff --git a/CHANGELOG.md b/CHANGELOG.md index 5020d760..19f1f28b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,9 @@ # WaterDrop changelog ## 2.6.12 (Unreleased) +- [Enhancement] Provide ability to label message dispatches for increased observability. - [Enhancement] Provide ability to commit offset during the transaction with a consumer provided. +- [Change] Change transactional message purged error type from to `message.error` to `librdkafka.dispatch_error` to align with the non-transactional error type. - [Change] Remove usage of concurrent ruby. ## 2.6.11 (2023-10-25) diff --git a/Gemfile.lock b/Gemfile.lock index f4ae5e9c..ec143d87 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -19,7 +19,7 @@ GEM mutex_m tzinfo (~> 2.0) base64 (0.2.0) - bigdecimal (3.1.4) + bigdecimal (3.1.5) byebug (11.1.3) concurrent-ruby (1.2.2) connection_pool (2.4.1) @@ -35,7 +35,7 @@ GEM karafka-core (2.2.7) concurrent-ruby (>= 1.1) karafka-rdkafka (>= 0.13.9, < 0.15.0) - karafka-rdkafka (0.14.1) + karafka-rdkafka (0.14.5) ffi (~> 1.15) mini_portile2 (~> 2.6) rake (> 12) @@ -78,4 +78,4 @@ DEPENDENCIES waterdrop! BUNDLED WITH - 2.4.19 + 2.4.22 diff --git a/lib/waterdrop/instrumentation/callbacks/delivery.rb b/lib/waterdrop/instrumentation/callbacks/delivery.rb index 16bb5196..567e350a 100644 --- a/lib/waterdrop/instrumentation/callbacks/delivery.rb +++ b/lib/waterdrop/instrumentation/callbacks/delivery.rb @@ -57,7 +57,8 @@ def instrument_acknowledged(delivery_report) offset: delivery_report.offset, partition: delivery_report.partition, topic: delivery_report.topic_name, - delivery_report: delivery_report + delivery_report: delivery_report, + label: delivery_report.label ) end @@ -71,7 +72,9 @@ def instrument_purged(delivery_report) offset: delivery_report.offset, partition: delivery_report.partition, topic: delivery_report.topic_name, - delivery_report: delivery_report + delivery_report: delivery_report, + label: delivery_report.label, + type: 'librdkafka.dispatch_error' ) end @@ -86,6 +89,7 @@ def instrument_error(delivery_report) partition: delivery_report.partition, topic: delivery_report.topic_name, delivery_report: delivery_report, + label: delivery_report.label, type: 'librdkafka.dispatch_error' ) end diff --git a/spec/lib/waterdrop/producer/async_spec.rb b/spec/lib/waterdrop/producer/async_spec.rb index 7a59c35f..be629cc7 100644 --- a/spec/lib/waterdrop/producer/async_spec.rb +++ b/spec/lib/waterdrop/producer/async_spec.rb @@ -20,6 +20,12 @@ it { expect(delivery).to be_a(Rdkafka::Producer::DeliveryHandle) } end + context 'when message is valid and with label' do + let(:message) { build(:valid_message, label: 'test') } + + it { expect(delivery.label).to eq('test') } + end + context 'when sending a tombstone message' do let(:message) { build(:valid_message, payload: nil) } @@ -100,7 +106,7 @@ end begin - message = build(:valid_message) + message = build(:valid_message, label: 'test') 5.times { producer.produce_async(message) } rescue WaterDrop::Errors::ProduceError => e errors << e @@ -110,6 +116,7 @@ it { expect(errors).to be_empty } it { expect(occurred.first.payload[:error].cause).to be_a(Rdkafka::RdkafkaError) } it { expect(occurred.first.payload[:type]).to eq('message.produce_async') } + it { expect(occurred.first.payload[:label]).to eq(nil) } end context 'when inline error occurs in librdkafka and we go beyond max wait on queue full' do @@ -132,7 +139,7 @@ end begin - message = build(:valid_message) + message = build(:valid_message, label: 'test') 5.times { producer.produce_async(message) } rescue WaterDrop::Errors::ProduceError => e errors << e @@ -142,6 +149,7 @@ it { expect(errors).not_to be_empty } it { expect(occurred.first.payload[:error].cause).to be_a(Rdkafka::RdkafkaError) } it { expect(occurred.first.payload[:type]).to eq('message.produce_async') } + it { expect(occurred.first.payload[:label]).to eq(nil) } end end diff --git a/spec/lib/waterdrop/producer/sync_spec.rb b/spec/lib/waterdrop/producer/sync_spec.rb index 40d9c63e..e00a4a59 100644 --- a/spec/lib/waterdrop/producer/sync_spec.rb +++ b/spec/lib/waterdrop/producer/sync_spec.rb @@ -20,6 +20,12 @@ it { expect(delivery).to be_a(Rdkafka::Producer::DeliveryReport) } end + context 'when message is valid and with label' do + let(:message) { build(:valid_message, label: 'test') } + + it { expect(delivery.label).to eq('test') } + end + context 'when producing with topic as a symbol' do let(:message) do msg = build(:valid_message) @@ -43,7 +49,7 @@ occurred << event end - message = build(:valid_message) + message = build(:valid_message, label: 'test') threads = Array.new(20) do Thread.new do producer.produce_sync(message) @@ -60,6 +66,9 @@ it { expect(error.cause).to be_a(Rdkafka::RdkafkaError) } it { expect(occurred.first.payload[:error].cause).to be_a(Rdkafka::RdkafkaError) } it { expect(occurred.first.payload[:type]).to eq('message.produce_sync') } + # We expect this to be nil because the error was raised by the code that was attempting to + # produce, hence there is a chance of not even having a handler + it { expect(occurred.first.payload[:label]).to eq(nil) } end end diff --git a/spec/lib/waterdrop/producer_spec.rb b/spec/lib/waterdrop/producer_spec.rb index 28b17069..e38cd2ba 100644 --- a/spec/lib/waterdrop/producer_spec.rb +++ b/spec/lib/waterdrop/producer_spec.rb @@ -223,16 +223,17 @@ producer.monitor.subscribe('error.occurred') do |event| next unless event[:type] == 'librdkafka.dispatch_error' - detected << event[:error].code + detected << event end end it 'expect the error notifications to publish those errors' do - handler = producer.produce_async(topic: 'na', payload: 'data') + handler = producer.produce_async(topic: 'na', payload: 'data', label: 'test') producer.purge handler.wait(raise_response_error: false) - expect(detected.first).to eq(:purge_queue) + expect(detected.first[:error].code).to eq(:purge_queue) + expect(detected.first[:label]).to eq('test') end end end diff --git a/spec/support/factories/message.rb b/spec/support/factories/message.rb index 77cd7161..c763c9a0 100644 --- a/spec/support/factories/message.rb +++ b/spec/support/factories/message.rb @@ -7,12 +7,14 @@ topic { rand.to_s } payload { rand.to_s } partition_key { nil } + label { nil } initialize_with do message = new message[:topic] = topic message[:payload] = payload message[:partition_key] = partition_key if partition_key + message[:label] = label if label message end end