Skip to content

Commit

Permalink
fix: wrong flush message condition (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
uatmaca authored Jan 9, 2024
1 parent e377402 commit dea1d4e
Showing 1 changed file with 4 additions and 5 deletions.
9 changes: 4 additions & 5 deletions elasticsearch/bulk/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func (b *Bulk) AddActions(

key := getActionKey(action)
if batchIndex, ok := b.batchKeys[key]; ok {
b.batchByteSize += len(value) - len(b.batch[batchIndex].Bytes)
b.batch[batchIndex] = BatchItem{
Action: &actions[i],
Bytes: value,
Expand All @@ -156,18 +157,16 @@ func (b *Bulk) AddActions(
})
b.batchKeys[key] = b.batchIndex
b.batchIndex++
b.batchSize++
b.batchByteSize += len(value)
}

b.batchByteSize += len(value)
}
ctx.Ack()

b.batchSize += len(actions)

b.flushLock.Unlock()

b.metric.ProcessLatencyMs = time.Since(eventTime).Milliseconds()
if b.batchSize >= b.batchSizeLimit || len(b.batch) >= b.batchByteSizeLimit {
if b.batchSize >= b.batchSizeLimit || b.batchByteSize >= b.batchByteSizeLimit {
b.flushMessages()
}
}
Expand Down

0 comments on commit dea1d4e

Please sign in to comment.