diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ec1f67d..5020d760 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # WaterDrop changelog ## 2.6.12 (Unreleased) +- [Enhancement] Provide ability to commit offset during the transaction with a consumer provided. - [Change] Remove usage of concurrent ruby. ## 2.6.11 (2023-10-25) diff --git a/lib/waterdrop/clients/buffered.rb b/lib/waterdrop/clients/buffered.rb index 3dad6367..159ae116 100644 --- a/lib/waterdrop/clients/buffered.rb +++ b/lib/waterdrop/clients/buffered.rb @@ -44,8 +44,6 @@ def begin_transaction def commit_transaction @transaction_level -= 1 - return unless @transaction_level.zero? - # Transfer transactional data on success @transaction_topics.each do |topic, messages| @topics[topic] += messages @@ -58,12 +56,19 @@ def commit_transaction @transaction_active = false end + # Fakes storing the offset in a transactional fashion + # + # @param _consumer [#consumer_group_metadata_pointer] any consumer from which we can obtain + # the librdkafka consumer group metadata pointer + # @param _tpl [Rdkafka::Consumer::TopicPartitionList] consumer tpl for offset storage + # @param _timeout [Integer] ms timeout + def send_offsets_to_transaction(_consumer, _tpl, _timeout) + nil + end + # Aborts the transaction def abort_transaction @transaction_level -= 1 - - return unless @transaction_level.zero? - @transaction_topics.clear @transaction_messages.clear @transaction_active = false diff --git a/lib/waterdrop/errors.rb b/lib/waterdrop/errors.rb index 3d2ac4ea..e126caa1 100644 --- a/lib/waterdrop/errors.rb +++ b/lib/waterdrop/errors.rb @@ -32,7 +32,13 @@ module Errors # Raised when there is an inline error during single message produce operations ProduceError = Class.new(BaseError) + # Raised when we attempt to perform operation that is only allowed inside of a transaction and + # there is no transaction around us + TransactionRequiredError = Class.new(BaseError) + # Raise it within a transaction to abort it + # It does not have an `Error` postfix because technically it is not an error as it is used for + # graceful transaction aborting AbortTransaction = Class.new(BaseError) # Raised when during messages producing something bad happened inline diff --git a/lib/waterdrop/instrumentation/logger_listener.rb b/lib/waterdrop/instrumentation/logger_listener.rb index 942a7dff..03dfe546 100644 --- a/lib/waterdrop/instrumentation/logger_listener.rb +++ b/lib/waterdrop/instrumentation/logger_listener.rb @@ -145,6 +145,15 @@ def on_transaction_committed(event) info(event, 'Committing transaction') end + # @param event [Dry::Events::Event] event that happened with the details + def on_transaction_offset_stored(event) + topic = event[:topic] + partition = event[:partition] + offset = event[:offset] + + info(event, "Storing offset #{offset} for topic #{topic}/#{partition} in the transaction") + end + # @param event [Dry::Events::Event] event that happened with the details def on_transaction_finished(event) info(event, 'Processing transaction') diff --git a/lib/waterdrop/instrumentation/notifications.rb b/lib/waterdrop/instrumentation/notifications.rb index cdbed5ad..af822f7b 100644 --- a/lib/waterdrop/instrumentation/notifications.rb +++ b/lib/waterdrop/instrumentation/notifications.rb @@ -22,6 +22,7 @@ class Notifications < ::Karafka::Core::Monitoring::Notifications transaction.started transaction.committed transaction.aborted + transaction.offset_stored transaction.finished buffer.flushed_async diff --git a/lib/waterdrop/producer.rb b/lib/waterdrop/producer.rb index 92f96471..4e39445d 100644 --- a/lib/waterdrop/producer.rb +++ b/lib/waterdrop/producer.rb @@ -16,7 +16,10 @@ class Producer Rdkafka::Producer::DeliveryHandle::WaitTimeoutError ].freeze - private_constant :SUPPORTED_FLOW_ERRORS + # Empty has to save on memory allocations + EMPTY_HASH = {}.freeze + + private_constant :SUPPORTED_FLOW_ERRORS, :EMPTY_HASH def_delegators :config, :middleware diff --git a/lib/waterdrop/producer/transactions.rb b/lib/waterdrop/producer/transactions.rb index e042a153..bf4852e2 100644 --- a/lib/waterdrop/producer/transactions.rb +++ b/lib/waterdrop/producer/transactions.rb @@ -91,6 +91,34 @@ def transactional? @transactional = config.kafka.to_h.key?(:'transactional.id') end + # Stores provided offset inside of the transaction. If used, will connect the stored offset + # with the producer transaction making sure that all succeed or fail together + # + # @param consumer [#consumer_group_metadata_pointer] any consumer from which we can obtain + # the librdkafka consumer group metadata pointer + # @param topic [String] topic name + # @param partition [Integer] partition + # @param offset [Integer] offset we want to store + def transactional_store_offset(consumer, topic, partition, offset) + raise Errors::TransactionRequiredError unless @transaction_mutex.owned? + + details = { topic: topic, partition: partition, offset: offset } + + transactional_instrument(:offset_stored, details) do + tpl = Rdkafka::Consumer::TopicPartitionList.new + tpl.add_topic_and_partitions_with_offsets(topic, partition => offset) + + with_transactional_error_handling(:store_offset) do + client.send_offsets_to_transaction( + consumer, + tpl, + # This setting is at the moment in seconds and we require ms + @config.max_wait_timeout * 1_000 + ) + end + end + end + private # Runs provided code with a transaction wrapper if transactions are enabled. @@ -105,9 +133,10 @@ def with_transaction_if_transactional(&block) # Instruments the transactional operation with producer id # # @param key [Symbol] transaction operation key + # @param details [Hash] additional instrumentation details # @param block [Proc] block to run inside the instrumentation or nothing if not given - def transactional_instrument(key, &block) - @monitor.instrument("transaction.#{key}", producer_id: id, &block) + def transactional_instrument(key, details = EMPTY_HASH, &block) + @monitor.instrument("transaction.#{key}", details.merge(producer_id: id), &block) end # Error handling for transactional operations is a bit special. There are three types of @@ -157,7 +186,7 @@ def with_transactional_error_handling(action, allow_abortable: true) # Always attempt to abort but if aborting fails with an abortable error, do not attempt # to abort from abort as this could create an infinite loop with_transactional_error_handling(:abort, allow_abortable: false) do - transactional_instrument(:aborted) { @client.abort_transaction } + transactional_instrument(:aborted) { client.abort_transaction } end raise diff --git a/spec/lib/waterdrop/clients/buffered_spec.rb b/spec/lib/waterdrop/clients/buffered_spec.rb index 8b2b1926..64a3a59e 100644 --- a/spec/lib/waterdrop/clients/buffered_spec.rb +++ b/spec/lib/waterdrop/clients/buffered_spec.rb @@ -179,5 +179,22 @@ expect(result).to eq(2) end end + + context 'when we try to store offset without a transaction' do + it 'expect to raise an error' do + expect { producer.transactional_store_offset(nil, 'topic', 0, 0) } + .to raise_error(WaterDrop::Errors::TransactionRequiredError) + end + end + + context 'when trying to store offset with transaction' do + it do + expect do + producer.transaction do + producer.transactional_store_offset(nil, 'topic', 0, 0) + end + end.not_to raise_error + end + end end end diff --git a/spec/lib/waterdrop/errors_spec.rb b/spec/lib/waterdrop/errors_spec.rb index 1c330e8e..fb3364bd 100644 --- a/spec/lib/waterdrop/errors_spec.rb +++ b/spec/lib/waterdrop/errors_spec.rb @@ -48,4 +48,16 @@ specify { expect(error).to be < described_class::BaseError } end + + describe 'AbortTransaction' do + subject(:error) { described_class::AbortTransaction } + + specify { expect(error).to be < described_class::BaseError } + end + + describe 'TransactionRequiredError' do + subject(:error) { described_class::TransactionRequiredError } + + specify { expect(error).to be < described_class::BaseError } + end end diff --git a/spec/lib/waterdrop/instrumentation/logger_listener_spec.rb b/spec/lib/waterdrop/instrumentation/logger_listener_spec.rb index db04055c..180d2e45 100644 --- a/spec/lib/waterdrop/instrumentation/logger_listener_spec.rb +++ b/spec/lib/waterdrop/instrumentation/logger_listener_spec.rb @@ -243,4 +243,18 @@ it { expect(logged_data[0]).to include('INFO') } it { expect(logged_data[0]).to include('Committing transaction') } end + + describe '#on_transaction_offset_stored' do + before do + details[:topic] = rand.to_s + details[:partition] = 0 + details[:offset] = 100 + + listener.on_transaction_offset_stored(event) + end + + it { expect(logged_data[0]).to include(producer.id) } + it { expect(logged_data[0]).to include('INFO') } + it { expect(logged_data[0]).to include('Storing offset') } + end end diff --git a/spec/lib/waterdrop/producer/transactions_spec.rb b/spec/lib/waterdrop/producer/transactions_spec.rb index dd5986f7..05600c5b 100644 --- a/spec/lib/waterdrop/producer/transactions_spec.rb +++ b/spec/lib/waterdrop/producer/transactions_spec.rb @@ -506,4 +506,30 @@ end end end + + context 'when we try to store offset without a transaction' do + it 'expect to raise an error' do + expect { producer.transactional_store_offset(nil, 'topic', 0, 0) } + .to raise_error(WaterDrop::Errors::TransactionRequiredError) + end + end + + # Full e2e integration of this is checked in Karafka as we do not operate on consumers here + context 'when trying to store offset inside a transaction' do + let(:consumer) { OpenStruct.new } + + before do + allow(producer.client).to receive(:send_offsets_to_transaction) + + producer.transaction do + producer.transactional_store_offset(consumer, 'topic', 0, 0) + end + end + + it 'expect to delegate to client send_offsets_to_transaction with correct timeout' do + expect(producer.client) + .to have_received(:send_offsets_to_transaction) + .with(consumer, any_args, 30_000) + end + end end