Skip to content

Commit

Permalink
chore: add preBatch unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
Abdulsametileri committed Dec 16, 2023
1 parent 31f168f commit 56d3376
Showing 1 changed file with 58 additions and 0 deletions.
58 changes: 58 additions & 0 deletions batch_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,64 @@ func Test_batchConsumer_startBatch(t *testing.T) {
}
}

func Test_batchConsumer_startBatch_with_preBatch(t *testing.T) {
// Given
var numberOfBatch int

mc := mockReader{}
bc := batchConsumer{
base: &base{
incomingMessageStream: make(chan *Message, 1),
batchConsumingStream: make(chan []*Message, 1),
singleConsumingStream: make(chan *Message, 1),
messageProcessedStream: make(chan struct{}, 1),
metric: &ConsumerMetric{},
wg: sync.WaitGroup{},
messageGroupDuration: 500 * time.Millisecond,
r: &mc,
concurrency: 1,
},
messageGroupLimit: 2,
consumeFn: func(messages []*Message) error {
numberOfBatch++
return nil
},
preBatchFn: func(messages []*Message) []*Message {
return messages[:1]
},
}
go func() {
// Simulate messageGroupLimit
bc.base.incomingMessageStream <- &Message{}
bc.base.incomingMessageStream <- &Message{}

time.Sleep(1 * time.Second)

// Simulate messageGroupDuration
bc.base.incomingMessageStream <- &Message{}
bc.base.incomingMessageStream <- &Message{}

time.Sleep(1 * time.Second)

// Return from startBatch
close(bc.base.incomingMessageStream)
}()

bc.base.wg.Add(1 + bc.base.concurrency)

// When
bc.setupConcurrentWorkers()
bc.startBatch()

// Then
if numberOfBatch != 2 {
t.Fatalf("Number of batch group must equal to 2")
}
if bc.metric.TotalProcessedMessagesCounter != 2 {
t.Fatalf("Total Processed Message Counter must equal to 2")
}
}

func Test_batchConsumer_process(t *testing.T) {
t.Run("When_Processing_Is_Successful", func(t *testing.T) {
// Given
Expand Down

0 comments on commit 56d3376

Please sign in to comment.