diff --git a/lib/waterdrop/clients/buffered.rb b/lib/waterdrop/clients/buffered.rb index f870bfab..f6da9147 100644 --- a/lib/waterdrop/clients/buffered.rb +++ b/lib/waterdrop/clients/buffered.rb @@ -62,11 +62,10 @@ def commit_transaction # # @param _consumer [#consumer_group_metadata_pointer] any consumer from which we can obtain # the librdkafka consumer group metadata pointer - # @param _topic [String] topic name - # @param _partition [Integer] partition - # @param _offset [Integer] offset we want to store - def send_offsets_to_transaction(_consumer, _topic, _partition, _offset) - return if @transaction_mutex.owned? + # @param _tpl [Rdkafka::Consumer::TopicPartitionList] consumer tpl for offset storage + # @param _timeout [Integer] ms timeout + def send_offsets_to_transaction(_consumer, _tpl, _timeout) + return unless @transaction_level.zero? raise Errors::TransactionRequiredError end diff --git a/spec/lib/waterdrop/clients/buffered_spec.rb b/spec/lib/waterdrop/clients/buffered_spec.rb index 8b2b1926..64a3a59e 100644 --- a/spec/lib/waterdrop/clients/buffered_spec.rb +++ b/spec/lib/waterdrop/clients/buffered_spec.rb @@ -179,5 +179,22 @@ expect(result).to eq(2) end end + + context 'when we try to store offset without a transaction' do + it 'expect to raise an error' do + expect { producer.transactional_store_offset(nil, 'topic', 0, 0) } + .to raise_error(WaterDrop::Errors::TransactionRequiredError) + end + end + + context 'when trying to store offset with transaction' do + it do + expect do + producer.transaction do + producer.transactional_store_offset(nil, 'topic', 0, 0) + end + end.not_to raise_error + end + end end end diff --git a/spec/lib/waterdrop/producer/transactions_spec.rb b/spec/lib/waterdrop/producer/transactions_spec.rb index ded229be..05600c5b 100644 --- a/spec/lib/waterdrop/producer/transactions_spec.rb +++ b/spec/lib/waterdrop/producer/transactions_spec.rb @@ -507,7 +507,7 @@ end end - context 'when we try to store offsets without a transaction' do + context 'when we try to store offset without a transaction' do it 'expect to raise an error' do expect { producer.transactional_store_offset(nil, 'topic', 0, 0) } .to raise_error(WaterDrop::Errors::TransactionRequiredError)