diff --git a/lib/waterdrop/instrumentation/callbacks/delivery.rb b/lib/waterdrop/instrumentation/callbacks/delivery.rb index 268810b8..19b0f951 100644 --- a/lib/waterdrop/instrumentation/callbacks/delivery.rb +++ b/lib/waterdrop/instrumentation/callbacks/delivery.rb @@ -11,6 +11,14 @@ module Callbacks # this is a standard behaviour for not yet dispatched messages on aborted transactions. # We do however still want to instrument it for traceability. class Delivery + # Error emitted when a message was not yet dispatched and was purged from the queue + RD_KAFKA_RESP_PURGE_QUEUE = -152 + + # Error emitted when a message was purged while it was dispatched + RD_KAFKA_RESP_PURGE_INFLIGHT = -151 + + private_constant :RD_KAFKA_RESP_PURGE_QUEUE, :RD_KAFKA_RESP_PURGE_INFLIGHT + # @param producer_id [String] id of the current producer # @param transactional [Boolean] is this handle for a transactional or regular producer # @param monitor [WaterDrop::Instrumentation::Monitor] monitor we are using @@ -27,7 +35,10 @@ def call(delivery_report) if error_code.zero? instrument_acknowledged(delivery_report) - elsif error_code == -152 && @transactional + + elsif error_code == RD_KAFKA_RESP_PURGE_QUEUE && @transactional + instrument_purged(delivery_report) + elsif error_code == RD_KAFKA_RESP_PURGE_INFLIGHT && @transactional instrument_purged(delivery_report) else instrument_error(delivery_report)