Skip to content

Commit

Permalink
increase observability
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld committed Dec 20, 2023
1 parent c152451 commit 3d09083
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 11 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# WaterDrop changelog

## 2.6.12 (Unreleased)
- [Enhancement] Provide ability to label message dispatches for increased observability.
- [Enhancement] Provide ability to commit offset during the transaction with a consumer provided.
- [Change] Change transactional message purged error type from to `message.error` to `librdkafka.dispatch_error` to align with the non-transactional error type.
- [Change] Remove usage of concurrent ruby.

## 2.6.11 (2023-10-25)
Expand Down
6 changes: 3 additions & 3 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ GEM
mutex_m
tzinfo (~> 2.0)
base64 (0.2.0)
bigdecimal (3.1.4)
bigdecimal (3.1.5)
byebug (11.1.3)
concurrent-ruby (1.2.2)
connection_pool (2.4.1)
Expand All @@ -35,7 +35,7 @@ GEM
karafka-core (2.2.7)
concurrent-ruby (>= 1.1)
karafka-rdkafka (>= 0.13.9, < 0.15.0)
karafka-rdkafka (0.14.1)
karafka-rdkafka (0.14.5)
ffi (~> 1.15)
mini_portile2 (~> 2.6)
rake (> 12)
Expand Down Expand Up @@ -78,4 +78,4 @@ DEPENDENCIES
waterdrop!

BUNDLED WITH
2.4.19
2.4.22
8 changes: 6 additions & 2 deletions lib/waterdrop/instrumentation/callbacks/delivery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ def instrument_acknowledged(delivery_report)
offset: delivery_report.offset,
partition: delivery_report.partition,
topic: delivery_report.topic_name,
delivery_report: delivery_report
delivery_report: delivery_report,
label: delivery_report.label
)
end

Expand All @@ -71,7 +72,9 @@ def instrument_purged(delivery_report)
offset: delivery_report.offset,
partition: delivery_report.partition,
topic: delivery_report.topic_name,
delivery_report: delivery_report
delivery_report: delivery_report,
label: delivery_report.label,
type: 'librdkafka.dispatch_error'
)
end

Expand All @@ -86,6 +89,7 @@ def instrument_error(delivery_report)
partition: delivery_report.partition,
topic: delivery_report.topic_name,
delivery_report: delivery_report,
label: delivery_report.label,
type: 'librdkafka.dispatch_error'
)
end
Expand Down
12 changes: 10 additions & 2 deletions spec/lib/waterdrop/producer/async_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
it { expect(delivery).to be_a(Rdkafka::Producer::DeliveryHandle) }
end

context 'when message is valid and with label' do
let(:message) { build(:valid_message, label: 'test') }

it { expect(delivery.label).to eq('test') }
end

context 'when sending a tombstone message' do
let(:message) { build(:valid_message, payload: nil) }

Expand Down Expand Up @@ -100,7 +106,7 @@
end

begin
message = build(:valid_message)
message = build(:valid_message, label: 'test')
5.times { producer.produce_async(message) }
rescue WaterDrop::Errors::ProduceError => e
errors << e
Expand All @@ -110,6 +116,7 @@
it { expect(errors).to be_empty }
it { expect(occurred.first.payload[:error].cause).to be_a(Rdkafka::RdkafkaError) }
it { expect(occurred.first.payload[:type]).to eq('message.produce_async') }
it { expect(occurred.first.payload[:label]).to eq(nil) }
end

context 'when inline error occurs in librdkafka and we go beyond max wait on queue full' do
Expand All @@ -132,7 +139,7 @@
end

begin
message = build(:valid_message)
message = build(:valid_message, label: 'test')
5.times { producer.produce_async(message) }
rescue WaterDrop::Errors::ProduceError => e
errors << e
Expand All @@ -142,6 +149,7 @@
it { expect(errors).not_to be_empty }
it { expect(occurred.first.payload[:error].cause).to be_a(Rdkafka::RdkafkaError) }
it { expect(occurred.first.payload[:type]).to eq('message.produce_async') }
it { expect(occurred.first.payload[:label]).to eq(nil) }
end
end

Expand Down
11 changes: 10 additions & 1 deletion spec/lib/waterdrop/producer/sync_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
it { expect(delivery).to be_a(Rdkafka::Producer::DeliveryReport) }
end

context 'when message is valid and with label' do
let(:message) { build(:valid_message, label: 'test') }

it { expect(delivery.label).to eq('test') }
end

context 'when producing with topic as a symbol' do
let(:message) do
msg = build(:valid_message)
Expand All @@ -43,7 +49,7 @@
occurred << event
end

message = build(:valid_message)
message = build(:valid_message, label: 'test')
threads = Array.new(20) do
Thread.new do
producer.produce_sync(message)
Expand All @@ -60,6 +66,9 @@
it { expect(error.cause).to be_a(Rdkafka::RdkafkaError) }
it { expect(occurred.first.payload[:error].cause).to be_a(Rdkafka::RdkafkaError) }
it { expect(occurred.first.payload[:type]).to eq('message.produce_sync') }
# We expect this to be nil because the error was raised by the code that was attempting to
# produce, hence there is a chance of not even having a handler
it { expect(occurred.first.payload[:label]).to eq(nil) }
end
end

Expand Down
7 changes: 4 additions & 3 deletions spec/lib/waterdrop/producer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -223,16 +223,17 @@
producer.monitor.subscribe('error.occurred') do |event|
next unless event[:type] == 'librdkafka.dispatch_error'

detected << event[:error].code
detected << event
end
end

it 'expect the error notifications to publish those errors' do
handler = producer.produce_async(topic: 'na', payload: 'data')
handler = producer.produce_async(topic: 'na', payload: 'data', label: 'test')
producer.purge

handler.wait(raise_response_error: false)
expect(detected.first).to eq(:purge_queue)
expect(detected.first[:error].code).to eq(:purge_queue)
expect(detected.first[:label]).to eq('test')
end
end
end
Expand Down
2 changes: 2 additions & 0 deletions spec/support/factories/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
topic { rand.to_s }
payload { rand.to_s }
partition_key { nil }
label { nil }

initialize_with do
message = new
message[:topic] = topic
message[:payload] = payload
message[:partition_key] = partition_key if partition_key
message[:label] = label if label
message
end
end
Expand Down

0 comments on commit 3d09083

Please sign in to comment.