Skip to content

Commit

Permalink
Merge branch 'fix/duplicates-log-level' into 'master'
Browse files Browse the repository at this point in the history
DEX-2850 feat: consider duplicate messages as warnings

See merge request nstmrt/rubygems/sbmt-kafka_consumer!76
  • Loading branch information
Меркушин Михаил Сергеевич committed Jan 29, 2025
2 parents 7c3e515 + d99e099 commit b13e683
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 9 deletions.
17 changes: 14 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Fixed

## [3.4.0] - 2025-01-27

### Added

- Added support for warning log level in LoggerListener error handling
- Consider duplicate messages as warnings in logs

### Changed

### Fixed

## [3.3.2] - 2024-12-27

### Fixed
Expand All @@ -32,20 +43,20 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- inheritance of kafka settings for topic

### Fixed

- support `karafka 2.4.12`
- properly report metrics for consumer offset lag when partition fetch assignment was lost

## [3.2.2] - 2024-09-23

### Fixed

- log OTEL `trace_id`

## [3.2.1] - 2024-09-20

### Fixed

- Limit Karafka version to less than 2.4.12 since `bootstrap.servers` has been made required

## [3.2.0] - 2024-09-17
Expand Down
3 changes: 2 additions & 1 deletion lib/sbmt/kafka_consumer/base_consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ def instrument_error(error, message)
error: error,
caller: self,
message: message,
type: "consumer.base.consume_one"
type: "consumer.base.consume_one",
log_level: :error
)
end

Expand Down
11 changes: 8 additions & 3 deletions lib/sbmt/kafka_consumer/inbox_consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def process_inbox_item(message)
item = result.success
item.track_metrics_after_consume if item.respond_to?(:track_metrics_after_consume)
rescue ActiveRecord::RecordNotUnique
instrument_error("Skipped duplicate message for #{inbox_name}, message_uuid: #{message_uuid(message)}", message, "duplicate")
instrument_warn("Skipped duplicate message for #{inbox_name}, message_uuid: #{message_uuid(message)}", message, "duplicate")
rescue => ex
if skip_on_error
logger.warn("skipping unprocessable message for #{inbox_name}, message_uuid: #{message_uuid(message)}")
Expand Down Expand Up @@ -108,7 +108,7 @@ def inbox_name
inbox_item_class.box_name
end

def instrument_error(error, message, status = "failure")
def instrument_error(error, message, status = "failure", log_level: :error)
::Sbmt::KafkaConsumer.monitor.instrument(
"error.occurred",
error: error,
Expand All @@ -117,9 +117,14 @@ def instrument_error(error, message, status = "failure")
inbox_name: inbox_name,
event_name: event_name,
status: status,
type: "consumer.inbox.consume_one"
type: "consumer.inbox.consume_one",
log_level: log_level
)
end

def instrument_warn(*args, **kwargs)
instrument_error(*args, **kwargs, log_level: :warn)
end
end
end
end
8 changes: 7 additions & 1 deletion lib/sbmt/kafka_consumer/instrumentation/logger_listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module Instrumentation
class LoggerListener < Karafka::Instrumentation::LoggerListener
include ListenerHelper
CUSTOM_ERROR_TYPES = %w[consumer.base.consume_one consumer.inbox.consume_one].freeze
VALID_LOG_LEVELS = %i[error warn].freeze

def on_error_occurred(event)
type = event[:type]
Expand All @@ -23,7 +24,12 @@ def on_error_occurred(event)
stacktrace: log_backtrace(error),
**tags
) do
logger.error(error_message(error))
log_level = event[:log_level] || :error
if VALID_LOG_LEVELS.include?(log_level)
logger.public_send(log_level, error_message(error))
else
raise "Invalid log level #{log_level}"
end
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/sbmt/kafka_consumer/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module Sbmt
module KafkaConsumer
VERSION = "3.3.2"
VERSION = "3.4.0"
end
end
1 change: 1 addition & 0 deletions spec/rails_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
ENGINE_ROOT = Pathname.new(File.expand_path("..", __dir__))

require "spec_helper"
require "logger"
require "combustion"

RSpec::Matchers.define_negated_matcher :not_increment_yabeda_counter, :increment_yabeda_counter
Expand Down
17 changes: 17 additions & 0 deletions spec/sbmt/kafka_consumer/instrumentation/logger_listener_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
it "logs error when consumer.base.consume_one event occurred" do
allow(event).to receive(:[]).with(:error).and_return(error)
allow(event).to receive(:[]).with(:type).and_return("consumer.base.consume_one")
allow(event).to receive(:[]).with(:log_level).and_return(:error)

expect(logger).to receive(:tagged).with(hash_including(
type: "consumer.base.consume_one",
Expand All @@ -43,6 +44,7 @@
allow(event).to receive(:[]).with(:error).and_return(error)
allow(event).to receive(:[]).with(:type).and_return("consumer.inbox.consume_one")
allow(event).to receive(:[]).with(:status).and_return(status)
allow(event).to receive(:[]).with(:log_level).and_return(:error)

expect(logger).to receive(:tagged).with(hash_including(
type: "consumer.inbox.consume_one",
Expand All @@ -54,6 +56,21 @@

described_class.new.on_error_occurred(event)
end

it "logs warnings" do
allow(event).to receive(:[]).with(:error).and_return("test error")
allow(event).to receive(:[]).with(:type).and_return("consumer.base.consume_one")
allow(event).to receive(:[]).with(:log_level).and_return(:warn)

expect(logger).to receive(:tagged).with(hash_including(
type: "consumer.base.consume_one",
stacktrace: "some backtrace"
)).and_yield

expect(logger).to receive(:warn).with(error_message)

described_class.new.on_error_occurred(event)
end
end

describe ".on_consumer_consumed_one" do
Expand Down

0 comments on commit b13e683

Please sign in to comment.