From 50034eae7eca5cb96df5fcc181da84eb179fcf02 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Wed, 12 Jun 2024 15:33:00 +1000 Subject: [PATCH] feat(processor): Add worker support for sync workloads --- pkg/processor/batch.go | 40 +++++++++++++++++++++++++++++++++------- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index 386c4568..baf57739 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -214,6 +214,11 @@ func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, it countItemsToExport := len(items) // Split the items in to chunks of our max batch size + batchCh := make(chan []*T, bvp.o.Workers) + errCh := make(chan error, bvp.o.Workers) + + var wg sync.WaitGroup + for i := 0; i < countItemsToExport; i += bvp.o.MaxExportBatchSize { end := i + bvp.o.MaxExportBatchSize if end > countItemsToExport { @@ -221,16 +226,37 @@ func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, it } itemsBatch := items[i:end] + batchCh <- itemsBatch + } + close(batchCh) - bvp.log.WithFields(logrus.Fields{ - "count": len(itemsBatch), - }).Debug("Immediately exporting items") + for i := 0; i < bvp.o.Workers; i++ { + wg.Add(1) - err := bvp.exportWithTimeout(ctx, itemsBatch) + go func() { + defer wg.Done() - if err != nil { - return err - } + for itemsBatch := range batchCh { + bvp.log.WithFields(logrus.Fields{ + "count": len(itemsBatch), + "worker": i, + }).Debug("Immediately exporting items") + + err := bvp.exportWithTimeout(ctx, itemsBatch) + if err != nil { + errCh <- err + + return + } + } + }() + } + + wg.Wait() + close(errCh) + + if len(errCh) > 0 { + return <-errCh } }