Skip to content

Commit

Permalink
Expose producer lifecycle events in notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld committed Feb 5, 2024
1 parent 5fd4a43 commit 9115394
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 9 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# WaterDrop changelog

## 2.6.14 (Unreleased)
- [Enhancement] Expose all producer lifecycle events.

## 2.6.13 (2024-01-29)
- [Enhancement] Expose `#partition_count` for building custom partitioners that need to be aware of number of partitions on a given topic.

Expand Down
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
waterdrop (2.6.13)
waterdrop (2.6.14)
karafka-core (>= 2.2.3, < 3.0.0)
zeitwerk (~> 2.3)

Expand Down
3 changes: 3 additions & 0 deletions lib/waterdrop/instrumentation/notifications.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ class Notifications < ::Karafka::Core::Monitoring::Notifications
# List of events that we support in the system and to which a monitor client can hook up
# @note The non-error once support timestamp benchmarking
EVENTS = %w[
producer.configured
producer.connected
producer.closing
producer.closed

message.produced_async
Expand Down
8 changes: 2 additions & 6 deletions lib/waterdrop/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def setup(&block)
@id = @config.id
@monitor = @config.monitor
@contract = Contracts::Message.new(max_payload_size: @config.max_payload_size)
@status.setup(@monitor, @id)
@status.configured!
end

Expand Down Expand Up @@ -155,10 +156,7 @@ def close(force: false)
@operating_mutex.synchronize do
return unless @status.active?

@monitor.instrument(
'producer.closed',
producer_id: id
) do
@status.closed! do
@status.closing!

# No need for auto-gc if everything got closed by us
Expand Down Expand Up @@ -208,8 +206,6 @@ def close(force: false)
# Remove callbacks runners that were registered
::Karafka::Core::Instrumentation.statistics_callbacks.delete(@id)
::Karafka::Core::Instrumentation.error_callbacks.delete(@id)

@status.closed!
end
end
end
Expand Down
21 changes: 20 additions & 1 deletion lib/waterdrop/producer/status.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@ def initialize
@current = LIFECYCLE.first
end

# Configures all the needed references. We need this since we publish events about the
# producer lifecycle.
#
# @param monitor []
# @param producer_id [String] id of the producer. We pass it to be consistent with the events
# payload we already have.
def setup(monitor, producer_id)
@monitor = monitor
@producer_id = producer_id
end

# @return [Boolean] true if producer is in a active state. Active means, that we can start
# sending messages. Actives states are connected (connection established) or configured,
# which means, that producer is configured, but connection with Kafka is
Expand All @@ -43,7 +54,15 @@ def #{state}?
# Sets a given state as current
def #{state}!
@current = :#{state}
# Monitor is available only post-configuration. Prior events will not be published
if @monitor
@monitor.instrument("producer.#{state}", producer_id: @producer_id) do
yield if block_given?
@current = :#{state}
end
else
@current = :#{state}
end
end
RUBY
end
Expand Down
2 changes: 1 addition & 1 deletion lib/waterdrop/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
# WaterDrop library
module WaterDrop
# Current WaterDrop version
VERSION = '2.6.13'
VERSION = '2.6.14'
end

0 comments on commit 9115394

Please sign in to comment.