Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store offset API #428

Merged
merged 5 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# WaterDrop changelog

## 2.6.12 (Unreleased)
- [Enhancement] Provide ability to commit offset during the transaction with a consumer provided.
- [Change] Remove usage of concurrent ruby.

## 2.6.11 (2023-10-25)
Expand Down
15 changes: 10 additions & 5 deletions lib/waterdrop/clients/buffered.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ def begin_transaction
def commit_transaction
@transaction_level -= 1

return unless @transaction_level.zero?

# Transfer transactional data on success
@transaction_topics.each do |topic, messages|
@topics[topic] += messages
Expand All @@ -58,12 +56,19 @@ def commit_transaction
@transaction_active = false
end

# Fakes storing the offset in a transactional fashion
#
# @param _consumer [#consumer_group_metadata_pointer] any consumer from which we can obtain
# the librdkafka consumer group metadata pointer
# @param _tpl [Rdkafka::Consumer::TopicPartitionList] consumer tpl for offset storage
# @param _timeout [Integer] ms timeout
def send_offsets_to_transaction(_consumer, _tpl, _timeout)
nil
end

# Aborts the transaction
def abort_transaction
@transaction_level -= 1

return unless @transaction_level.zero?

@transaction_topics.clear
@transaction_messages.clear
@transaction_active = false
Expand Down
6 changes: 6 additions & 0 deletions lib/waterdrop/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@ module Errors
# Raised when there is an inline error during single message produce operations
ProduceError = Class.new(BaseError)

# Raised when we attempt to perform operation that is only allowed inside of a transaction and
# there is no transaction around us
TransactionRequiredError = Class.new(BaseError)

# Raise it within a transaction to abort it
# It does not have an `Error` postfix because technically it is not an error as it is used for
# graceful transaction aborting
AbortTransaction = Class.new(BaseError)

# Raised when during messages producing something bad happened inline
Expand Down
9 changes: 9 additions & 0 deletions lib/waterdrop/instrumentation/logger_listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,15 @@ def on_transaction_committed(event)
info(event, 'Committing transaction')
end

# @param event [Dry::Events::Event] event that happened with the details
def on_transaction_offset_stored(event)
topic = event[:topic]
partition = event[:partition]
offset = event[:offset]

info(event, "Storing offset #{offset} for topic #{topic}/#{partition} in the transaction")
end

# @param event [Dry::Events::Event] event that happened with the details
def on_transaction_finished(event)
info(event, 'Processing transaction')
Expand Down
1 change: 1 addition & 0 deletions lib/waterdrop/instrumentation/notifications.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class Notifications < ::Karafka::Core::Monitoring::Notifications
transaction.started
transaction.committed
transaction.aborted
transaction.offset_stored
transaction.finished

buffer.flushed_async
Expand Down
5 changes: 4 additions & 1 deletion lib/waterdrop/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ class Producer
Rdkafka::Producer::DeliveryHandle::WaitTimeoutError
].freeze

private_constant :SUPPORTED_FLOW_ERRORS
# Empty has to save on memory allocations
EMPTY_HASH = {}.freeze

private_constant :SUPPORTED_FLOW_ERRORS, :EMPTY_HASH

def_delegators :config, :middleware

Expand Down
35 changes: 32 additions & 3 deletions lib/waterdrop/producer/transactions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,34 @@ def transactional?
@transactional = config.kafka.to_h.key?(:'transactional.id')
end

# Stores provided offset inside of the transaction. If used, will connect the stored offset
# with the producer transaction making sure that all succeed or fail together
#
# @param consumer [#consumer_group_metadata_pointer] any consumer from which we can obtain
# the librdkafka consumer group metadata pointer
# @param topic [String] topic name
# @param partition [Integer] partition
# @param offset [Integer] offset we want to store
def transactional_store_offset(consumer, topic, partition, offset)
raise Errors::TransactionRequiredError unless @transaction_mutex.owned?

details = { topic: topic, partition: partition, offset: offset }

transactional_instrument(:offset_stored, details) do
tpl = Rdkafka::Consumer::TopicPartitionList.new
tpl.add_topic_and_partitions_with_offsets(topic, partition => offset)

with_transactional_error_handling(:store_offset) do
client.send_offsets_to_transaction(
consumer,
tpl,
# This setting is at the moment in seconds and we require ms
@config.max_wait_timeout * 1_000
)
end
end
end

private

# Runs provided code with a transaction wrapper if transactions are enabled.
Expand All @@ -105,9 +133,10 @@ def with_transaction_if_transactional(&block)
# Instruments the transactional operation with producer id
#
# @param key [Symbol] transaction operation key
# @param details [Hash] additional instrumentation details
# @param block [Proc] block to run inside the instrumentation or nothing if not given
def transactional_instrument(key, &block)
@monitor.instrument("transaction.#{key}", producer_id: id, &block)
def transactional_instrument(key, details = EMPTY_HASH, &block)
@monitor.instrument("transaction.#{key}", details.merge(producer_id: id), &block)
end

# Error handling for transactional operations is a bit special. There are three types of
Expand Down Expand Up @@ -157,7 +186,7 @@ def with_transactional_error_handling(action, allow_abortable: true)
# Always attempt to abort but if aborting fails with an abortable error, do not attempt
# to abort from abort as this could create an infinite loop
with_transactional_error_handling(:abort, allow_abortable: false) do
transactional_instrument(:aborted) { @client.abort_transaction }
transactional_instrument(:aborted) { client.abort_transaction }
end

raise
Expand Down
17 changes: 17 additions & 0 deletions spec/lib/waterdrop/clients/buffered_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -179,5 +179,22 @@
expect(result).to eq(2)
end
end

context 'when we try to store offset without a transaction' do
it 'expect to raise an error' do
expect { producer.transactional_store_offset(nil, 'topic', 0, 0) }
.to raise_error(WaterDrop::Errors::TransactionRequiredError)
end
end

context 'when trying to store offset with transaction' do
it do
expect do
producer.transaction do
producer.transactional_store_offset(nil, 'topic', 0, 0)
end
end.not_to raise_error
end
end
end
end
12 changes: 12 additions & 0 deletions spec/lib/waterdrop/errors_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,16 @@

specify { expect(error).to be < described_class::BaseError }
end

describe 'AbortTransaction' do
subject(:error) { described_class::AbortTransaction }

specify { expect(error).to be < described_class::BaseError }
end

describe 'TransactionRequiredError' do
subject(:error) { described_class::TransactionRequiredError }

specify { expect(error).to be < described_class::BaseError }
end
end
14 changes: 14 additions & 0 deletions spec/lib/waterdrop/instrumentation/logger_listener_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -243,4 +243,18 @@
it { expect(logged_data[0]).to include('INFO') }
it { expect(logged_data[0]).to include('Committing transaction') }
end

describe '#on_transaction_offset_stored' do
before do
details[:topic] = rand.to_s
details[:partition] = 0
details[:offset] = 100

listener.on_transaction_offset_stored(event)
end

it { expect(logged_data[0]).to include(producer.id) }
it { expect(logged_data[0]).to include('INFO') }
it { expect(logged_data[0]).to include('Storing offset') }
end
end
26 changes: 26 additions & 0 deletions spec/lib/waterdrop/producer/transactions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -506,4 +506,30 @@
end
end
end

context 'when we try to store offset without a transaction' do
it 'expect to raise an error' do
expect { producer.transactional_store_offset(nil, 'topic', 0, 0) }
.to raise_error(WaterDrop::Errors::TransactionRequiredError)
end
end

# Full e2e integration of this is checked in Karafka as we do not operate on consumers here
context 'when trying to store offset inside a transaction' do
let(:consumer) { OpenStruct.new }

before do
allow(producer.client).to receive(:send_offsets_to_transaction)

producer.transaction do
producer.transactional_store_offset(consumer, 'topic', 0, 0)
end
end

it 'expect to delegate to client send_offsets_to_transaction with correct timeout' do
expect(producer.client)
.to have_received(:send_offsets_to_transaction)
.with(consumer, any_args, 30_000)
end
end
end