Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#purge, #close! and #transaction #395

Merged
merged 4 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
# WaterDrop changelog

## 2.7.0 (Unreleased)
- **[Feature]** Introduce transactions support.
- [Improvement] Expand `LoggerListener` to inform about transactions (info level).
- [Improvement] Allow waterdrop to use topic as a symbol or a string.
- [Improvement] Enhance both `message.acknowledged` and `error.occurred` (for `librdkafka.dispatch_error`) with full delivery_report.
- [Improvement] Provide `#close!` that will force producer close even with outgoing data after the ma wait timeout.
- [Improvement] Provide `#purge` that will purge any outgoing data and data from the internal queues (both WaterDrop and librdkafka).
- [Fix] Fix the `librdkafka.dispatch_error` error dispatch for errors with negative code.

### Upgrade Notes

There are no breaking changes in this release. However, if you upgrade WaterDrop in Karafka **and** choose to use transactions, Karafka Web UI may not support it. Web UI will support transactional producers starting from `0.7.7`.

## 2.6.7 (2023-09-01)
- [Improvement] early flush data from `librdkafka` internal buffer before closing.
- [Maintenance] Update the signing cert as the old one expired.
Expand Down
10 changes: 5 additions & 5 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
PATH
remote: .
specs:
waterdrop (2.6.7)
karafka-core (>= 2.1.1, < 3.0.0)
waterdrop (2.7.0)
karafka-core (>= 2.2.3, < 3.0.0)
zeitwerk (~> 2.3)

GEM
Expand All @@ -22,10 +22,10 @@ GEM
ffi (1.15.5)
i18n (1.14.1)
concurrent-ruby (~> 1.0)
karafka-core (2.1.1)
karafka-core (2.2.3)
concurrent-ruby (>= 1.1)
karafka-rdkafka (>= 0.13.1, < 0.14.0)
karafka-rdkafka (0.13.2)
karafka-rdkafka (>= 0.13.6, < 0.14.0)
karafka-rdkafka (0.13.6)
ffi (~> 1.15)
mini_portile2 (~> 2.6)
rake (> 12)
Expand Down
7 changes: 6 additions & 1 deletion lib/waterdrop/clients/rdkafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ class << self
# @param producer [WaterDrop::Producer] producer instance with its config, etc
# @note We overwrite this that way, because we do not care
def new(producer)
client = ::Rdkafka::Config.new(producer.config.kafka.to_h).producer
config = producer.config.kafka.to_h

client = ::Rdkafka::Config.new(config).producer

# This callback is not global and is per client, thus we do not have to wrap it with a
# callbacks manager to make it work
Expand All @@ -20,6 +22,9 @@ def new(producer)
producer.config.monitor
)

# Switch to the transactional mode if user provided the transactional id
client.init_transactions if config.key?(:'transactional.id')

client
end
end
Expand Down
5 changes: 5 additions & 0 deletions lib/waterdrop/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ class Config
# option [Numeric] how many seconds should we wait with the backoff on queue having space for
# more messages before re-raising the error.
setting :wait_timeout_on_queue_full, default: 10

setting :wait_backoff_on_transaction_command, default: 0.5

setting :max_attempts_on_transaction_command, default: 5

# option [Boolean] should we send messages. Setting this to false can be really useful when
# testing and or developing because when set to false, won't actually ping Kafka but will
# run all the validations, etc
Expand Down
5 changes: 4 additions & 1 deletion lib/waterdrop/contracts/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ def initialize(max_payload_size:)
@max_payload_size = max_payload_size
end

required(:topic) { |val| val.is_a?(String) && TOPIC_REGEXP.match?(val) }
required(:topic) do |val|
(val.is_a?(String) || val.is_a?(Symbol)) && TOPIC_REGEXP.match?(val.to_s)
end

required(:payload) { |val| val.nil? || val.is_a?(String) }
optional(:key) { |val| val.nil? || (val.is_a?(String) && !val.empty?) }
optional(:partition) { |val| val.is_a?(Integer) && val >= -1 }
Expand Down
3 changes: 3 additions & 0 deletions lib/waterdrop/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ module Errors
# Raised when there is an inline error during single message produce operations
ProduceError = Class.new(BaseError)

# Raise it within a transaction to abort it
AbortTransaction = Class.new(BaseError)

# Raised when during messages producing something bad happened inline
class ProduceManyError < ProduceError
attr_reader :dispatched
Expand Down
10 changes: 6 additions & 4 deletions lib/waterdrop/instrumentation/callbacks/delivery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ def initialize(producer_id, monitor)
# Emits delivery details to the monitor
# @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report
def call(delivery_report)
if delivery_report.error.to_i.positive?
instrument_error(delivery_report)
else
if delivery_report.error.to_i.zero?
instrument_acknowledged(delivery_report)
else
instrument_error(delivery_report)
end
end

Expand All @@ -36,6 +36,7 @@ def instrument_error(delivery_report)
offset: delivery_report.offset,
partition: delivery_report.partition,
topic: delivery_report.topic_name,
delivery_report: delivery_report,
type: 'librdkafka.dispatch_error'
)
end
Expand All @@ -47,7 +48,8 @@ def instrument_acknowledged(delivery_report)
producer_id: @producer_id,
offset: delivery_report.offset,
partition: delivery_report.partition,
topic: delivery_report.topic_name
topic: delivery_report.topic_name,
delivery_report: delivery_report
)
end
end
Expand Down
22 changes: 21 additions & 1 deletion lib/waterdrop/instrumentation/logger_listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,14 @@ def on_buffer_flushed_sync(event)
debug(event, messages)
end

# @param event [Dry::Events::Event] event that happened with the details
def on_buffer_purged(event)
info(event, 'Successfully purging buffer')
end

# @param event [Dry::Events::Event] event that happened with the details
def on_producer_closed(event)
info event, 'Closing producer'
info(event, 'Closing producer')
end

# @param event [Dry::Events::Event] event that happened with the error details
Expand All @@ -125,6 +130,21 @@ def on_error_occurred(event)
error(event, "Error occurred: #{error} - #{type}")
end

# @param event [Dry::Events::Event] event that happened with the details
def on_transaction_started(event)
info(event, 'Starting transaction')
end

# @param event [Dry::Events::Event] event that happened with the details
def on_transaction_aborted(event)
info(event, 'Aborting transaction')
end

# @param event [Dry::Events::Event] event that happened with the details
def on_transaction_committed(event)
info(event, 'Committing transaction')
end

private

# @return [Boolean] should we report the messages details in the debug mode.
Expand Down
5 changes: 5 additions & 0 deletions lib/waterdrop/instrumentation/notifications.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@ class Notifications < ::Karafka::Core::Monitoring::Notifications
messages.produced_sync
messages.buffered

transaction.started
transaction.committed
transaction.aborted

buffer.flushed_async
buffer.flushed_sync
buffer.purged

statistics.emitted

Expand Down
39 changes: 37 additions & 2 deletions lib/waterdrop/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ class Producer
include Sync
include Async
include Buffer
include Transactions
include ::Karafka::Core::Helpers::Time

# Which of the inline flow errors do we want to intercept and re-bind
Expand Down Expand Up @@ -38,6 +39,7 @@ def initialize(&block)
@buffer_mutex = Mutex.new
@connecting_mutex = Mutex.new
@operating_mutex = Mutex.new
@transaction_mutex = Mutex.new

@status = Status.new
@messages = Concurrent::Array.new
Expand Down Expand Up @@ -117,8 +119,25 @@ def client
@client
end

# Purges data from both the buffer queue as well as the librdkafka queue.
#
# @note This is an operation that can cause data loss. Keep that in mind. It will not only
# purge the internal WaterDrop buffer but will also purge the librdkafka queue as well as
# will cancel any outgoing messages dispatches.
def purge
@monitor.instrument('buffer.purged', producer_id: id) do
@buffer_mutex.synchronize do
@messages = Concurrent::Array.new
end

@client.purge
end
end

# Flushes the buffers in a sync way and closes the producer
def close
# @param force [Boolean] should we force closing even with outstanding messages after the
# max wait timeout
def close(force: false)
@operating_mutex.synchronize do
return unless @status.active?

Expand Down Expand Up @@ -156,12 +175,19 @@ def close
# `max_wait_timeout` is in seconds at the moment
@client.flush(@config.max_wait_timeout * 1_000) unless @client.closed?
# We can safely ignore timeouts here because any left outstanding requests
# will anyhow force wait on close
# will anyhow force wait on close if not forced.
# If forced, we will purge the queue and just close
rescue ::Rdkafka::RdkafkaError, Rdkafka::AbstractHandle::WaitTimeoutError
nil
ensure
# Purge fully the local queue in case of a forceful shutdown just to be sure, that
# there are no dangling messages. In case flush was successful, there should be
# none but we do it just in case it timed out
purge if force
end

@client.close

@client = nil
end

Expand All @@ -174,6 +200,11 @@ def close
end
end

# Closes the producer with forced close after timeout, purging any outgoing data
def close!
close(force: true)
end

private

# Ensures that we don't run any operations when the producer is not configured or when it
Expand Down Expand Up @@ -223,6 +254,10 @@ def produce(message)
ensure_active!
end

# In case someone defines topic as a symbol, we need to convert it into a string as
# librdkafka does not accept symbols
message = message.merge(topic: message[:topic].to_s) if message[:topic].is_a?(Symbol)

client.produce(**message)
rescue SUPPORTED_FLOW_ERRORS.first => e
# Unless we want to wait and retry and it's a full queue, we raise normally
Expand Down
Loading