Skip to content

Commit

Permalink
Merge branch 'feat/DEX-2546/support-for-kafka-tags-in-ULS' into 'master'
Browse files Browse the repository at this point in the history
[DEX-2546] feat: place logs in tags

Closes DEX-2546

See merge request nstmrt/rubygems/sbmt-kafka_consumer!68
  • Loading branch information
Arlantir committed Sep 18, 2024
2 parents fffa614 + b4e81e5 commit f3d067a
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 25 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Fixed

## [3.2.0] - 2024-09-17

### Added

- For synchronous messages and errors, we place logs in tags

## [3.1.0] - 2024-09-09

### Fixed
Expand Down
4 changes: 1 addition & 3 deletions lib/sbmt/kafka_consumer/base_consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ def process_batch?

def with_instrumentation(message)
logger.tagged(
trace_id: trace_id,
topic: message.metadata.topic, partition: message.metadata.partition,
key: message.metadata.key, offset: message.metadata.offset
trace_id: trace_id
) do
::Sbmt::KafkaConsumer.monitor.instrument(
"consumer.consumed_one",
Expand Down
38 changes: 32 additions & 6 deletions lib/sbmt/kafka_consumer/instrumentation/logger_listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,56 @@ def on_error_occurred(event)

logger.tagged(
type: type,
stacktrace: log_backtrace(error),
**tags
) do
logger.error(error_message(error))
log_backtrace(error)
end
end

# BaseConsumer events
def on_consumer_consumed_one(event)
logger.info("Successfully consumed message in #{event.payload[:time]} ms")
log_with_tags(log_tags(event), "Successfully consumed message")
end

def on_consumer_mark_as_consumed(event)
logger.info("Processing message in #{event.payload[:time]} ms")
log_with_tags(log_tags(event), "Processing message")
end

def on_consumer_process_message(event)
logger.info("Commit offset in #{event.payload[:time]} ms")
log_with_tags(log_tags(event), "Commit offset")
end

# InboxConsumer events
def on_consumer_inbox_consumed_one(event)
logger.tagged(status: event[:status]) do
logger.info("Successfully consumed message with uuid: #{event[:message_uuid]} in #{event.payload[:time]} ms")
log_tags = log_tags(event).merge!(status: event[:status])
msg = "Successfully consumed message with uuid: #{event[:message_uuid]}"

log_with_tags(log_tags, msg)
end

private

def log_tags(event)
metadata = event.payload[:message].metadata

{
kafka: {
topic: metadata.topic,
partition: metadata.partition,
key: metadata.key,
offset: metadata.offset,
consumer_group: event.payload[:caller].topic.consumer_group.id,
consume_duration_ms: event.payload[:time]
}
}
end

def log_with_tags(log_tags, msg)
return unless logger.respond_to?(:tagged)

logger.tagged(log_tags) do
logger.send(:info, msg)
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/sbmt/kafka_consumer/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module Sbmt
module KafkaConsumer
VERSION = "3.1.0"
VERSION = "3.2.0"
end
end
74 changes: 59 additions & 15 deletions spec/sbmt/kafka_consumer/instrumentation/logger_listener_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,51 +3,95 @@
require "rails_helper"

describe Sbmt::KafkaConsumer::Instrumentation::LoggerListener do
let(:event) { double("event") }

let(:message) { double("message") }
let(:metadata) { OpenStruct.new(topic: "topic", partition: 0) }
let(:metadata) { OpenStruct.new(topic: "topic", partition: 0, key: "key", offset: 42) }
let(:message) { double("message", metadata: metadata) }
let(:caller) { double(topic: double(consumer_group: double(id: "group_id"))) }
let(:event) { double("event", payload: {message: message, time: time, caller: caller}) }

let(:inbox_name) { "inbox" }
let(:event_name) { "event" }
let(:status) { "status" }
let(:message_uuid) { "uuid" }
let(:time) { 10.20 }
let(:logger) { double("Logger") }
let(:error) { StandardError.new("some error") }
let(:error_message) { "some error message" }

before do
allow(Sbmt::KafkaConsumer).to receive(:logger).and_return(logger)

allow_any_instance_of(described_class).to receive(:log_backtrace).and_return("some backtrace")
allow_any_instance_of(described_class).to receive(:error_message).and_return(error_message)
end

describe ".on_error_occurred" do
it "logs error when consumer.base.consume_one event occurred" do
expect(event).to receive(:[]).with(:error).and_return("some error")
expect(event).to receive(:[]).with(:type).and_return("consumer.base.consume_one")
expect(Rails.logger).to receive(:error)
allow(event).to receive(:[]).with(:error).and_return(error)
allow(event).to receive(:[]).with(:type).and_return("consumer.base.consume_one")

expect(logger).to receive(:tagged).with(hash_including(
type: "consumer.base.consume_one",
stacktrace: "some backtrace"
)).and_yield

expect(logger).to receive(:error).with(error_message)

described_class.new.on_error_occurred(event)
end

it "logs error when consumer.inbox.consume_one event occurred" do
expect(event).to receive(:[]).with(:error).and_return("some error")
expect(event).to receive(:[]).with(:type).and_return("consumer.inbox.consume_one")
expect(event).to receive(:[]).with(:status).and_return(status)
expect(Rails.logger).to receive(:error)
allow(event).to receive(:[]).with(:error).and_return(error)
allow(event).to receive(:[]).with(:type).and_return("consumer.inbox.consume_one")
allow(event).to receive(:[]).with(:status).and_return(status)

expect(logger).to receive(:tagged).with(hash_including(
type: "consumer.inbox.consume_one",
status: "status",
stacktrace: "some backtrace"
)).and_yield

expect(logger).to receive(:error).with(error_message)

described_class.new.on_error_occurred(event)
end
end

describe ".on_consumer_consumed_one" do
it "logs info message" do
expect(event).to receive(:payload).and_return(time: time)
expect(Rails.logger).to receive(:info)
expect(logger).to receive(:tagged).with(hash_including(
kafka: hash_including(
topic: "topic",
partition: 0,
key: "key",
offset: 42,
consumer_group: "group_id",
consume_duration_ms: time
)
)).and_yield

expect(logger).to receive(:info).with("Successfully consumed message")

described_class.new.on_consumer_consumed_one(event)
end
end

describe ".on_consumer_inbox_consumed_one" do
it "logs info message" do
expect(event).to receive(:payload).and_return(time: time)
expect(event).to receive(:[]).with(:status).and_return(status)
expect(event).to receive(:[]).with(:message_uuid).and_return(message_uuid)
expect(Rails.logger).to receive(:info)

expect(logger).to receive(:tagged).with(hash_including(
kafka: hash_including(
topic: "topic",
partition: 0,
key: "key",
offset: 42,
consumer_group: "group_id",
consume_duration_ms: time
)
)).and_yield

expect(logger).to receive(:info).with("Successfully consumed message with uuid: uuid")

described_class.new.on_consumer_inbox_consumed_one(event)
end
Expand Down

0 comments on commit f3d067a

Please sign in to comment.