Skip to content

Commit

Permalink
chore: fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Abdulsametileri committed Jan 14, 2024
1 parent e63a263 commit c75e4bf
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 33 deletions.
18 changes: 11 additions & 7 deletions batch_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,10 +360,10 @@ func Test_batchConsumer_Pause(t *testing.T) {
ctx, cancelFn := context.WithCancel(context.Background())
bc := batchConsumer{
base: &base{
logger: NewZapLogger(LogLevelDebug),
context: ctx,
pause: make(chan struct{}),
cancelFn: cancelFn,
logger: NewZapLogger(LogLevelDebug),
pause: make(chan struct{}),
context: ctx, cancelFn: cancelFn,
consumerState: stateRunning,
},
}

Expand All @@ -382,12 +382,16 @@ func Test_batchConsumer_Pause(t *testing.T) {

func Test_batchConsumer_Resume(t *testing.T) {
// Given
mc := mockReader{}
ctx, cancelFn := context.WithCancel(context.Background())
bc := batchConsumer{
base: &base{
logger: NewZapLogger(LogLevelDebug),
context: ctx,
cancelFn: cancelFn,
r: &mc,
logger: NewZapLogger(LogLevelDebug),
pause: make(chan struct{}),
quit: make(chan struct{}),
wg: sync.WaitGroup{},
context: ctx, cancelFn: cancelFn,
},
}

Expand Down
5 changes: 2 additions & 3 deletions consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ func (c *base) Pause() {
c.cancelFn()

c.pause <- struct{}{}
close(c.pause)

c.consumerState = statePaused
}
Expand All @@ -200,15 +199,15 @@ func (c *base) WithLogger(logger LoggerInterface) {
}

func (c *base) Stop() error {
c.logger.Info("Stop called!")
c.logger.Info("Stop is called!")

var err error
c.once.Do(func() {
c.subprocesses.Stop()
c.cancelFn()

// In order to save cpu, we break startConsume loop in pause mode.
// If consumer is pause mode, and Stop called
// If consumer is pause mode and Stop is called
// We need to close incomingMessageStream, because c.wg.Wait() blocks indefinitely.
if c.consumerState == stateRunning {
c.quit <- struct{}{}
Expand Down
17 changes: 9 additions & 8 deletions consumer_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,24 @@ package kafka
import (
"context"
"errors"
"github.com/google/go-cmp/cmp"
"github.com/segmentio/kafka-go"
"sync"
"testing"
"time"

"github.com/google/go-cmp/cmp"

"github.com/segmentio/kafka-go"
)

func Test_base_startConsume(t *testing.T) {
t.Run("Return_When_Quit_Signal_Is_Came", func(t *testing.T) {
mc := mockReader{wantErr: true}
b := base{
wg: sync.WaitGroup{}, r: &mc,
wg: sync.WaitGroup{},
r: &mc,
incomingMessageStream: make(chan *IncomingMessage),
quit: make(chan struct{}),
pause: make(chan struct{}),
logger: NewZapLogger(LogLevelInfo),
logger: NewZapLogger(LogLevelError),
consumerState: stateRunning,
}
b.context, b.cancelFn = context.WithCancel(context.Background())

Expand Down Expand Up @@ -65,6 +65,7 @@ func Test_base_Pause(t *testing.T) {
logger: NewZapLogger(LogLevelDebug),
pause: make(chan struct{}),
context: ctx, cancelFn: cancelFn,
consumerState: stateRunning,
}
go func() {
<-b.pause
Expand All @@ -86,10 +87,10 @@ func Test_base_Resume(t *testing.T) {
b := base{
r: &mc,
logger: NewZapLogger(LogLevelDebug),
quit: make(chan struct{}),
pause: make(chan struct{}),
quit: make(chan struct{}),
wg: sync.WaitGroup{},
context: ctx, cancelFn: cancelFn,
wg: sync.WaitGroup{},
}

// When
Expand Down
19 changes: 12 additions & 7 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kafka
import (
"context"
"errors"
"sync"
"testing"
)

Expand Down Expand Up @@ -121,10 +122,10 @@ func Test_consumer_Pause(t *testing.T) {
ctx, cancelFn := context.WithCancel(context.Background())
c := consumer{
base: &base{
logger: NewZapLogger(LogLevelDebug),
pause: make(chan struct{}),
context: ctx,
cancelFn: cancelFn,
logger: NewZapLogger(LogLevelDebug),
pause: make(chan struct{}),
context: ctx, cancelFn: cancelFn,
consumerState: stateRunning,
},
}
go func() {
Expand All @@ -142,12 +143,16 @@ func Test_consumer_Pause(t *testing.T) {

func Test_consumer_Resume(t *testing.T) {
// Given
mc := mockReader{}
ctx, cancelFn := context.WithCancel(context.Background())
c := consumer{
base: &base{
logger: NewZapLogger(LogLevelDebug),
context: ctx,
cancelFn: cancelFn,
r: &mc,
logger: NewZapLogger(LogLevelDebug),
pause: make(chan struct{}),
quit: make(chan struct{}),
wg: sync.WaitGroup{},
context: ctx, cancelFn: cancelFn,
},
}

Expand Down
15 changes: 7 additions & 8 deletions examples/with-pause-resume-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,15 @@ func main() {

time.Sleep(10 * time.Second)
consumer.Resume()
/*
time.Sleep(10 * time.Second)
consumer.Pause()

time.Sleep(10 * time.Second)
consumer.Resume()
time.Sleep(10 * time.Second)
consumer.Pause()

time.Sleep(10 * time.Second)
consumer.Pause()
*/
time.Sleep(10 * time.Second)
consumer.Resume()

time.Sleep(10 * time.Second)
consumer.Pause()
}()

c := make(chan os.Signal, 1)
Expand Down

0 comments on commit c75e4bf

Please sign in to comment.