Skip to content

Commit

Permalink
Provide full reports and handles on dummy dispatches and buffer trans…
Browse files Browse the repository at this point in the history
…actions correctly (#407)

* improve dummy and buffered

* version
  • Loading branch information
mensfeld authored Oct 25, 2023
1 parent 10afa00 commit a0ca2a8
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 111 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# WaterDrop changelog

## 2.6.11 (Unreleased)
- [Enhancement] Return delivery handles and delivery report for both dummy and buffered clients with proper topics, partitions and offsets assign and auto-increment offsets per partition.
- [Fix] Fix a case where buffered test client would not accumulate messages on failed transactions

## 2.6.10 (2023-10-24)
- [Improvement] Introduce `message.purged` event to indicate that a message that was not delivered to Kafka was purged. This most of the time refers to messages that were part of a transaction and were not yet dispatched to Kafka. It always means, that given message was not delivered but in case of transactions it is expected. In case of non-transactional it usually means `#purge` usage or exceeding `message.timeout.ms` so `librdkafka` removes this message from its internal queue. Non-transactional producers do **not** use this and pipe purges to `error.occurred`.
- [Fix] Fix a case where `message.acknowledged` would not have `caller` key.
Expand Down
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
waterdrop (2.6.10)
waterdrop (2.6.11)
karafka-core (>= 2.2.3, < 3.0.0)
zeitwerk (~> 2.3)

Expand Down
61 changes: 24 additions & 37 deletions lib/waterdrop/clients/buffered.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,12 @@ module Clients
class Buffered < Clients::Dummy
attr_accessor :messages

# Sync fake response for the message delivery to Kafka, since we do not dispatch anything
class SyncResponse
# @param _args Handler wait arguments (irrelevant as waiting is fake here)
def wait(*_args)
false
end
end

# @param args [Object] anything accepted by `Clients::Dummy`
def initialize(*args)
super
@messages = []
@topics = Hash.new { |k, v| k[v] = [] }

@transaction_mutex = Mutex.new
@transaction_active = false
@transaction_messages = []
@transaction_topics = Hash.new { |k, v| k[v] = [] }
Expand All @@ -29,6 +20,7 @@ def initialize(*args)

# "Produces" message to Kafka: it acknowledges it locally, adds it to the internal buffer
# @param message [Hash] `WaterDrop::Producer#produce_sync` message hash
# @return [Dummy::Handle] fake delivery handle that can be materialized into a report
def produce(message)
if @transaction_active
@transaction_topics[message.fetch(:topic)] << message
Expand All @@ -39,29 +31,20 @@ def produce(message)
@messages << message
end

SyncResponse.new
super(**message.to_h)
end

# Yields the code pretending it is in a transaction
# Supports our aborting transaction flow
# Moves messages the appropriate buffers only if transaction is successful
def transaction
# Starts the transaction on a given level
def begin_transaction
@transaction_level += 1

return yield if @transaction_mutex.owned?

@transaction_mutex.lock
@transaction_active = true
end

result = nil
commit = false

catch(:abort) do
result = yield
commit = true
end
# Finishes given level of transaction
def commit_transaction
@transaction_level -= 1

commit || raise(WaterDrop::Errors::AbortTransaction)
return unless @transaction_level.zero?

# Transfer transactional data on success
@transaction_topics.each do |topic, messages|
Expand All @@ -70,20 +53,20 @@ def transaction

@messages += @transaction_messages

result
rescue StandardError => e
return if e.is_a?(WaterDrop::Errors::AbortTransaction)
@transaction_topics.clear
@transaction_messages.clear
@transaction_active = false
end

raise
ensure
# Aborts the transaction
def abort_transaction
@transaction_level -= 1

if @transaction_level.zero? && @transaction_mutex.owned?
@transaction_topics.clear
@transaction_messages.clear
@transaction_active = false
@transaction_mutex.unlock
end
return unless @transaction_level.zero?

@transaction_topics.clear
@transaction_messages.clear
@transaction_active = false
end

# Returns messages produced to a given topic
Expand All @@ -95,6 +78,10 @@ def messages_for(topic)
# Clears internal buffer
# Used in between specs so messages do not leak out
def reset
@transaction_level = 0
@transaction_active = false
@transaction_topics.clear
@transaction_messages.clear
@messages.clear
@topics.each_value(&:clear)
end
Expand Down
68 changes: 41 additions & 27 deletions lib/waterdrop/clients/dummy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,46 +5,60 @@ module Clients
# A dummy client that is supposed to be used instead of Rdkafka::Producer in case we don't
# want to dispatch anything to Kafka.
#
# It does not store anything and just ignores messages.
# It does not store anything and just ignores messages. It does however return proper delivery
# handle that can be materialized into a report.
class Dummy
# `::Rdkafka::Producer::DeliveryHandle` object API compatible dummy object
class Handle < ::Rdkafka::Producer::DeliveryHandle
# @param topic [String] topic where we want to dispatch message
# @param partition [Integer] target partition
# @param offset [Integer] offset assigned by our fake "Kafka"
def initialize(topic, partition, offset)
@topic = topic
@partition = partition
@offset = offset
end

# Does not wait, just creates the result
#
# @param _args [Array] anything the wait handle would accept
# @return [::Rdkafka::Producer::DeliveryReport]
def wait(*_args)
create_result
end

# Creates a delivery report with details where the message went
#
# @return [::Rdkafka::Producer::DeliveryReport]
def create_result
::Rdkafka::Producer::DeliveryReport.new(
@partition,
@offset,
@topic
)
end
end

# @param _producer [WaterDrop::Producer]
# @return [Dummy] dummy instance
def initialize(_producer)
@counter = -1
@counters = Hash.new { |h, k| h[k] = -1 }
end

# Dummy method for returning the delivery report
# @param _args [Object] anything that the delivery handle accepts
# @return [::Rdkafka::Producer::DeliveryReport]
def wait(*_args)
::Rdkafka::Producer::DeliveryReport.new(0, @counter += 1)
# "Produces" the message
# @param topic [String, Symbol] topic where we want to dispatch message
# @param partition [Integer] target partition
# @param _args [Hash] remaining details that are ignored in the dummy mode
# @return [Handle] delivery handle
def produce(topic:, partition: 0, **_args)
Handle.new(topic.to_s, partition, @counters["#{topic}#{partition}"] += 1)
end

# @param _args [Object] anything really, this dummy is suppose to support anything
def respond_to_missing?(*_args)
true
end

# Yields the code pretending it is in a transaction
# Supports our aborting transaction flow
def transaction
result = nil
commit = false

catch(:abort) do
result = yield
commit = true
end

commit || raise(WaterDrop::Errors::AbortTransaction)

result
rescue StandardError => e
return if e.is_a?(WaterDrop::Errors::AbortTransaction)

raise
end

# @param _args [Object] anything really, this dummy is suppose to support anything
# @return [self] returns self for chaining cases
def method_missing(*_args)
Expand Down
2 changes: 1 addition & 1 deletion lib/waterdrop/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
# WaterDrop library
module WaterDrop
# Current WaterDrop version
VERSION = '2.6.10'
VERSION = '2.6.11'
end
52 changes: 26 additions & 26 deletions spec/lib/waterdrop/clients/buffered_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@
describe '#transaction' do
context 'when no error and no abort' do
it 'expect to return the block value' do
expect(client.transaction { 1 }).to eq(1)
expect(producer.transaction { 1 }).to eq(1)
end
end

context 'when running transaction with production of messages' do
it 'expect to add them to the buffers' do
client.transaction do
client.produce(topic: 'test', payload: 'test')
client.produce(topic: 'test', payload: 'test')
producer.transaction do
producer.produce_sync(topic: 'test', payload: 'test')
producer.produce_sync(topic: 'test', payload: 'test')
end

expect(client.messages.size).to eq(5)
Expand All @@ -86,13 +86,13 @@

context 'when running nested transaction with production of messages' do
it 'expect to add them to the buffers' do
client.transaction do
client.produce(topic: 'test', payload: 'test')
client.produce(topic: 'test', payload: 'test')
producer.transaction do
producer.produce_sync(topic: 'test', payload: 'test')
producer.produce_sync(topic: 'test', payload: 'test')

client.transaction do
client.produce(topic: 'test', payload: 'test')
client.produce(topic: 'test', payload: 'test')
producer.transaction do
producer.produce_sync(topic: 'test', payload: 'test')
producer.produce_sync(topic: 'test', payload: 'test')
end
end

Expand All @@ -103,13 +103,13 @@

context 'when running nested transaction with production of messages on abort' do
it 'expect to add them to the buffers' do
client.transaction do
client.produce(topic: 'test', payload: 'test')
client.produce(topic: 'test', payload: 'test')
producer.transaction do
producer.produce_sync(topic: 'test', payload: 'test')
producer.produce_sync(topic: 'test', payload: 'test')

client.transaction do
client.produce(topic: 'test', payload: 'test')
client.produce(topic: 'test', payload: 'test')
producer.transaction do
producer.produce_sync(topic: 'test', payload: 'test')
producer.produce_sync(topic: 'test', payload: 'test')

throw(:abort)
end
Expand All @@ -123,13 +123,13 @@
context 'when abort occurs' do
it 'expect not to raise error' do
expect do
client.transaction { throw(:abort) }
producer.transaction { throw(:abort) }
end.not_to raise_error
end

it 'expect not to contain messages from the aborted transaction' do
client.transaction do
client.produce(topic: 'test', payload: 'test')
producer.transaction do
producer.produce_sync(topic: 'test', payload: 'test')
throw(:abort)
end

Expand All @@ -141,22 +141,22 @@
context 'when WaterDrop::Errors::AbortTransaction error occurs' do
it 'expect not to raise error' do
expect do
client.transaction { raise(WaterDrop::Errors::AbortTransaction) }
producer.transaction { raise(WaterDrop::Errors::AbortTransaction) }
end.not_to raise_error
end
end

context 'when different error occurs' do
it 'expect to raise error' do
expect do
client.transaction { raise(StandardError) }
producer.transaction { raise(StandardError) }
end.to raise_error(StandardError)
end

it 'expect not to contain messages from the aborted transaction' do
expect do
client.transaction do
client.produce(topic: 'test', payload: 'test')
producer.transaction do
producer.produce_sync(topic: 'test', payload: 'test')

raise StandardError
end
Expand All @@ -169,9 +169,9 @@

context 'when running a nested transaction' do
it 'expect to work ok' do
result = client.transaction do
client.transaction do
client.produce(topic: '1', payload: '2')
result = producer.transaction do
producer.transaction do
producer.produce_sync(topic: '1', payload: '2')
2
end
end
Expand Down
Loading

0 comments on commit a0ca2a8

Please sign in to comment.