Skip to content

Commit

Permalink
specs
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld committed Dec 19, 2023
1 parent ec9f15c commit 9234aa2
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 2 deletions.
13 changes: 13 additions & 0 deletions lib/waterdrop/clients/buffered.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,19 @@ def commit_transaction
@transaction_active = false
end

# Fakes storing the offset in a transactional fashion
#
# @param _consumer [#consumer_group_metadata_pointer] any consumer from which we can obtain
# the librdkafka consumer group metadata pointer
# @param _topic [String] topic name
# @param _partition [Integer] partition
# @param _offset [Integer] offset we want to store
def send_offsets_to_transaction(_consumer, _topic, _partition, _offset)
return if @transaction_mutex.owned?

raise Errors::TransactionRequiredError
end

# Aborts the transaction
def abort_transaction
@transaction_level -= 1
Expand Down
4 changes: 2 additions & 2 deletions lib/waterdrop/producer/transactions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def transactional_store_offset(consumer, topic, partition, offset)
tpl.add_topic_and_partitions_with_offsets(topic, partition => offset)

with_transactional_error_handling(:store_offset) do
@client.send_offsets_to_transaction(
client.send_offsets_to_transaction(
consumer,
tpl,
# This setting is at the moment in seconds and we require ms
Expand Down Expand Up @@ -186,7 +186,7 @@ def with_transactional_error_handling(action, allow_abortable: true)
# Always attempt to abort but if aborting fails with an abortable error, do not attempt
# to abort from abort as this could create an infinite loop
with_transactional_error_handling(:abort, allow_abortable: false) do
transactional_instrument(:aborted) { @client.abort_transaction }
transactional_instrument(:aborted) { client.abort_transaction }
end

raise
Expand Down
26 changes: 26 additions & 0 deletions spec/lib/waterdrop/producer/transactions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -506,4 +506,30 @@
end
end
end

context 'when we try to store offsets without a transaction' do
it 'expect to raise an error' do
expect { producer.transactional_store_offset(nil, 'topic', 0, 0) }
.to raise_error(WaterDrop::Errors::TransactionRequiredError)
end
end

# Full e2e integration of this is checked in Karafka as we do not operate on consumers here
context 'when trying to store offset inside a transaction' do
let(:consumer) { OpenStruct.new }

before do
allow(producer.client).to receive(:send_offsets_to_transaction)

producer.transaction do
producer.transactional_store_offset(consumer, 'topic', 0, 0)
end
end

it 'expect to delegate to client send_offsets_to_transaction with correct timeout' do
expect(producer.client)
.to have_received(:send_offsets_to_transaction)
.with(consumer, any_args, 30_000)
end
end
end

0 comments on commit 9234aa2

Please sign in to comment.