Skip to content

Commit

Permalink
Store offset API (#428)
Browse files Browse the repository at this point in the history
* offset storage in transaction

* remarks

* specs

* remarks

* remove unused code
  • Loading branch information
mensfeld authored Dec 19, 2023
1 parent 7397ef0 commit a618ff0
Show file tree
Hide file tree
Showing 11 changed files with 132 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# WaterDrop changelog

## 2.6.12 (Unreleased)
- [Enhancement] Provide ability to commit offset during the transaction with a consumer provided.
- [Change] Remove usage of concurrent ruby.

## 2.6.11 (2023-10-25)
Expand Down
15 changes: 10 additions & 5 deletions lib/waterdrop/clients/buffered.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ def begin_transaction
def commit_transaction
@transaction_level -= 1

return unless @transaction_level.zero?

# Transfer transactional data on success
@transaction_topics.each do |topic, messages|
@topics[topic] += messages
Expand All @@ -58,12 +56,19 @@ def commit_transaction
@transaction_active = false
end

# Fakes storing the offset in a transactional fashion
#
# @param _consumer [#consumer_group_metadata_pointer] any consumer from which we can obtain
# the librdkafka consumer group metadata pointer
# @param _tpl [Rdkafka::Consumer::TopicPartitionList] consumer tpl for offset storage
# @param _timeout [Integer] ms timeout
def send_offsets_to_transaction(_consumer, _tpl, _timeout)
nil
end

# Aborts the transaction
def abort_transaction
@transaction_level -= 1

return unless @transaction_level.zero?

@transaction_topics.clear
@transaction_messages.clear
@transaction_active = false
Expand Down
6 changes: 6 additions & 0 deletions lib/waterdrop/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@ module Errors
# Raised when there is an inline error during single message produce operations
ProduceError = Class.new(BaseError)

# Raised when we attempt to perform operation that is only allowed inside of a transaction and
# there is no transaction around us
TransactionRequiredError = Class.new(BaseError)

# Raise it within a transaction to abort it
# It does not have an `Error` postfix because technically it is not an error as it is used for
# graceful transaction aborting
AbortTransaction = Class.new(BaseError)

# Raised when during messages producing something bad happened inline
Expand Down
9 changes: 9 additions & 0 deletions lib/waterdrop/instrumentation/logger_listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,15 @@ def on_transaction_committed(event)
info(event, 'Committing transaction')
end

# @param event [Dry::Events::Event] event that happened with the details
def on_transaction_offset_stored(event)
topic = event[:topic]
partition = event[:partition]
offset = event[:offset]

info(event, "Storing offset #{offset} for topic #{topic}/#{partition} in the transaction")
end

# @param event [Dry::Events::Event] event that happened with the details
def on_transaction_finished(event)
info(event, 'Processing transaction')
Expand Down
1 change: 1 addition & 0 deletions lib/waterdrop/instrumentation/notifications.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class Notifications < ::Karafka::Core::Monitoring::Notifications
transaction.started
transaction.committed
transaction.aborted
transaction.offset_stored
transaction.finished

buffer.flushed_async
Expand Down
5 changes: 4 additions & 1 deletion lib/waterdrop/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ class Producer
Rdkafka::Producer::DeliveryHandle::WaitTimeoutError
].freeze

private_constant :SUPPORTED_FLOW_ERRORS
# Empty has to save on memory allocations
EMPTY_HASH = {}.freeze

private_constant :SUPPORTED_FLOW_ERRORS, :EMPTY_HASH

def_delegators :config, :middleware

Expand Down
35 changes: 32 additions & 3 deletions lib/waterdrop/producer/transactions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,34 @@ def transactional?
@transactional = config.kafka.to_h.key?(:'transactional.id')
end

# Stores provided offset inside of the transaction. If used, will connect the stored offset
# with the producer transaction making sure that all succeed or fail together
#
# @param consumer [#consumer_group_metadata_pointer] any consumer from which we can obtain
# the librdkafka consumer group metadata pointer
# @param topic [String] topic name
# @param partition [Integer] partition
# @param offset [Integer] offset we want to store
def transactional_store_offset(consumer, topic, partition, offset)
raise Errors::TransactionRequiredError unless @transaction_mutex.owned?

details = { topic: topic, partition: partition, offset: offset }

transactional_instrument(:offset_stored, details) do
tpl = Rdkafka::Consumer::TopicPartitionList.new
tpl.add_topic_and_partitions_with_offsets(topic, partition => offset)

with_transactional_error_handling(:store_offset) do
client.send_offsets_to_transaction(
consumer,
tpl,
# This setting is at the moment in seconds and we require ms
@config.max_wait_timeout * 1_000
)
end
end
end

private

# Runs provided code with a transaction wrapper if transactions are enabled.
Expand All @@ -105,9 +133,10 @@ def with_transaction_if_transactional(&block)
# Instruments the transactional operation with producer id
#
# @param key [Symbol] transaction operation key
# @param details [Hash] additional instrumentation details
# @param block [Proc] block to run inside the instrumentation or nothing if not given
def transactional_instrument(key, &block)
@monitor.instrument("transaction.#{key}", producer_id: id, &block)
def transactional_instrument(key, details = EMPTY_HASH, &block)
@monitor.instrument("transaction.#{key}", details.merge(producer_id: id), &block)
end

# Error handling for transactional operations is a bit special. There are three types of
Expand Down Expand Up @@ -157,7 +186,7 @@ def with_transactional_error_handling(action, allow_abortable: true)
# Always attempt to abort but if aborting fails with an abortable error, do not attempt
# to abort from abort as this could create an infinite loop
with_transactional_error_handling(:abort, allow_abortable: false) do
transactional_instrument(:aborted) { @client.abort_transaction }
transactional_instrument(:aborted) { client.abort_transaction }
end

raise
Expand Down
17 changes: 17 additions & 0 deletions spec/lib/waterdrop/clients/buffered_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -179,5 +179,22 @@
expect(result).to eq(2)
end
end

context 'when we try to store offset without a transaction' do
it 'expect to raise an error' do
expect { producer.transactional_store_offset(nil, 'topic', 0, 0) }
.to raise_error(WaterDrop::Errors::TransactionRequiredError)
end
end

context 'when trying to store offset with transaction' do
it do
expect do
producer.transaction do
producer.transactional_store_offset(nil, 'topic', 0, 0)
end
end.not_to raise_error
end
end
end
end
12 changes: 12 additions & 0 deletions spec/lib/waterdrop/errors_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,16 @@

specify { expect(error).to be < described_class::BaseError }
end

describe 'AbortTransaction' do
subject(:error) { described_class::AbortTransaction }

specify { expect(error).to be < described_class::BaseError }
end

describe 'TransactionRequiredError' do
subject(:error) { described_class::TransactionRequiredError }

specify { expect(error).to be < described_class::BaseError }
end
end
14 changes: 14 additions & 0 deletions spec/lib/waterdrop/instrumentation/logger_listener_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -243,4 +243,18 @@
it { expect(logged_data[0]).to include('INFO') }
it { expect(logged_data[0]).to include('Committing transaction') }
end

describe '#on_transaction_offset_stored' do
before do
details[:topic] = rand.to_s
details[:partition] = 0
details[:offset] = 100

listener.on_transaction_offset_stored(event)
end

it { expect(logged_data[0]).to include(producer.id) }
it { expect(logged_data[0]).to include('INFO') }
it { expect(logged_data[0]).to include('Storing offset') }
end
end
26 changes: 26 additions & 0 deletions spec/lib/waterdrop/producer/transactions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -506,4 +506,30 @@
end
end
end

context 'when we try to store offset without a transaction' do
it 'expect to raise an error' do
expect { producer.transactional_store_offset(nil, 'topic', 0, 0) }
.to raise_error(WaterDrop::Errors::TransactionRequiredError)
end
end

# Full e2e integration of this is checked in Karafka as we do not operate on consumers here
context 'when trying to store offset inside a transaction' do
let(:consumer) { OpenStruct.new }

before do
allow(producer.client).to receive(:send_offsets_to_transaction)

producer.transaction do
producer.transactional_store_offset(consumer, 'topic', 0, 0)
end
end

it 'expect to delegate to client send_offsets_to_transaction with correct timeout' do
expect(producer.client)
.to have_received(:send_offsets_to_transaction)
.with(consumer, any_args, 30_000)
end
end
end

0 comments on commit a618ff0

Please sign in to comment.