Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose producer lifecycle events in notifications #451

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# WaterDrop changelog

## 2.6.14 (Unreleased)
- [Enhancement] Expose all producer lifecycle events.

## 2.6.13 (2024-01-29)
- [Enhancement] Expose `#partition_count` for building custom partitioners that need to be aware of number of partitions on a given topic.

Expand Down
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
waterdrop (2.6.13)
waterdrop (2.6.14)
karafka-core (>= 2.2.3, < 3.0.0)
zeitwerk (~> 2.3)

Expand Down
3 changes: 3 additions & 0 deletions lib/waterdrop/instrumentation/notifications.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ class Notifications < ::Karafka::Core::Monitoring::Notifications
# List of events that we support in the system and to which a monitor client can hook up
# @note The non-error once support timestamp benchmarking
EVENTS = %w[
producer.configured
producer.connected
producer.closing
producer.closed

message.produced_async
Expand Down
8 changes: 2 additions & 6 deletions lib/waterdrop/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def setup(&block)
@id = @config.id
@monitor = @config.monitor
@contract = Contracts::Message.new(max_payload_size: @config.max_payload_size)
@status.setup(@monitor, @id)
@status.configured!
end

Expand Down Expand Up @@ -155,10 +156,7 @@ def close(force: false)
@operating_mutex.synchronize do
return unless @status.active?

@monitor.instrument(
'producer.closed',
producer_id: id
) do
@status.closed! do
@status.closing!

# No need for auto-gc if everything got closed by us
Expand Down Expand Up @@ -208,8 +206,6 @@ def close(force: false)
# Remove callbacks runners that were registered
::Karafka::Core::Instrumentation.statistics_callbacks.delete(@id)
::Karafka::Core::Instrumentation.error_callbacks.delete(@id)

@status.closed!
end
end
end
Expand Down
21 changes: 20 additions & 1 deletion lib/waterdrop/producer/status.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@ def initialize
@current = LIFECYCLE.first
end

# Configures all the needed references. We need this since we publish events about the
# producer lifecycle.
#
# @param monitor []
# @param producer_id [String] id of the producer. We pass it to be consistent with the events
# payload we already have.
def setup(monitor, producer_id)
@monitor = monitor
@producer_id = producer_id
end

# @return [Boolean] true if producer is in a active state. Active means, that we can start
# sending messages. Actives states are connected (connection established) or configured,
# which means, that producer is configured, but connection with Kafka is
Expand All @@ -43,7 +54,15 @@ def #{state}?

# Sets a given state as current
def #{state}!
@current = :#{state}
# Monitor is available only post-configuration. Prior events will not be published
if @monitor
@monitor.instrument("producer.#{state}", producer_id: @producer_id) do
yield if block_given?
@current = :#{state}
end
else
@current = :#{state}
end
end
RUBY
end
Expand Down
2 changes: 1 addition & 1 deletion lib/waterdrop/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
# WaterDrop library
module WaterDrop
# Current WaterDrop version
VERSION = '2.6.13'
VERSION = '2.6.14'
end
81 changes: 81 additions & 0 deletions spec/lib/waterdrop/instrumentation/notifications_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,85 @@ def on_message_produced_async(_event)
it { expect { subscription }.not_to raise_error }
end
end

describe 'producer lifecycle events flow' do
subject(:status) { producer.status }

let(:producer) { WaterDrop::Producer.new }
let(:events) { [] }
let(:events_names) do
%w[
producer.connected
producer.closing
producer.closed
]
end

context 'when producer is initialized' do
it { expect(status.to_s).to eq('initial') }
it { expect(events).to be_empty }
end

context 'when producer is configured' do
before do
producer.setup {}

events_names.each do |event_name|
producer.monitor.subscribe(event_name) do |event|
events << event
end
end
end

it { expect(status.to_s).to eq('configured') }
it { expect(events).to be_empty }
end

context 'when producer is connected' do
before do
producer.setup {}

events_names.each do |event_name|
producer.monitor.subscribe(event_name) do |event|
events << event
end
end

producer.client
end

it { expect(status.to_s).to eq('connected') }
it { expect(events.size).to eq(1) }
it { expect(events.last.id).to eq('producer.connected') }
it { expect(events.last.payload.key?(:producer_id)).to eq(true) }
it { expect(events.last.payload.key?(:time)).to eq(true) }
end

context 'when producer is closed' do
before do
producer.setup {}

events_names.each do |event_name|
producer.monitor.subscribe(event_name) do |event|
events << event
end
end

producer.client
producer.close
end

it { expect(status.to_s).to eq('closed') }
it { expect(events.size).to eq(3) }
it { expect(events.first.id).to eq('producer.connected') }
it { expect(events.first.payload.key?(:producer_id)).to eq(true) }
it { expect(events.first.payload.key?(:time)).to eq(true) }
it { expect(events[1].id).to eq('producer.closing') }
it { expect(events[1].payload.key?(:producer_id)).to eq(true) }
it { expect(events[1].payload.key?(:time)).to eq(true) }
it { expect(events.last.id).to eq('producer.closed') }
it { expect(events.last.payload.key?(:producer_id)).to eq(true) }
it { expect(events.last.payload.key?(:time)).to eq(true) }
end
end
end