Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Purge flow for transactions #403

Merged
merged 5 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# WaterDrop changelog

## 2.6.10 (Unreleased)
- [Improvement] Introduce `message.purged` event to indicate that a message that was not delivered to Kafka was purged. This most of the time refers to messages that were part of a transaction and were not yet dispatched to Kafka. It always means, that given message was not delivered but in case of transactions it is expected. In case of non-transactional it usually means `#purge` usage or exceeding `message.timeout.ms` so `librdkafka` removes this message from its internal queue. Non-transactional producers do **not** use this and pipe purges to `error.occurred`.
- [Fix] Fix a case where `message.acknowledged` would not have `caller` key.

## 2.6.9 (2023-10-23)
- [Improvement] Introduce a `transaction.finished` event to indicate that transaction has finished whether it was aborted or committed.
- [Improvement] Use `transaction.committed` event to indicate that transaction has been committed.
Expand Down
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
waterdrop (2.6.9)
waterdrop (2.6.10)
karafka-core (>= 2.2.3, < 3.0.0)
zeitwerk (~> 2.3)

Expand Down
1 change: 1 addition & 0 deletions lib/waterdrop/clients/rdkafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def new(producer)
# callbacks manager to make it work
client.delivery_callback = Instrumentation::Callbacks::Delivery.new(
producer.id,
producer.transactional?,
producer.config.monitor
)

Expand Down
62 changes: 53 additions & 9 deletions lib/waterdrop/instrumentation/callbacks/delivery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,41 @@ module Callbacks
# Creates a callable that we want to run upon each message delivery or failure
#
# @note We don't have to provide client_name here as this callback is per client instance
#
# @note We do not consider `message.purge` as an error for transactional producers, because
# this is a standard behaviour for not yet dispatched messages on aborted transactions.
# We do however still want to instrument it for traceability.
class Delivery
# Error emitted when a message was not yet dispatched and was purged from the queue
RD_KAFKA_RESP_PURGE_QUEUE = -152

# Error emitted when a message was purged while it was dispatched
RD_KAFKA_RESP_PURGE_INFLIGHT = -151

# Errors related to queue purging that is expected in transactions
PURGE_ERRORS = [RD_KAFKA_RESP_PURGE_INFLIGHT, RD_KAFKA_RESP_PURGE_QUEUE].freeze

private_constant :RD_KAFKA_RESP_PURGE_QUEUE, :RD_KAFKA_RESP_PURGE_INFLIGHT, :PURGE_ERRORS

# @param producer_id [String] id of the current producer
# @param transactional [Boolean] is this handle for a transactional or regular producer
# @param monitor [WaterDrop::Instrumentation::Monitor] monitor we are using
def initialize(producer_id, monitor)
def initialize(producer_id, transactional, monitor)
@producer_id = producer_id
@transactional = transactional
@monitor = monitor
end

# Emits delivery details to the monitor
# @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report
def call(delivery_report)
if delivery_report.error.to_i.zero?
error_code = delivery_report.error.to_i

if error_code.zero?
instrument_acknowledged(delivery_report)

elsif @transactional && PURGE_ERRORS.include?(error_code)
instrument_purged(delivery_report)
else
instrument_error(delivery_report)
end
Expand All @@ -27,31 +49,53 @@ def call(delivery_report)
private

# @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report
def instrument_error(delivery_report)
def instrument_acknowledged(delivery_report)
@monitor.instrument(
'error.occurred',
'message.acknowledged',
caller: self,
error: ::Rdkafka::RdkafkaError.new(delivery_report.error),
producer_id: @producer_id,
offset: delivery_report.offset,
partition: delivery_report.partition,
topic: delivery_report.topic_name,
delivery_report: delivery_report,
type: 'librdkafka.dispatch_error'
delivery_report: delivery_report
)
end

# @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report
def instrument_acknowledged(delivery_report)
def instrument_purged(delivery_report)
@monitor.instrument(
'message.acknowledged',
'message.purged',
caller: self,
error: build_error(delivery_report),
producer_id: @producer_id,
offset: delivery_report.offset,
partition: delivery_report.partition,
topic: delivery_report.topic_name,
delivery_report: delivery_report
)
end

# @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report
def instrument_error(delivery_report)
@monitor.instrument(
'error.occurred',
caller: self,
error: build_error(delivery_report),
producer_id: @producer_id,
offset: delivery_report.offset,
partition: delivery_report.partition,
topic: delivery_report.topic_name,
delivery_report: delivery_report,
type: 'librdkafka.dispatch_error'
)
end

# Builds appropriate rdkafka error
# @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report
# @return [::Rdkafka::RdkafkaError]
def build_error(delivery_report)
::Rdkafka::RdkafkaError.new(delivery_report.error)
end
end
end
end
Expand Down
1 change: 1 addition & 0 deletions lib/waterdrop/instrumentation/notifications.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class Notifications < ::Karafka::Core::Monitoring::Notifications
message.produced_async
message.produced_sync
message.acknowledged
message.purged
message.buffered

messages.produced_async
Expand Down
2 changes: 1 addition & 1 deletion lib/waterdrop/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
# WaterDrop library
module WaterDrop
# Current WaterDrop version
VERSION = '2.6.9'
VERSION = '2.6.10'
end
31 changes: 30 additions & 1 deletion spec/lib/waterdrop/instrumentation/callbacks/delivery_spec.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# frozen_string_literal: true

RSpec.describe_current do
subject(:callback) { described_class.new(producer_id, monitor) }
subject(:callback) { described_class.new(producer_id, transactional, monitor) }

let(:producer) { build(:producer) }
let(:producer_id) { SecureRandom.uuid }
let(:transactional) { producer.transactional? }
let(:monitor) { ::WaterDrop::Instrumentation::Monitor.new }
let(:delivery_report) do
OpenStruct.new(
Expand Down Expand Up @@ -123,5 +124,33 @@
it { expect(event[:error]).to be_a(WaterDrop::Errors::ProduceError) }
it { expect(event[:error].cause).to be_a(Rdkafka::RdkafkaError) }
end

context 'when there is a producer with non-transactional purge' do
let(:producer) { build(:slow_producer) }
let(:errors) { [] }
let(:purges) { [] }

before do
producer.monitor.subscribe('error.occurred') do |event|
errors << event[:error]
end

producer.monitor.subscribe('message.purged') do |event|
purges << event[:error]
end

producer.produce_async(build(:valid_message))
producer.purge
end

it 'expect to have it in the errors' do
expect(errors.first).to be_a(Rdkafka::RdkafkaError)
expect(errors.first.code).to eq(:purge_queue)
end

it 'expect not to publish purge notification' do
expect(purges).to be_empty
end
end
end
end
30 changes: 24 additions & 6 deletions spec/lib/waterdrop/producer/transactions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,17 @@

context 'when we have error instrumentation' do
let(:errors) { [] }
let(:purges) { [] }

before do
producer.monitor.subscribe('error.occurred') do |event|
errors << event[:error]
end

producer.monitor.subscribe('message.purged') do |event|
purges << event[:error]
end

begin
producer.transaction do
producer.produce_async(topic: 'example_topic', payload: 'na')
Expand All @@ -146,9 +151,13 @@
end
end

it 'expect to emit the cancellation error via the error pipeline' do
expect(errors.first).to be_a(Rdkafka::RdkafkaError)
expect(errors.first.code).to eq(:purge_queue)
it 'expect not to emit the cancellation error via the error pipeline' do
expect(errors).to be_empty
end

it 'expect to emit the cancellation error via the message.purged' do
expect(purges.first).to be_a(Rdkafka::RdkafkaError)
expect(purges.first.code).to eq(:purge_queue)
end
end

Expand Down Expand Up @@ -253,22 +262,31 @@

context 'when we have error instrumentation' do
let(:errors) { [] }
let(:purges) { [] }

before do
producer.monitor.subscribe('error.occurred') do |event|
errors << event[:error]
end

producer.monitor.subscribe('message.purged') do |event|
purges << event[:error]
end

producer.transaction do
producer.produce_async(topic: 'example_topic', payload: 'na')

throw(:abort)
end
end

it 'expect to emit the cancellation error via the error pipeline' do
expect(errors.first).to be_a(Rdkafka::RdkafkaError)
expect(errors.first.code).to eq(:purge_queue)
it 'expect not to emit the cancellation error via the error pipeline' do
expect(errors).to be_empty
end

it 'expect to emit the cancellation error via the message.purged' do
expect(purges.first).to be_a(Rdkafka::RdkafkaError)
expect(purges.first.code).to eq(:purge_queue)
end
end

Expand Down