From d0a5b26031bfb0ee57e6afaa55d5f68524a3474d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?A=2ESamet=20=C4=B0leri?= Date: Mon, 15 Jan 2024 09:22:49 +0300 Subject: [PATCH] feat: add functionality pause and resume for consumer (#89), closes (#77) * feat: add functionality pause and resume for consumer * chore: typo * refactor: extract to the base * feat: add unit tests * feat: add pause/resume integration tests * chore: add documentation * refactor: convert pause ch in order to save cpu * chore: fix tests * chore: lint --- README.md | 6 ++ batch_consumer.go | 8 +++ batch_consumer_test.go | 50 +++++++++++++++ consumer.go | 9 +++ consumer_base.go | 55 ++++++++++++++++- consumer_base_test.go | 55 ++++++++++++++++- consumer_test.go | 50 +++++++++++++++ .../how-it-works.md | 30 +++++++++ examples/with-pause-resume-consumer/main.go | 55 +++++++++++++++++ test/integration/integration_test.go | 61 +++++++++++++++++++ 10 files changed, 374 insertions(+), 5 deletions(-) create mode 100644 examples/with-pause-resume-consumer/how-it-works.md create mode 100644 examples/with-pause-resume-consumer/main.go diff --git a/README.md b/README.md index fee7d9f..6782ca1 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,8 @@ manager ([kafka-cronsumer](https://github.com/Trendyol/kafka-cronsumer)). - Added ability for manipulating kafka message headers. - Added transactional retry feature. Set false if you want to use exception/retry strategy to only failed messages. - Enable manuel commit at both single and batch consuming modes. +- Enabling consumer resume/pause functionality. Please refer to [its example](examples/with-pause-resume-consumer) and +[how it works](examples/with-pause-resume-consumer/how-it-works.md) documentation. - Bumped [kafka-cronsumer](https://github.com/Trendyol/kafka-cronsumer/releases) to the latest version: - Backoff strategy support (linear, exponential options) - Added message key for retried messages @@ -197,6 +199,10 @@ After running `docker-compose up` command, you can run any application you want. Please refer to [Tracing Example](examples/with-tracing/README.md) +#### With Pause & Resume Consumer + +Please refer to [Pause Resume Example](examples/with-pause-resume-consumer) + #### With Grafana & Prometheus In this example, we are demonstrating how to create Grafana dashboard and how to define alerts in Prometheus. You can diff --git a/batch_consumer.go b/batch_consumer.go index 965ee77..598f129 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -17,6 +17,14 @@ type batchConsumer struct { messageGroupLimit int } +func (b *batchConsumer) Pause() { + b.base.Pause() +} + +func (b *batchConsumer) Resume() { + b.base.Resume() +} + func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) { consumerBase, err := newBase(cfg, cfg.BatchConfiguration.MessageGroupLimit*cfg.Concurrency) if err != nil { diff --git a/batch_consumer_test.go b/batch_consumer_test.go index 5bd93cb..1b390ec 100644 --- a/batch_consumer_test.go +++ b/batch_consumer_test.go @@ -1,6 +1,7 @@ package kafka import ( + "context" "errors" "reflect" "strconv" @@ -354,6 +355,55 @@ func Test_batchConsumer_chunk(t *testing.T) { } } +func Test_batchConsumer_Pause(t *testing.T) { + // Given + ctx, cancelFn := context.WithCancel(context.Background()) + bc := batchConsumer{ + base: &base{ + logger: NewZapLogger(LogLevelDebug), + pause: make(chan struct{}), + context: ctx, cancelFn: cancelFn, + consumerState: stateRunning, + }, + } + + go func() { + <-bc.base.pause + }() + + // When + bc.Pause() + + // Then + if bc.base.consumerState != statePaused { + t.Fatal("consumer state must be in paused") + } +} + +func Test_batchConsumer_Resume(t *testing.T) { + // Given + mc := mockReader{} + ctx, cancelFn := context.WithCancel(context.Background()) + bc := batchConsumer{ + base: &base{ + r: &mc, + logger: NewZapLogger(LogLevelDebug), + pause: make(chan struct{}), + quit: make(chan struct{}), + wg: sync.WaitGroup{}, + context: ctx, cancelFn: cancelFn, + }, + } + + // When + bc.Resume() + + // Then + if bc.base.consumerState != stateRunning { + t.Fatal("consumer state must be in resume!") + } +} + func createMessages(partitionStart int, partitionEnd int) []*Message { messages := make([]*Message, 0) for i := partitionStart; i < partitionEnd; i++ { diff --git a/consumer.go b/consumer.go index 3bc0a73..4d66f2f 100644 --- a/consumer.go +++ b/consumer.go @@ -14,6 +14,14 @@ type consumer struct { consumeFn func(*Message) error } +func (c *consumer) Pause() { + c.base.Pause() +} + +func (c *consumer) Resume() { + c.base.Resume() +} + func newSingleConsumer(cfg *ConsumerConfig) (Consumer, error) { consumerBase, err := newBase(cfg, cfg.Concurrency) if err != nil { @@ -40,6 +48,7 @@ func newSingleConsumer(cfg *ConsumerConfig) (Consumer, error) { func (c *consumer) Consume() { go c.subprocesses.Start() + c.wg.Add(1) go c.startConsume() diff --git a/consumer_base.go b/consumer_base.go index 0cd69b3..fcb7cf2 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -20,6 +20,12 @@ type Consumer interface { // Consume starts consuming Consume() + // Pause function pauses consumer, it is stop consuming new messages + Pause() + + // Resume function resumes consumer, it is start to working + Resume() + // WithLogger for injecting custom log implementation WithLogger(logger LoggerInterface) @@ -33,6 +39,13 @@ type Reader interface { CommitMessages(messages []kafka.Message) error } +type state string + +const ( + stateRunning state = "running" + statePaused state = "paused" +) + type base struct { cronsumer kcronsumer.Cronsumer api API @@ -42,6 +55,7 @@ type base struct { r Reader cancelFn context.CancelFunc metric *ConsumerMetric + pause chan struct{} quit chan struct{} messageProcessedStream chan struct{} incomingMessageStream chan *IncomingMessage @@ -56,6 +70,7 @@ type base struct { retryEnabled bool transactionalRetry bool distributedTracingEnabled bool + consumerState state } func NewConsumer(cfg *ConsumerConfig) (Consumer, error) { @@ -79,6 +94,7 @@ func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) { metric: &ConsumerMetric{}, incomingMessageStream: make(chan *IncomingMessage, messageChSize), quit: make(chan struct{}), + pause: make(chan struct{}), concurrency: cfg.Concurrency, retryEnabled: cfg.RetryEnabled, transactionalRetry: *cfg.TransactionalRetry, @@ -90,6 +106,7 @@ func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) { messageProcessedStream: make(chan struct{}, cfg.Concurrency), singleConsumingStream: make(chan *Message, cfg.Concurrency), batchConsumingStream: make(chan []*Message, cfg.Concurrency), + consumerState: stateRunning, } if cfg.DistributedTracingEnabled { @@ -125,6 +142,9 @@ func (c *base) startConsume() { for { select { + case <-c.pause: + c.logger.Debug("startConsume exited!") + return case <-c.quit: close(c.incomingMessageStream) return @@ -153,17 +173,48 @@ func (c *base) startConsume() { } } +func (c *base) Pause() { + c.logger.Info("Consumer is paused!") + + c.cancelFn() + + c.pause <- struct{}{} + + c.consumerState = statePaused +} + +func (c *base) Resume() { + c.logger.Info("Consumer is resumed!") + + c.pause = make(chan struct{}) + c.context, c.cancelFn = context.WithCancel(context.Background()) + c.consumerState = stateRunning + + c.wg.Add(1) + go c.startConsume() +} + func (c *base) WithLogger(logger LoggerInterface) { c.logger = logger } func (c *base) Stop() error { - c.logger.Debug("Stop called!") + c.logger.Info("Stop is called!") + var err error c.once.Do(func() { c.subprocesses.Stop() c.cancelFn() - c.quit <- struct{}{} + + // In order to save cpu, we break startConsume loop in pause mode. + // If consumer is pause mode and Stop is called + // We need to close incomingMessageStream, because c.wg.Wait() blocks indefinitely. + if c.consumerState == stateRunning { + c.quit <- struct{}{} + } else if c.consumerState == statePaused { + close(c.incomingMessageStream) + } + c.wg.Wait() err = c.r.Close() }) diff --git a/consumer_base_test.go b/consumer_base_test.go index b45fdda..65f60e0 100644 --- a/consumer_base_test.go +++ b/consumer_base_test.go @@ -8,7 +8,6 @@ import ( "time" "github.com/google/go-cmp/cmp" - "github.com/segmentio/kafka-go" ) @@ -16,10 +15,13 @@ func Test_base_startConsume(t *testing.T) { t.Run("Return_When_Quit_Signal_Is_Came", func(t *testing.T) { mc := mockReader{wantErr: true} b := base{ - wg: sync.WaitGroup{}, r: &mc, + wg: sync.WaitGroup{}, + r: &mc, incomingMessageStream: make(chan *IncomingMessage), quit: make(chan struct{}), - logger: NewZapLogger(LogLevelDebug), + pause: make(chan struct{}), + logger: NewZapLogger(LogLevelError), + consumerState: stateRunning, } b.context, b.cancelFn = context.WithCancel(context.Background()) @@ -57,6 +59,53 @@ func Test_base_startConsume(t *testing.T) { }) } +func Test_base_Pause(t *testing.T) { + // Given + ctx, cancelFn := context.WithCancel(context.Background()) + b := base{ + logger: NewZapLogger(LogLevelDebug), + pause: make(chan struct{}), + context: ctx, cancelFn: cancelFn, + consumerState: stateRunning, + } + go func() { + <-b.pause + }() + + // When + b.Pause() + + // Then + if b.consumerState != statePaused { + t.Fatal("consumer state must be in paused") + } +} + +func Test_base_Resume(t *testing.T) { + // Given + mc := mockReader{} + ctx, cancelFn := context.WithCancel(context.Background()) + b := base{ + r: &mc, + logger: NewZapLogger(LogLevelDebug), + pause: make(chan struct{}), + quit: make(chan struct{}), + wg: sync.WaitGroup{}, + context: ctx, cancelFn: cancelFn, + } + + // When + b.Resume() + + // Then + if b.consumerState != stateRunning { + t.Fatal("consumer state must be in running") + } + if ctx == b.context { + t.Fatal("contexts must be differ!") + } +} + type mockReader struct { wantErr bool } diff --git a/consumer_test.go b/consumer_test.go index 65a1813..5536b38 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -1,7 +1,9 @@ package kafka import ( + "context" "errors" + "sync" "testing" ) @@ -114,3 +116,51 @@ func Test_consumer_process(t *testing.T) { } }) } + +func Test_consumer_Pause(t *testing.T) { + // Given + ctx, cancelFn := context.WithCancel(context.Background()) + c := consumer{ + base: &base{ + logger: NewZapLogger(LogLevelDebug), + pause: make(chan struct{}), + context: ctx, cancelFn: cancelFn, + consumerState: stateRunning, + }, + } + go func() { + <-c.base.pause + }() + + // When + c.Pause() + + // Then + if c.base.consumerState != statePaused { + t.Fatal("consumer state must be in paused") + } +} + +func Test_consumer_Resume(t *testing.T) { + // Given + mc := mockReader{} + ctx, cancelFn := context.WithCancel(context.Background()) + c := consumer{ + base: &base{ + r: &mc, + logger: NewZapLogger(LogLevelDebug), + pause: make(chan struct{}), + quit: make(chan struct{}), + wg: sync.WaitGroup{}, + context: ctx, cancelFn: cancelFn, + }, + } + + // When + c.Resume() + + // Then + if c.base.consumerState != stateRunning { + t.Fatal("consumer state must be in running") + } +} diff --git a/examples/with-pause-resume-consumer/how-it-works.md b/examples/with-pause-resume-consumer/how-it-works.md new file mode 100644 index 0000000..37150bf --- /dev/null +++ b/examples/with-pause-resume-consumer/how-it-works.md @@ -0,0 +1,30 @@ +Before implementation, I researched [segmentio/kafka-go](https://github.com/segmentio/kafka-go)' issues for this +functionality. I came across [this issue](https://github.com/segmentio/kafka-go/issues/474). [Achille-roussel](https://github.com/achille-roussel) +who is the old maintainer of the kafka-go clearly said that + +``` +To pause consuming from a partition, you can simply stop reading +messages. Kafka does not have a concept of pausing or resuming in its protocol, the responsibility is given to clients +to decide what to read and when. +``` + +It means, if we stop calling `FetchMessage`, the consumer pauses. If we invoke, the consumer resumes. Here, there is very +important behaviour exist. Consumer group state not affected at all in this situation. When we call `kafka.NewConsumer`, +segmentio/kafka-go library creates a goroutine under the hood, and it starts to send heartbeat with a specific interval +so even if we stop calling `FetchMessage`, consumer group still stable mode and not consumes new message at all. + +```go +consumer, _ := kafka.NewConsumer(consumerCfg) +defer consumer.Stop() + +consumer.Consume() +fmt.Println("Consumer started...!") +``` + +If you need to implement Pause & Resume functionality on your own applications, you need to call `Consume`. Because this +method creates listeners goroutine under the hood. After that you can manage the lifecycle of the consumer by calling +`Pause` and `Resume` methods. + +You can run the example to see `Is consumer consumes new message in Pause mode` or `consumer consumes new message in Resume mode` +by producing dummy messages on kowl ui. + diff --git a/examples/with-pause-resume-consumer/main.go b/examples/with-pause-resume-consumer/main.go new file mode 100644 index 0000000..2d7b005 --- /dev/null +++ b/examples/with-pause-resume-consumer/main.go @@ -0,0 +1,55 @@ +package main + +import ( + "fmt" + "github.com/Trendyol/kafka-konsumer/v2" + "os" + "os/signal" + "time" +) + +func main() { + consumerCfg := &kafka.ConsumerConfig{ + Concurrency: 1, + Reader: kafka.ReaderConfig{ + Brokers: []string{"localhost:29092"}, + Topic: "standart-topic", + GroupID: "standart-cg", + }, + RetryEnabled: false, + ConsumeFn: consumeFn, + } + + consumer, _ := kafka.NewConsumer(consumerCfg) + defer consumer.Stop() + + consumer.Consume() + fmt.Println("Consumer started...!") + + // You can produce a message via kowl. + go func() { + time.Sleep(10 * time.Second) + consumer.Pause() + + time.Sleep(10 * time.Second) + consumer.Resume() + + time.Sleep(10 * time.Second) + consumer.Pause() + + time.Sleep(10 * time.Second) + consumer.Resume() + + time.Sleep(10 * time.Second) + consumer.Pause() + }() + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + <-c +} + +func consumeFn(message *kafka.Message) error { + fmt.Printf("Message From %s with value %s \n", message.Topic, string(message.Value)) + return nil +} diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index 71449d7..f4b4295 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -105,6 +105,67 @@ func Test_Should_Consume_Message_Successfully(t *testing.T) { } } +func Test_Should_Pause_And_Resume_Successfully(t *testing.T) { + // Given + topic := "pause-topic" + consumerGroup := "pause-topic-cg" + brokerAddress := "localhost:9092" + + conn, cleanUp := createTopicAndWriteMessages(t, topic, nil) + defer cleanUp() + + messageCh := make(chan *kafka.Message) + + consumerCfg := &kafka.ConsumerConfig{ + Reader: kafka.ReaderConfig{Brokers: []string{brokerAddress}, Topic: topic, GroupID: consumerGroup}, + ConsumeFn: func(message *kafka.Message) error { + messageCh <- message + return nil + }, + } + + consumer, _ := kafka.NewConsumer(consumerCfg) + defer consumer.Stop() + + consumer.Consume() + + producer := &segmentio.Writer{ + Topic: topic, + Addr: segmentio.TCP(brokerAddress), + AllowAutoTopicCreation: true, + } + + // When + consumer.Pause() + + err := producer.WriteMessages(context.Background(), []segmentio.Message{{}, {}, {}}...) + if err != nil { + t.Fatalf("error producing step %s", err.Error()) + } + + // Then + timeoutCtx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelFn() + + select { + case <-timeoutCtx.Done(): + o, _ := conn.ReadLastOffset() + if o != 3 { + t.Fatalf("offset %v must be equal to 3", o) + } + case <-messageCh: + t.Fatal("Consumer is Pause Mode so it is not possible to consume message!") + } + + // When + consumer.Resume() + + // Then + <-messageCh + <-messageCh + <-messageCh +} + func Test_Should_Batch_Consume_Messages_Successfully(t *testing.T) { // Given topic := "batch-topic"