Skip to content

Commit

Permalink
Adds config to disable commit in confluent stream (#16)
Browse files Browse the repository at this point in the history
Sometimes, we want a consumer to always read from the beginning of a topic. In
that case, it shouldn't commit the messages.
  • Loading branch information
andremissaglia authored Nov 30, 2020
1 parent 64129e8 commit 8a1adf8
Showing 1 changed file with 14 additions and 1 deletion.
15 changes: 14 additions & 1 deletion impl/implstream/kafkaconfluent/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ type Config struct {
// PoolTimeout is the value passed to the internal consumer.Pool(...)
// function. Default: 1s
PoolTimeout time.Duration

// DisableCommit indicates that offsets should never be commited, even
// after calling Done()
DisableCommit bool
}

type goduckStream struct {
Expand All @@ -47,7 +51,8 @@ type goduckStream struct {
done chan struct{}
waitGroup *sync.WaitGroup

timeout time.Duration
timeout time.Duration
disableCommit bool
}

// MustNew creates a confluent-kafka-go goduck.Stream with default configs
Expand Down Expand Up @@ -105,6 +110,7 @@ func mustCreateStream(config Config) goduck.Stream {
done: done,
waitGroup: &sync.WaitGroup{},
timeout: config.PoolTimeout,
disableCommit: config.DisableCommit,
}

stream.waitGroup.Add(1)
Expand Down Expand Up @@ -177,6 +183,9 @@ func (c *goduckStream) pollNextMessage() (*kafka.Message, error) {
}

func (c *goduckStream) markUnackedMessage(msg *kafka.Message) {
if c.disableCommit {
return
}
c.unackedMessagesLock.Lock()
defer c.unackedMessagesLock.Unlock()
tp := topicPartition{msg.TopicPartition.Topic, msg.TopicPartition.Partition}
Expand All @@ -186,6 +195,10 @@ func (c *goduckStream) markUnackedMessage(msg *kafka.Message) {
func (c *goduckStream) Done(ctx context.Context) error {
const op = errors.Op("kafkaconfluent.goduckStream.Done")

if c.disableCommit {
return nil
}

c.unackedMessagesLock.Lock()
defer c.unackedMessagesLock.Unlock()

Expand Down

0 comments on commit 8a1adf8

Please sign in to comment.