diff --git a/lib/waterdrop/clients/buffered.rb b/lib/waterdrop/clients/buffered.rb index f6da9147..159ae116 100644 --- a/lib/waterdrop/clients/buffered.rb +++ b/lib/waterdrop/clients/buffered.rb @@ -44,8 +44,6 @@ def begin_transaction def commit_transaction @transaction_level -= 1 - return unless @transaction_level.zero? - # Transfer transactional data on success @transaction_topics.each do |topic, messages| @topics[topic] += messages @@ -65,17 +63,12 @@ def commit_transaction # @param _tpl [Rdkafka::Consumer::TopicPartitionList] consumer tpl for offset storage # @param _timeout [Integer] ms timeout def send_offsets_to_transaction(_consumer, _tpl, _timeout) - return unless @transaction_level.zero? - - raise Errors::TransactionRequiredError + nil end # Aborts the transaction def abort_transaction @transaction_level -= 1 - - return unless @transaction_level.zero? - @transaction_topics.clear @transaction_messages.clear @transaction_active = false