Skip to content

Commit

Permalink
feat(processor): Add worker support for sync workloads
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Jun 12, 2024
1 parent 1f75a9d commit 50034ea
Showing 1 changed file with 33 additions and 7 deletions.
40 changes: 33 additions & 7 deletions pkg/processor/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,23 +214,49 @@ func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, it
countItemsToExport := len(items)

// Split the items in to chunks of our max batch size
batchCh := make(chan []*T, bvp.o.Workers)
errCh := make(chan error, bvp.o.Workers)

var wg sync.WaitGroup

for i := 0; i < countItemsToExport; i += bvp.o.MaxExportBatchSize {
end := i + bvp.o.MaxExportBatchSize
if end > countItemsToExport {
end = countItemsToExport
}

itemsBatch := items[i:end]
batchCh <- itemsBatch
}
close(batchCh)

bvp.log.WithFields(logrus.Fields{
"count": len(itemsBatch),
}).Debug("Immediately exporting items")
for i := 0; i < bvp.o.Workers; i++ {
wg.Add(1)

err := bvp.exportWithTimeout(ctx, itemsBatch)
go func() {
defer wg.Done()

if err != nil {
return err
}
for itemsBatch := range batchCh {
bvp.log.WithFields(logrus.Fields{
"count": len(itemsBatch),
"worker": i,
}).Debug("Immediately exporting items")

err := bvp.exportWithTimeout(ctx, itemsBatch)
if err != nil {
errCh <- err

return
}
}
}()
}

wg.Wait()
close(errCh)

if len(errCh) > 0 {
return <-errCh
}
}

Expand Down

0 comments on commit 50034ea

Please sign in to comment.