Skip to content

Commit

Permalink
normalize dispatched for transactional (#524)
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld authored Aug 14, 2024
1 parent 579297e commit 830696c
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ This release contains **BREAKING** changes. Make sure to read and apply upgrade
- **[Breaking]** Remove ability to abort transactions using `throw(:abort)`. Please use `raise WaterDrop::Errors::AbortTransaction`.
- **[Breaking]** Disallow (similar to ActiveRecord) exiting transactions with `return`, `break` or `throw`.
- [Enhancement] Make variants fiber safe.
- [Enhancement] In transactional mode do not return any `dispatched` messages as none will be dispatched due to rollback.

### Upgrade Notes

Expand Down
14 changes: 10 additions & 4 deletions lib/waterdrop/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ class Producer
Rdkafka::Producer::DeliveryHandle::WaitTimeoutError
].freeze

# Empty has to save on memory allocations
# Empty hash to save on memory allocations
EMPTY_HASH = {}.freeze

private_constant :SUPPORTED_FLOW_ERRORS, :EMPTY_HASH
# Empty array to save on memory allocations
EMPTY_ARRAY = [].freeze

private_constant :SUPPORTED_FLOW_ERRORS, :EMPTY_HASH, :EMPTY_ARRAY

def_delegators :config

Expand Down Expand Up @@ -272,10 +275,13 @@ def validate_message!(message)
# Waits on a given handler
#
# @param handler [Rdkafka::Producer::DeliveryHandle]
def wait(handler)
# @param raise_response_error [Boolean] should we raise the response error after we receive the
# final result and it is an error.
def wait(handler, raise_response_error: true)
handler.wait(
# rdkafka max_wait_timeout is in seconds and we use ms
max_wait_timeout: current_variant.max_wait_timeout / 1_000.0
max_wait_timeout: current_variant.max_wait_timeout / 1_000.0,
raise_response_error: raise_response_error
)
end

Expand Down
6 changes: 5 additions & 1 deletion lib/waterdrop/producer/sync.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ def produce_many_sync(messages)
'error.occurred',
producer_id: id,
messages: messages,
dispatched: dispatched,
# If it is a transactional producer nothing was successfully dispatched on error, thus
# we never return any dispatched handlers. While those messages might have reached
# Kafka, in transactional mode they will not be visible to consumers with correct
# isolation level.
dispatched: transactional? ? EMPTY_ARRAY : dispatched,
error: re_raised,
type: 'messages.produce_many_sync'
)
Expand Down
29 changes: 29 additions & 0 deletions spec/lib/waterdrop/producer/transactions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,35 @@
producer.produce_many_sync(messages)
expect(counts.size).to eq(1)
end

context 'when error occurs after few messages' do
subject(:producer) { build(:transactional_producer, max_payload_size: 10 * 1_024 * 1_024) }

let(:messages) do
too_big = build(:valid_message)
too_big[:payload] = '1' * 1024 * 1024

[
Array.new(9) { build(:valid_message) },
too_big
].flatten
end

let(:dispatched) { [] }

before do
producer.monitor.subscribe('error.occurred') do |event|
dispatched << event[:dispatched]
end
end

it 'expect not to contain anything in the dispatched notification' do
expect { producer.produce_many_sync(messages) }
.to raise_error(WaterDrop::Errors::ProduceManyError)

expect(dispatched.flatten).to eq([])
end
end
end

context 'when using with produce_many_async' do
Expand Down
2 changes: 2 additions & 0 deletions spec/support/factories/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
max_wait_timeout { 30_000 }
wait_on_queue_full { false }
wait_timeout_on_queue_full { 1_000 }
max_payload_size { 1_000_012 }

kafka do
{
Expand All @@ -27,6 +28,7 @@
config.max_wait_timeout = max_wait_timeout
config.wait_on_queue_full = wait_on_queue_full
config.wait_timeout_on_queue_full = wait_timeout_on_queue_full
config.max_payload_size = max_payload_size
end

instance.monitor.subscribe(::WaterDrop::Instrumentation::LoggerListener.new(logger))
Expand Down

0 comments on commit 830696c

Please sign in to comment.