From 562e63a4617708ce460157ca11468c4e1ea6a272 Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Tue, 19 Dec 2023 13:39:04 +0100 Subject: [PATCH 1/5] offset storage in transaction --- CHANGELOG.md | 1 + lib/waterdrop/errors.rb | 6 ++++ .../instrumentation/logger_listener.rb | 9 ++++++ .../instrumentation/notifications.rb | 1 + lib/waterdrop/producer.rb | 5 ++- lib/waterdrop/producer/transactions.rb | 32 +++++++++++++++++-- spec/lib/waterdrop/errors_spec.rb | 12 +++++++ .../instrumentation/logger_listener_spec.rb | 14 ++++++++ 8 files changed, 77 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ec1f67d..5020d760 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # WaterDrop changelog ## 2.6.12 (Unreleased) +- [Enhancement] Provide ability to commit offset during the transaction with a consumer provided. - [Change] Remove usage of concurrent ruby. ## 2.6.11 (2023-10-25) diff --git a/lib/waterdrop/errors.rb b/lib/waterdrop/errors.rb index 3d2ac4ea..e126caa1 100644 --- a/lib/waterdrop/errors.rb +++ b/lib/waterdrop/errors.rb @@ -32,7 +32,13 @@ module Errors # Raised when there is an inline error during single message produce operations ProduceError = Class.new(BaseError) + # Raised when we attempt to perform operation that is only allowed inside of a transaction and + # there is no transaction around us + TransactionRequiredError = Class.new(BaseError) + # Raise it within a transaction to abort it + # It does not have an `Error` postfix because technically it is not an error as it is used for + # graceful transaction aborting AbortTransaction = Class.new(BaseError) # Raised when during messages producing something bad happened inline diff --git a/lib/waterdrop/instrumentation/logger_listener.rb b/lib/waterdrop/instrumentation/logger_listener.rb index 942a7dff..03dfe546 100644 --- a/lib/waterdrop/instrumentation/logger_listener.rb +++ b/lib/waterdrop/instrumentation/logger_listener.rb @@ -145,6 +145,15 @@ def on_transaction_committed(event) info(event, 'Committing transaction') end + # @param event [Dry::Events::Event] event that happened with the details + def on_transaction_offset_stored(event) + topic = event[:topic] + partition = event[:partition] + offset = event[:offset] + + info(event, "Storing offset #{offset} for topic #{topic}/#{partition} in the transaction") + end + # @param event [Dry::Events::Event] event that happened with the details def on_transaction_finished(event) info(event, 'Processing transaction') diff --git a/lib/waterdrop/instrumentation/notifications.rb b/lib/waterdrop/instrumentation/notifications.rb index cdbed5ad..af822f7b 100644 --- a/lib/waterdrop/instrumentation/notifications.rb +++ b/lib/waterdrop/instrumentation/notifications.rb @@ -22,6 +22,7 @@ class Notifications < ::Karafka::Core::Monitoring::Notifications transaction.started transaction.committed transaction.aborted + transaction.offset_stored transaction.finished buffer.flushed_async diff --git a/lib/waterdrop/producer.rb b/lib/waterdrop/producer.rb index 92f96471..4e39445d 100644 --- a/lib/waterdrop/producer.rb +++ b/lib/waterdrop/producer.rb @@ -16,7 +16,10 @@ class Producer Rdkafka::Producer::DeliveryHandle::WaitTimeoutError ].freeze - private_constant :SUPPORTED_FLOW_ERRORS + # Empty has to save on memory allocations + EMPTY_HASH = {}.freeze + + private_constant :SUPPORTED_FLOW_ERRORS, :EMPTY_HASH def_delegators :config, :middleware diff --git a/lib/waterdrop/producer/transactions.rb b/lib/waterdrop/producer/transactions.rb index e042a153..de45743f 100644 --- a/lib/waterdrop/producer/transactions.rb +++ b/lib/waterdrop/producer/transactions.rb @@ -91,6 +91,34 @@ def transactional? @transactional = config.kafka.to_h.key?(:'transactional.id') end + # Stores provided offset inside of the transaction. If used, will connect the stored offset + # with the producer transaction making sure that all succeed or fail together + # + # @param consumer [#consumer_group_metadata_pointer] any consumer from which we can obtain + # the librdkafka cosumer group metadata pointer + # @param topic [String] topic name + # @param partition [Integer] partition + # @param offset [Integer] offset we want to store + def transactional_store_offset(consumer, topic, partition, offset) + raise Errors::TransactionRequiredError unless @transaction_mutex.owned? + + details = { topic: topic, partition: partition, offset: offset } + + transactional_instrument(:offset_stored, details) do + tpl = Rdkafka::Consumer::TopicPartitionList.new + tpl.add_topic_and_partitions_with_offsets(topic, partition => offset) + + with_transactional_error_handling(:store_offset) do + @client.send_offsets_to_transaction( + consumer, + tpl, + # This setting is at the moment in seconds and we require ms + @config.max_wait_timeout * 1_000 + ) + end + end + end + private # Runs provided code with a transaction wrapper if transactions are enabled. @@ -106,8 +134,8 @@ def with_transaction_if_transactional(&block) # # @param key [Symbol] transaction operation key # @param block [Proc] block to run inside the instrumentation or nothing if not given - def transactional_instrument(key, &block) - @monitor.instrument("transaction.#{key}", producer_id: id, &block) + def transactional_instrument(key, details = EMPTY_HASH, &block) + @monitor.instrument("transaction.#{key}", details.merge(producer_id: id), &block) end # Error handling for transactional operations is a bit special. There are three types of diff --git a/spec/lib/waterdrop/errors_spec.rb b/spec/lib/waterdrop/errors_spec.rb index 1c330e8e..fb3364bd 100644 --- a/spec/lib/waterdrop/errors_spec.rb +++ b/spec/lib/waterdrop/errors_spec.rb @@ -48,4 +48,16 @@ specify { expect(error).to be < described_class::BaseError } end + + describe 'AbortTransaction' do + subject(:error) { described_class::AbortTransaction } + + specify { expect(error).to be < described_class::BaseError } + end + + describe 'TransactionRequiredError' do + subject(:error) { described_class::TransactionRequiredError } + + specify { expect(error).to be < described_class::BaseError } + end end diff --git a/spec/lib/waterdrop/instrumentation/logger_listener_spec.rb b/spec/lib/waterdrop/instrumentation/logger_listener_spec.rb index db04055c..180d2e45 100644 --- a/spec/lib/waterdrop/instrumentation/logger_listener_spec.rb +++ b/spec/lib/waterdrop/instrumentation/logger_listener_spec.rb @@ -243,4 +243,18 @@ it { expect(logged_data[0]).to include('INFO') } it { expect(logged_data[0]).to include('Committing transaction') } end + + describe '#on_transaction_offset_stored' do + before do + details[:topic] = rand.to_s + details[:partition] = 0 + details[:offset] = 100 + + listener.on_transaction_offset_stored(event) + end + + it { expect(logged_data[0]).to include(producer.id) } + it { expect(logged_data[0]).to include('INFO') } + it { expect(logged_data[0]).to include('Storing offset') } + end end From ec9f15c82fe5307b0efd08bde326dedde7f5964e Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Tue, 19 Dec 2023 13:44:35 +0100 Subject: [PATCH 2/5] remarks --- lib/waterdrop/producer/transactions.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/waterdrop/producer/transactions.rb b/lib/waterdrop/producer/transactions.rb index de45743f..056538a9 100644 --- a/lib/waterdrop/producer/transactions.rb +++ b/lib/waterdrop/producer/transactions.rb @@ -95,7 +95,7 @@ def transactional? # with the producer transaction making sure that all succeed or fail together # # @param consumer [#consumer_group_metadata_pointer] any consumer from which we can obtain - # the librdkafka cosumer group metadata pointer + # the librdkafka consumer group metadata pointer # @param topic [String] topic name # @param partition [Integer] partition # @param offset [Integer] offset we want to store @@ -133,6 +133,7 @@ def with_transaction_if_transactional(&block) # Instruments the transactional operation with producer id # # @param key [Symbol] transaction operation key + # @param details [Hash] additional instrumentation details # @param block [Proc] block to run inside the instrumentation or nothing if not given def transactional_instrument(key, details = EMPTY_HASH, &block) @monitor.instrument("transaction.#{key}", details.merge(producer_id: id), &block) From 9234aa2cb01bb11a23ab3c7f5ab05c98e0b68585 Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Tue, 19 Dec 2023 14:24:21 +0100 Subject: [PATCH 3/5] specs --- lib/waterdrop/clients/buffered.rb | 13 ++++++++++ lib/waterdrop/producer/transactions.rb | 4 +-- .../waterdrop/producer/transactions_spec.rb | 26 +++++++++++++++++++ 3 files changed, 41 insertions(+), 2 deletions(-) diff --git a/lib/waterdrop/clients/buffered.rb b/lib/waterdrop/clients/buffered.rb index 3dad6367..f870bfab 100644 --- a/lib/waterdrop/clients/buffered.rb +++ b/lib/waterdrop/clients/buffered.rb @@ -58,6 +58,19 @@ def commit_transaction @transaction_active = false end + # Fakes storing the offset in a transactional fashion + # + # @param _consumer [#consumer_group_metadata_pointer] any consumer from which we can obtain + # the librdkafka consumer group metadata pointer + # @param _topic [String] topic name + # @param _partition [Integer] partition + # @param _offset [Integer] offset we want to store + def send_offsets_to_transaction(_consumer, _topic, _partition, _offset) + return if @transaction_mutex.owned? + + raise Errors::TransactionRequiredError + end + # Aborts the transaction def abort_transaction @transaction_level -= 1 diff --git a/lib/waterdrop/producer/transactions.rb b/lib/waterdrop/producer/transactions.rb index 056538a9..bf4852e2 100644 --- a/lib/waterdrop/producer/transactions.rb +++ b/lib/waterdrop/producer/transactions.rb @@ -109,7 +109,7 @@ def transactional_store_offset(consumer, topic, partition, offset) tpl.add_topic_and_partitions_with_offsets(topic, partition => offset) with_transactional_error_handling(:store_offset) do - @client.send_offsets_to_transaction( + client.send_offsets_to_transaction( consumer, tpl, # This setting is at the moment in seconds and we require ms @@ -186,7 +186,7 @@ def with_transactional_error_handling(action, allow_abortable: true) # Always attempt to abort but if aborting fails with an abortable error, do not attempt # to abort from abort as this could create an infinite loop with_transactional_error_handling(:abort, allow_abortable: false) do - transactional_instrument(:aborted) { @client.abort_transaction } + transactional_instrument(:aborted) { client.abort_transaction } end raise diff --git a/spec/lib/waterdrop/producer/transactions_spec.rb b/spec/lib/waterdrop/producer/transactions_spec.rb index dd5986f7..ded229be 100644 --- a/spec/lib/waterdrop/producer/transactions_spec.rb +++ b/spec/lib/waterdrop/producer/transactions_spec.rb @@ -506,4 +506,30 @@ end end end + + context 'when we try to store offsets without a transaction' do + it 'expect to raise an error' do + expect { producer.transactional_store_offset(nil, 'topic', 0, 0) } + .to raise_error(WaterDrop::Errors::TransactionRequiredError) + end + end + + # Full e2e integration of this is checked in Karafka as we do not operate on consumers here + context 'when trying to store offset inside a transaction' do + let(:consumer) { OpenStruct.new } + + before do + allow(producer.client).to receive(:send_offsets_to_transaction) + + producer.transaction do + producer.transactional_store_offset(consumer, 'topic', 0, 0) + end + end + + it 'expect to delegate to client send_offsets_to_transaction with correct timeout' do + expect(producer.client) + .to have_received(:send_offsets_to_transaction) + .with(consumer, any_args, 30_000) + end + end end From 8c3a7974cec12497cc94e1612fc31035fa3f39be Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Tue, 19 Dec 2023 14:39:27 +0100 Subject: [PATCH 4/5] remarks --- lib/waterdrop/clients/buffered.rb | 9 ++++----- spec/lib/waterdrop/clients/buffered_spec.rb | 17 +++++++++++++++++ .../lib/waterdrop/producer/transactions_spec.rb | 2 +- 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/lib/waterdrop/clients/buffered.rb b/lib/waterdrop/clients/buffered.rb index f870bfab..f6da9147 100644 --- a/lib/waterdrop/clients/buffered.rb +++ b/lib/waterdrop/clients/buffered.rb @@ -62,11 +62,10 @@ def commit_transaction # # @param _consumer [#consumer_group_metadata_pointer] any consumer from which we can obtain # the librdkafka consumer group metadata pointer - # @param _topic [String] topic name - # @param _partition [Integer] partition - # @param _offset [Integer] offset we want to store - def send_offsets_to_transaction(_consumer, _topic, _partition, _offset) - return if @transaction_mutex.owned? + # @param _tpl [Rdkafka::Consumer::TopicPartitionList] consumer tpl for offset storage + # @param _timeout [Integer] ms timeout + def send_offsets_to_transaction(_consumer, _tpl, _timeout) + return unless @transaction_level.zero? raise Errors::TransactionRequiredError end diff --git a/spec/lib/waterdrop/clients/buffered_spec.rb b/spec/lib/waterdrop/clients/buffered_spec.rb index 8b2b1926..64a3a59e 100644 --- a/spec/lib/waterdrop/clients/buffered_spec.rb +++ b/spec/lib/waterdrop/clients/buffered_spec.rb @@ -179,5 +179,22 @@ expect(result).to eq(2) end end + + context 'when we try to store offset without a transaction' do + it 'expect to raise an error' do + expect { producer.transactional_store_offset(nil, 'topic', 0, 0) } + .to raise_error(WaterDrop::Errors::TransactionRequiredError) + end + end + + context 'when trying to store offset with transaction' do + it do + expect do + producer.transaction do + producer.transactional_store_offset(nil, 'topic', 0, 0) + end + end.not_to raise_error + end + end end end diff --git a/spec/lib/waterdrop/producer/transactions_spec.rb b/spec/lib/waterdrop/producer/transactions_spec.rb index ded229be..05600c5b 100644 --- a/spec/lib/waterdrop/producer/transactions_spec.rb +++ b/spec/lib/waterdrop/producer/transactions_spec.rb @@ -507,7 +507,7 @@ end end - context 'when we try to store offsets without a transaction' do + context 'when we try to store offset without a transaction' do it 'expect to raise an error' do expect { producer.transactional_store_offset(nil, 'topic', 0, 0) } .to raise_error(WaterDrop::Errors::TransactionRequiredError) From f6d3c0cccc5390f81cd93f78e308a6abccb703e3 Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Tue, 19 Dec 2023 16:07:37 +0100 Subject: [PATCH 5/5] remove unused code --- lib/waterdrop/clients/buffered.rb | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/lib/waterdrop/clients/buffered.rb b/lib/waterdrop/clients/buffered.rb index f6da9147..159ae116 100644 --- a/lib/waterdrop/clients/buffered.rb +++ b/lib/waterdrop/clients/buffered.rb @@ -44,8 +44,6 @@ def begin_transaction def commit_transaction @transaction_level -= 1 - return unless @transaction_level.zero? - # Transfer transactional data on success @transaction_topics.each do |topic, messages| @topics[topic] += messages @@ -65,17 +63,12 @@ def commit_transaction # @param _tpl [Rdkafka::Consumer::TopicPartitionList] consumer tpl for offset storage # @param _timeout [Integer] ms timeout def send_offsets_to_transaction(_consumer, _tpl, _timeout) - return unless @transaction_level.zero? - - raise Errors::TransactionRequiredError + nil end # Aborts the transaction def abort_transaction @transaction_level -= 1 - - return unless @transaction_level.zero? - @transaction_topics.clear @transaction_messages.clear @transaction_active = false