diff --git a/CHANGELOG.md b/CHANGELOG.md index 49964045..28b8cb81 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ This release contains **BREAKING** changes. Make sure to read and apply upgrade - **[Breaking]** Remove ability to abort transactions using `throw(:abort)`. Please use `raise WaterDrop::Errors::AbortTransaction`. - **[Breaking]** Disallow (similar to ActiveRecord) exiting transactions with `return`, `break` or `throw`. - [Enhancement] Make variants fiber safe. +- [Enhancement] In transactional mode do not return any `dispatched` messages as none will be dispatched due to rollback. ### Upgrade Notes diff --git a/lib/waterdrop/producer.rb b/lib/waterdrop/producer.rb index f439b9f0..07dd6677 100644 --- a/lib/waterdrop/producer.rb +++ b/lib/waterdrop/producer.rb @@ -19,10 +19,13 @@ class Producer Rdkafka::Producer::DeliveryHandle::WaitTimeoutError ].freeze - # Empty has to save on memory allocations + # Empty hash to save on memory allocations EMPTY_HASH = {}.freeze - private_constant :SUPPORTED_FLOW_ERRORS, :EMPTY_HASH + # Empty array to save on memory allocations + EMPTY_ARRAY = [].freeze + + private_constant :SUPPORTED_FLOW_ERRORS, :EMPTY_HASH, :EMPTY_ARRAY def_delegators :config @@ -272,10 +275,13 @@ def validate_message!(message) # Waits on a given handler # # @param handler [Rdkafka::Producer::DeliveryHandle] - def wait(handler) + # @param raise_response_error [Boolean] should we raise the response error after we receive the + # final result and it is an error. + def wait(handler, raise_response_error: true) handler.wait( # rdkafka max_wait_timeout is in seconds and we use ms - max_wait_timeout: current_variant.max_wait_timeout / 1_000.0 + max_wait_timeout: current_variant.max_wait_timeout / 1_000.0, + raise_response_error: raise_response_error ) end diff --git a/lib/waterdrop/producer/sync.rb b/lib/waterdrop/producer/sync.rb index 3010efb6..baf9b1d8 100644 --- a/lib/waterdrop/producer/sync.rb +++ b/lib/waterdrop/producer/sync.rb @@ -82,7 +82,11 @@ def produce_many_sync(messages) 'error.occurred', producer_id: id, messages: messages, - dispatched: dispatched, + # If it is a transactional producer nothing was successfully dispatched on error, thus + # we never return any dispatched handlers. While those messages might have reached + # Kafka, in transactional mode they will not be visible to consumers with correct + # isolation level. + dispatched: transactional? ? EMPTY_ARRAY : dispatched, error: re_raised, type: 'messages.produce_many_sync' ) diff --git a/spec/lib/waterdrop/producer/transactions_spec.rb b/spec/lib/waterdrop/producer/transactions_spec.rb index 8e4bef72..0098963f 100644 --- a/spec/lib/waterdrop/producer/transactions_spec.rb +++ b/spec/lib/waterdrop/producer/transactions_spec.rb @@ -453,6 +453,35 @@ producer.produce_many_sync(messages) expect(counts.size).to eq(1) end + + context 'when error occurs after few messages' do + subject(:producer) { build(:transactional_producer, max_payload_size: 10 * 1_024 * 1_024) } + + let(:messages) do + too_big = build(:valid_message) + too_big[:payload] = '1' * 1024 * 1024 + + [ + Array.new(9) { build(:valid_message) }, + too_big + ].flatten + end + + let(:dispatched) { [] } + + before do + producer.monitor.subscribe('error.occurred') do |event| + dispatched << event[:dispatched] + end + end + + it 'expect not to contain anything in the dispatched notification' do + expect { producer.produce_many_sync(messages) } + .to raise_error(WaterDrop::Errors::ProduceManyError) + + expect(dispatched.flatten).to eq([]) + end + end end context 'when using with produce_many_async' do diff --git a/spec/support/factories/producer.rb b/spec/support/factories/producer.rb index 9b16cf2d..2fef6f4b 100644 --- a/spec/support/factories/producer.rb +++ b/spec/support/factories/producer.rb @@ -9,6 +9,7 @@ max_wait_timeout { 30_000 } wait_on_queue_full { false } wait_timeout_on_queue_full { 1_000 } + max_payload_size { 1_000_012 } kafka do { @@ -27,6 +28,7 @@ config.max_wait_timeout = max_wait_timeout config.wait_on_queue_full = wait_on_queue_full config.wait_timeout_on_queue_full = wait_timeout_on_queue_full + config.max_payload_size = max_payload_size end instance.monitor.subscribe(::WaterDrop::Instrumentation::LoggerListener.new(logger))