Skip to content

Commit

Permalink
align transactional api (#440)
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld authored Jan 3, 2024
1 parent 1c664bc commit 252f194
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 99 deletions.
4 changes: 1 addition & 3 deletions config/locales/errors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ en:

transactional_offset:
consumer_format: 'must respond to #consumer_group_metadata_pointer method'
topic_format: must be a non-empty string
message_format: 'must respond to #topic, #partition and #offset'
missing: must be present
partition_format: must be an integer greater or equal to -1
offset_format: must be an integer greater or equal to -1
offset_metadata_format: must be string or nil

test:
Expand Down
4 changes: 1 addition & 3 deletions lib/waterdrop/contracts/transactional_offset.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ class TransactionalOffset < ::Karafka::Core::Contractable::Contract
end

required(:consumer) { |val| val.respond_to?(:consumer_group_metadata_pointer) }
required(:topic) { |val| val.is_a?(String) && !val.empty? }
required(:partition) { |val| val.is_a?(Integer) && val >= 0 }
required(:offset) { |val| val.is_a?(Integer) && val >= 0 }
required(:message) { |val| val.respond_to?(:topic) && val.respond_to?(:partition) }
required(:offset_metadata) { |val| val.is_a?(String) || val.nil? }
end
end
Expand Down
17 changes: 11 additions & 6 deletions lib/waterdrop/instrumentation/logger_listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,17 @@ def on_transaction_committed(event)
end

# @param event [Dry::Events::Event] event that happened with the details
def on_transaction_offset_stored(event)
topic = event[:topic]
partition = event[:partition]
offset = event[:offset]

info(event, "Storing offset #{offset} for topic #{topic}/#{partition} in the transaction")
def on_transaction_marked_as_consumed(event)
message = event[:message]
topic = message.topic
partition = message.partition
offset = message.offset
loc = "#{topic}/#{partition}"

info(
event,
"Marking message with offset #{offset} for topic #{loc} as consumed in a transaction"
)
end

# @param event [Dry::Events::Event] event that happened with the details
Expand Down
2 changes: 1 addition & 1 deletion lib/waterdrop/instrumentation/notifications.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class Notifications < ::Karafka::Core::Monitoring::Notifications
transaction.started
transaction.committed
transaction.aborted
transaction.offset_stored
transaction.marked_as_consumed
transaction.finished

buffer.flushed_async
Expand Down
28 changes: 15 additions & 13 deletions lib/waterdrop/producer/transactions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,35 +96,37 @@ def transactional?
@transactional = config.kafka.to_h.key?(:'transactional.id')
end

# Stores provided offset inside of the transaction. If used, will connect the stored offset
# with the producer transaction making sure that all succeed or fail together
# Marks given message as consumed inside of a transaction.
#
# @param consumer [#consumer_group_metadata_pointer] any consumer from which we can obtain
# the librdkafka consumer group metadata pointer
# @param topic [String] topic name
# @param partition [Integer] partition
# @param offset [Integer] offset we want to store
# @param message [Karafka::Messages::Message] karafka message
# @param offset_metadata [String] offset metadata or nil if none
def transactional_store_offset(consumer, topic, partition, offset, offset_metadata = nil)
def transaction_mark_as_consumed(consumer, message, offset_metadata = nil)
raise Errors::TransactionRequiredError unless @transaction_mutex.owned?

CONTRACT.validate!(
{
consumer: consumer,
topic: topic,
partition: partition,
offset: offset,
message: message,
offset_metadata: offset_metadata
},
Errors::TransactionalOffsetInvalidError
)

details = { topic: topic, partition: partition, offset: offset }
details = { message: message, offset_metadata: offset_metadata }

transactional_instrument(:offset_stored, details) do
transactional_instrument(:marked_as_consumed, details) do
tpl = Rdkafka::Consumer::TopicPartitionList.new
partition = Rdkafka::Consumer::Partition.new(partition, offset, 0, offset_metadata)
tpl.add_topic_and_partitions_with_offsets(topic, [partition])
partition = Rdkafka::Consumer::Partition.new(
message.partition,
# +1 because this is next offset from which we will start processing from
message.offset + 1,
0,
offset_metadata
)

tpl.add_topic_and_partitions_with_offsets(message.topic, [partition])

with_transactional_error_handling(:store_offset) do
client.send_offsets_to_transaction(
Expand Down
7 changes: 5 additions & 2 deletions spec/lib/waterdrop/clients/buffered_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -181,19 +181,22 @@
end

context 'when we try to store offset without a transaction' do
let(:message) { OpenStruct.new(topic: rand.to_s, partition: 0, offset: 10) }

it 'expect to raise an error' do
expect { producer.transactional_store_offset(nil, 'topic', 0, 0) }
expect { producer.transaction_mark_as_consumed(nil, message) }
.to raise_error(WaterDrop::Errors::TransactionRequiredError)
end
end

context 'when trying to store offset with transaction' do
let(:consumer) { OpenStruct.new(consumer_group_metadata_pointer: nil) }
let(:message) { OpenStruct.new(topic: rand.to_s, partition: 0, offset: 10) }

it do
expect do
producer.transaction do
producer.transactional_store_offset(consumer, 'topic', 0, 0)
producer.transaction_mark_as_consumed(consumer, message)
end
end.not_to raise_error
end
Expand Down
45 changes: 5 additions & 40 deletions spec/lib/waterdrop/contracts/transactional_offset_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@
let(:topic) { 'test_topic' }
let(:partition) { 0 }
let(:offset) { 10 }
let(:message) { OpenStruct.new(topic: topic, partition: partition, offset: offset) }
let(:offset_metadata) { 'metadata' }
let(:input) do
{
consumer: consumer,
topic: topic,
partition: partition,
offset: offset,
message: message,
offset_metadata: offset_metadata
}
end
Expand All @@ -28,43 +27,9 @@
it { expect(contract_result).not_to be_success }
end

context 'when topic is invalid' do
context 'when topic is empty' do
let(:topic) { '' }

it { expect(contract_result).not_to be_success }
end

context 'when topic is not a string' do
let(:topic) { 123 }

it { expect(contract_result).not_to be_success }
end
end

context 'when partition is invalid' do
context 'when partition is negative' do
let(:partition) { -1 }

it { expect(contract_result).not_to be_success }
end

context 'when partition is not an integer' do
let(:partition) { 'not_an_integer' }

it { expect(contract_result).not_to be_success }
end
end

context 'when offset is invalid' do
context 'when offset is negative' do
let(:offset) { -10 }

it { expect(contract_result).not_to be_success }
end

context 'when offset is not an integer' do
let(:offset) { 'not_an_integer' }
context 'when message is invalid' do
context 'when message is a string' do
let(:message) { 'test' }

it { expect(contract_result).not_to be_success }
end
Expand Down
14 changes: 8 additions & 6 deletions spec/lib/waterdrop/instrumentation/logger_listener_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -244,17 +244,19 @@
it { expect(logged_data[0]).to include('Committing transaction') }
end

describe '#on_transaction_offset_stored' do
describe '#on_transaction_marked_as_consumed' do
before do
details[:topic] = rand.to_s
details[:partition] = 0
details[:offset] = 100
details[:message] = OpenStruct.new(
topic: rand.to_s,
partition: 0,
offset: 100
)

listener.on_transaction_offset_stored(event)
listener.on_transaction_marked_as_consumed(event)
end

it { expect(logged_data[0]).to include(producer.id) }
it { expect(logged_data[0]).to include('INFO') }
it { expect(logged_data[0]).to include('Storing offset') }
it { expect(logged_data[0]).to include('Marking message') }
end
end
54 changes: 29 additions & 25 deletions spec/lib/waterdrop/producer/transactions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -507,42 +507,46 @@
end
end

context 'when we try to store offset without a transaction' do
it 'expect to raise an error' do
expect { producer.transactional_store_offset(nil, 'topic', 0, 0) }
.to raise_error(WaterDrop::Errors::TransactionRequiredError)
context 'when trying to mark as consumed in a transaction' do
let(:message) { OpenStruct.new(topic: rand.to_s, partition: 0, offset: 100) }

context 'when we try mark as consumed without a transaction' do
it 'expect to raise an error' do
expect { producer.transaction_mark_as_consumed(nil, message) }
.to raise_error(WaterDrop::Errors::TransactionRequiredError)
end
end
end

context 'when we try to store offset with invalid arguments' do
let(:consumer) { OpenStruct.new }
context 'when we try mark as consumed with invalid arguments' do
let(:consumer) { OpenStruct.new }

before { allow(producer.client).to receive(:send_offsets_to_transaction) }
before { allow(producer.client).to receive(:send_offsets_to_transaction) }

it 'expect to delegate to client send_offsets_to_transaction with correct timeout' do
producer.transaction do
expect { producer.transactional_store_offset(consumer, 'topic', 0, 100) }
.to raise_error(WaterDrop::Errors::TransactionalOffsetInvalidError)
it 'expect to delegate to client send_offsets_to_transaction with correct timeout' do
producer.transaction do
expect { producer.transaction_mark_as_consumed(consumer, message) }
.to raise_error(WaterDrop::Errors::TransactionalOffsetInvalidError)
end
end
end
end

# Full e2e integration of this is checked in Karafka as we do not operate on consumers here
context 'when trying to store offset inside a transaction' do
let(:consumer) { OpenStruct.new(consumer_group_metadata_pointer: 1) }
# Full e2e integration of this is checked in Karafka as we do not operate on consumers here
context 'when trying mark as consumed inside a transaction' do
let(:consumer) { OpenStruct.new(consumer_group_metadata_pointer: 1) }

before do
allow(producer.client).to receive(:send_offsets_to_transaction)
before do
allow(producer.client).to receive(:send_offsets_to_transaction)

producer.transaction do
producer.transactional_store_offset(consumer, 'topic', 0, 0)
producer.transaction do
producer.transaction_mark_as_consumed(consumer, message)
end
end
end

it 'expect to delegate to client send_offsets_to_transaction with correct timeout' do
expect(producer.client)
.to have_received(:send_offsets_to_transaction)
.with(consumer, any_args, 30_000)
it 'expect to delegate to client send_offsets_to_transaction with correct timeout' do
expect(producer.client)
.to have_received(:send_offsets_to_transaction)
.with(consumer, any_args, 30_000)
end
end
end
end

0 comments on commit 252f194

Please sign in to comment.