Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

align transactional api #440

Merged
merged 1 commit into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions config/locales/errors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ en:

transactional_offset:
consumer_format: 'must respond to #consumer_group_metadata_pointer method'
topic_format: must be a non-empty string
message_format: 'must respond to #topic, #partition and #offset'
missing: must be present
partition_format: must be an integer greater or equal to -1
offset_format: must be an integer greater or equal to -1
offset_metadata_format: must be string or nil

test:
Expand Down
4 changes: 1 addition & 3 deletions lib/waterdrop/contracts/transactional_offset.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ class TransactionalOffset < ::Karafka::Core::Contractable::Contract
end

required(:consumer) { |val| val.respond_to?(:consumer_group_metadata_pointer) }
required(:topic) { |val| val.is_a?(String) && !val.empty? }
required(:partition) { |val| val.is_a?(Integer) && val >= 0 }
required(:offset) { |val| val.is_a?(Integer) && val >= 0 }
required(:message) { |val| val.respond_to?(:topic) && val.respond_to?(:partition) }
required(:offset_metadata) { |val| val.is_a?(String) || val.nil? }
end
end
Expand Down
17 changes: 11 additions & 6 deletions lib/waterdrop/instrumentation/logger_listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,17 @@ def on_transaction_committed(event)
end

# @param event [Dry::Events::Event] event that happened with the details
def on_transaction_offset_stored(event)
topic = event[:topic]
partition = event[:partition]
offset = event[:offset]

info(event, "Storing offset #{offset} for topic #{topic}/#{partition} in the transaction")
def on_transaction_marked_as_consumed(event)
message = event[:message]
topic = message.topic
partition = message.partition
offset = message.offset
loc = "#{topic}/#{partition}"

info(
event,
"Marking message with offset #{offset} for topic #{loc} as consumed in a transaction"
)
end

# @param event [Dry::Events::Event] event that happened with the details
Expand Down
2 changes: 1 addition & 1 deletion lib/waterdrop/instrumentation/notifications.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class Notifications < ::Karafka::Core::Monitoring::Notifications
transaction.started
transaction.committed
transaction.aborted
transaction.offset_stored
transaction.marked_as_consumed
transaction.finished

buffer.flushed_async
Expand Down
28 changes: 15 additions & 13 deletions lib/waterdrop/producer/transactions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,35 +96,37 @@ def transactional?
@transactional = config.kafka.to_h.key?(:'transactional.id')
end

# Stores provided offset inside of the transaction. If used, will connect the stored offset
# with the producer transaction making sure that all succeed or fail together
# Marks given message as consumed inside of a transaction.
#
# @param consumer [#consumer_group_metadata_pointer] any consumer from which we can obtain
# the librdkafka consumer group metadata pointer
# @param topic [String] topic name
# @param partition [Integer] partition
# @param offset [Integer] offset we want to store
# @param message [Karafka::Messages::Message] karafka message
# @param offset_metadata [String] offset metadata or nil if none
def transactional_store_offset(consumer, topic, partition, offset, offset_metadata = nil)
def transaction_mark_as_consumed(consumer, message, offset_metadata = nil)
raise Errors::TransactionRequiredError unless @transaction_mutex.owned?

CONTRACT.validate!(
{
consumer: consumer,
topic: topic,
partition: partition,
offset: offset,
message: message,
offset_metadata: offset_metadata
},
Errors::TransactionalOffsetInvalidError
)

details = { topic: topic, partition: partition, offset: offset }
details = { message: message, offset_metadata: offset_metadata }

transactional_instrument(:offset_stored, details) do
transactional_instrument(:marked_as_consumed, details) do
tpl = Rdkafka::Consumer::TopicPartitionList.new
partition = Rdkafka::Consumer::Partition.new(partition, offset, 0, offset_metadata)
tpl.add_topic_and_partitions_with_offsets(topic, [partition])
partition = Rdkafka::Consumer::Partition.new(
message.partition,
# +1 because this is next offset from which we will start processing from
message.offset + 1,
0,
offset_metadata
)

tpl.add_topic_and_partitions_with_offsets(message.topic, [partition])

with_transactional_error_handling(:store_offset) do
client.send_offsets_to_transaction(
Expand Down
7 changes: 5 additions & 2 deletions spec/lib/waterdrop/clients/buffered_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -181,19 +181,22 @@
end

context 'when we try to store offset without a transaction' do
let(:message) { OpenStruct.new(topic: rand.to_s, partition: 0, offset: 10) }

it 'expect to raise an error' do
expect { producer.transactional_store_offset(nil, 'topic', 0, 0) }
expect { producer.transaction_mark_as_consumed(nil, message) }
.to raise_error(WaterDrop::Errors::TransactionRequiredError)
end
end

context 'when trying to store offset with transaction' do
let(:consumer) { OpenStruct.new(consumer_group_metadata_pointer: nil) }
let(:message) { OpenStruct.new(topic: rand.to_s, partition: 0, offset: 10) }

it do
expect do
producer.transaction do
producer.transactional_store_offset(consumer, 'topic', 0, 0)
producer.transaction_mark_as_consumed(consumer, message)
end
end.not_to raise_error
end
Expand Down
45 changes: 5 additions & 40 deletions spec/lib/waterdrop/contracts/transactional_offset_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@
let(:topic) { 'test_topic' }
let(:partition) { 0 }
let(:offset) { 10 }
let(:message) { OpenStruct.new(topic: topic, partition: partition, offset: offset) }
let(:offset_metadata) { 'metadata' }
let(:input) do
{
consumer: consumer,
topic: topic,
partition: partition,
offset: offset,
message: message,
offset_metadata: offset_metadata
}
end
Expand All @@ -28,43 +27,9 @@
it { expect(contract_result).not_to be_success }
end

context 'when topic is invalid' do
context 'when topic is empty' do
let(:topic) { '' }

it { expect(contract_result).not_to be_success }
end

context 'when topic is not a string' do
let(:topic) { 123 }

it { expect(contract_result).not_to be_success }
end
end

context 'when partition is invalid' do
context 'when partition is negative' do
let(:partition) { -1 }

it { expect(contract_result).not_to be_success }
end

context 'when partition is not an integer' do
let(:partition) { 'not_an_integer' }

it { expect(contract_result).not_to be_success }
end
end

context 'when offset is invalid' do
context 'when offset is negative' do
let(:offset) { -10 }

it { expect(contract_result).not_to be_success }
end

context 'when offset is not an integer' do
let(:offset) { 'not_an_integer' }
context 'when message is invalid' do
context 'when message is a string' do
let(:message) { 'test' }

it { expect(contract_result).not_to be_success }
end
Expand Down
14 changes: 8 additions & 6 deletions spec/lib/waterdrop/instrumentation/logger_listener_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -244,17 +244,19 @@
it { expect(logged_data[0]).to include('Committing transaction') }
end

describe '#on_transaction_offset_stored' do
describe '#on_transaction_marked_as_consumed' do
before do
details[:topic] = rand.to_s
details[:partition] = 0
details[:offset] = 100
details[:message] = OpenStruct.new(
topic: rand.to_s,
partition: 0,
offset: 100
)

listener.on_transaction_offset_stored(event)
listener.on_transaction_marked_as_consumed(event)
end

it { expect(logged_data[0]).to include(producer.id) }
it { expect(logged_data[0]).to include('INFO') }
it { expect(logged_data[0]).to include('Storing offset') }
it { expect(logged_data[0]).to include('Marking message') }
end
end
54 changes: 29 additions & 25 deletions spec/lib/waterdrop/producer/transactions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -507,42 +507,46 @@
end
end

context 'when we try to store offset without a transaction' do
it 'expect to raise an error' do
expect { producer.transactional_store_offset(nil, 'topic', 0, 0) }
.to raise_error(WaterDrop::Errors::TransactionRequiredError)
context 'when trying to mark as consumed in a transaction' do
let(:message) { OpenStruct.new(topic: rand.to_s, partition: 0, offset: 100) }

context 'when we try mark as consumed without a transaction' do
it 'expect to raise an error' do
expect { producer.transaction_mark_as_consumed(nil, message) }
.to raise_error(WaterDrop::Errors::TransactionRequiredError)
end
end
end

context 'when we try to store offset with invalid arguments' do
let(:consumer) { OpenStruct.new }
context 'when we try mark as consumed with invalid arguments' do
let(:consumer) { OpenStruct.new }

before { allow(producer.client).to receive(:send_offsets_to_transaction) }
before { allow(producer.client).to receive(:send_offsets_to_transaction) }

it 'expect to delegate to client send_offsets_to_transaction with correct timeout' do
producer.transaction do
expect { producer.transactional_store_offset(consumer, 'topic', 0, 100) }
.to raise_error(WaterDrop::Errors::TransactionalOffsetInvalidError)
it 'expect to delegate to client send_offsets_to_transaction with correct timeout' do
producer.transaction do
expect { producer.transaction_mark_as_consumed(consumer, message) }
.to raise_error(WaterDrop::Errors::TransactionalOffsetInvalidError)
end
end
end
end

# Full e2e integration of this is checked in Karafka as we do not operate on consumers here
context 'when trying to store offset inside a transaction' do
let(:consumer) { OpenStruct.new(consumer_group_metadata_pointer: 1) }
# Full e2e integration of this is checked in Karafka as we do not operate on consumers here
context 'when trying mark as consumed inside a transaction' do
let(:consumer) { OpenStruct.new(consumer_group_metadata_pointer: 1) }

before do
allow(producer.client).to receive(:send_offsets_to_transaction)
before do
allow(producer.client).to receive(:send_offsets_to_transaction)

producer.transaction do
producer.transactional_store_offset(consumer, 'topic', 0, 0)
producer.transaction do
producer.transaction_mark_as_consumed(consumer, message)
end
end
end

it 'expect to delegate to client send_offsets_to_transaction with correct timeout' do
expect(producer.client)
.to have_received(:send_offsets_to_transaction)
.with(consumer, any_args, 30_000)
it 'expect to delegate to client send_offsets_to_transaction with correct timeout' do
expect(producer.client)
.to have_received(:send_offsets_to_transaction)
.with(consumer, any_args, 30_000)
end
end
end
end