diff --git a/CHANGELOG.md b/CHANGELOG.md index b6a5a78a..e6c3ede1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # WaterDrop changelog +## 2.6.14 (Unreleased) +- [Enhancement] Expose all producer lifecycle events. + ## 2.6.13 (2024-01-29) - [Enhancement] Expose `#partition_count` for building custom partitioners that need to be aware of number of partitions on a given topic. diff --git a/Gemfile.lock b/Gemfile.lock index 2834c1a3..f031ba1c 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - waterdrop (2.6.13) + waterdrop (2.6.14) karafka-core (>= 2.2.3, < 3.0.0) zeitwerk (~> 2.3) diff --git a/lib/waterdrop/instrumentation/notifications.rb b/lib/waterdrop/instrumentation/notifications.rb index b7078ac6..95e854ad 100644 --- a/lib/waterdrop/instrumentation/notifications.rb +++ b/lib/waterdrop/instrumentation/notifications.rb @@ -7,6 +7,9 @@ class Notifications < ::Karafka::Core::Monitoring::Notifications # List of events that we support in the system and to which a monitor client can hook up # @note The non-error once support timestamp benchmarking EVENTS = %w[ + producer.configured + producer.connected + producer.closing producer.closed message.produced_async diff --git a/lib/waterdrop/producer.rb b/lib/waterdrop/producer.rb index 41233963..e194831a 100644 --- a/lib/waterdrop/producer.rb +++ b/lib/waterdrop/producer.rb @@ -65,6 +65,7 @@ def setup(&block) @id = @config.id @monitor = @config.monitor @contract = Contracts::Message.new(max_payload_size: @config.max_payload_size) + @status.setup(@monitor, @id) @status.configured! end @@ -155,10 +156,7 @@ def close(force: false) @operating_mutex.synchronize do return unless @status.active? - @monitor.instrument( - 'producer.closed', - producer_id: id - ) do + @status.closed! do @status.closing! # No need for auto-gc if everything got closed by us @@ -208,8 +206,6 @@ def close(force: false) # Remove callbacks runners that were registered ::Karafka::Core::Instrumentation.statistics_callbacks.delete(@id) ::Karafka::Core::Instrumentation.error_callbacks.delete(@id) - - @status.closed! end end end diff --git a/lib/waterdrop/producer/status.rb b/lib/waterdrop/producer/status.rb index 930acb2f..a273e6ed 100644 --- a/lib/waterdrop/producer/status.rb +++ b/lib/waterdrop/producer/status.rb @@ -21,6 +21,17 @@ def initialize @current = LIFECYCLE.first end + # Configures all the needed references. We need this since we publish events about the + # producer lifecycle. + # + # @param monitor [] + # @param producer_id [String] id of the producer. We pass it to be consistent with the events + # payload we already have. + def setup(monitor, producer_id) + @monitor = monitor + @producer_id = producer_id + end + # @return [Boolean] true if producer is in a active state. Active means, that we can start # sending messages. Actives states are connected (connection established) or configured, # which means, that producer is configured, but connection with Kafka is @@ -43,7 +54,15 @@ def #{state}? # Sets a given state as current def #{state}! - @current = :#{state} + # Monitor is available only post-configuration. Prior events will not be published + if @monitor + @monitor.instrument("producer.#{state}", producer_id: @producer_id) do + yield if block_given? + @current = :#{state} + end + else + @current = :#{state} + end end RUBY end diff --git a/lib/waterdrop/version.rb b/lib/waterdrop/version.rb index e0df2dd2..3cbd9039 100644 --- a/lib/waterdrop/version.rb +++ b/lib/waterdrop/version.rb @@ -3,5 +3,5 @@ # WaterDrop library module WaterDrop # Current WaterDrop version - VERSION = '2.6.13' + VERSION = '2.6.14' end diff --git a/spec/lib/waterdrop/instrumentation/notifications_spec.rb b/spec/lib/waterdrop/instrumentation/notifications_spec.rb index 3a166d77..50fb9852 100644 --- a/spec/lib/waterdrop/instrumentation/notifications_spec.rb +++ b/spec/lib/waterdrop/instrumentation/notifications_spec.rb @@ -52,4 +52,85 @@ def on_message_produced_async(_event) it { expect { subscription }.not_to raise_error } end end + + describe 'producer lifecycle events flow' do + subject(:status) { producer.status } + + let(:producer) { WaterDrop::Producer.new } + let(:events) { [] } + let(:events_names) do + %w[ + producer.connected + producer.closing + producer.closed + ] + end + + context 'when producer is initialized' do + it { expect(status.to_s).to eq('initial') } + it { expect(events).to be_empty } + end + + context 'when producer is configured' do + before do + producer.setup {} + + events_names.each do |event_name| + producer.monitor.subscribe(event_name) do |event| + events << event + end + end + end + + it { expect(status.to_s).to eq('configured') } + it { expect(events).to be_empty } + end + + context 'when producer is connected' do + before do + producer.setup {} + + events_names.each do |event_name| + producer.monitor.subscribe(event_name) do |event| + events << event + end + end + + producer.client + end + + it { expect(status.to_s).to eq('connected') } + it { expect(events.size).to eq(1) } + it { expect(events.last.id).to eq('producer.connected') } + it { expect(events.last.payload.key?(:producer_id)).to eq(true) } + it { expect(events.last.payload.key?(:time)).to eq(true) } + end + + context 'when producer is closed' do + before do + producer.setup {} + + events_names.each do |event_name| + producer.monitor.subscribe(event_name) do |event| + events << event + end + end + + producer.client + producer.close + end + + it { expect(status.to_s).to eq('closed') } + it { expect(events.size).to eq(3) } + it { expect(events.first.id).to eq('producer.connected') } + it { expect(events.first.payload.key?(:producer_id)).to eq(true) } + it { expect(events.first.payload.key?(:time)).to eq(true) } + it { expect(events[1].id).to eq('producer.closing') } + it { expect(events[1].payload.key?(:producer_id)).to eq(true) } + it { expect(events[1].payload.key?(:time)).to eq(true) } + it { expect(events.last.id).to eq('producer.closed') } + it { expect(events.last.payload.key?(:producer_id)).to eq(true) } + it { expect(events.last.payload.key?(:time)).to eq(true) } + end + end end