Skip to content

Commit

Permalink
more debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Jun 13, 2024
1 parent 188fad4 commit 413e0e1
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 22 deletions.
86 changes: 65 additions & 21 deletions pkg/processor/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,26 +175,40 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log
log: log,
name: name,
metrics: metrics,
batch: make([]*T, 0, o.MaxExportBatchSize),
batch: make([]*T, 0, o.MaxQueueSize),
timer: time.NewTimer(o.BatchTimeout),
queue: make(chan *T, o.MaxQueueSize),
stopCh: make(chan struct{}),
stopWorkersCh: make(chan struct{}),
}

bvp.log.WithFields(
logrus.Fields{
"workers": bvp.o.Workers,
"batch_timeout": bvp.o.BatchTimeout,
"export_timeout": bvp.o.ExportTimeout,
"max_export_batch_size": bvp.o.MaxExportBatchSize,
"max_queue_size": bvp.o.MaxQueueSize,
"shipping_method": bvp.o.ShippingMethod,
},
).Info("Batch item processor initialized")

bvp.batches = make(chan batchWithErr[T], o.Workers)
bvp.batchReady = make(chan struct{}, 1)

bvp.stopWait.Add(o.Workers)

for i := 0; i < o.Workers; i++ {
go func() {
go func(num int) {
defer bvp.stopWait.Done()
bvp.worker(context.Background())
}()
bvp.worker(context.Background(), num)
}(i)
}

go bvp.batchBuilder(context.Background()) // Start building batches
go func() {
bvp.batchBuilder(context.Background()) // Start building batches
bvp.log.Info("Batch builder exited")
}()

return &bvp, nil
}
Expand Down Expand Up @@ -317,11 +331,11 @@ func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBa
func (bvp *BatchItemProcessor[T]) Shutdown(ctx context.Context) error {
var err error

bvp.log.Debug("Shutting down processor")

bvp.stopOnce.Do(func() {
wait := make(chan struct{})
go func() {
bvp.log.Info("Stopping processor")

close(bvp.stopCh)

// Drain the queue
Expand Down Expand Up @@ -411,51 +425,73 @@ func (bvp *BatchItemProcessor[T]) batchBuilder(ctx context.Context) {
for {
select {
case <-bvp.stopWorkersCh:
bvp.log.Info("Stopping batch builder")

return
case sd := <-bvp.queue:
bvp.log.Info("New item added to queue")

bvp.batchMutex.Lock()
bvp.batch = append(bvp.batch, sd)

if len(bvp.batch) >= bvp.o.MaxExportBatchSize {
batchCopy := make([]*T, len(bvp.batch))
copy(batchCopy, bvp.batch)
bvp.batches <- batchWithErr[T]{items: batchCopy, errCh: make(chan error), completedCh: make(chan struct{})}
bvp.batch = bvp.batch[:0]
bvp.batchReady <- struct{}{}
bvp.sendBatch("max_export_batch_size")
}
bvp.batchMutex.Unlock()
case <-bvp.timer.C:
bvp.batchMutex.Lock()
bvp.log.Info("Batch timeout reached")

if len(bvp.batch) > 0 {
batchCopy := make([]*T, len(bvp.batch))
copy(batchCopy, bvp.batch)
bvp.batches <- batchWithErr[T]{items: batchCopy, errCh: make(chan error), completedCh: make(chan struct{})}
bvp.batch = bvp.batch[:0]
bvp.batchReady <- struct{}{}
bvp.sendBatch("timer")
} else {
// Reset the timer if there are no items in the batch.
// If there are items in the batch, one of the workers will reset the timer.
bvp.log.Info("No items in batch, resetting timer")
bvp.timer.Reset(bvp.o.BatchTimeout)
}

bvp.batchMutex.Unlock()
}
}
}

func (bvp *BatchItemProcessor[T]) sendBatch(reason string) {
log := bvp.log.WithField("reason", reason)
log.Infof("Creating a batch of %d items", len(bvp.batch))

batchCopy := make([]*T, len(bvp.batch))
log.Infof("Copying batch items")
copy(batchCopy, bvp.batch)
log.Infof("Batch items copied")

bvp.batches <- batchWithErr[T]{items: batchCopy, errCh: make(chan error), completedCh: make(chan struct{})}
log.Infof("Batch sent to batches channel")

bvp.batch = bvp.batch[:0]
log.Infof("Batch cleared")

bvp.batchReady <- struct{}{}
log.Infof("Signaled batchReady")
}

// worker removes items from the `queue` channel until processor
// is shut down. It calls the exporter in batches of up to MaxExportBatchSize
// waiting up to BatchTimeout to form a batch.
func (bvp *BatchItemProcessor[T]) worker(ctx context.Context) {
func (bvp *BatchItemProcessor[T]) worker(ctx context.Context, number int) {
bvp.log.Infof("Starting worker %d", number)

for {
select {
case <-bvp.stopWorkersCh:
bvp.log.Infof("Stopping worker %d", number)

return
case <-bvp.batchReady:
bvp.log.Infof("Worker %d is processing a batch", number)

bvp.timer.Reset(bvp.o.BatchTimeout)
batch := <-bvp.batches

bvp.log.Infof("Worker %d is exporting a batch of %d items", number, len(batch.items))

if err := bvp.exportWithTimeout(ctx, batch.items); err != nil {
bvp.log.WithError(err).Error("failed to export items")

Expand All @@ -464,6 +500,8 @@ func (bvp *BatchItemProcessor[T]) worker(ctx context.Context) {
}
}

bvp.log.Infof("Batch completed by worker %d", number)

batch.completedCh <- struct{}{}
}
}
Expand All @@ -472,17 +510,23 @@ func (bvp *BatchItemProcessor[T]) worker(ctx context.Context) {
// drainQueue awaits the any caller that had added to bvp.stopWait
// to finish the enqueue, then exports the final batch.
func (bvp *BatchItemProcessor[T]) drainQueue() {
bvp.log.Info("Draining queue: waiting for the batch builder to pull all the items from the queue")

// Wait for the batch builder to send all remaining items to the workers.
for len(bvp.queue) > 0 {
time.Sleep(10 * time.Millisecond)
}

bvp.log.Info("Draining queue: waiting for workers to finish processing batches")

// Wait for the workers to finish processing all batches.
for len(bvp.batches) > 0 {
batch := <-bvp.batches
<-batch.completedCh
}

bvp.log.Info("Draining queue: all batches finished")

// Close the batches channel since no more batches will be sent.
close(bvp.batches)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/processor/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ func TestBatchItemProcessorShutdown(t *testing.T) {

func TestBatchItemProcessorDrainQueue(t *testing.T) {
be := testBatchExporter[TestItem]{}
bsp, err := NewBatchItemProcessor[TestItem](&be, "processor", nullLogger(), WithMaxExportBatchSize(5), WithBatchTimeout(5*time.Minute), WithWorkers(5))
log := logrus.New()
bsp, err := NewBatchItemProcessor[TestItem](&be, "processor", log, WithMaxExportBatchSize(5), WithBatchTimeout(1*time.Second), WithWorkers(2), WithShippingMethod(ShippingMethodAsync))
require.NoError(t, err)

itemsToExport := 50
Expand Down

0 comments on commit 413e0e1

Please sign in to comment.