Skip to content

Commit

Permalink
Offset management
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld committed Jan 1, 2024
1 parent 2d78b37 commit 743c99f
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 4 deletions.
8 changes: 8 additions & 0 deletions config/locales/errors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ en:
headers_invalid_key_type: all headers keys need to be of type String
headers_invalid_value_type: all headers values need to be of type String

transactional_offset:
consumer_format: 'must respond to #consumer_group_metadata_pointer method'
topic_format: must be a non-empty string
missing: must be present
partition_format: must be an integer greater or equal to -1
offset_format: must be an integer greater or equal to -1
offset_metadata_format: must be string or nil

test:
missing: must be present
nested.id_format: 'is invalid'
Expand Down
23 changes: 23 additions & 0 deletions lib/waterdrop/contracts/transactional_offset.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# frozen_string_literal: true

module WaterDrop
module Contracts
# Contract to ensure that arguments provided to the transactional offset commit are valid
# and match our expectations
class TransactionalOffset < ::Karafka::Core::Contractable::Contract
configure do |config|
config.error_messages = YAML.safe_load(
File.read(
File.join(WaterDrop.gem_root, 'config', 'locales', 'errors.yml')
)
).fetch('en').fetch('validations').fetch('transactional_offset')
end

required(:consumer) { |val| val.respond_to?(:consumer_group_metadata_pointer) }
required(:topic) { |val| val.is_a?(String) && !val.empty? }
required(:partition) { |val| val.is_a?(Integer) && val >= 0 }
required(:offset) { |val| val.is_a?(Integer) && val >= 0 }
required(:offset_metadata) { |val| val.is_a?(String) || val.nil? }
end
end
end
3 changes: 3 additions & 0 deletions lib/waterdrop/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ module Errors
# Raised when we want to send a message that is invalid (impossible topic, etc)
MessageInvalidError = Class.new(BaseError)

# Raised when we want to commit transactional offset and the input is invalid
TransactionalOffsetInvalidError = Class.new(BaseError)

# Raised when we've got an unexpected status. This should never happen. If it does, please
# contact us as it is an error.
StatusInvalidError = Class.new(BaseError)
Expand Down
22 changes: 20 additions & 2 deletions lib/waterdrop/producer/transactions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ module WaterDrop
class Producer
# Transactions related producer functionalities
module Transactions
# Contract to validate that input for transactional offset storage is correct
CONTRACT = Contracts::TransactionalOffset.new

private_constant :CONTRACT

# Creates a transaction.
#
# Karafka transactions work in a similar manner to SQL db transactions though there are some
Expand Down Expand Up @@ -99,14 +104,27 @@ def transactional?
# @param topic [String] topic name
# @param partition [Integer] partition
# @param offset [Integer] offset we want to store
def transactional_store_offset(consumer, topic, partition, offset)
# @param offset_metadata [String] offset metadata or nil if none
def transactional_store_offset(consumer, topic, partition, offset, offset_metadata = nil)
raise Errors::TransactionRequiredError unless @transaction_mutex.owned?

CONTRACT.validate!(
{
consumer: consumer,
topic: topic,
partition: partition,
offset: offset,
offset_metadata: offset_metadata
},
Errors::TransactionalOffsetInvalidError
)

details = { topic: topic, partition: partition, offset: offset }

transactional_instrument(:offset_stored, details) do
tpl = Rdkafka::Consumer::TopicPartitionList.new
tpl.add_topic_and_partitions_with_offsets(topic, partition => offset)
partition = Rdkafka::Consumer::Partition.new(partition, offset, 0, offset_metadata)
tpl.add_topic_and_partitions_with_offsets(topic, [partition])

with_transactional_error_handling(:store_offset) do
client.send_offsets_to_transaction(
Expand Down
4 changes: 3 additions & 1 deletion spec/lib/waterdrop/clients/buffered_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,12 @@
end

context 'when trying to store offset with transaction' do
let(:consumer) { OpenStruct.new(consumer_group_metadata_pointer: nil) }

it do
expect do
producer.transaction do
producer.transactional_store_offset(nil, 'topic', 0, 0)
producer.transactional_store_offset(consumer, 'topic', 0, 0)
end
end.not_to raise_error
end
Expand Down
80 changes: 80 additions & 0 deletions spec/lib/waterdrop/contracts/transactional_offset_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# frozen_string_literal: true

RSpec.describe_current do
subject(:contract_result) { described_class.new.call(input) }

let(:consumer) { instance_double('Consumer', consumer_group_metadata_pointer: true) }
let(:topic) { 'test_topic' }
let(:partition) { 0 }
let(:offset) { 10 }
let(:offset_metadata) { 'metadata' }
let(:input) do
{
consumer: consumer,
topic: topic,
partition: partition,
offset: offset,
offset_metadata: offset_metadata
}
end

context 'when all inputs are valid' do
it { expect(contract_result).to be_success }
end

context 'when consumer is invalid' do
let(:consumer) { nil }

it { expect(contract_result).not_to be_success }
end

context 'when topic is invalid' do
context 'when topic is empty' do
let(:topic) { '' }

it { expect(contract_result).not_to be_success }
end

context 'when topic is not a string' do
let(:topic) { 123 }

it { expect(contract_result).not_to be_success }
end
end

context 'when partition is invalid' do
context 'when partition is negative' do
let(:partition) { -1 }

it { expect(contract_result).not_to be_success }
end

context 'when partition is not an integer' do
let(:partition) { 'not_an_integer' }

it { expect(contract_result).not_to be_success }
end
end

context 'when offset is invalid' do
context 'when offset is negative' do
let(:offset) { -10 }

it { expect(contract_result).not_to be_success }
end

context 'when offset is not an integer' do
let(:offset) { 'not_an_integer' }

it { expect(contract_result).not_to be_success }
end
end

context 'when offset_metadata is invalid' do
context 'when offset_metadata is not a string or nil' do
let(:offset_metadata) { 123 }

it { expect(contract_result).not_to be_success }
end
end
end
15 changes: 14 additions & 1 deletion spec/lib/waterdrop/producer/transactions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -514,9 +514,22 @@
end
end

context 'when we try to store offset with invalid arguments' do
let(:consumer) { OpenStruct.new }

before { allow(producer.client).to receive(:send_offsets_to_transaction) }

it 'expect to delegate to client send_offsets_to_transaction with correct timeout' do
producer.transaction do
expect { producer.transactional_store_offset(consumer, 'topic', 0, 100) }
.to raise_error(WaterDrop::Errors::TransactionalOffsetInvalidError)
end
end
end

# Full e2e integration of this is checked in Karafka as we do not operate on consumers here
context 'when trying to store offset inside a transaction' do
let(:consumer) { OpenStruct.new }
let(:consumer) { OpenStruct.new(consumer_group_metadata_pointer: 1) }

before do
allow(producer.client).to receive(:send_offsets_to_transaction)
Expand Down

0 comments on commit 743c99f

Please sign in to comment.