diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index 7320b4a2..e6a541de 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -175,26 +175,40 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log log: log, name: name, metrics: metrics, - batch: make([]*T, 0, o.MaxExportBatchSize), + batch: make([]*T, 0, o.MaxQueueSize), timer: time.NewTimer(o.BatchTimeout), queue: make(chan *T, o.MaxQueueSize), stopCh: make(chan struct{}), stopWorkersCh: make(chan struct{}), } + bvp.log.WithFields( + logrus.Fields{ + "workers": bvp.o.Workers, + "batch_timeout": bvp.o.BatchTimeout, + "export_timeout": bvp.o.ExportTimeout, + "max_export_batch_size": bvp.o.MaxExportBatchSize, + "max_queue_size": bvp.o.MaxQueueSize, + "shipping_method": bvp.o.ShippingMethod, + }, + ).Info("Batch item processor initialized") + bvp.batches = make(chan batchWithErr[T], o.Workers) bvp.batchReady = make(chan struct{}, 1) bvp.stopWait.Add(o.Workers) for i := 0; i < o.Workers; i++ { - go func() { + go func(num int) { defer bvp.stopWait.Done() - bvp.worker(context.Background()) - }() + bvp.worker(context.Background(), num) + }(i) } - go bvp.batchBuilder(context.Background()) // Start building batches + go func() { + bvp.batchBuilder(context.Background()) // Start building batches + bvp.log.Info("Batch builder exited") + }() return &bvp, nil } @@ -317,11 +331,11 @@ func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBa func (bvp *BatchItemProcessor[T]) Shutdown(ctx context.Context) error { var err error - bvp.log.Debug("Shutting down processor") - bvp.stopOnce.Do(func() { wait := make(chan struct{}) go func() { + bvp.log.Info("Stopping processor") + close(bvp.stopCh) // Drain the queue @@ -411,51 +425,73 @@ func (bvp *BatchItemProcessor[T]) batchBuilder(ctx context.Context) { for { select { case <-bvp.stopWorkersCh: + bvp.log.Info("Stopping batch builder") + return case sd := <-bvp.queue: + bvp.log.Info("New item added to queue") + bvp.batchMutex.Lock() bvp.batch = append(bvp.batch, sd) if len(bvp.batch) >= bvp.o.MaxExportBatchSize { - batchCopy := make([]*T, len(bvp.batch)) - copy(batchCopy, bvp.batch) - bvp.batches <- batchWithErr[T]{items: batchCopy, errCh: make(chan error), completedCh: make(chan struct{})} - bvp.batch = bvp.batch[:0] - bvp.batchReady <- struct{}{} + bvp.sendBatch("max_export_batch_size") } bvp.batchMutex.Unlock() case <-bvp.timer.C: bvp.batchMutex.Lock() + bvp.log.Info("Batch timeout reached") if len(bvp.batch) > 0 { - batchCopy := make([]*T, len(bvp.batch)) - copy(batchCopy, bvp.batch) - bvp.batches <- batchWithErr[T]{items: batchCopy, errCh: make(chan error), completedCh: make(chan struct{})} - bvp.batch = bvp.batch[:0] - bvp.batchReady <- struct{}{} + bvp.sendBatch("timer") } else { - // Reset the timer if there are no items in the batch. - // If there are items in the batch, one of the workers will reset the timer. + bvp.log.Info("No items in batch, resetting timer") bvp.timer.Reset(bvp.o.BatchTimeout) } - bvp.batchMutex.Unlock() } } } +func (bvp *BatchItemProcessor[T]) sendBatch(reason string) { + log := bvp.log.WithField("reason", reason) + log.Infof("Creating a batch of %d items", len(bvp.batch)) + + batchCopy := make([]*T, len(bvp.batch)) + log.Infof("Copying batch items") + copy(batchCopy, bvp.batch) + log.Infof("Batch items copied") + + bvp.batches <- batchWithErr[T]{items: batchCopy, errCh: make(chan error), completedCh: make(chan struct{})} + log.Infof("Batch sent to batches channel") + + bvp.batch = bvp.batch[:0] + log.Infof("Batch cleared") + + bvp.batchReady <- struct{}{} + log.Infof("Signaled batchReady") +} + // worker removes items from the `queue` channel until processor // is shut down. It calls the exporter in batches of up to MaxExportBatchSize // waiting up to BatchTimeout to form a batch. -func (bvp *BatchItemProcessor[T]) worker(ctx context.Context) { +func (bvp *BatchItemProcessor[T]) worker(ctx context.Context, number int) { + bvp.log.Infof("Starting worker %d", number) + for { select { case <-bvp.stopWorkersCh: + bvp.log.Infof("Stopping worker %d", number) + return case <-bvp.batchReady: + bvp.log.Infof("Worker %d is processing a batch", number) + bvp.timer.Reset(bvp.o.BatchTimeout) batch := <-bvp.batches + bvp.log.Infof("Worker %d is exporting a batch of %d items", number, len(batch.items)) + if err := bvp.exportWithTimeout(ctx, batch.items); err != nil { bvp.log.WithError(err).Error("failed to export items") @@ -464,6 +500,8 @@ func (bvp *BatchItemProcessor[T]) worker(ctx context.Context) { } } + bvp.log.Infof("Batch completed by worker %d", number) + batch.completedCh <- struct{}{} } } @@ -472,17 +510,23 @@ func (bvp *BatchItemProcessor[T]) worker(ctx context.Context) { // drainQueue awaits the any caller that had added to bvp.stopWait // to finish the enqueue, then exports the final batch. func (bvp *BatchItemProcessor[T]) drainQueue() { + bvp.log.Info("Draining queue: waiting for the batch builder to pull all the items from the queue") + // Wait for the batch builder to send all remaining items to the workers. for len(bvp.queue) > 0 { time.Sleep(10 * time.Millisecond) } + bvp.log.Info("Draining queue: waiting for workers to finish processing batches") + // Wait for the workers to finish processing all batches. for len(bvp.batches) > 0 { batch := <-bvp.batches <-batch.completedCh } + bvp.log.Info("Draining queue: all batches finished") + // Close the batches channel since no more batches will be sent. close(bvp.batches) } diff --git a/pkg/processor/batch_test.go b/pkg/processor/batch_test.go index 1ac75bdd..8f24181e 100644 --- a/pkg/processor/batch_test.go +++ b/pkg/processor/batch_test.go @@ -276,7 +276,8 @@ func TestBatchItemProcessorShutdown(t *testing.T) { func TestBatchItemProcessorDrainQueue(t *testing.T) { be := testBatchExporter[TestItem]{} - bsp, err := NewBatchItemProcessor[TestItem](&be, "processor", nullLogger(), WithMaxExportBatchSize(5), WithBatchTimeout(5*time.Minute), WithWorkers(5)) + log := logrus.New() + bsp, err := NewBatchItemProcessor[TestItem](&be, "processor", log, WithMaxExportBatchSize(5), WithBatchTimeout(1*time.Second), WithWorkers(2), WithShippingMethod(ShippingMethodAsync)) require.NoError(t, err) itemsToExport := 50