From b6d3858a6478f4747ebde4f5e3a30b3439abd6eb Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Wed, 18 Oct 2023 15:15:44 +0200 Subject: [PATCH] `#purge`, `#close!` and `#transaction` (#395) --- CHANGELOG.md | 13 + Gemfile.lock | 10 +- lib/waterdrop/clients/rdkafka.rb | 7 +- lib/waterdrop/config.rb | 5 + lib/waterdrop/contracts/message.rb | 5 +- lib/waterdrop/errors.rb | 3 + .../instrumentation/callbacks/delivery.rb | 10 +- .../instrumentation/logger_listener.rb | 22 +- .../instrumentation/notifications.rb | 5 + lib/waterdrop/producer.rb | 39 +- lib/waterdrop/producer/transactions.rb | 150 ++++++++ lib/waterdrop/version.rb | 2 +- spec/lib/waterdrop/contracts/message_spec.rb | 8 +- .../instrumentation/logger_listener_spec.rb | 33 ++ spec/lib/waterdrop/producer/sync_spec.rb | 10 + .../waterdrop/producer/transactions_spec.rb | 344 ++++++++++++++++++ spec/lib/waterdrop/producer_spec.rb | 57 +++ spec/support/factories/producer.rb | 18 +- waterdrop.gemspec | 6 +- 19 files changed, 727 insertions(+), 20 deletions(-) create mode 100644 lib/waterdrop/producer/transactions.rb create mode 100644 spec/lib/waterdrop/producer/transactions_spec.rb diff --git a/CHANGELOG.md b/CHANGELOG.md index 20171366..9ca5b3b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,18 @@ # WaterDrop changelog +## 2.7.0 (Unreleased) +- **[Feature]** Introduce transactions support. +- [Improvement] Expand `LoggerListener` to inform about transactions (info level). +- [Improvement] Allow waterdrop to use topic as a symbol or a string. +- [Improvement] Enhance both `message.acknowledged` and `error.occurred` (for `librdkafka.dispatch_error`) with full delivery_report. +- [Improvement] Provide `#close!` that will force producer close even with outgoing data after the ma wait timeout. +- [Improvement] Provide `#purge` that will purge any outgoing data and data from the internal queues (both WaterDrop and librdkafka). +- [Fix] Fix the `librdkafka.dispatch_error` error dispatch for errors with negative code. + +### Upgrade Notes + +There are no breaking changes in this release. However, if you upgrade WaterDrop in Karafka **and** choose to use transactions, Karafka Web UI may not support it. Web UI will support transactional producers starting from `0.7.7`. + ## 2.6.7 (2023-09-01) - [Improvement] early flush data from `librdkafka` internal buffer before closing. - [Maintenance] Update the signing cert as the old one expired. diff --git a/Gemfile.lock b/Gemfile.lock index eb34b6c1..cddd93c7 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,8 +1,8 @@ PATH remote: . specs: - waterdrop (2.6.7) - karafka-core (>= 2.1.1, < 3.0.0) + waterdrop (2.7.0) + karafka-core (>= 2.2.3, < 3.0.0) zeitwerk (~> 2.3) GEM @@ -22,10 +22,10 @@ GEM ffi (1.15.5) i18n (1.14.1) concurrent-ruby (~> 1.0) - karafka-core (2.1.1) + karafka-core (2.2.3) concurrent-ruby (>= 1.1) - karafka-rdkafka (>= 0.13.1, < 0.14.0) - karafka-rdkafka (0.13.2) + karafka-rdkafka (>= 0.13.6, < 0.14.0) + karafka-rdkafka (0.13.6) ffi (~> 1.15) mini_portile2 (~> 2.6) rake (> 12) diff --git a/lib/waterdrop/clients/rdkafka.rb b/lib/waterdrop/clients/rdkafka.rb index 6004914c..8a884db3 100644 --- a/lib/waterdrop/clients/rdkafka.rb +++ b/lib/waterdrop/clients/rdkafka.rb @@ -11,7 +11,9 @@ class << self # @param producer [WaterDrop::Producer] producer instance with its config, etc # @note We overwrite this that way, because we do not care def new(producer) - client = ::Rdkafka::Config.new(producer.config.kafka.to_h).producer + config = producer.config.kafka.to_h + + client = ::Rdkafka::Config.new(config).producer # This callback is not global and is per client, thus we do not have to wrap it with a # callbacks manager to make it work @@ -20,6 +22,9 @@ def new(producer) producer.config.monitor ) + # Switch to the transactional mode if user provided the transactional id + client.init_transactions if config.key?(:'transactional.id') + client end end diff --git a/lib/waterdrop/config.rb b/lib/waterdrop/config.rb index 64b2f5c0..1d360b62 100644 --- a/lib/waterdrop/config.rb +++ b/lib/waterdrop/config.rb @@ -64,6 +64,11 @@ class Config # option [Numeric] how many seconds should we wait with the backoff on queue having space for # more messages before re-raising the error. setting :wait_timeout_on_queue_full, default: 10 + + setting :wait_backoff_on_transaction_command, default: 0.5 + + setting :max_attempts_on_transaction_command, default: 5 + # option [Boolean] should we send messages. Setting this to false can be really useful when # testing and or developing because when set to false, won't actually ping Kafka but will # run all the validations, etc diff --git a/lib/waterdrop/contracts/message.rb b/lib/waterdrop/contracts/message.rb index 280c4b6e..d9624b82 100644 --- a/lib/waterdrop/contracts/message.rb +++ b/lib/waterdrop/contracts/message.rb @@ -26,7 +26,10 @@ def initialize(max_payload_size:) @max_payload_size = max_payload_size end - required(:topic) { |val| val.is_a?(String) && TOPIC_REGEXP.match?(val) } + required(:topic) do |val| + (val.is_a?(String) || val.is_a?(Symbol)) && TOPIC_REGEXP.match?(val.to_s) + end + required(:payload) { |val| val.nil? || val.is_a?(String) } optional(:key) { |val| val.nil? || (val.is_a?(String) && !val.empty?) } optional(:partition) { |val| val.is_a?(Integer) && val >= -1 } diff --git a/lib/waterdrop/errors.rb b/lib/waterdrop/errors.rb index fbd90860..3d2ac4ea 100644 --- a/lib/waterdrop/errors.rb +++ b/lib/waterdrop/errors.rb @@ -32,6 +32,9 @@ module Errors # Raised when there is an inline error during single message produce operations ProduceError = Class.new(BaseError) + # Raise it within a transaction to abort it + AbortTransaction = Class.new(BaseError) + # Raised when during messages producing something bad happened inline class ProduceManyError < ProduceError attr_reader :dispatched diff --git a/lib/waterdrop/instrumentation/callbacks/delivery.rb b/lib/waterdrop/instrumentation/callbacks/delivery.rb index 31efda76..a95772e8 100644 --- a/lib/waterdrop/instrumentation/callbacks/delivery.rb +++ b/lib/waterdrop/instrumentation/callbacks/delivery.rb @@ -17,10 +17,10 @@ def initialize(producer_id, monitor) # Emits delivery details to the monitor # @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report def call(delivery_report) - if delivery_report.error.to_i.positive? - instrument_error(delivery_report) - else + if delivery_report.error.to_i.zero? instrument_acknowledged(delivery_report) + else + instrument_error(delivery_report) end end @@ -36,6 +36,7 @@ def instrument_error(delivery_report) offset: delivery_report.offset, partition: delivery_report.partition, topic: delivery_report.topic_name, + delivery_report: delivery_report, type: 'librdkafka.dispatch_error' ) end @@ -47,7 +48,8 @@ def instrument_acknowledged(delivery_report) producer_id: @producer_id, offset: delivery_report.offset, partition: delivery_report.partition, - topic: delivery_report.topic_name + topic: delivery_report.topic_name, + delivery_report: delivery_report ) end end diff --git a/lib/waterdrop/instrumentation/logger_listener.rb b/lib/waterdrop/instrumentation/logger_listener.rb index b29ce6f1..08ccf4de 100644 --- a/lib/waterdrop/instrumentation/logger_listener.rb +++ b/lib/waterdrop/instrumentation/logger_listener.rb @@ -112,9 +112,14 @@ def on_buffer_flushed_sync(event) debug(event, messages) end + # @param event [Dry::Events::Event] event that happened with the details + def on_buffer_purged(event) + info(event, 'Successfully purging buffer') + end + # @param event [Dry::Events::Event] event that happened with the details def on_producer_closed(event) - info event, 'Closing producer' + info(event, 'Closing producer') end # @param event [Dry::Events::Event] event that happened with the error details @@ -125,6 +130,21 @@ def on_error_occurred(event) error(event, "Error occurred: #{error} - #{type}") end + # @param event [Dry::Events::Event] event that happened with the details + def on_transaction_started(event) + info(event, 'Starting transaction') + end + + # @param event [Dry::Events::Event] event that happened with the details + def on_transaction_aborted(event) + info(event, 'Aborting transaction') + end + + # @param event [Dry::Events::Event] event that happened with the details + def on_transaction_committed(event) + info(event, 'Committing transaction') + end + private # @return [Boolean] should we report the messages details in the debug mode. diff --git a/lib/waterdrop/instrumentation/notifications.rb b/lib/waterdrop/instrumentation/notifications.rb index d58c9166..fcb14215 100644 --- a/lib/waterdrop/instrumentation/notifications.rb +++ b/lib/waterdrop/instrumentation/notifications.rb @@ -18,8 +18,13 @@ class Notifications < ::Karafka::Core::Monitoring::Notifications messages.produced_sync messages.buffered + transaction.started + transaction.committed + transaction.aborted + buffer.flushed_async buffer.flushed_sync + buffer.purged statistics.emitted diff --git a/lib/waterdrop/producer.rb b/lib/waterdrop/producer.rb index 632ce59e..4bcbc168 100644 --- a/lib/waterdrop/producer.rb +++ b/lib/waterdrop/producer.rb @@ -7,6 +7,7 @@ class Producer include Sync include Async include Buffer + include Transactions include ::Karafka::Core::Helpers::Time # Which of the inline flow errors do we want to intercept and re-bind @@ -38,6 +39,7 @@ def initialize(&block) @buffer_mutex = Mutex.new @connecting_mutex = Mutex.new @operating_mutex = Mutex.new + @transaction_mutex = Mutex.new @status = Status.new @messages = Concurrent::Array.new @@ -117,8 +119,25 @@ def client @client end + # Purges data from both the buffer queue as well as the librdkafka queue. + # + # @note This is an operation that can cause data loss. Keep that in mind. It will not only + # purge the internal WaterDrop buffer but will also purge the librdkafka queue as well as + # will cancel any outgoing messages dispatches. + def purge + @monitor.instrument('buffer.purged', producer_id: id) do + @buffer_mutex.synchronize do + @messages = Concurrent::Array.new + end + + @client.purge + end + end + # Flushes the buffers in a sync way and closes the producer - def close + # @param force [Boolean] should we force closing even with outstanding messages after the + # max wait timeout + def close(force: false) @operating_mutex.synchronize do return unless @status.active? @@ -156,12 +175,19 @@ def close # `max_wait_timeout` is in seconds at the moment @client.flush(@config.max_wait_timeout * 1_000) unless @client.closed? # We can safely ignore timeouts here because any left outstanding requests - # will anyhow force wait on close + # will anyhow force wait on close if not forced. + # If forced, we will purge the queue and just close rescue ::Rdkafka::RdkafkaError, Rdkafka::AbstractHandle::WaitTimeoutError nil + ensure + # Purge fully the local queue in case of a forceful shutdown just to be sure, that + # there are no dangling messages. In case flush was successful, there should be + # none but we do it just in case it timed out + purge if force end @client.close + @client = nil end @@ -174,6 +200,11 @@ def close end end + # Closes the producer with forced close after timeout, purging any outgoing data + def close! + close(force: true) + end + private # Ensures that we don't run any operations when the producer is not configured or when it @@ -223,6 +254,10 @@ def produce(message) ensure_active! end + # In case someone defines topic as a symbol, we need to convert it into a string as + # librdkafka does not accept symbols + message = message.merge(topic: message[:topic].to_s) if message[:topic].is_a?(Symbol) + client.produce(**message) rescue SUPPORTED_FLOW_ERRORS.first => e # Unless we want to wait and retry and it's a full queue, we raise normally diff --git a/lib/waterdrop/producer/transactions.rb b/lib/waterdrop/producer/transactions.rb new file mode 100644 index 00000000..94c85ba4 --- /dev/null +++ b/lib/waterdrop/producer/transactions.rb @@ -0,0 +1,150 @@ +# frozen_string_literal: true + +module WaterDrop + class Producer + # Transactions related producer functionalities + module Transactions + # Creates a transaction. + # + # Karafka transactions work in a similar manner to SQL db transactions though there are some + # crucial differences. When you start a transaction, all messages produced during it will + # be delivered together or will fail together. The difference is, that messages from within + # a single transaction can be delivered and will have a delivery handle but will be then + # compacted prior to moving the LSO forward. This means, that not every delivery handle for + # async dispatches will emit a queue purge error. None for sync as the delivery has happened + # but they will never be visible by the transactional consumers. + # + # Transactions **are** thread-safe however they lock a mutex. This means, that for + # high-throughput transactional messages production in multiple threads + # (for example in Karafka), it may be much better to use few instances that can work in + # parallel. + # + # Please note, that if a producer is configured as transactional, it **cannot** produce + # messages outside of transactions + # + # @return Block result + # + # @example Simple transaction + # producer.transaction do + # producer.produce_async(topic: 'topic', payload: 'data') + # end + # + # @example Aborted transaction - messages producer won't be visible by consumers + # producer.transaction do + # producer.produce_sync(topic: 'topic', payload: 'data') + # throw(:abort) + # end + # + # @example Use block result last handler to wait on all messages ack + # handler = producer.transaction do + # producer.produce_async(topic: 'topic', payload: 'data') + # end + # + # handler.wait + def transaction + @transaction_mutex.synchronize do + transactional_instrument(:committed) do + with_transactional_error_handling(:begin) do + transactional_instrument(:started) { client.begin_transaction } + end + + result = nil + commit = false + + catch(:abort) do + result = yield + commit = true + end + + commit || raise(WaterDrop::Errors::AbortTransaction) + + with_transactional_error_handling(:commit) do + client.commit_transaction + end + + result + rescue StandardError => e + with_transactional_error_handling(:abort) do + transactional_instrument(:aborted) { client.abort_transaction } + end + + raise unless e.is_a?(WaterDrop::Errors::AbortTransaction) + end + end + end + + # @return [Boolean] Is this producer a transactional one + def transactional? + return @transactional if instance_variable_defined?(:'@transactional') + + @transactional = config.kafka.to_h.key?(:'transactional.id') + end + + private + + # Instruments the transactional operation with producer id + # + # @param key [Symbol] transaction operation key + # @param block [Proc] block to run inside the instrumentation or nothing if not given + def transactional_instrument(key, &block) + @monitor.instrument("transaction.#{key}", producer_id: id, &block) + end + + # Error handling for transactional operations is a bit special. There are three types of + # errors coming from librdkafka: + # - retryable - indicates that a given operation (like offset commit) can be retried after + # a backoff and that is should be operating later as expected. We try to retry those + # few times before finally failing. + # - fatal - errors that will not recover no matter what (for example being fenced out) + # - abortable - error from which we cannot recover but for which we should abort the + # current transaction. + # + # The code below handles this logic also publishing the appropriate notifications via our + # notifications pipeline. + # + # @param action [Symbol] action type + # @param allow_abortable [Boolean] should we allow for the abortable flow. This is set to + # false internally to prevent attempts to abort from failed abort operations + def with_transactional_error_handling(action, allow_abortable: true) + attempt ||= 0 + attempt += 1 + + yield + rescue ::Rdkafka::RdkafkaError => e + # Decide if there is a chance to retry given error + do_retry = e.retryable? && attempt < config.max_attempts_on_transaction_command + + @monitor.instrument( + 'error.occurred', + producer_id: id, + caller: self, + error: e, + type: "transaction.#{action}", + retry: do_retry, + attempt: attempt + ) + + raise if e.fatal? + + if do_retry + # Backoff more and more before retries + sleep(config.wait_backoff_on_transaction_command * attempt) + + retry + end + + if e.abortable? && allow_abortable + # Always attempt to abort but if aborting fails with an abortable error, do not attempt + # to abort from abort as this could create an infinite loop + with_transactional_error_handling(:abort, allow_abortable: false) do + transactional_instrument(:aborted) { @client.abort_transaction } + end + + raise + end + + raise + end + end + end +end diff --git a/lib/waterdrop/version.rb b/lib/waterdrop/version.rb index 2b0e4338..7803802a 100644 --- a/lib/waterdrop/version.rb +++ b/lib/waterdrop/version.rb @@ -3,5 +3,5 @@ # WaterDrop library module WaterDrop # Current WaterDrop version - VERSION = '2.6.7' + VERSION = '2.7.0' end diff --git a/spec/lib/waterdrop/contracts/message_spec.rb b/spec/lib/waterdrop/contracts/message_spec.rb index c9fca23d..26fe7cf9 100644 --- a/spec/lib/waterdrop/contracts/message_spec.rb +++ b/spec/lib/waterdrop/contracts/message_spec.rb @@ -41,9 +41,15 @@ it { expect(errors[:topic]).not_to be_empty } end - context 'when topic is a symbol' do + context 'when topic is a valid symbol' do before { message[:topic] = :symbol } + it { expect(contract_result).to be_success } + end + + context 'when topic is a symbol that will not be a topic' do + before { message[:topic] = '$%^&*()'.to_sym } + it { expect(contract_result).not_to be_success } it { expect(errors[:topic]).not_to be_empty } end diff --git a/spec/lib/waterdrop/instrumentation/logger_listener_spec.rb b/spec/lib/waterdrop/instrumentation/logger_listener_spec.rb index ba49ae48..db04055c 100644 --- a/spec/lib/waterdrop/instrumentation/logger_listener_spec.rb +++ b/spec/lib/waterdrop/instrumentation/logger_listener_spec.rb @@ -192,6 +192,15 @@ end end + describe '#on_buffer_purged' do + before { listener.on_buffer_purged(event) } + + it { expect(logged_data[0]).to include(producer.id) } + it { expect(logged_data[0]).to include('INFO') } + it { expect(logged_data[0]).to include('Successfully purging buffer') } + it { expect(logged_data[1]).to eq(nil) } + end + describe '#on_producer_closed' do before { listener.on_producer_closed(event) } @@ -210,4 +219,28 @@ it { expect(logged_data[0]).to include('ERROR') } it { expect(logged_data[0]).to include('Error occurred') } end + + describe '#on_transaction_started' do + before { listener.on_transaction_started(event) } + + it { expect(logged_data[0]).to include(producer.id) } + it { expect(logged_data[0]).to include('INFO') } + it { expect(logged_data[0]).to include('Starting transaction') } + end + + describe '#on_transaction_aborted' do + before { listener.on_transaction_aborted(event) } + + it { expect(logged_data[0]).to include(producer.id) } + it { expect(logged_data[0]).to include('INFO') } + it { expect(logged_data[0]).to include('Aborting transaction') } + end + + describe '#on_transaction_committed' do + before { listener.on_transaction_committed(event) } + + it { expect(logged_data[0]).to include(producer.id) } + it { expect(logged_data[0]).to include('INFO') } + it { expect(logged_data[0]).to include('Committing transaction') } + end end diff --git a/spec/lib/waterdrop/producer/sync_spec.rb b/spec/lib/waterdrop/producer/sync_spec.rb index f3e663d9..40d9c63e 100644 --- a/spec/lib/waterdrop/producer/sync_spec.rb +++ b/spec/lib/waterdrop/producer/sync_spec.rb @@ -20,6 +20,16 @@ it { expect(delivery).to be_a(Rdkafka::Producer::DeliveryReport) } end + context 'when producing with topic as a symbol' do + let(:message) do + msg = build(:valid_message) + msg[:topic] = msg[:topic].to_sym + msg + end + + it { expect(delivery).to be_a(Rdkafka::Producer::DeliveryReport) } + end + context 'when inline error occurs in librdkafka' do let(:errors) { [] } let(:error) { errors.first } diff --git a/spec/lib/waterdrop/producer/transactions_spec.rb b/spec/lib/waterdrop/producer/transactions_spec.rb new file mode 100644 index 00000000..319f2cbf --- /dev/null +++ b/spec/lib/waterdrop/producer/transactions_spec.rb @@ -0,0 +1,344 @@ +# frozen_string_literal: true + +RSpec.describe_current do + subject(:producer) { build(:transactional_producer) } + + let(:transactional_id) { SecureRandom.uuid } + + after { producer.close } + + it do + # First run will check if cached + producer.transactional? + expect(producer.transactional?).to eq(true) + end + + context 'when we try to create producer with invalid transactional settings' do + it 'expect to raise an error' do + expect do + build(:transactional_producer, transaction_timeout_ms: 100).client + end.to raise_error(Rdkafka::Config::ConfigError, /transaction\.timeout\.ms/) + end + end + + context 'when we try to start transaction without transactional.id' do + subject(:producer) { build(:producer) } + + it 'expect to raise with info that this functionality is not configured' do + expect { producer.transaction {} } + .to raise_error(::Rdkafka::RdkafkaError, /Local: Functionality not configured/) + end + + it { expect(producer.transactional?).to eq(false) } + end + + context 'when we make a transaction without sending any messages' do + it 'expect not to crash and do nothing' do + expect { producer.transaction {} }.not_to raise_error + end + end + + context 'when we dispatch in transaction to multiple topics' do + it 'expect to work' do + handlers = [] + + producer.transaction do + handlers << producer.produce_async(topic: 'example_topic1', payload: '1') + handlers << producer.produce_async(topic: 'example_topic2', payload: '2') + end + + expect { handlers.map!(&:wait) }.not_to raise_error + end + + it 'expect to return block result as the transaction result' do + result = rand + + transaction_result = producer.transaction do + producer.produce_async(topic: 'example_topic', payload: '2') + result + end + + expect(transaction_result).to eq(result) + end + end + + context 'when trying to use transaction on a non-existing topics and short time' do + subject(:producer) { build(:transactional_producer, transaction_timeout_ms: 1_000) } + + it 'expect to crash with an inconsistent state after abort' do + error = nil + + begin + producer.transaction do + 10.times do |i| + producer.produce_async(topic: SecureRandom.uuid, payload: i.to_s) + end + end + rescue Rdkafka::RdkafkaError => e + error = e + end + + expect(error).to be_a(Rdkafka::RdkafkaError) + expect(error.code).to eq(:state) + expect(error.cause).to be_a(Rdkafka::RdkafkaError) + expect(error.cause.code).to eq(:inconsistent) + end + end + + context 'when trying to use transaction on a non-existing topics and enough time' do + subject(:producer) { build(:transactional_producer) } + + it 'expect not to crash and publish all data' do + expect do + producer.transaction do + 10.times do |i| + producer.produce_async(topic: SecureRandom.uuid, payload: i.to_s) + end + end + end.not_to raise_error + end + end + + context 'when we start transaction and raise an error' do + it 'expect to re-raise this error' do + expect do + producer.transaction do + producer.produce_async(topic: 'example_topic', payload: 'na') + + raise StandardError + end + end.to raise_error(StandardError) + end + + it 'expect to cancel the dispatch of the message' do + handler = nil + + begin + producer.transaction do + handler = producer.produce_async(topic: 'example_topic', payload: 'na') + + raise StandardError + end + rescue StandardError + nil + end + + expect { handler.wait }.to raise_error(Rdkafka::RdkafkaError, /Purged in queue/) + end + + context 'when we have error instrumentation' do + let(:errors) { [] } + + before do + producer.monitor.subscribe('error.occurred') do |event| + errors << event[:error] + end + + begin + producer.transaction do + producer.produce_async(topic: 'example_topic', payload: 'na') + + raise StandardError + end + rescue StandardError + nil + end + end + + it 'expect to emit the cancellation error via the error pipeline' do + expect(errors.first).to be_a(Rdkafka::RdkafkaError) + expect(errors.first.code).to eq(:purge_queue) + end + end + + context 'when using sync producer' do + it 'expect to wait on the initial delivery per message and have it internally' do + result = nil + + begin + producer.transaction do + result = producer.produce_sync(topic: 'example_topic', payload: 'na') + + expect(result.partition).to eq(0) + expect(result.error).to eq(nil) + + raise StandardError + end + rescue StandardError + nil + end + + # It will be compacted but is still visible as a delivery report + expect(result.partition).to eq(0) + expect(result.error).to eq(nil) + end + end + + context 'when using async producer and waiting' do + it 'expect to wait on the initial delivery per message' do + handler = nil + + begin + producer.transaction do + handler = producer.produce_async(topic: 'example_topic', payload: 'na') + + raise StandardError + end + rescue StandardError + nil + end + + result = handler.create_result + + # It will be compacted but is still visible as a delivery report + expect(result.partition).to eq(-1) + expect(result.error).to be_a(Rdkafka::RdkafkaError) + end + end + end + + context 'when we start transaction and abort' do + it 'expect not to re-raise' do + expect do + producer.transaction do + producer.produce_async(topic: 'example_topic', payload: 'na') + + throw(:abort) + end + end.not_to raise_error + end + + it 'expect to cancel the dispatch of the message' do + handler = nil + + producer.transaction do + handler = producer.produce_async(topic: 'example_topic', payload: 'na') + + raise WaterDrop::Errors::AbortTransaction + end + + expect { handler.wait }.to raise_error(Rdkafka::RdkafkaError, /Purged in queue/) + end + + context 'when we have error instrumentation' do + let(:errors) { [] } + + before do + producer.monitor.subscribe('error.occurred') do |event| + errors << event[:error] + end + + producer.transaction do + producer.produce_async(topic: 'example_topic', payload: 'na') + + throw(:abort) + end + end + + it 'expect to emit the cancellation error via the error pipeline' do + expect(errors.first).to be_a(Rdkafka::RdkafkaError) + expect(errors.first.code).to eq(:purge_queue) + end + end + + context 'when using sync producer' do + it 'expect to wait on the initial delivery per message and have it internally' do + result = nil + + producer.transaction do + result = producer.produce_sync(topic: 'example_topic', payload: 'na') + + expect(result.partition).to eq(0) + expect(result.error).to eq(nil) + + throw(:abort) + end + + # It will be compacted but is still visible as a delivery report + expect(result.partition).to eq(0) + expect(result.error).to eq(nil) + end + end + + context 'when using async producer and waiting' do + it 'expect to wait on the initial delivery per message' do + handler = nil + + producer.transaction do + handler = producer.produce_async(topic: 'example_topic', payload: 'na') + + throw(:abort) + end + + result = handler.create_result + + # It will be compacted but is still visible as a delivery report + expect(result.partition).to eq(-1) + expect(result.error).to be_a(Rdkafka::RdkafkaError) + end + end + end + + context 'when we try to create a producer with already used transactional_id' do + let(:producer1) { build(:transactional_producer, transactional_id: transactional_id) } + let(:producer2) { build(:transactional_producer, transactional_id: transactional_id) } + + after do + producer1.close + producer2.close + end + + it 'expect to fence out the previous one' do + producer1.transaction {} + producer2.transaction {} + + expect do + producer1.transaction do + producer1.produce_async(topic: 'example_topic', payload: '1') + end + end.to raise_error(Rdkafka::RdkafkaError, /fenced by a newer instance/) + end + + it 'expect not to fence out the new one' do + producer1.transaction {} + producer2.transaction {} + + expect do + producer2.transaction do + producer2.produce_async(topic: 'example_topic', payload: '1') + end + end.not_to raise_error + end + end + + context 'when trying to close a producer from inside of a transaction' do + it 'expect to raise an error' do + expect do + producer.transaction do + producer.close + end + end.to raise_error(Rdkafka::RdkafkaError, /Erroneous state/) + end + end + + context 'when transaction crashes internally on one of the retryable operations' do + before do + counter = 0 + ref = producer.client.method(:begin_transaction) + + allow(producer.client).to receive(:begin_transaction) do + if counter.zero? + counter += 1 + + raise(Rdkafka::RdkafkaError.new(-152, retryable: true)) + end + + ref.call + end + end + + it 'expect to retry and continue' do + expect { producer.transaction {} }.not_to raise_error + end + end +end diff --git a/spec/lib/waterdrop/producer_spec.rb b/spec/lib/waterdrop/producer_spec.rb index 546ff415..28b17069 100644 --- a/spec/lib/waterdrop/producer_spec.rb +++ b/spec/lib/waterdrop/producer_spec.rb @@ -181,6 +181,63 @@ end end + describe '#close!' do + context 'when producer cannot connect and dispatch messages' do + subject(:producer) do + build( + :producer, + kafka: { 'bootstrap.servers': 'localhost:9093' }, + max_wait_timeout: 1 + ) + end + + it 'expect not to hang forever' do + producer.produce_async(topic: 'na', payload: 'data') + producer.close! + end + end + end + + describe '#purge' do + context 'when there are some outgoing messages and we purge' do + subject(:producer) do + build( + :producer, + kafka: { 'bootstrap.servers': 'localhost:9093' }, + max_wait_timeout: 1 + ) + end + + after { producer.close! } + + it 'expect their deliveries to materialize with errors' do + handler = producer.produce_async(topic: 'na', payload: 'data') + producer.purge + expect { handler.wait }.to raise_error(Rdkafka::RdkafkaError, /Purged in queue/) + end + + context 'when monitoring errors via instrumentation' do + let(:detected) { [] } + + before do + producer.monitor.subscribe('error.occurred') do |event| + next unless event[:type] == 'librdkafka.dispatch_error' + + detected << event[:error].code + end + end + + it 'expect the error notifications to publish those errors' do + handler = producer.produce_async(topic: 'na', payload: 'data') + producer.purge + + handler.wait(raise_response_error: false) + expect(detected.first).to eq(:purge_queue) + end + end + end + end + describe '#ensure_usable!' do subject(:producer) { build(:producer) } diff --git a/spec/support/factories/producer.rb b/spec/support/factories/producer.rb index 5cd20e51..acabb28d 100644 --- a/spec/support/factories/producer.rb +++ b/spec/support/factories/producer.rb @@ -15,7 +15,7 @@ 'bootstrap.servers': 'localhost:9092', # We emit statistics as it is a great way to check they actually always work 'statistics.interval.ms': 100, - 'request.required.acks': 1 + 'request.required.acks': 'all' } end @@ -35,6 +35,22 @@ end end + factory :transactional_producer, parent: :producer do + transient do + transactional_id { SecureRandom.uuid } + transaction_timeout_ms { 30_000 } + end + + kafka do + { + 'bootstrap.servers': 'localhost:9092', + 'request.required.acks': 'all', + 'transactional.id': transactional_id, + 'transaction.timeout.ms': transaction_timeout_ms + } + end + end + factory :limited_producer, parent: :producer do wait_timeout_on_queue_full { 15 } diff --git a/waterdrop.gemspec b/waterdrop.gemspec index 87de55f5..d81e934f 100644 --- a/waterdrop.gemspec +++ b/waterdrop.gemspec @@ -16,7 +16,7 @@ Gem::Specification.new do |spec| spec.description = spec.summary spec.license = 'MIT' - spec.add_dependency 'karafka-core', '>= 2.1.1', '< 3.0.0' + spec.add_dependency 'karafka-core', '>= 2.2.3', '< 3.0.0' spec.add_dependency 'zeitwerk', '~> 2.3' if $PROGRAM_NAME.end_with?('gem') @@ -31,10 +31,10 @@ Gem::Specification.new do |spec| spec.metadata = { 'funding_uri' => 'https://karafka.io/#become-pro', 'homepage_uri' => 'https://karafka.io', - 'changelog_uri' => 'https://github.com/karafka/waterdrop/blob/master/CHANGELOG.md', + 'changelog_uri' => 'https://karafka.io/docs/Changelog-WaterDrop', 'bug_tracker_uri' => 'https://github.com/karafka/waterdrop/issues', 'source_code_uri' => 'https://github.com/karafka/waterdrop', - 'documentation_uri' => 'https://github.com/karafka/waterdrop#readme', + 'documentation_uri' => 'https://karafka.io/docs/#waterdrop', 'rubygems_mfa_required' => 'true' } end