From 743c99f9f95accf6763a11ff312b55798812582d Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Mon, 1 Jan 2024 17:40:51 +0100 Subject: [PATCH] Offset management --- config/locales/errors.yml | 8 ++ .../contracts/transactional_offset.rb | 23 ++++++ lib/waterdrop/errors.rb | 3 + lib/waterdrop/producer/transactions.rb | 22 ++++- spec/lib/waterdrop/clients/buffered_spec.rb | 4 +- .../contracts/transactional_offset_spec.rb | 80 +++++++++++++++++++ .../waterdrop/producer/transactions_spec.rb | 15 +++- 7 files changed, 151 insertions(+), 4 deletions(-) create mode 100644 lib/waterdrop/contracts/transactional_offset.rb create mode 100644 spec/lib/waterdrop/contracts/transactional_offset_spec.rb diff --git a/config/locales/errors.yml b/config/locales/errors.yml index 7e26afd3..ed379472 100644 --- a/config/locales/errors.yml +++ b/config/locales/errors.yml @@ -27,6 +27,14 @@ en: headers_invalid_key_type: all headers keys need to be of type String headers_invalid_value_type: all headers values need to be of type String + transactional_offset: + consumer_format: 'must respond to #consumer_group_metadata_pointer method' + topic_format: must be a non-empty string + missing: must be present + partition_format: must be an integer greater or equal to -1 + offset_format: must be an integer greater or equal to -1 + offset_metadata_format: must be string or nil + test: missing: must be present nested.id_format: 'is invalid' diff --git a/lib/waterdrop/contracts/transactional_offset.rb b/lib/waterdrop/contracts/transactional_offset.rb new file mode 100644 index 00000000..d9fed232 --- /dev/null +++ b/lib/waterdrop/contracts/transactional_offset.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +module WaterDrop + module Contracts + # Contract to ensure that arguments provided to the transactional offset commit are valid + # and match our expectations + class TransactionalOffset < ::Karafka::Core::Contractable::Contract + configure do |config| + config.error_messages = YAML.safe_load( + File.read( + File.join(WaterDrop.gem_root, 'config', 'locales', 'errors.yml') + ) + ).fetch('en').fetch('validations').fetch('transactional_offset') + end + + required(:consumer) { |val| val.respond_to?(:consumer_group_metadata_pointer) } + required(:topic) { |val| val.is_a?(String) && !val.empty? } + required(:partition) { |val| val.is_a?(Integer) && val >= 0 } + required(:offset) { |val| val.is_a?(Integer) && val >= 0 } + required(:offset_metadata) { |val| val.is_a?(String) || val.nil? } + end + end +end diff --git a/lib/waterdrop/errors.rb b/lib/waterdrop/errors.rb index e126caa1..c4824b2b 100644 --- a/lib/waterdrop/errors.rb +++ b/lib/waterdrop/errors.rb @@ -25,6 +25,9 @@ module Errors # Raised when we want to send a message that is invalid (impossible topic, etc) MessageInvalidError = Class.new(BaseError) + # Raised when we want to commit transactional offset and the input is invalid + TransactionalOffsetInvalidError = Class.new(BaseError) + # Raised when we've got an unexpected status. This should never happen. If it does, please # contact us as it is an error. StatusInvalidError = Class.new(BaseError) diff --git a/lib/waterdrop/producer/transactions.rb b/lib/waterdrop/producer/transactions.rb index bf4852e2..fe25d7ad 100644 --- a/lib/waterdrop/producer/transactions.rb +++ b/lib/waterdrop/producer/transactions.rb @@ -4,6 +4,11 @@ module WaterDrop class Producer # Transactions related producer functionalities module Transactions + # Contract to validate that input for transactional offset storage is correct + CONTRACT = Contracts::TransactionalOffset.new + + private_constant :CONTRACT + # Creates a transaction. # # Karafka transactions work in a similar manner to SQL db transactions though there are some @@ -99,14 +104,27 @@ def transactional? # @param topic [String] topic name # @param partition [Integer] partition # @param offset [Integer] offset we want to store - def transactional_store_offset(consumer, topic, partition, offset) + # @param offset_metadata [String] offset metadata or nil if none + def transactional_store_offset(consumer, topic, partition, offset, offset_metadata = nil) raise Errors::TransactionRequiredError unless @transaction_mutex.owned? + CONTRACT.validate!( + { + consumer: consumer, + topic: topic, + partition: partition, + offset: offset, + offset_metadata: offset_metadata + }, + Errors::TransactionalOffsetInvalidError + ) + details = { topic: topic, partition: partition, offset: offset } transactional_instrument(:offset_stored, details) do tpl = Rdkafka::Consumer::TopicPartitionList.new - tpl.add_topic_and_partitions_with_offsets(topic, partition => offset) + partition = Rdkafka::Consumer::Partition.new(partition, offset, 0, offset_metadata) + tpl.add_topic_and_partitions_with_offsets(topic, [partition]) with_transactional_error_handling(:store_offset) do client.send_offsets_to_transaction( diff --git a/spec/lib/waterdrop/clients/buffered_spec.rb b/spec/lib/waterdrop/clients/buffered_spec.rb index 64a3a59e..f7ca91aa 100644 --- a/spec/lib/waterdrop/clients/buffered_spec.rb +++ b/spec/lib/waterdrop/clients/buffered_spec.rb @@ -188,10 +188,12 @@ end context 'when trying to store offset with transaction' do + let(:consumer) { OpenStruct.new(consumer_group_metadata_pointer: nil) } + it do expect do producer.transaction do - producer.transactional_store_offset(nil, 'topic', 0, 0) + producer.transactional_store_offset(consumer, 'topic', 0, 0) end end.not_to raise_error end diff --git a/spec/lib/waterdrop/contracts/transactional_offset_spec.rb b/spec/lib/waterdrop/contracts/transactional_offset_spec.rb new file mode 100644 index 00000000..97be2e8b --- /dev/null +++ b/spec/lib/waterdrop/contracts/transactional_offset_spec.rb @@ -0,0 +1,80 @@ +# frozen_string_literal: true + +RSpec.describe_current do + subject(:contract_result) { described_class.new.call(input) } + + let(:consumer) { instance_double('Consumer', consumer_group_metadata_pointer: true) } + let(:topic) { 'test_topic' } + let(:partition) { 0 } + let(:offset) { 10 } + let(:offset_metadata) { 'metadata' } + let(:input) do + { + consumer: consumer, + topic: topic, + partition: partition, + offset: offset, + offset_metadata: offset_metadata + } + end + + context 'when all inputs are valid' do + it { expect(contract_result).to be_success } + end + + context 'when consumer is invalid' do + let(:consumer) { nil } + + it { expect(contract_result).not_to be_success } + end + + context 'when topic is invalid' do + context 'when topic is empty' do + let(:topic) { '' } + + it { expect(contract_result).not_to be_success } + end + + context 'when topic is not a string' do + let(:topic) { 123 } + + it { expect(contract_result).not_to be_success } + end + end + + context 'when partition is invalid' do + context 'when partition is negative' do + let(:partition) { -1 } + + it { expect(contract_result).not_to be_success } + end + + context 'when partition is not an integer' do + let(:partition) { 'not_an_integer' } + + it { expect(contract_result).not_to be_success } + end + end + + context 'when offset is invalid' do + context 'when offset is negative' do + let(:offset) { -10 } + + it { expect(contract_result).not_to be_success } + end + + context 'when offset is not an integer' do + let(:offset) { 'not_an_integer' } + + it { expect(contract_result).not_to be_success } + end + end + + context 'when offset_metadata is invalid' do + context 'when offset_metadata is not a string or nil' do + let(:offset_metadata) { 123 } + + it { expect(contract_result).not_to be_success } + end + end +end diff --git a/spec/lib/waterdrop/producer/transactions_spec.rb b/spec/lib/waterdrop/producer/transactions_spec.rb index 05600c5b..79a03e8b 100644 --- a/spec/lib/waterdrop/producer/transactions_spec.rb +++ b/spec/lib/waterdrop/producer/transactions_spec.rb @@ -514,9 +514,22 @@ end end + context 'when we try to store offset with invalid arguments' do + let(:consumer) { OpenStruct.new } + + before { allow(producer.client).to receive(:send_offsets_to_transaction) } + + it 'expect to delegate to client send_offsets_to_transaction with correct timeout' do + producer.transaction do + expect { producer.transactional_store_offset(consumer, 'topic', 0, 100) } + .to raise_error(WaterDrop::Errors::TransactionalOffsetInvalidError) + end + end + end + # Full e2e integration of this is checked in Karafka as we do not operate on consumers here context 'when trying to store offset inside a transaction' do - let(:consumer) { OpenStruct.new } + let(:consumer) { OpenStruct.new(consumer_group_metadata_pointer: 1) } before do allow(producer.client).to receive(:send_offsets_to_transaction)