Skip to content

Commit

Permalink
add unit tests for chunk messages
Browse files Browse the repository at this point in the history
  • Loading branch information
mhmtszr committed Dec 6, 2023
1 parent 3301839 commit 1530acb
Showing 1 changed file with 68 additions and 0 deletions.
68 changes: 68 additions & 0 deletions batch_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package kafka

import (
"errors"
"reflect"
"strconv"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -207,6 +209,72 @@ func Test_batchConsumer_process(t *testing.T) {
})
}

func Test_batchConsumer_chunk(t *testing.T) {
tests := []struct {
allMessages []*Message
chunkSize int
expected [][]*Message
}{
{
allMessages: createMessages(0, 9),
chunkSize: 3,
expected: [][]*Message{
createMessages(0, 3),
createMessages(3, 6),
createMessages(6, 9),
},
},
{
allMessages: []*Message{},
chunkSize: 3,
expected: [][]*Message{},
},
{
allMessages: createMessages(0, 1),
chunkSize: 3,
expected: [][]*Message{
createMessages(0, 1),
},
},
{
allMessages: createMessages(0, 8),
chunkSize: 3,
expected: [][]*Message{
createMessages(0, 3),
createMessages(3, 6),
createMessages(6, 8),
},
},
{
allMessages: createMessages(0, 3),
chunkSize: 3,
expected: [][]*Message{
createMessages(0, 3),
},
},
}

for i, tc := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
chunkedMessages := chunkMessages(tc.allMessages, tc.chunkSize)

if !reflect.DeepEqual(chunkedMessages, tc.expected) && !(len(chunkedMessages) == 0 && len(tc.expected) == 0) {
t.Errorf("For chunkSize %d, expected %v, but got %v", tc.chunkSize, tc.expected, chunkedMessages)
}
})
}
}

func createMessages(partitionStart int, partitionEnd int) []*Message {
messages := make([]*Message, 0)
for i := partitionStart; i < partitionEnd; i++ {
messages = append(messages, &Message{
Partition: i,
})
}
return messages
}

type mockCronsumer struct {
wantErr bool
}
Expand Down

0 comments on commit 1530acb

Please sign in to comment.