diff --git a/config/locales/errors.yml b/config/locales/errors.yml index ed379472..6171f1a8 100644 --- a/config/locales/errors.yml +++ b/config/locales/errors.yml @@ -29,10 +29,8 @@ en: transactional_offset: consumer_format: 'must respond to #consumer_group_metadata_pointer method' - topic_format: must be a non-empty string + message_format: 'must respond to #topic, #partition and #offset' missing: must be present - partition_format: must be an integer greater or equal to -1 - offset_format: must be an integer greater or equal to -1 offset_metadata_format: must be string or nil test: diff --git a/lib/waterdrop/contracts/transactional_offset.rb b/lib/waterdrop/contracts/transactional_offset.rb index d9fed232..448b38fa 100644 --- a/lib/waterdrop/contracts/transactional_offset.rb +++ b/lib/waterdrop/contracts/transactional_offset.rb @@ -14,9 +14,7 @@ class TransactionalOffset < ::Karafka::Core::Contractable::Contract end required(:consumer) { |val| val.respond_to?(:consumer_group_metadata_pointer) } - required(:topic) { |val| val.is_a?(String) && !val.empty? } - required(:partition) { |val| val.is_a?(Integer) && val >= 0 } - required(:offset) { |val| val.is_a?(Integer) && val >= 0 } + required(:message) { |val| val.respond_to?(:topic) && val.respond_to?(:partition) } required(:offset_metadata) { |val| val.is_a?(String) || val.nil? } end end diff --git a/lib/waterdrop/instrumentation/logger_listener.rb b/lib/waterdrop/instrumentation/logger_listener.rb index 03dfe546..d470c4b3 100644 --- a/lib/waterdrop/instrumentation/logger_listener.rb +++ b/lib/waterdrop/instrumentation/logger_listener.rb @@ -146,12 +146,17 @@ def on_transaction_committed(event) end # @param event [Dry::Events::Event] event that happened with the details - def on_transaction_offset_stored(event) - topic = event[:topic] - partition = event[:partition] - offset = event[:offset] - - info(event, "Storing offset #{offset} for topic #{topic}/#{partition} in the transaction") + def on_transaction_marked_as_consumed(event) + message = event[:message] + topic = message.topic + partition = message.partition + offset = message.offset + loc = "#{topic}/#{partition}" + + info( + event, + "Marking message with offset #{offset} for topic #{loc} as consumed in a transaction" + ) end # @param event [Dry::Events::Event] event that happened with the details diff --git a/lib/waterdrop/instrumentation/notifications.rb b/lib/waterdrop/instrumentation/notifications.rb index af822f7b..b7078ac6 100644 --- a/lib/waterdrop/instrumentation/notifications.rb +++ b/lib/waterdrop/instrumentation/notifications.rb @@ -22,7 +22,7 @@ class Notifications < ::Karafka::Core::Monitoring::Notifications transaction.started transaction.committed transaction.aborted - transaction.offset_stored + transaction.marked_as_consumed transaction.finished buffer.flushed_async diff --git a/lib/waterdrop/producer/transactions.rb b/lib/waterdrop/producer/transactions.rb index fe25d7ad..639a5afa 100644 --- a/lib/waterdrop/producer/transactions.rb +++ b/lib/waterdrop/producer/transactions.rb @@ -96,35 +96,37 @@ def transactional? @transactional = config.kafka.to_h.key?(:'transactional.id') end - # Stores provided offset inside of the transaction. If used, will connect the stored offset - # with the producer transaction making sure that all succeed or fail together + # Marks given message as consumed inside of a transaction. # # @param consumer [#consumer_group_metadata_pointer] any consumer from which we can obtain # the librdkafka consumer group metadata pointer - # @param topic [String] topic name - # @param partition [Integer] partition - # @param offset [Integer] offset we want to store + # @param message [Karafka::Messages::Message] karafka message # @param offset_metadata [String] offset metadata or nil if none - def transactional_store_offset(consumer, topic, partition, offset, offset_metadata = nil) + def transaction_mark_as_consumed(consumer, message, offset_metadata = nil) raise Errors::TransactionRequiredError unless @transaction_mutex.owned? CONTRACT.validate!( { consumer: consumer, - topic: topic, - partition: partition, - offset: offset, + message: message, offset_metadata: offset_metadata }, Errors::TransactionalOffsetInvalidError ) - details = { topic: topic, partition: partition, offset: offset } + details = { message: message, offset_metadata: offset_metadata } - transactional_instrument(:offset_stored, details) do + transactional_instrument(:marked_as_consumed, details) do tpl = Rdkafka::Consumer::TopicPartitionList.new - partition = Rdkafka::Consumer::Partition.new(partition, offset, 0, offset_metadata) - tpl.add_topic_and_partitions_with_offsets(topic, [partition]) + partition = Rdkafka::Consumer::Partition.new( + message.partition, + # +1 because this is next offset from which we will start processing from + message.offset + 1, + 0, + offset_metadata + ) + + tpl.add_topic_and_partitions_with_offsets(message.topic, [partition]) with_transactional_error_handling(:store_offset) do client.send_offsets_to_transaction( diff --git a/spec/lib/waterdrop/clients/buffered_spec.rb b/spec/lib/waterdrop/clients/buffered_spec.rb index f7ca91aa..ca39711e 100644 --- a/spec/lib/waterdrop/clients/buffered_spec.rb +++ b/spec/lib/waterdrop/clients/buffered_spec.rb @@ -181,19 +181,22 @@ end context 'when we try to store offset without a transaction' do + let(:message) { OpenStruct.new(topic: rand.to_s, partition: 0, offset: 10) } + it 'expect to raise an error' do - expect { producer.transactional_store_offset(nil, 'topic', 0, 0) } + expect { producer.transaction_mark_as_consumed(nil, message) } .to raise_error(WaterDrop::Errors::TransactionRequiredError) end end context 'when trying to store offset with transaction' do let(:consumer) { OpenStruct.new(consumer_group_metadata_pointer: nil) } + let(:message) { OpenStruct.new(topic: rand.to_s, partition: 0, offset: 10) } it do expect do producer.transaction do - producer.transactional_store_offset(consumer, 'topic', 0, 0) + producer.transaction_mark_as_consumed(consumer, message) end end.not_to raise_error end diff --git a/spec/lib/waterdrop/contracts/transactional_offset_spec.rb b/spec/lib/waterdrop/contracts/transactional_offset_spec.rb index 97be2e8b..fd44685e 100644 --- a/spec/lib/waterdrop/contracts/transactional_offset_spec.rb +++ b/spec/lib/waterdrop/contracts/transactional_offset_spec.rb @@ -7,13 +7,12 @@ let(:topic) { 'test_topic' } let(:partition) { 0 } let(:offset) { 10 } + let(:message) { OpenStruct.new(topic: topic, partition: partition, offset: offset) } let(:offset_metadata) { 'metadata' } let(:input) do { consumer: consumer, - topic: topic, - partition: partition, - offset: offset, + message: message, offset_metadata: offset_metadata } end @@ -28,43 +27,9 @@ it { expect(contract_result).not_to be_success } end - context 'when topic is invalid' do - context 'when topic is empty' do - let(:topic) { '' } - - it { expect(contract_result).not_to be_success } - end - - context 'when topic is not a string' do - let(:topic) { 123 } - - it { expect(contract_result).not_to be_success } - end - end - - context 'when partition is invalid' do - context 'when partition is negative' do - let(:partition) { -1 } - - it { expect(contract_result).not_to be_success } - end - - context 'when partition is not an integer' do - let(:partition) { 'not_an_integer' } - - it { expect(contract_result).not_to be_success } - end - end - - context 'when offset is invalid' do - context 'when offset is negative' do - let(:offset) { -10 } - - it { expect(contract_result).not_to be_success } - end - - context 'when offset is not an integer' do - let(:offset) { 'not_an_integer' } + context 'when message is invalid' do + context 'when message is a string' do + let(:message) { 'test' } it { expect(contract_result).not_to be_success } end diff --git a/spec/lib/waterdrop/instrumentation/logger_listener_spec.rb b/spec/lib/waterdrop/instrumentation/logger_listener_spec.rb index 180d2e45..c8a01e66 100644 --- a/spec/lib/waterdrop/instrumentation/logger_listener_spec.rb +++ b/spec/lib/waterdrop/instrumentation/logger_listener_spec.rb @@ -244,17 +244,19 @@ it { expect(logged_data[0]).to include('Committing transaction') } end - describe '#on_transaction_offset_stored' do + describe '#on_transaction_marked_as_consumed' do before do - details[:topic] = rand.to_s - details[:partition] = 0 - details[:offset] = 100 + details[:message] = OpenStruct.new( + topic: rand.to_s, + partition: 0, + offset: 100 + ) - listener.on_transaction_offset_stored(event) + listener.on_transaction_marked_as_consumed(event) end it { expect(logged_data[0]).to include(producer.id) } it { expect(logged_data[0]).to include('INFO') } - it { expect(logged_data[0]).to include('Storing offset') } + it { expect(logged_data[0]).to include('Marking message') } end end diff --git a/spec/lib/waterdrop/producer/transactions_spec.rb b/spec/lib/waterdrop/producer/transactions_spec.rb index 79a03e8b..3bce3f71 100644 --- a/spec/lib/waterdrop/producer/transactions_spec.rb +++ b/spec/lib/waterdrop/producer/transactions_spec.rb @@ -507,42 +507,46 @@ end end - context 'when we try to store offset without a transaction' do - it 'expect to raise an error' do - expect { producer.transactional_store_offset(nil, 'topic', 0, 0) } - .to raise_error(WaterDrop::Errors::TransactionRequiredError) + context 'when trying to mark as consumed in a transaction' do + let(:message) { OpenStruct.new(topic: rand.to_s, partition: 0, offset: 100) } + + context 'when we try mark as consumed without a transaction' do + it 'expect to raise an error' do + expect { producer.transaction_mark_as_consumed(nil, message) } + .to raise_error(WaterDrop::Errors::TransactionRequiredError) + end end - end - context 'when we try to store offset with invalid arguments' do - let(:consumer) { OpenStruct.new } + context 'when we try mark as consumed with invalid arguments' do + let(:consumer) { OpenStruct.new } - before { allow(producer.client).to receive(:send_offsets_to_transaction) } + before { allow(producer.client).to receive(:send_offsets_to_transaction) } - it 'expect to delegate to client send_offsets_to_transaction with correct timeout' do - producer.transaction do - expect { producer.transactional_store_offset(consumer, 'topic', 0, 100) } - .to raise_error(WaterDrop::Errors::TransactionalOffsetInvalidError) + it 'expect to delegate to client send_offsets_to_transaction with correct timeout' do + producer.transaction do + expect { producer.transaction_mark_as_consumed(consumer, message) } + .to raise_error(WaterDrop::Errors::TransactionalOffsetInvalidError) + end end end - end - # Full e2e integration of this is checked in Karafka as we do not operate on consumers here - context 'when trying to store offset inside a transaction' do - let(:consumer) { OpenStruct.new(consumer_group_metadata_pointer: 1) } + # Full e2e integration of this is checked in Karafka as we do not operate on consumers here + context 'when trying mark as consumed inside a transaction' do + let(:consumer) { OpenStruct.new(consumer_group_metadata_pointer: 1) } - before do - allow(producer.client).to receive(:send_offsets_to_transaction) + before do + allow(producer.client).to receive(:send_offsets_to_transaction) - producer.transaction do - producer.transactional_store_offset(consumer, 'topic', 0, 0) + producer.transaction do + producer.transaction_mark_as_consumed(consumer, message) + end end - end - it 'expect to delegate to client send_offsets_to_transaction with correct timeout' do - expect(producer.client) - .to have_received(:send_offsets_to_transaction) - .with(consumer, any_args, 30_000) + it 'expect to delegate to client send_offsets_to_transaction with correct timeout' do + expect(producer.client) + .to have_received(:send_offsets_to_transaction) + .with(consumer, any_args, 30_000) + end end end end