Skip to content

Commit

Permalink
remarks
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld committed Oct 23, 2023
1 parent ca689fc commit fdd0bd9
Showing 1 changed file with 5 additions and 4 deletions.
9 changes: 5 additions & 4 deletions lib/waterdrop/instrumentation/callbacks/delivery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ class Delivery
# Error emitted when a message was purged while it was dispatched
RD_KAFKA_RESP_PURGE_INFLIGHT = -151

private_constant :RD_KAFKA_RESP_PURGE_QUEUE, :RD_KAFKA_RESP_PURGE_INFLIGHT
# Errors related to queue purging that is expected in transactions
PURGE_ERRORS = [RD_KAFKA_RESP_PURGE_INFLIGHT, RD_KAFKA_RESP_PURGE_QUEUE].freeze

private_constant :RD_KAFKA_RESP_PURGE_QUEUE, :RD_KAFKA_RESP_PURGE_INFLIGHT, :PURGE_ERRORS

# @param producer_id [String] id of the current producer
# @param transactional [Boolean] is this handle for a transactional or regular producer
Expand All @@ -36,9 +39,7 @@ def call(delivery_report)
if error_code.zero?
instrument_acknowledged(delivery_report)

elsif error_code == RD_KAFKA_RESP_PURGE_QUEUE && @transactional
instrument_purged(delivery_report)
elsif error_code == RD_KAFKA_RESP_PURGE_INFLIGHT && @transactional
elsif @transactional && PURGE_ERRORS.include?(error_code)
instrument_purged(delivery_report)
else
instrument_error(delivery_report)
Expand Down

0 comments on commit fdd0bd9

Please sign in to comment.