Skip to content

Commit

Permalink
#purge, #close! and #transaction (#395)
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld authored Oct 18, 2023
1 parent e329d99 commit b6d3858
Show file tree
Hide file tree
Showing 19 changed files with 727 additions and 20 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
# WaterDrop changelog

## 2.7.0 (Unreleased)
- **[Feature]** Introduce transactions support.
- [Improvement] Expand `LoggerListener` to inform about transactions (info level).
- [Improvement] Allow waterdrop to use topic as a symbol or a string.
- [Improvement] Enhance both `message.acknowledged` and `error.occurred` (for `librdkafka.dispatch_error`) with full delivery_report.
- [Improvement] Provide `#close!` that will force producer close even with outgoing data after the ma wait timeout.
- [Improvement] Provide `#purge` that will purge any outgoing data and data from the internal queues (both WaterDrop and librdkafka).
- [Fix] Fix the `librdkafka.dispatch_error` error dispatch for errors with negative code.

### Upgrade Notes

There are no breaking changes in this release. However, if you upgrade WaterDrop in Karafka **and** choose to use transactions, Karafka Web UI may not support it. Web UI will support transactional producers starting from `0.7.7`.

## 2.6.7 (2023-09-01)
- [Improvement] early flush data from `librdkafka` internal buffer before closing.
- [Maintenance] Update the signing cert as the old one expired.
Expand Down
10 changes: 5 additions & 5 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
PATH
remote: .
specs:
waterdrop (2.6.7)
karafka-core (>= 2.1.1, < 3.0.0)
waterdrop (2.7.0)
karafka-core (>= 2.2.3, < 3.0.0)
zeitwerk (~> 2.3)

GEM
Expand All @@ -22,10 +22,10 @@ GEM
ffi (1.15.5)
i18n (1.14.1)
concurrent-ruby (~> 1.0)
karafka-core (2.1.1)
karafka-core (2.2.3)
concurrent-ruby (>= 1.1)
karafka-rdkafka (>= 0.13.1, < 0.14.0)
karafka-rdkafka (0.13.2)
karafka-rdkafka (>= 0.13.6, < 0.14.0)
karafka-rdkafka (0.13.6)
ffi (~> 1.15)
mini_portile2 (~> 2.6)
rake (> 12)
Expand Down
7 changes: 6 additions & 1 deletion lib/waterdrop/clients/rdkafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ class << self
# @param producer [WaterDrop::Producer] producer instance with its config, etc
# @note We overwrite this that way, because we do not care
def new(producer)
client = ::Rdkafka::Config.new(producer.config.kafka.to_h).producer
config = producer.config.kafka.to_h

client = ::Rdkafka::Config.new(config).producer

# This callback is not global and is per client, thus we do not have to wrap it with a
# callbacks manager to make it work
Expand All @@ -20,6 +22,9 @@ def new(producer)
producer.config.monitor
)

# Switch to the transactional mode if user provided the transactional id
client.init_transactions if config.key?(:'transactional.id')

client
end
end
Expand Down
5 changes: 5 additions & 0 deletions lib/waterdrop/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ class Config
# option [Numeric] how many seconds should we wait with the backoff on queue having space for
# more messages before re-raising the error.
setting :wait_timeout_on_queue_full, default: 10

setting :wait_backoff_on_transaction_command, default: 0.5

setting :max_attempts_on_transaction_command, default: 5

# option [Boolean] should we send messages. Setting this to false can be really useful when
# testing and or developing because when set to false, won't actually ping Kafka but will
# run all the validations, etc
Expand Down
5 changes: 4 additions & 1 deletion lib/waterdrop/contracts/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ def initialize(max_payload_size:)
@max_payload_size = max_payload_size
end

required(:topic) { |val| val.is_a?(String) && TOPIC_REGEXP.match?(val) }
required(:topic) do |val|
(val.is_a?(String) || val.is_a?(Symbol)) && TOPIC_REGEXP.match?(val.to_s)
end

required(:payload) { |val| val.nil? || val.is_a?(String) }
optional(:key) { |val| val.nil? || (val.is_a?(String) && !val.empty?) }
optional(:partition) { |val| val.is_a?(Integer) && val >= -1 }
Expand Down
3 changes: 3 additions & 0 deletions lib/waterdrop/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ module Errors
# Raised when there is an inline error during single message produce operations
ProduceError = Class.new(BaseError)

# Raise it within a transaction to abort it
AbortTransaction = Class.new(BaseError)

# Raised when during messages producing something bad happened inline
class ProduceManyError < ProduceError
attr_reader :dispatched
Expand Down
10 changes: 6 additions & 4 deletions lib/waterdrop/instrumentation/callbacks/delivery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ def initialize(producer_id, monitor)
# Emits delivery details to the monitor
# @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report
def call(delivery_report)
if delivery_report.error.to_i.positive?
instrument_error(delivery_report)
else
if delivery_report.error.to_i.zero?
instrument_acknowledged(delivery_report)
else
instrument_error(delivery_report)
end
end

Expand All @@ -36,6 +36,7 @@ def instrument_error(delivery_report)
offset: delivery_report.offset,
partition: delivery_report.partition,
topic: delivery_report.topic_name,
delivery_report: delivery_report,
type: 'librdkafka.dispatch_error'
)
end
Expand All @@ -47,7 +48,8 @@ def instrument_acknowledged(delivery_report)
producer_id: @producer_id,
offset: delivery_report.offset,
partition: delivery_report.partition,
topic: delivery_report.topic_name
topic: delivery_report.topic_name,
delivery_report: delivery_report
)
end
end
Expand Down
22 changes: 21 additions & 1 deletion lib/waterdrop/instrumentation/logger_listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,14 @@ def on_buffer_flushed_sync(event)
debug(event, messages)
end

# @param event [Dry::Events::Event] event that happened with the details
def on_buffer_purged(event)
info(event, 'Successfully purging buffer')
end

# @param event [Dry::Events::Event] event that happened with the details
def on_producer_closed(event)
info event, 'Closing producer'
info(event, 'Closing producer')
end

# @param event [Dry::Events::Event] event that happened with the error details
Expand All @@ -125,6 +130,21 @@ def on_error_occurred(event)
error(event, "Error occurred: #{error} - #{type}")
end

# @param event [Dry::Events::Event] event that happened with the details
def on_transaction_started(event)
info(event, 'Starting transaction')
end

# @param event [Dry::Events::Event] event that happened with the details
def on_transaction_aborted(event)
info(event, 'Aborting transaction')
end

# @param event [Dry::Events::Event] event that happened with the details
def on_transaction_committed(event)
info(event, 'Committing transaction')
end

private

# @return [Boolean] should we report the messages details in the debug mode.
Expand Down
5 changes: 5 additions & 0 deletions lib/waterdrop/instrumentation/notifications.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@ class Notifications < ::Karafka::Core::Monitoring::Notifications
messages.produced_sync
messages.buffered

transaction.started
transaction.committed
transaction.aborted

buffer.flushed_async
buffer.flushed_sync
buffer.purged

statistics.emitted

Expand Down
39 changes: 37 additions & 2 deletions lib/waterdrop/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ class Producer
include Sync
include Async
include Buffer
include Transactions
include ::Karafka::Core::Helpers::Time

# Which of the inline flow errors do we want to intercept and re-bind
Expand Down Expand Up @@ -38,6 +39,7 @@ def initialize(&block)
@buffer_mutex = Mutex.new
@connecting_mutex = Mutex.new
@operating_mutex = Mutex.new
@transaction_mutex = Mutex.new

@status = Status.new
@messages = Concurrent::Array.new
Expand Down Expand Up @@ -117,8 +119,25 @@ def client
@client
end

# Purges data from both the buffer queue as well as the librdkafka queue.
#
# @note This is an operation that can cause data loss. Keep that in mind. It will not only
# purge the internal WaterDrop buffer but will also purge the librdkafka queue as well as
# will cancel any outgoing messages dispatches.
def purge
@monitor.instrument('buffer.purged', producer_id: id) do
@buffer_mutex.synchronize do
@messages = Concurrent::Array.new
end

@client.purge
end
end

# Flushes the buffers in a sync way and closes the producer
def close
# @param force [Boolean] should we force closing even with outstanding messages after the
# max wait timeout
def close(force: false)
@operating_mutex.synchronize do
return unless @status.active?

Expand Down Expand Up @@ -156,12 +175,19 @@ def close
# `max_wait_timeout` is in seconds at the moment
@client.flush(@config.max_wait_timeout * 1_000) unless @client.closed?
# We can safely ignore timeouts here because any left outstanding requests
# will anyhow force wait on close
# will anyhow force wait on close if not forced.
# If forced, we will purge the queue and just close
rescue ::Rdkafka::RdkafkaError, Rdkafka::AbstractHandle::WaitTimeoutError
nil
ensure
# Purge fully the local queue in case of a forceful shutdown just to be sure, that
# there are no dangling messages. In case flush was successful, there should be
# none but we do it just in case it timed out
purge if force
end

@client.close

@client = nil
end

Expand All @@ -174,6 +200,11 @@ def close
end
end

# Closes the producer with forced close after timeout, purging any outgoing data
def close!
close(force: true)
end

private

# Ensures that we don't run any operations when the producer is not configured or when it
Expand Down Expand Up @@ -223,6 +254,10 @@ def produce(message)
ensure_active!
end

# In case someone defines topic as a symbol, we need to convert it into a string as
# librdkafka does not accept symbols
message = message.merge(topic: message[:topic].to_s) if message[:topic].is_a?(Symbol)

client.produce(**message)
rescue SUPPORTED_FLOW_ERRORS.first => e
# Unless we want to wait and retry and it's a full queue, we raise normally
Expand Down
Loading

0 comments on commit b6d3858

Please sign in to comment.