Skip to content

Commit

Permalink
remarks
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld committed Dec 19, 2023
1 parent 9234aa2 commit 8c3a797
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 6 deletions.
9 changes: 4 additions & 5 deletions lib/waterdrop/clients/buffered.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,10 @@ def commit_transaction
#
# @param _consumer [#consumer_group_metadata_pointer] any consumer from which we can obtain
# the librdkafka consumer group metadata pointer
# @param _topic [String] topic name
# @param _partition [Integer] partition
# @param _offset [Integer] offset we want to store
def send_offsets_to_transaction(_consumer, _topic, _partition, _offset)
return if @transaction_mutex.owned?
# @param _tpl [Rdkafka::Consumer::TopicPartitionList] consumer tpl for offset storage
# @param _timeout [Integer] ms timeout
def send_offsets_to_transaction(_consumer, _tpl, _timeout)
return unless @transaction_level.zero?

raise Errors::TransactionRequiredError
end
Expand Down
17 changes: 17 additions & 0 deletions spec/lib/waterdrop/clients/buffered_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -179,5 +179,22 @@
expect(result).to eq(2)
end
end

context 'when we try to store offset without a transaction' do
it 'expect to raise an error' do
expect { producer.transactional_store_offset(nil, 'topic', 0, 0) }
.to raise_error(WaterDrop::Errors::TransactionRequiredError)
end
end

context 'when trying to store offset with transaction' do
it do
expect do
producer.transaction do
producer.transactional_store_offset(nil, 'topic', 0, 0)
end
end.not_to raise_error
end
end
end
end
2 changes: 1 addition & 1 deletion spec/lib/waterdrop/producer/transactions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@
end
end

context 'when we try to store offsets without a transaction' do
context 'when we try to store offset without a transaction' do
it 'expect to raise an error' do
expect { producer.transactional_store_offset(nil, 'topic', 0, 0) }
.to raise_error(WaterDrop::Errors::TransactionRequiredError)
Expand Down

0 comments on commit 8c3a797

Please sign in to comment.