Skip to content

Commit

Permalink
instrument producer lifecycle events (#452)
Browse files Browse the repository at this point in the history
* instrument producer lifecycle events

* ensure specs are closed

* add closing event to info instrumentation
  • Loading branch information
mensfeld authored Feb 6, 2024
1 parent 5fd4a43 commit 9e464bd
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 1 deletion.
13 changes: 12 additions & 1 deletion lib/waterdrop/instrumentation/logger_listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ def on_buffer_purged(event)
end

# @param event [Dry::Events::Event] event that happened with the details
def on_producer_closing(event)
info(event, 'Closing producer')
end

# @param event [Dry::Events::Event] event that happened with the details
# @note While this says "Closing producer", it produces a nice message with time taken:
# "Closing producer took 12 ms" indicating it happened in the past.
def on_producer_closed(event)
info(event, 'Closing producer')
end
Expand Down Expand Up @@ -180,7 +187,11 @@ def debug(event, log_message)
# @param event [Dry::Events::Event] event that happened with the details
# @param log_message [String] message we want to publish
def info(event, log_message)
@logger.info("[#{event[:producer_id]}] #{log_message} took #{event[:time]} ms")
if event.payload.key?(:time)
@logger.info("[#{event[:producer_id]}] #{log_message} took #{event[:time]} ms")
else
@logger.info("[#{event[:producer_id]}] #{log_message}")
end
end

# @param event [Dry::Events::Event] event that happened with the details
Expand Down
2 changes: 2 additions & 0 deletions lib/waterdrop/instrumentation/notifications.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ class Notifications < ::Karafka::Core::Monitoring::Notifications
# List of events that we support in the system and to which a monitor client can hook up
# @note The non-error once support timestamp benchmarking
EVENTS = %w[
producer.connected
producer.closing
producer.closed

message.produced_async
Expand Down
2 changes: 2 additions & 0 deletions lib/waterdrop/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def client
)

@status.connected!
@monitor.instrument('producer.connected', producer_id: id)
end

@client
Expand Down Expand Up @@ -160,6 +161,7 @@ def close(force: false)
producer_id: id
) do
@status.closing!
@monitor.instrument('producer.closing', producer_id: id)

# No need for auto-gc if everything got closed by us
# This should be used only in case a producer was not closed properly and forgotten
Expand Down
8 changes: 8 additions & 0 deletions spec/lib/waterdrop/instrumentation/logger_listener_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,14 @@
it { expect(logged_data[1]).to eq(nil) }
end

describe '#on_producer_closing' do
before { listener.on_producer_closing(event) }

it { expect(logged_data[0]).to include(producer.id) }
it { expect(logged_data[0]).to include('INFO') }
it { expect(logged_data[0]).to include('Closing producer') }
end

describe '#on_producer_closed' do
before { listener.on_producer_closed(event) }

Expand Down
80 changes: 80 additions & 0 deletions spec/lib/waterdrop/instrumentation/notifications_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,84 @@ def on_message_produced_async(_event)
it { expect { subscription }.not_to raise_error }
end
end

describe 'producer lifecycle events flow' do
subject(:status) { producer.status }

let(:producer) { WaterDrop::Producer.new }
let(:events) { [] }
let(:events_names) do
%w[
producer.connected
producer.closing
producer.closed
]
end

after { producer.close }

context 'when producer is initialized' do
it { expect(status.to_s).to eq('initial') }
it { expect(events).to be_empty }
end

context 'when producer is configured' do
before do
producer.setup {}

events_names.each do |event_name|
producer.monitor.subscribe(event_name) do |event|
events << event
end
end
end

it { expect(status.to_s).to eq('configured') }
it { expect(events).to be_empty }
end

context 'when producer is connected' do
before do
producer.setup {}

events_names.each do |event_name|
producer.monitor.subscribe(event_name) do |event|
events << event
end
end

producer.client
end

it { expect(status.to_s).to eq('connected') }
it { expect(events.size).to eq(1) }
it { expect(events.last.id).to eq('producer.connected') }
it { expect(events.last.payload.key?(:producer_id)).to eq(true) }
end

context 'when producer is closed' do
before do
producer.setup {}

events_names.each do |event_name|
producer.monitor.subscribe(event_name) do |event|
events << event
end
end

producer.client
producer.close
end

it { expect(status.to_s).to eq('closed') }
it { expect(events.size).to eq(3) }
it { expect(events.first.id).to eq('producer.connected') }
it { expect(events.first.payload.key?(:producer_id)).to eq(true) }
it { expect(events[1].id).to eq('producer.closing') }
it { expect(events[1].payload.key?(:producer_id)).to eq(true) }
it { expect(events.last.id).to eq('producer.closed') }
it { expect(events.last.payload.key?(:producer_id)).to eq(true) }
it { expect(events.last.payload.key?(:time)).to eq(true) }
end
end
end

0 comments on commit 9e464bd

Please sign in to comment.