Skip to content

Commit

Permalink
refactor: Improve error handling in enqueueOrDrop function
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Jun 17, 2024
1 parent 3863dcb commit d941849
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 34 deletions.
14 changes: 7 additions & 7 deletions pkg/processor/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,8 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error {
}

for _, i := range prepared {
if !bvp.enqueueOrDrop(ctx, i.item, i.errCh, i.completedCh) {
return errors.New("failed to enqueue item - queue is full")
if err := bvp.enqueueOrDrop(ctx, i); err != nil {
return err
}
}

Expand Down Expand Up @@ -458,24 +458,24 @@ func recoverSendOnClosedChan() {
panic(x)
}

func (bvp *BatchItemProcessor[T]) enqueueOrDrop(ctx context.Context, sd *T, errCh chan error, completedCh chan struct{}) bool {
func (bvp *BatchItemProcessor[T]) enqueueOrDrop(ctx context.Context, item traceableItem[T]) error {
// This ensures the bvp.queue<- below does not panic as the
// processor shuts down.
defer recoverSendOnClosedChan()

select {
case <-bvp.stopCh:
return false
return errors.New("processor is shutting down")
default:
}

select {
case bvp.queue <- traceableItem[T]{item: sd, errCh: errCh, completedCh: completedCh}:
return true
case bvp.queue <- item:
return nil
default:
atomic.AddUint32(&bvp.dropped, 1)
bvp.metrics.IncItemsDroppedBy(bvp.name, float64(1))
}

return false
return errors.New("queue is full")
}
29 changes: 2 additions & 27 deletions pkg/processor/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,9 +651,9 @@ func TestBatchItemProcessorWithBatchTimeout(t *testing.T) {
}

func TestBatchItemProcessorQueueSize(t *testing.T) {
te := testBatchExporter[TestItem]{}
te := indefiniteExporter[TestItem]{}
maxQueueSize := 5
bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxQueueSize(maxQueueSize), WithMaxExportBatchSize(2), WithWorkers(1))
bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithBatchTimeout(10*time.Minute), WithMaxQueueSize(maxQueueSize), WithMaxExportBatchSize(maxQueueSize), WithWorkers(1), WithShippingMethod(ShippingMethodAsync))
require.NoError(t, err)

itemsToExport := 10
Expand All @@ -675,31 +675,6 @@ func TestBatchItemProcessorQueueSize(t *testing.T) {

// Ensure that the queue size is respected
require.Equal(t, maxQueueSize, len(bsp.queue), "Queue size should be equal to maxQueueSize")
}

func TestBatchItemProcessorDropEvents(t *testing.T) {
te := testBatchExporter[TestItem]{}
maxQueueSize := 5
bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxQueueSize(maxQueueSize), WithMaxExportBatchSize(2), WithWorkers(1))
require.NoError(t, err)

itemsToExport := 10
items := make([]*TestItem, itemsToExport)

for i := 0; i < itemsToExport; i++ {
items[i] = &TestItem{name: strconv.Itoa(i)}
}

// Write items to the processor
for i := 0; i < itemsToExport; i++ {
err := bsp.Write(context.Background(), []*TestItem{items[i]})
if i < maxQueueSize {
require.NoError(t, err, "Expected no error for item %d", i)
} else {
require.Error(t, err, "Expected an error for item %d due to queue size limit", i)
}
}

// Ensure that the dropped count is correct
require.Equal(t, uint32(itemsToExport-maxQueueSize), bsp.dropped, "Dropped count should be equal to the number of items that exceeded the queue size")
}

0 comments on commit d941849

Please sign in to comment.