Skip to content

Commit

Permalink
Purge flow for transactions (#403)
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld authored Oct 23, 2023
1 parent 305ea9c commit 87a7f79
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 18 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# WaterDrop changelog

## 2.6.10 (Unreleased)
- [Improvement] Introduce `message.purged` event to indicate that a message that was not delivered to Kafka was purged. This most of the time refers to messages that were part of a transaction and were not yet dispatched to Kafka. It always means, that given message was not delivered but in case of transactions it is expected. In case of non-transactional it usually means `#purge` usage or exceeding `message.timeout.ms` so `librdkafka` removes this message from its internal queue. Non-transactional producers do **not** use this and pipe purges to `error.occurred`.
- [Fix] Fix a case where `message.acknowledged` would not have `caller` key.

## 2.6.9 (2023-10-23)
- [Improvement] Introduce a `transaction.finished` event to indicate that transaction has finished whether it was aborted or committed.
- [Improvement] Use `transaction.committed` event to indicate that transaction has been committed.
Expand Down
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
waterdrop (2.6.9)
waterdrop (2.6.10)
karafka-core (>= 2.2.3, < 3.0.0)
zeitwerk (~> 2.3)

Expand Down
1 change: 1 addition & 0 deletions lib/waterdrop/clients/rdkafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def new(producer)
# callbacks manager to make it work
client.delivery_callback = Instrumentation::Callbacks::Delivery.new(
producer.id,
producer.transactional?,
producer.config.monitor
)

Expand Down
62 changes: 53 additions & 9 deletions lib/waterdrop/instrumentation/callbacks/delivery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,41 @@ module Callbacks
# Creates a callable that we want to run upon each message delivery or failure
#
# @note We don't have to provide client_name here as this callback is per client instance
#
# @note We do not consider `message.purge` as an error for transactional producers, because
# this is a standard behaviour for not yet dispatched messages on aborted transactions.
# We do however still want to instrument it for traceability.
class Delivery
# Error emitted when a message was not yet dispatched and was purged from the queue
RD_KAFKA_RESP_PURGE_QUEUE = -152

# Error emitted when a message was purged while it was dispatched
RD_KAFKA_RESP_PURGE_INFLIGHT = -151

# Errors related to queue purging that is expected in transactions
PURGE_ERRORS = [RD_KAFKA_RESP_PURGE_INFLIGHT, RD_KAFKA_RESP_PURGE_QUEUE].freeze

private_constant :RD_KAFKA_RESP_PURGE_QUEUE, :RD_KAFKA_RESP_PURGE_INFLIGHT, :PURGE_ERRORS

# @param producer_id [String] id of the current producer
# @param transactional [Boolean] is this handle for a transactional or regular producer
# @param monitor [WaterDrop::Instrumentation::Monitor] monitor we are using
def initialize(producer_id, monitor)
def initialize(producer_id, transactional, monitor)
@producer_id = producer_id
@transactional = transactional
@monitor = monitor
end

# Emits delivery details to the monitor
# @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report
def call(delivery_report)
if delivery_report.error.to_i.zero?
error_code = delivery_report.error.to_i

if error_code.zero?
instrument_acknowledged(delivery_report)

elsif @transactional && PURGE_ERRORS.include?(error_code)
instrument_purged(delivery_report)
else
instrument_error(delivery_report)
end
Expand All @@ -27,31 +49,53 @@ def call(delivery_report)
private

# @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report
def instrument_error(delivery_report)
def instrument_acknowledged(delivery_report)
@monitor.instrument(
'error.occurred',
'message.acknowledged',
caller: self,
error: ::Rdkafka::RdkafkaError.new(delivery_report.error),
producer_id: @producer_id,
offset: delivery_report.offset,
partition: delivery_report.partition,
topic: delivery_report.topic_name,
delivery_report: delivery_report,
type: 'librdkafka.dispatch_error'
delivery_report: delivery_report
)
end

# @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report
def instrument_acknowledged(delivery_report)
def instrument_purged(delivery_report)
@monitor.instrument(
'message.acknowledged',
'message.purged',
caller: self,
error: build_error(delivery_report),
producer_id: @producer_id,
offset: delivery_report.offset,
partition: delivery_report.partition,
topic: delivery_report.topic_name,
delivery_report: delivery_report
)
end

# @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report
def instrument_error(delivery_report)
@monitor.instrument(
'error.occurred',
caller: self,
error: build_error(delivery_report),
producer_id: @producer_id,
offset: delivery_report.offset,
partition: delivery_report.partition,
topic: delivery_report.topic_name,
delivery_report: delivery_report,
type: 'librdkafka.dispatch_error'
)
end

# Builds appropriate rdkafka error
# @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report
# @return [::Rdkafka::RdkafkaError]
def build_error(delivery_report)
::Rdkafka::RdkafkaError.new(delivery_report.error)
end
end
end
end
Expand Down
1 change: 1 addition & 0 deletions lib/waterdrop/instrumentation/notifications.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class Notifications < ::Karafka::Core::Monitoring::Notifications
message.produced_async
message.produced_sync
message.acknowledged
message.purged
message.buffered

messages.produced_async
Expand Down
2 changes: 1 addition & 1 deletion lib/waterdrop/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
# WaterDrop library
module WaterDrop
# Current WaterDrop version
VERSION = '2.6.9'
VERSION = '2.6.10'
end
31 changes: 30 additions & 1 deletion spec/lib/waterdrop/instrumentation/callbacks/delivery_spec.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# frozen_string_literal: true

RSpec.describe_current do
subject(:callback) { described_class.new(producer_id, monitor) }
subject(:callback) { described_class.new(producer_id, transactional, monitor) }

let(:producer) { build(:producer) }
let(:producer_id) { SecureRandom.uuid }
let(:transactional) { producer.transactional? }
let(:monitor) { ::WaterDrop::Instrumentation::Monitor.new }
let(:delivery_report) do
OpenStruct.new(
Expand Down Expand Up @@ -123,5 +124,33 @@
it { expect(event[:error]).to be_a(WaterDrop::Errors::ProduceError) }
it { expect(event[:error].cause).to be_a(Rdkafka::RdkafkaError) }
end

context 'when there is a producer with non-transactional purge' do
let(:producer) { build(:slow_producer) }
let(:errors) { [] }
let(:purges) { [] }

before do
producer.monitor.subscribe('error.occurred') do |event|
errors << event[:error]
end

producer.monitor.subscribe('message.purged') do |event|
purges << event[:error]
end

producer.produce_async(build(:valid_message))
producer.purge
end

it 'expect to have it in the errors' do
expect(errors.first).to be_a(Rdkafka::RdkafkaError)
expect(errors.first.code).to eq(:purge_queue)
end

it 'expect not to publish purge notification' do
expect(purges).to be_empty
end
end
end
end
30 changes: 24 additions & 6 deletions spec/lib/waterdrop/producer/transactions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,17 @@

context 'when we have error instrumentation' do
let(:errors) { [] }
let(:purges) { [] }

before do
producer.monitor.subscribe('error.occurred') do |event|
errors << event[:error]
end

producer.monitor.subscribe('message.purged') do |event|
purges << event[:error]
end

begin
producer.transaction do
producer.produce_async(topic: 'example_topic', payload: 'na')
Expand All @@ -146,9 +151,13 @@
end
end

it 'expect to emit the cancellation error via the error pipeline' do
expect(errors.first).to be_a(Rdkafka::RdkafkaError)
expect(errors.first.code).to eq(:purge_queue)
it 'expect not to emit the cancellation error via the error pipeline' do
expect(errors).to be_empty
end

it 'expect to emit the cancellation error via the message.purged' do
expect(purges.first).to be_a(Rdkafka::RdkafkaError)
expect(purges.first.code).to eq(:purge_queue)
end
end

Expand Down Expand Up @@ -253,22 +262,31 @@

context 'when we have error instrumentation' do
let(:errors) { [] }
let(:purges) { [] }

before do
producer.monitor.subscribe('error.occurred') do |event|
errors << event[:error]
end

producer.monitor.subscribe('message.purged') do |event|
purges << event[:error]
end

producer.transaction do
producer.produce_async(topic: 'example_topic', payload: 'na')

throw(:abort)
end
end

it 'expect to emit the cancellation error via the error pipeline' do
expect(errors.first).to be_a(Rdkafka::RdkafkaError)
expect(errors.first.code).to eq(:purge_queue)
it 'expect not to emit the cancellation error via the error pipeline' do
expect(errors).to be_empty
end

it 'expect to emit the cancellation error via the message.purged' do
expect(purges.first).to be_a(Rdkafka::RdkafkaError)
expect(purges.first.code).to eq(:purge_queue)
end
end

Expand Down

0 comments on commit 87a7f79

Please sign in to comment.