From 87a7f79eea3b920667f738ceeee63ed3649fa242 Mon Sep 17 00:00:00 2001
From: Maciej Mensfeld <maciej@mensfeld.pl>
Date: Mon, 23 Oct 2023 20:51:47 +0200
Subject: [PATCH] Purge flow for transactions (#403)

---
 CHANGELOG.md                                  |  4 ++
 Gemfile.lock                                  |  2 +-
 lib/waterdrop/clients/rdkafka.rb              |  1 +
 .../instrumentation/callbacks/delivery.rb     | 62 ++++++++++++++++---
 .../instrumentation/notifications.rb          |  1 +
 lib/waterdrop/version.rb                      |  2 +-
 .../callbacks/delivery_spec.rb                | 31 +++++++++-
 .../waterdrop/producer/transactions_spec.rb   | 30 +++++++--
 8 files changed, 115 insertions(+), 18 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 607c03a5..3c5d8335 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,9 @@
 # WaterDrop changelog
 
+## 2.6.10 (Unreleased)
+- [Improvement] Introduce `message.purged` event to indicate that a message that was not delivered to Kafka was purged. This most of the time refers to messages that were part of a transaction and were not yet dispatched to Kafka. It always means, that given message was not delivered but in case of transactions it is expected. In case of non-transactional it usually means `#purge` usage or exceeding `message.timeout.ms` so `librdkafka` removes this message from its internal queue. Non-transactional producers do **not** use this and pipe purges to `error.occurred`.
+- [Fix] Fix a case where `message.acknowledged` would not have `caller` key.
+
 ## 2.6.9 (2023-10-23)
 - [Improvement] Introduce a `transaction.finished` event to indicate that transaction has finished whether it was aborted or committed.
 - [Improvement] Use `transaction.committed` event to indicate that transaction has been committed.
diff --git a/Gemfile.lock b/Gemfile.lock
index b7407438..a7273e18 100644
--- a/Gemfile.lock
+++ b/Gemfile.lock
@@ -1,7 +1,7 @@
 PATH
   remote: .
   specs:
-    waterdrop (2.6.9)
+    waterdrop (2.6.10)
       karafka-core (>= 2.2.3, < 3.0.0)
       zeitwerk (~> 2.3)
 
diff --git a/lib/waterdrop/clients/rdkafka.rb b/lib/waterdrop/clients/rdkafka.rb
index 8a884db3..c5a73188 100644
--- a/lib/waterdrop/clients/rdkafka.rb
+++ b/lib/waterdrop/clients/rdkafka.rb
@@ -19,6 +19,7 @@ def new(producer)
           # callbacks manager to make it work
           client.delivery_callback = Instrumentation::Callbacks::Delivery.new(
             producer.id,
+            producer.transactional?,
             producer.config.monitor
           )
 
diff --git a/lib/waterdrop/instrumentation/callbacks/delivery.rb b/lib/waterdrop/instrumentation/callbacks/delivery.rb
index a95772e8..16bb5196 100644
--- a/lib/waterdrop/instrumentation/callbacks/delivery.rb
+++ b/lib/waterdrop/instrumentation/callbacks/delivery.rb
@@ -6,19 +6,41 @@ module Callbacks
       # Creates a callable that we want to run upon each message delivery or failure
       #
       # @note We don't have to provide client_name here as this callback is per client instance
+      #
+      # @note We do not consider `message.purge` as an error for transactional producers, because
+      #   this is a standard behaviour for not yet dispatched messages on aborted transactions.
+      #   We do however still want to instrument it for traceability.
       class Delivery
+        # Error emitted when a message was not yet dispatched and was purged from the queue
+        RD_KAFKA_RESP_PURGE_QUEUE = -152
+
+        # Error emitted when a message was purged while it was dispatched
+        RD_KAFKA_RESP_PURGE_INFLIGHT = -151
+
+        # Errors related to queue purging that is expected in transactions
+        PURGE_ERRORS = [RD_KAFKA_RESP_PURGE_INFLIGHT, RD_KAFKA_RESP_PURGE_QUEUE].freeze
+
+        private_constant :RD_KAFKA_RESP_PURGE_QUEUE, :RD_KAFKA_RESP_PURGE_INFLIGHT, :PURGE_ERRORS
+
         # @param producer_id [String] id of the current producer
+        # @param transactional [Boolean] is this handle for a transactional or regular producer
         # @param monitor [WaterDrop::Instrumentation::Monitor] monitor we are using
-        def initialize(producer_id, monitor)
+        def initialize(producer_id, transactional, monitor)
           @producer_id = producer_id
+          @transactional = transactional
           @monitor = monitor
         end
 
         # Emits delivery details to the monitor
         # @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report
         def call(delivery_report)
-          if delivery_report.error.to_i.zero?
+          error_code = delivery_report.error.to_i
+
+          if error_code.zero?
             instrument_acknowledged(delivery_report)
+
+          elsif @transactional && PURGE_ERRORS.include?(error_code)
+            instrument_purged(delivery_report)
           else
             instrument_error(delivery_report)
           end
@@ -27,24 +49,24 @@ def call(delivery_report)
         private
 
         # @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report
-        def instrument_error(delivery_report)
+        def instrument_acknowledged(delivery_report)
           @monitor.instrument(
-            'error.occurred',
+            'message.acknowledged',
             caller: self,
-            error: ::Rdkafka::RdkafkaError.new(delivery_report.error),
             producer_id: @producer_id,
             offset: delivery_report.offset,
             partition: delivery_report.partition,
             topic: delivery_report.topic_name,
-            delivery_report: delivery_report,
-            type: 'librdkafka.dispatch_error'
+            delivery_report: delivery_report
           )
         end
 
         # @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report
-        def instrument_acknowledged(delivery_report)
+        def instrument_purged(delivery_report)
           @monitor.instrument(
-            'message.acknowledged',
+            'message.purged',
+            caller: self,
+            error: build_error(delivery_report),
             producer_id: @producer_id,
             offset: delivery_report.offset,
             partition: delivery_report.partition,
@@ -52,6 +74,28 @@ def instrument_acknowledged(delivery_report)
             delivery_report: delivery_report
           )
         end
+
+        # @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report
+        def instrument_error(delivery_report)
+          @monitor.instrument(
+            'error.occurred',
+            caller: self,
+            error: build_error(delivery_report),
+            producer_id: @producer_id,
+            offset: delivery_report.offset,
+            partition: delivery_report.partition,
+            topic: delivery_report.topic_name,
+            delivery_report: delivery_report,
+            type: 'librdkafka.dispatch_error'
+          )
+        end
+
+        # Builds appropriate rdkafka error
+        # @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report
+        # @return [::Rdkafka::RdkafkaError]
+        def build_error(delivery_report)
+          ::Rdkafka::RdkafkaError.new(delivery_report.error)
+        end
       end
     end
   end
diff --git a/lib/waterdrop/instrumentation/notifications.rb b/lib/waterdrop/instrumentation/notifications.rb
index a0dcac30..cdbed5ad 100644
--- a/lib/waterdrop/instrumentation/notifications.rb
+++ b/lib/waterdrop/instrumentation/notifications.rb
@@ -12,6 +12,7 @@ class Notifications < ::Karafka::Core::Monitoring::Notifications
         message.produced_async
         message.produced_sync
         message.acknowledged
+        message.purged
         message.buffered
 
         messages.produced_async
diff --git a/lib/waterdrop/version.rb b/lib/waterdrop/version.rb
index 72204185..98a108fa 100644
--- a/lib/waterdrop/version.rb
+++ b/lib/waterdrop/version.rb
@@ -3,5 +3,5 @@
 # WaterDrop library
 module WaterDrop
   # Current WaterDrop version
-  VERSION = '2.6.9'
+  VERSION = '2.6.10'
 end
diff --git a/spec/lib/waterdrop/instrumentation/callbacks/delivery_spec.rb b/spec/lib/waterdrop/instrumentation/callbacks/delivery_spec.rb
index 1981d255..b5b85c9a 100644
--- a/spec/lib/waterdrop/instrumentation/callbacks/delivery_spec.rb
+++ b/spec/lib/waterdrop/instrumentation/callbacks/delivery_spec.rb
@@ -1,10 +1,11 @@
 # frozen_string_literal: true
 
 RSpec.describe_current do
-  subject(:callback) { described_class.new(producer_id, monitor) }
+  subject(:callback) { described_class.new(producer_id, transactional, monitor) }
 
   let(:producer) { build(:producer) }
   let(:producer_id) { SecureRandom.uuid }
+  let(:transactional) { producer.transactional? }
   let(:monitor) { ::WaterDrop::Instrumentation::Monitor.new }
   let(:delivery_report) do
     OpenStruct.new(
@@ -123,5 +124,33 @@
       it { expect(event[:error]).to be_a(WaterDrop::Errors::ProduceError) }
       it { expect(event[:error].cause).to be_a(Rdkafka::RdkafkaError) }
     end
+
+    context 'when there is a producer with non-transactional purge' do
+      let(:producer) { build(:slow_producer) }
+      let(:errors) { [] }
+      let(:purges) { [] }
+
+      before do
+        producer.monitor.subscribe('error.occurred') do |event|
+          errors << event[:error]
+        end
+
+        producer.monitor.subscribe('message.purged') do |event|
+          purges << event[:error]
+        end
+
+        producer.produce_async(build(:valid_message))
+        producer.purge
+      end
+
+      it 'expect to have it in the errors' do
+        expect(errors.first).to be_a(Rdkafka::RdkafkaError)
+        expect(errors.first.code).to eq(:purge_queue)
+      end
+
+      it 'expect not to publish purge notification' do
+        expect(purges).to be_empty
+      end
+    end
   end
 end
diff --git a/spec/lib/waterdrop/producer/transactions_spec.rb b/spec/lib/waterdrop/producer/transactions_spec.rb
index 7216f2d5..dd5986f7 100644
--- a/spec/lib/waterdrop/producer/transactions_spec.rb
+++ b/spec/lib/waterdrop/producer/transactions_spec.rb
@@ -129,12 +129,17 @@
 
     context 'when we have error instrumentation' do
       let(:errors) { [] }
+      let(:purges) { [] }
 
       before do
         producer.monitor.subscribe('error.occurred') do |event|
           errors << event[:error]
         end
 
+        producer.monitor.subscribe('message.purged') do |event|
+          purges << event[:error]
+        end
+
         begin
           producer.transaction do
             producer.produce_async(topic: 'example_topic', payload: 'na')
@@ -146,9 +151,13 @@
         end
       end
 
-      it 'expect to emit the cancellation error via the error pipeline' do
-        expect(errors.first).to be_a(Rdkafka::RdkafkaError)
-        expect(errors.first.code).to eq(:purge_queue)
+      it 'expect not to emit the cancellation error via the error pipeline' do
+        expect(errors).to be_empty
+      end
+
+      it 'expect to emit the cancellation error via the message.purged' do
+        expect(purges.first).to be_a(Rdkafka::RdkafkaError)
+        expect(purges.first.code).to eq(:purge_queue)
       end
     end
 
@@ -253,12 +262,17 @@
 
     context 'when we have error instrumentation' do
       let(:errors) { [] }
+      let(:purges) { [] }
 
       before do
         producer.monitor.subscribe('error.occurred') do |event|
           errors << event[:error]
         end
 
+        producer.monitor.subscribe('message.purged') do |event|
+          purges << event[:error]
+        end
+
         producer.transaction do
           producer.produce_async(topic: 'example_topic', payload: 'na')
 
@@ -266,9 +280,13 @@
         end
       end
 
-      it 'expect to emit the cancellation error via the error pipeline' do
-        expect(errors.first).to be_a(Rdkafka::RdkafkaError)
-        expect(errors.first.code).to eq(:purge_queue)
+      it 'expect not to emit the cancellation error via the error pipeline' do
+        expect(errors).to be_empty
+      end
+
+      it 'expect to emit the cancellation error via the message.purged' do
+        expect(purges.first).to be_a(Rdkafka::RdkafkaError)
+        expect(purges.first.code).to eq(:purge_queue)
       end
     end