diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index e6a541de..b387418f 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -13,28 +13,12 @@ import ( "github.com/sirupsen/logrus" ) +// ItemExporter is an interface for exporting items. type ItemExporter[T any] interface { - // ExportItems exports a batch of items. - // - // This function is called synchronously, so there is no concurrency - // safety requirement. However, due to the synchronous calling pattern, - // it is critical that all timeouts and cancellations contained in the - // passed context must be honored. - // - // Any retry logic must be contained in this function. The SDK that - // calls this function will not implement any retry logic. All errors - // returned by this function are considered unrecoverable and will be - // reported to a configured error Handler. ExportItems(ctx context.Context, items []*T) error - - // Shutdown notifies the exporter of a pending halt to operations. The - // exporter is expected to preform any cleanup or synchronization it - // requires while honoring all timeouts and cancellations contained in - // the passed context. Shutdown(ctx context.Context) error } -// Defaults for BatchItemProcessorOptions. const ( DefaultMaxQueueSize = 51200 DefaultScheduleDelay = 5000 @@ -44,6 +28,7 @@ const ( DefaultNumWorkers = 1 ) +// ShippingMethod is the method of shipping items for export. type ShippingMethod string const ( @@ -52,38 +37,16 @@ const ( ShippingMethodSync ShippingMethod = "sync" ) -// BatchItemProcessorOption configures a BatchItemProcessor. +// BatchItemProcessorOption is a functional option for the batch item processor. type BatchItemProcessorOption func(o *BatchItemProcessorOptions) -// BatchItemProcessorOptions is configuration settings for a -// BatchItemProcessor. type BatchItemProcessorOptions struct { - // MaxQueueSize is the maximum queue size to buffer items for delayed processing. If the - // queue gets full it drops the items. - // The default value of MaxQueueSize is 51200. - MaxQueueSize int - - // BatchTimeout is the maximum duration for constructing a batch. Processor - // forcefully sends available items when timeout is reached. - // The default value of BatchTimeout is 5000 msec. - BatchTimeout time.Duration - - // ExportTimeout specifies the maximum duration for exporting items. If the timeout - // is reached, the export will be cancelled. - // The default value of ExportTimeout is 30000 msec. - ExportTimeout time.Duration - - // MaxExportBatchSize is the maximum number of items to process in a single batch. - // If there are more than one batch worth of items then it processes multiple batches - // of items one batch after the other without any delay. - // The default value of MaxExportBatchSize is 512. + MaxQueueSize int + BatchTimeout time.Duration + ExportTimeout time.Duration MaxExportBatchSize int - - // ShippingMethod is the method used to ship items to the exporter. - ShippingMethod ShippingMethod - - // Number of workers to process items. - Workers int + ShippingMethod ShippingMethod + Workers int } func (o *BatchItemProcessorOptions) Validate() error { @@ -102,26 +65,20 @@ func (o *BatchItemProcessorOptions) Validate() error { return nil } -// BatchItemProcessor is a buffer that batches asynchronously-received -// items and sends them to a exporter when complete. +// BatchItemProcessor is a processor that batches items for export. type BatchItemProcessor[T any] struct { e ItemExporter[T] o BatchItemProcessorOptions log logrus.FieldLogger - queue chan *T + queue chan traceableItem[T] + batchCh chan []traceableItem[T] dropped uint32 name string metrics *Metrics - batches chan batchWithErr[T] - batchReady chan struct{} - - batch []*T - batchMutex sync.Mutex - timer *time.Timer stopWait sync.WaitGroup stopOnce sync.Once @@ -129,16 +86,13 @@ type BatchItemProcessor[T any] struct { stopWorkersCh chan struct{} } -type batchWithErr[T any] struct { - items []*T +type traceableItem[T any] struct { + item *T errCh chan error completedCh chan struct{} } -// NewBatchItemProcessor creates a new ItemProcessor that will send completed -// item batches to the exporter with the supplied options. -// -// If the exporter is nil, the item processor will preform no action. +// NewBatchItemProcessor creates a new batch item processor. func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log logrus.FieldLogger, options ...BatchItemProcessorOption) (*BatchItemProcessor[T], error) { maxQueueSize := DefaultMaxQueueSize maxExportBatchSize := DefaultMaxExportBatchSize @@ -175,9 +129,9 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log log: log, name: name, metrics: metrics, - batch: make([]*T, 0, o.MaxQueueSize), timer: time.NewTimer(o.BatchTimeout), - queue: make(chan *T, o.MaxQueueSize), + queue: make(chan traceableItem[T], o.MaxQueueSize), + batchCh: make(chan []traceableItem[T], o.Workers), stopCh: make(chan struct{}), stopWorkersCh: make(chan struct{}), } @@ -193,9 +147,6 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log }, ).Info("Batch item processor initialized") - bvp.batches = make(chan batchWithErr[T], o.Workers) - bvp.batchReady = make(chan struct{}, 1) - bvp.stopWait.Add(o.Workers) for i := 0; i < o.Workers; i++ { @@ -206,107 +157,75 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log } go func() { - bvp.batchBuilder(context.Background()) // Start building batches + bvp.batchBuilder(context.Background()) bvp.log.Info("Batch builder exited") }() return &bvp, nil } +// Write writes items to the queue. If the Processor is configured to use +// the sync shipping method, the items will be written to the queue and this +// function will return when all items have been processed. If the Processor is +// configured to use the async shipping method, the items will be written to +// the queue and this function will return immediately. func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { _, span := observability.Tracer().Start(ctx, "BatchItemProcessor.Write") defer span.End() bvp.metrics.SetItemsQueued(bvp.name, float64(len(bvp.queue))) - if bvp.o.ShippingMethod == ShippingMethodSync { - return bvp.immediatelyExportItems(ctx, s) - } - - bvp.metrics.SetItemsQueued(bvp.name, float64(len(bvp.queue))) - if bvp.e == nil { return errors.New("exporter is nil") } - for _, i := range s { - bvp.enqueue(i) - } - - return nil -} - -// immediatelyExportItems immediately exports the items to the exporter. -// Useful for propagating errors from the exporter. -func (bvp *BatchItemProcessor[T]) immediatelyExportItems(ctx context.Context, items []*T) error { - _, span := observability.Tracer().Start(ctx, "BatchItemProcessor.ImmediatelyExportItems") - defer span.End() - - if len(items) == 0 { - return nil - } - - batchSize := bvp.o.MaxExportBatchSize - if batchSize == 0 { - batchSize = 1 - } - - errCh := make(chan error, bvp.o.Workers) - completedCh := make(chan struct{}, bvp.o.Workers) - pendingExports := 0 - - for i := 0; i < len(items); i += batchSize { - end := i + batchSize - if end > len(items) { - end = len(items) + // Break our items up in to chunks that can be processed at + // one time by our workers. This is to prevent wasting + // resources sending items if we've failed an earlier + // batch. + batchSize := bvp.o.Workers * bvp.o.MaxExportBatchSize + for start := 0; start < len(s); start += batchSize { + end := start + batchSize + if end > len(s) { + end = len(s) } - itemsBatch := items[i:end] - bvp.batches <- batchWithErr[T]{items: itemsBatch, errCh: errCh, completedCh: completedCh} - bvp.batchReady <- struct{}{} - - pendingExports++ - - // Only create a new batch if there are already bvp.o.Workers pending exports. - // We do this so we don't bother queueing up any more batches if - // a previous batch has failed. - if pendingExports >= bvp.o.Workers { - select { - case batchErr := <-errCh: - pendingExports-- + prepared := []traceableItem[T]{} + for _, i := range s[start:end] { + prepared = append(prepared, traceableItem[T]{ + item: i, + errCh: make(chan error, 1), + completedCh: make(chan struct{}, 1), + }) + } - if batchErr != nil { - return batchErr - } - case <-completedCh: - pendingExports-- - case <-ctx.Done(): - return ctx.Err() + for _, i := range prepared { + if !bvp.enqueueOrDrop(ctx, i.item, i.errCh, i.completedCh) { + return errors.New("failed to enqueue item - queue is full") } } - } - // Wait for remaining pending exports to complete - for pendingExports > 0 { - select { - case batchErr := <-errCh: - pendingExports-- - - if batchErr != nil { - return batchErr + if bvp.o.ShippingMethod == ShippingMethodSync { + for _, item := range prepared { + select { + case err := <-item.errCh: + if err != nil { + return err + } + case <-item.completedCh: + continue + case <-ctx.Done(): + return ctx.Err() + } } - case <-completedCh: - pendingExports-- - case <-ctx.Done(): - return ctx.Err() } } return nil } -// exportWithTimeout exports the items with a timeout. -func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBatch []*T) error { +// exportWithTimeout exports items with a timeout. +func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBatch []traceableItem[T]) error { if bvp.o.ExportTimeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, bvp.o.ExportTimeout) @@ -314,20 +233,32 @@ func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBa defer cancel() } - err := bvp.e.ExportItems(ctx, itemsBatch) + items := make([]*T, len(itemsBatch)) + for i, item := range itemsBatch { + items[i] = item.item + } + + err := bvp.e.ExportItems(ctx, items) if err != nil { bvp.metrics.IncItemsFailedBy(bvp.name, float64(len(itemsBatch))) - - return err + } else { + bvp.metrics.IncItemsExportedBy(bvp.name, float64(len(itemsBatch))) } - bvp.metrics.IncItemsExportedBy(bvp.name, float64(len(itemsBatch))) + for _, item := range itemsBatch { + if item.errCh != nil { + item.errCh <- err + } + + if item.completedCh != nil { + item.completedCh <- struct{}{} + } + } return nil } -// Shutdown flushes the queue and waits until all items are processed. -// It only executes once. Subsequent call does nothing. +// Shutdown shuts down the batch item processor. func (bvp *BatchItemProcessor[T]) Shutdown(ctx context.Context) error { var err error @@ -338,19 +269,14 @@ func (bvp *BatchItemProcessor[T]) Shutdown(ctx context.Context) error { close(bvp.stopCh) - // Drain the queue - bvp.drainQueue() - - // Stop the timer bvp.timer.Stop() - // Stop the workers + bvp.drainQueue() + close(bvp.stopWorkersCh) - // Wait for the workers to finish bvp.stopWait.Wait() - // Shutdown the exporter if bvp.e != nil { if err = bvp.e.Shutdown(ctx); err != nil { bvp.log.WithError(err).Error("failed to shutdown processor") @@ -359,7 +285,6 @@ func (bvp *BatchItemProcessor[T]) Shutdown(ctx context.Context) error { close(wait) }() - // Wait until the wait group is done or the context is cancelled select { case <-wait: case <-ctx.Done(): @@ -370,51 +295,36 @@ func (bvp *BatchItemProcessor[T]) Shutdown(ctx context.Context) error { return err } -// WithMaxQueueSize returns a BatchItemProcessorOption that configures the -// maximum queue size allowed for a BatchItemProcessor. func WithMaxQueueSize(size int) BatchItemProcessorOption { return func(o *BatchItemProcessorOptions) { o.MaxQueueSize = size } } -// WithMaxExportBatchSize returns a BatchItemProcessorOption that configures -// the maximum export batch size allowed for a BatchItemProcessor. func WithMaxExportBatchSize(size int) BatchItemProcessorOption { return func(o *BatchItemProcessorOptions) { o.MaxExportBatchSize = size } } -// WithBatchTimeout returns a BatchItemProcessorOption that configures the -// maximum delay allowed for a BatchItemProcessor before it will export any -// held item (whether the queue is full or not). func WithBatchTimeout(delay time.Duration) BatchItemProcessorOption { return func(o *BatchItemProcessorOptions) { o.BatchTimeout = delay } } -// WithExportTimeout returns a BatchItemProcessorOption that configures the -// amount of time a BatchItemProcessor waits for an exporter to export before -// abandoning the export. func WithExportTimeout(timeout time.Duration) BatchItemProcessorOption { return func(o *BatchItemProcessorOptions) { o.ExportTimeout = timeout } } -// WithExportTimeout returns a BatchItemProcessorOption that configures the -// amount of time a BatchItemProcessor waits for an exporter to export before -// abandoning the export. func WithShippingMethod(method ShippingMethod) BatchItemProcessorOption { return func(o *BatchItemProcessorOptions) { o.ShippingMethod = method } } -// WithWorkers returns a BatchItemProcessorOption that configures the -// number of workers to process items. func WithWorkers(workers int) BatchItemProcessorOption { return func(o *BatchItemProcessorOptions) { o.Workers = workers @@ -422,59 +332,48 @@ func WithWorkers(workers int) BatchItemProcessorOption { } func (bvp *BatchItemProcessor[T]) batchBuilder(ctx context.Context) { + log := bvp.log.WithField("module", "batch_builder") + + var batch []traceableItem[T] + for { select { case <-bvp.stopWorkersCh: - bvp.log.Info("Stopping batch builder") + log.Info("Stopping batch builder") return - case sd := <-bvp.queue: - bvp.log.Info("New item added to queue") + case item := <-bvp.queue: + batch = append(batch, item) - bvp.batchMutex.Lock() - bvp.batch = append(bvp.batch, sd) + if len(batch) >= bvp.o.MaxExportBatchSize { + bvp.sendBatch(batch, "max_export_batch_size") - if len(bvp.batch) >= bvp.o.MaxExportBatchSize { - bvp.sendBatch("max_export_batch_size") + batch = []traceableItem[T]{} } - bvp.batchMutex.Unlock() case <-bvp.timer.C: - bvp.batchMutex.Lock() - bvp.log.Info("Batch timeout reached") - - if len(bvp.batch) > 0 { - bvp.sendBatch("timer") + if len(batch) > 0 { + bvp.sendBatch(batch, "timer") + batch = []traceableItem[T]{} } else { - bvp.log.Info("No items in batch, resetting timer") bvp.timer.Reset(bvp.o.BatchTimeout) } - bvp.batchMutex.Unlock() } } } - -func (bvp *BatchItemProcessor[T]) sendBatch(reason string) { +func (bvp *BatchItemProcessor[T]) sendBatch(batch []traceableItem[T], reason string) { log := bvp.log.WithField("reason", reason) - log.Infof("Creating a batch of %d items", len(bvp.batch)) + log.Tracef("Creating a batch of %d items", len(batch)) - batchCopy := make([]*T, len(bvp.batch)) - log.Infof("Copying batch items") - copy(batchCopy, bvp.batch) - log.Infof("Batch items copied") + batchCopy := make([]traceableItem[T], len(batch)) + copy(batchCopy, batch) - bvp.batches <- batchWithErr[T]{items: batchCopy, errCh: make(chan error), completedCh: make(chan struct{})} - log.Infof("Batch sent to batches channel") + log.Tracef("Batch items copied") - bvp.batch = bvp.batch[:0] - log.Infof("Batch cleared") + bvp.batchCh <- batchCopy - bvp.batchReady <- struct{}{} - log.Infof("Signaled batchReady") + log.Tracef("Batch sent to batch channel") } -// worker removes items from the `queue` channel until processor -// is shut down. It calls the exporter in batches of up to MaxExportBatchSize -// waiting up to BatchTimeout to form a batch. func (bvp *BatchItemProcessor[T]) worker(ctx context.Context, number int) { bvp.log.Infof("Starting worker %d", number) @@ -484,56 +383,32 @@ func (bvp *BatchItemProcessor[T]) worker(ctx context.Context, number int) { bvp.log.Infof("Stopping worker %d", number) return - case <-bvp.batchReady: - bvp.log.Infof("Worker %d is processing a batch", number) - + case batch := <-bvp.batchCh: bvp.timer.Reset(bvp.o.BatchTimeout) - batch := <-bvp.batches - - bvp.log.Infof("Worker %d is exporting a batch of %d items", number, len(batch.items)) - if err := bvp.exportWithTimeout(ctx, batch.items); err != nil { + if err := bvp.exportWithTimeout(ctx, batch); err != nil { bvp.log.WithError(err).Error("failed to export items") - - if batch.errCh != nil { - batch.errCh <- err - } } - - bvp.log.Infof("Batch completed by worker %d", number) - - batch.completedCh <- struct{}{} } } } -// drainQueue awaits the any caller that had added to bvp.stopWait -// to finish the enqueue, then exports the final batch. func (bvp *BatchItemProcessor[T]) drainQueue() { bvp.log.Info("Draining queue: waiting for the batch builder to pull all the items from the queue") - // Wait for the batch builder to send all remaining items to the workers. for len(bvp.queue) > 0 { time.Sleep(10 * time.Millisecond) } bvp.log.Info("Draining queue: waiting for workers to finish processing batches") - // Wait for the workers to finish processing all batches. - for len(bvp.batches) > 0 { - batch := <-bvp.batches - <-batch.completedCh + for len(bvp.queue) > 0 { + <-bvp.queue } bvp.log.Info("Draining queue: all batches finished") - // Close the batches channel since no more batches will be sent. - close(bvp.batches) -} - -func (bvp *BatchItemProcessor[T]) enqueue(sd *T) { - ctx := context.TODO() - bvp.enqueueOrDrop(ctx, sd) + close(bvp.queue) } func recoverSendOnClosedChan() { @@ -549,7 +424,7 @@ func recoverSendOnClosedChan() { panic(x) } -func (bvp *BatchItemProcessor[T]) enqueueOrDrop(ctx context.Context, sd *T) bool { +func (bvp *BatchItemProcessor[T]) enqueueOrDrop(ctx context.Context, sd *T, errCh chan error, completedCh chan struct{}) bool { // This ensures the bvp.queue<- below does not panic as the // processor shuts down. defer recoverSendOnClosedChan() @@ -561,7 +436,7 @@ func (bvp *BatchItemProcessor[T]) enqueueOrDrop(ctx context.Context, sd *T) bool } select { - case bvp.queue <- sd: + case bvp.queue <- traceableItem[T]{item: sd, errCh: errCh, completedCh: completedCh}: return true default: atomic.AddUint32(&bvp.dropped, 1) diff --git a/pkg/processor/batch_test.go b/pkg/processor/batch_test.go index 8f24181e..72c7fbe4 100644 --- a/pkg/processor/batch_test.go +++ b/pkg/processor/batch_test.go @@ -119,23 +119,26 @@ type testOption struct { wantBatchCount int } -func TestNewBatchItemProcessorWithOptions(t *testing.T) { +func TestAsyncNewBatchItemProcessorWithOptions(t *testing.T) { schDelay := 100 * time.Millisecond options := []testOption{ { name: "default", o: []BatchItemProcessorOption{ - WithShippingMethod(ShippingMethodSync), + WithShippingMethod(ShippingMethodAsync), + WithBatchTimeout(10 * time.Millisecond), }, + writeNumItems: 2048, genNumItems: 2053, - wantNumItems: 2048, - wantBatchCount: 4, + wantNumItems: 2053, + wantBatchCount: 5, }, { name: "non-default BatchTimeout", o: []BatchItemProcessorOption{ WithBatchTimeout(schDelay), - WithShippingMethod(ShippingMethodSync), + WithShippingMethod(ShippingMethodAsync), + WithMaxExportBatchSize(512), }, writeNumItems: 2048, genNumItems: 2053, @@ -147,25 +150,13 @@ func TestNewBatchItemProcessorWithOptions(t *testing.T) { o: []BatchItemProcessorOption{ WithBatchTimeout(schDelay), WithMaxQueueSize(200), - WithShippingMethod(ShippingMethodSync), - }, - writeNumItems: 200, - genNumItems: 205, - wantNumItems: 205, - wantBatchCount: 1, - }, - { - name: "blocking option", - o: []BatchItemProcessorOption{ - WithBatchTimeout(schDelay), - WithMaxQueueSize(200), - WithMaxExportBatchSize(20), - WithShippingMethod(ShippingMethodSync), + WithMaxExportBatchSize(200), + WithShippingMethod(ShippingMethodAsync), }, writeNumItems: 200, genNumItems: 205, wantNumItems: 205, - wantBatchCount: 11, + wantBatchCount: 2, }, } @@ -175,11 +166,11 @@ func TestNewBatchItemProcessorWithOptions(t *testing.T) { ssp, err := createAndRegisterBatchSP(option.o, &te) if err != nil { - t.Fatalf("%s: Error creating new instance of BatchItemProcessor\n", option.name) + require.NoError(t, err) } if ssp == nil { - t.Fatalf("%s: Error creating new instance of BatchItemProcessor\n", option.name) + require.NoError(t, err) } for i := 0; i < option.genNumItems; i++ { @@ -190,7 +181,7 @@ func TestNewBatchItemProcessorWithOptions(t *testing.T) { if err := ssp.Write(context.Background(), []*TestItem{{ name: "test", }}); err != nil { - t.Errorf("%s: Error writing to BatchItemProcessor\n", option.name) + t.Errorf("%s: Error writing to BatchItemProcessor", option.name) } } @@ -198,15 +189,15 @@ func TestNewBatchItemProcessorWithOptions(t *testing.T) { gotNumOfItems := te.len() if option.wantNumItems > 0 && option.wantNumItems != gotNumOfItems { - t.Errorf("number of exported items: got %+v, want %+v\n", + t.Errorf("number of exported items: got %v, want %v", gotNumOfItems, option.wantNumItems) } gotBatchCount := te.getBatchCount() if option.wantBatchCount > 0 && gotBatchCount != option.wantBatchCount { - t.Errorf("number batches: got %+v, want >= %+v\n", + t.Errorf("number batches: got %v, want %v", gotBatchCount, option.wantBatchCount) - t.Errorf("Batches %v\n", te.sizes) + t.Errorf("Batches %v", te.sizes) } }) } @@ -280,7 +271,7 @@ func TestBatchItemProcessorDrainQueue(t *testing.T) { bsp, err := NewBatchItemProcessor[TestItem](&be, "processor", log, WithMaxExportBatchSize(5), WithBatchTimeout(1*time.Second), WithWorkers(2), WithShippingMethod(ShippingMethodAsync)) require.NoError(t, err) - itemsToExport := 50 + itemsToExport := 5000 for i := 0; i < itemsToExport; i++ { if err := bsp.Write(context.Background(), []*TestItem{{ @@ -313,11 +304,10 @@ func TestBatchItemProcessorPostShutdown(t *testing.T) { lenJustAfterShutdown := be.len() for i := 0; i < 60; i++ { - if err := bsp.Write(context.Background(), []*TestItem{{ + err := bsp.Write(context.Background(), []*TestItem{{ name: strconv.Itoa(i), - }}); err != nil { - t.Errorf("Error writing to BatchItemProcessor\n") - } + }}) + require.Error(t, err) } assert.Equal(t, lenJustAfterShutdown, be.len(), "Write should have no effect after Shutdown") @@ -494,8 +484,8 @@ func (ErrorItemExporter[T]) ExportItems(ctx context.Context, _ []*T) error { } // TestBatchItemProcessorWithSyncErrorExporter tests a processor with ShippingMethod = sync and an exporter that only returns errors. -func TestBatchItemProcessorWithAsyncErrorExporter(t *testing.T) { - bsp, err := NewBatchItemProcessor[TestItem](ErrorItemExporter[TestItem]{}, "processor", nullLogger(), WithShippingMethod(ShippingMethodSync)) +func TestBatchItemProcessorWithSyncErrorExporter(t *testing.T) { + bsp, err := NewBatchItemProcessor[TestItem](ErrorItemExporter[TestItem]{}, "processor", nullLogger(), WithShippingMethod(ShippingMethodSync), WithBatchTimeout(100*time.Millisecond)) if err != nil { t.Fatalf("failed to create batch processor: %v", err) } @@ -510,14 +500,22 @@ func TestBatchItemProcessorSyncShipping(t *testing.T) { // Define a range of values for workers, maxExportBatchSize, and itemsToExport workerCounts := []int{1, 5, 10} maxBatchSizes := []int{1, 5, 10, 20} - itemCounts := []int{0, 1, 25, 50} + itemCounts := []int{0, 1, 10, 25, 50} for _, workers := range workerCounts { for _, maxBatchSize := range maxBatchSizes { for _, itemsToExport := range itemCounts { t.Run(fmt.Sprintf("%d workers, batch size %d, %d items", workers, maxBatchSize, itemsToExport), func(t *testing.T) { te := testBatchExporter[TestItem]{} - bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(maxBatchSize), WithWorkers(workers), WithShippingMethod(ShippingMethodSync)) + bsp, err := NewBatchItemProcessor[TestItem]( + &te, + "processor", + logrus.New(), + WithMaxExportBatchSize(maxBatchSize), + WithWorkers(workers), + WithShippingMethod(ShippingMethodSync), + WithBatchTimeout(100*time.Millisecond), + ) require.NoError(t, err) items := make([]*TestItem, itemsToExport)