-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add preBatch feature #83
Conversation
@@ -11,7 +11,8 @@ import ( | |||
type batchConsumer struct { | |||
*base | |||
|
|||
consumeFn func([]*Message) error | |||
consumeFn BatchConsumeFn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already defined func([]*Message) error
alias for this.
b.wg.Add(1) | ||
go b.startConsume() | ||
|
||
b.wg.Add(b.concurrency) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For more readability, I extract wg.Add within setupConcurrentWorkers method here
@@ -125,6 +128,11 @@ func chunkMessages(allMessages *[]*Message, chunkSize int) [][]*Message { | |||
func (b *batchConsumer) consume(allMessages *[]*Message, commitMessages *[]kafka.Message) { | |||
chunks := chunkMessages(allMessages, b.messageGroupLimit) | |||
|
|||
if b.preBatchFn != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to let you know, I only changed chunks, not all messages. Because allMessages is used for commitMessages. If I mutate, I have some issues of offset/lag, etc.
@@ -126,6 +126,7 @@ func (c *base) startConsume() { | |||
for { | |||
select { | |||
case <-c.quit: | |||
close(c.incomingMessageStream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before, we had closed this channel in Stop()
method, but this is not the correct way to manage channels.
If a goroutine writes to a channel, It should be that goroutine's responsibility to close the channel, too.
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## v2 #83 +/- ##
==========================================
+ Coverage 35.53% 37.05% +1.51%
==========================================
Files 19 19
Lines 695 699 +4
==========================================
+ Hits 247 259 +12
+ Misses 441 433 -8
Partials 7 7 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
In order to prevent race conditions for fat types of events (concurrency is higher than 1), we enable a feature called preBatch. Please take a look at the standalone examples of it.
In retry/exception, there is no need to integrate preBatch for it. We used chunkMessages to handle exceptional cases. chunkMessages is processed after the preBatch method, so it is unnecessary to take action for it.