From 72667b5140f53df59f269e42a8b74e236b3a3b31 Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Tue, 6 Feb 2024 09:00:43 +0100 Subject: [PATCH] instrument producer lifecycle events --- .../instrumentation/notifications.rb | 2 + lib/waterdrop/producer.rb | 2 + .../instrumentation/notifications_spec.rb | 78 +++++++++++++++++++ 3 files changed, 82 insertions(+) diff --git a/lib/waterdrop/instrumentation/notifications.rb b/lib/waterdrop/instrumentation/notifications.rb index b7078ac6..07196f5f 100644 --- a/lib/waterdrop/instrumentation/notifications.rb +++ b/lib/waterdrop/instrumentation/notifications.rb @@ -7,6 +7,8 @@ class Notifications < ::Karafka::Core::Monitoring::Notifications # List of events that we support in the system and to which a monitor client can hook up # @note The non-error once support timestamp benchmarking EVENTS = %w[ + producer.connected + producer.closing producer.closed message.produced_async diff --git a/lib/waterdrop/producer.rb b/lib/waterdrop/producer.rb index 41233963..d3892030 100644 --- a/lib/waterdrop/producer.rb +++ b/lib/waterdrop/producer.rb @@ -117,6 +117,7 @@ def client ) @status.connected! + @monitor.instrument('producer.connected', producer_id: id) end @client @@ -160,6 +161,7 @@ def close(force: false) producer_id: id ) do @status.closing! + @monitor.instrument('producer.closing', producer_id: id) # No need for auto-gc if everything got closed by us # This should be used only in case a producer was not closed properly and forgotten diff --git a/spec/lib/waterdrop/instrumentation/notifications_spec.rb b/spec/lib/waterdrop/instrumentation/notifications_spec.rb index 3a166d77..74d63b22 100644 --- a/spec/lib/waterdrop/instrumentation/notifications_spec.rb +++ b/spec/lib/waterdrop/instrumentation/notifications_spec.rb @@ -52,4 +52,82 @@ def on_message_produced_async(_event) it { expect { subscription }.not_to raise_error } end end + + describe 'producer lifecycle events flow' do + subject(:status) { producer.status } + + let(:producer) { WaterDrop::Producer.new } + let(:events) { [] } + let(:events_names) do + %w[ + producer.connected + producer.closing + producer.closed + ] + end + + context 'when producer is initialized' do + it { expect(status.to_s).to eq('initial') } + it { expect(events).to be_empty } + end + + context 'when producer is configured' do + before do + producer.setup {} + + events_names.each do |event_name| + producer.monitor.subscribe(event_name) do |event| + events << event + end + end + end + + it { expect(status.to_s).to eq('configured') } + it { expect(events).to be_empty } + end + + context 'when producer is connected' do + before do + producer.setup {} + + events_names.each do |event_name| + producer.monitor.subscribe(event_name) do |event| + events << event + end + end + + producer.client + end + + it { expect(status.to_s).to eq('connected') } + it { expect(events.size).to eq(1) } + it { expect(events.last.id).to eq('producer.connected') } + it { expect(events.last.payload.key?(:producer_id)).to eq(true) } + end + + context 'when producer is closed' do + before do + producer.setup {} + + events_names.each do |event_name| + producer.monitor.subscribe(event_name) do |event| + events << event + end + end + + producer.client + producer.close + end + + it { expect(status.to_s).to eq('closed') } + it { expect(events.size).to eq(3) } + it { expect(events.first.id).to eq('producer.connected') } + it { expect(events.first.payload.key?(:producer_id)).to eq(true) } + it { expect(events[1].id).to eq('producer.closing') } + it { expect(events[1].payload.key?(:producer_id)).to eq(true) } + it { expect(events.last.id).to eq('producer.closed') } + it { expect(events.last.payload.key?(:producer_id)).to eq(true) } + it { expect(events.last.payload.key?(:time)).to eq(true) } + end + end end