Skip to content

Commit

Permalink
remarks
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld committed Oct 23, 2023
1 parent 79c406f commit ca689fc
Showing 1 changed file with 12 additions and 1 deletion.
13 changes: 12 additions & 1 deletion lib/waterdrop/instrumentation/callbacks/delivery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ module Callbacks
# this is a standard behaviour for not yet dispatched messages on aborted transactions.
# We do however still want to instrument it for traceability.
class Delivery
# Error emitted when a message was not yet dispatched and was purged from the queue
RD_KAFKA_RESP_PURGE_QUEUE = -152

# Error emitted when a message was purged while it was dispatched
RD_KAFKA_RESP_PURGE_INFLIGHT = -151

private_constant :RD_KAFKA_RESP_PURGE_QUEUE, :RD_KAFKA_RESP_PURGE_INFLIGHT

# @param producer_id [String] id of the current producer
# @param transactional [Boolean] is this handle for a transactional or regular producer
# @param monitor [WaterDrop::Instrumentation::Monitor] monitor we are using
Expand All @@ -27,7 +35,10 @@ def call(delivery_report)

if error_code.zero?
instrument_acknowledged(delivery_report)
elsif error_code == -152 && @transactional

elsif error_code == RD_KAFKA_RESP_PURGE_QUEUE && @transactional
instrument_purged(delivery_report)
elsif error_code == RD_KAFKA_RESP_PURGE_INFLIGHT && @transactional
instrument_purged(delivery_report)
else
instrument_error(delivery_report)
Expand Down

0 comments on commit ca689fc

Please sign in to comment.