From 18b4ad0dc0a23d87c4fba39829fea1f11a05c503 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?A=2ESamet=20=C4=B0leri?= Date: Wed, 1 Jan 2025 20:53:20 +0300 Subject: [PATCH] Revert "refactor: change prometheus primary metrics as atomic ones (#159)" (#160) This reverts commit 50f4fcc5d83f716b25a4291af1c800a3e0b8d3c9. --- collector.go | 6 +++--- consumer.go | 6 +++--- consumer_base.go | 2 +- metric.go | 18 +++--------------- 4 files changed, 10 insertions(+), 22 deletions(-) diff --git a/collector.go b/collector.go index d7d5370..2393e77 100644 --- a/collector.go +++ b/collector.go @@ -55,21 +55,21 @@ func (s *MetricCollector) Collect(ch chan<- prometheus.Metric) { ch <- prometheus.MustNewConstMetric( s.totalProcessedMessagesCounter, prometheus.CounterValue, - float64(s.consumerMetric.totalProcessedMessagesCounter), + float64(s.consumerMetric.TotalProcessedMessagesCounter), emptyStringList..., ) ch <- prometheus.MustNewConstMetric( s.totalUnprocessedMessagesCounter, prometheus.CounterValue, - float64(s.consumerMetric.totalUnprocessedMessagesCounter), + float64(s.consumerMetric.TotalUnprocessedMessagesCounter), emptyStringList..., ) ch <- prometheus.MustNewConstMetric( s.totalErrorCountDuringFetchingMessage, prometheus.CounterValue, - float64(s.consumerMetric.totalErrorCountDuringFetchingMessage), + float64(s.consumerMetric.TotalErrorCountDuringFetchingMessage), emptyStringList..., ) } diff --git a/consumer.go b/consumer.go index c991e17..257c5b8 100644 --- a/consumer.go +++ b/consumer.go @@ -166,10 +166,10 @@ func (c *consumer) process(message *Message) { // Try to process same message again if consumeErr = c.consumeFn(message); consumeErr != nil { c.logger.Warnf("Consume Function Again Err %s, message is sending to exception/retry topic %s", consumeErr.Error(), c.retryTopic) - c.metric.IncrementTotalUnprocessedMessagesCounter() + c.metric.TotalUnprocessedMessagesCounter++ } } else { - c.metric.IncrementTotalUnprocessedMessagesCounter() + c.metric.TotalUnprocessedMessagesCounter++ } } @@ -185,7 +185,7 @@ func (c *consumer) process(message *Message) { } if consumeErr == nil { - c.metric.IncrementTotalProcessedMessagesCounter() + c.metric.TotalProcessedMessagesCounter++ } } diff --git a/consumer_base.go b/consumer_base.go index cef76dc..9ff590d 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -202,7 +202,7 @@ func (c *base) startConsume() { continue } - c.metric.IncrementTotalErrorCountDuringFetchingMessage() + c.metric.TotalErrorCountDuringFetchingMessage++ //nolint:lll c.logger.Warnf("Message could not read, err %s, from topics %s with consumer group %s", err.Error(), c.consumerCfg.getTopics(), c.consumerCfg.Reader.GroupID) continue diff --git a/metric.go b/metric.go index d41d8d4..104bbee 100644 --- a/metric.go +++ b/metric.go @@ -1,19 +1,7 @@ package kafka type ConsumerMetric struct { - totalUnprocessedMessagesCounter int64 - totalProcessedMessagesCounter int64 - totalErrorCountDuringFetchingMessage int64 -} - -func (cm *ConsumerMetric) IncrementTotalUnprocessedMessagesCounter() { - cm.totalUnprocessedMessagesCounter++ -} - -func (cm *ConsumerMetric) IncrementTotalProcessedMessagesCounter() { - cm.totalProcessedMessagesCounter++ -} - -func (cm *ConsumerMetric) IncrementTotalErrorCountDuringFetchingMessage() { - cm.totalErrorCountDuringFetchingMessage++ + TotalUnprocessedMessagesCounter int64 + TotalProcessedMessagesCounter int64 + TotalErrorCountDuringFetchingMessage int64 }