diff --git a/lib/waterdrop/clients/buffered.rb b/lib/waterdrop/clients/buffered.rb index 3dad6367..f870bfab 100644 --- a/lib/waterdrop/clients/buffered.rb +++ b/lib/waterdrop/clients/buffered.rb @@ -58,6 +58,19 @@ def commit_transaction @transaction_active = false end + # Fakes storing the offset in a transactional fashion + # + # @param _consumer [#consumer_group_metadata_pointer] any consumer from which we can obtain + # the librdkafka consumer group metadata pointer + # @param _topic [String] topic name + # @param _partition [Integer] partition + # @param _offset [Integer] offset we want to store + def send_offsets_to_transaction(_consumer, _topic, _partition, _offset) + return if @transaction_mutex.owned? + + raise Errors::TransactionRequiredError + end + # Aborts the transaction def abort_transaction @transaction_level -= 1 diff --git a/lib/waterdrop/producer/transactions.rb b/lib/waterdrop/producer/transactions.rb index 056538a9..bf4852e2 100644 --- a/lib/waterdrop/producer/transactions.rb +++ b/lib/waterdrop/producer/transactions.rb @@ -109,7 +109,7 @@ def transactional_store_offset(consumer, topic, partition, offset) tpl.add_topic_and_partitions_with_offsets(topic, partition => offset) with_transactional_error_handling(:store_offset) do - @client.send_offsets_to_transaction( + client.send_offsets_to_transaction( consumer, tpl, # This setting is at the moment in seconds and we require ms @@ -186,7 +186,7 @@ def with_transactional_error_handling(action, allow_abortable: true) # Always attempt to abort but if aborting fails with an abortable error, do not attempt # to abort from abort as this could create an infinite loop with_transactional_error_handling(:abort, allow_abortable: false) do - transactional_instrument(:aborted) { @client.abort_transaction } + transactional_instrument(:aborted) { client.abort_transaction } end raise diff --git a/spec/lib/waterdrop/producer/transactions_spec.rb b/spec/lib/waterdrop/producer/transactions_spec.rb index dd5986f7..ded229be 100644 --- a/spec/lib/waterdrop/producer/transactions_spec.rb +++ b/spec/lib/waterdrop/producer/transactions_spec.rb @@ -506,4 +506,30 @@ end end end + + context 'when we try to store offsets without a transaction' do + it 'expect to raise an error' do + expect { producer.transactional_store_offset(nil, 'topic', 0, 0) } + .to raise_error(WaterDrop::Errors::TransactionRequiredError) + end + end + + # Full e2e integration of this is checked in Karafka as we do not operate on consumers here + context 'when trying to store offset inside a transaction' do + let(:consumer) { OpenStruct.new } + + before do + allow(producer.client).to receive(:send_offsets_to_transaction) + + producer.transaction do + producer.transactional_store_offset(consumer, 'topic', 0, 0) + end + end + + it 'expect to delegate to client send_offsets_to_transaction with correct timeout' do + expect(producer.client) + .to have_received(:send_offsets_to_transaction) + .with(consumer, any_args, 30_000) + end + end end