Skip to content

Commit

Permalink
instrument producer lifecycle events
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld committed Feb 6, 2024
1 parent 5fd4a43 commit 72667b5
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 0 deletions.
2 changes: 2 additions & 0 deletions lib/waterdrop/instrumentation/notifications.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ class Notifications < ::Karafka::Core::Monitoring::Notifications
# List of events that we support in the system and to which a monitor client can hook up
# @note The non-error once support timestamp benchmarking
EVENTS = %w[
producer.connected
producer.closing
producer.closed

message.produced_async
Expand Down
2 changes: 2 additions & 0 deletions lib/waterdrop/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def client
)

@status.connected!
@monitor.instrument('producer.connected', producer_id: id)
end

@client
Expand Down Expand Up @@ -160,6 +161,7 @@ def close(force: false)
producer_id: id
) do
@status.closing!
@monitor.instrument('producer.closing', producer_id: id)

# No need for auto-gc if everything got closed by us
# This should be used only in case a producer was not closed properly and forgotten
Expand Down
78 changes: 78 additions & 0 deletions spec/lib/waterdrop/instrumentation/notifications_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,82 @@ def on_message_produced_async(_event)
it { expect { subscription }.not_to raise_error }
end
end

describe 'producer lifecycle events flow' do
subject(:status) { producer.status }

let(:producer) { WaterDrop::Producer.new }
let(:events) { [] }
let(:events_names) do
%w[
producer.connected
producer.closing
producer.closed
]
end

context 'when producer is initialized' do
it { expect(status.to_s).to eq('initial') }
it { expect(events).to be_empty }
end

context 'when producer is configured' do
before do
producer.setup {}

events_names.each do |event_name|
producer.monitor.subscribe(event_name) do |event|
events << event
end
end
end

it { expect(status.to_s).to eq('configured') }
it { expect(events).to be_empty }
end

context 'when producer is connected' do
before do
producer.setup {}

events_names.each do |event_name|
producer.monitor.subscribe(event_name) do |event|
events << event
end
end

producer.client
end

it { expect(status.to_s).to eq('connected') }
it { expect(events.size).to eq(1) }
it { expect(events.last.id).to eq('producer.connected') }
it { expect(events.last.payload.key?(:producer_id)).to eq(true) }
end

context 'when producer is closed' do
before do
producer.setup {}

events_names.each do |event_name|
producer.monitor.subscribe(event_name) do |event|
events << event
end
end

producer.client
producer.close
end

it { expect(status.to_s).to eq('closed') }
it { expect(events.size).to eq(3) }
it { expect(events.first.id).to eq('producer.connected') }
it { expect(events.first.payload.key?(:producer_id)).to eq(true) }
it { expect(events[1].id).to eq('producer.closing') }
it { expect(events[1].payload.key?(:producer_id)).to eq(true) }
it { expect(events.last.id).to eq('producer.closed') }
it { expect(events.last.payload.key?(:producer_id)).to eq(true) }
it { expect(events.last.payload.key?(:time)).to eq(true) }
end
end
end

0 comments on commit 72667b5

Please sign in to comment.