From a2801362298a2b25e27f02da22b6d8dea5795606 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Fri, 29 Nov 2024 11:32:52 +1000 Subject: [PATCH] fix(processor): Fix panic from nil items (#413) * fix(mimicry): Cap per-peer workers to 1 * fix(processor): Fix panic from nil items * fix(processor): Fix panic from nil items * style: Remove extra line in ExportTransactions function * refactor: Update item creation in batch_test.go * test: improve error messages in tests and add thread-safe getters * test: Add test for blocking exporter in batch processor --------- Co-authored-by: Matty Evans --- .../p2p/execution/event_transaction.go | 21 +- pkg/processor/batch.go | 32 ++- pkg/processor/batch_test.go | 211 ++++++++++++++++-- 3 files changed, 242 insertions(+), 22 deletions(-) diff --git a/pkg/mimicry/p2p/execution/event_transaction.go b/pkg/mimicry/p2p/execution/event_transaction.go index 2f0fd847..ab59f5b0 100644 --- a/pkg/mimicry/p2p/execution/event_transaction.go +++ b/pkg/mimicry/p2p/execution/event_transaction.go @@ -152,18 +152,31 @@ func (t TransactionExporter) Shutdown(ctx context.Context) error { } func (p *Peer) ExportTransactions(ctx context.Context, items []*TransactionHashItem) error { + if len(items) == 0 { + return nil + } + go func() { - hashes := make([]common.Hash, len(items)) - seenMap := map[common.Hash]time.Time{} + var hashes []common.Hash + + seenMap := make(map[common.Hash]time.Time, len(items)) + + for _, item := range items { + if item == nil { + continue + } - for i, item := range items { exists := p.sharedCache.Transaction.Get(item.Hash.String()) if exists == nil { - hashes[i] = item.Hash + hashes = append(hashes, item.Hash) seenMap[item.Hash] = item.Seen } } + if len(hashes) == 0 { + return + } + txs, err := p.client.GetPooledTransactions(ctx, hashes) if err != nil { p.log.WithError(err).Warn("Failed to get pooled transactions") diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index bcde579b..676f8c9d 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -234,6 +234,14 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { prepared := []*TraceableItem[T]{} for _, i := range s[start:end] { + if i == nil { + bvp.metrics.IncItemsDroppedBy(bvp.name, float64(1)) + + bvp.log.Warnf("Attempted to write a nil item. This item has been dropped. This probably shouldn't happen and is likely a bug.") + + continue + } + item := &TraceableItem[T]{ item: i, } @@ -284,8 +292,18 @@ func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBa defer cancel() } + // Since the batch processor filters out nil items upstream, + // we can optimize by pre-allocating the full slice size. + // Worst case is a few wasted allocations if any nil items slip through. items := make([]*T, len(itemsBatch)) + for i, item := range itemsBatch { + if item == nil { + bvp.log.Warnf("Attempted to export a nil item. This item has been dropped. This probably shouldn't happen and is likely a bug.") + + continue + } + items[i] = item.item } @@ -298,10 +316,10 @@ func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBa bvp.metrics.ObserveExportDuration(bvp.name, duration) if err != nil { - bvp.metrics.IncItemsFailedBy(bvp.name, float64(len(itemsBatch))) + bvp.metrics.IncItemsFailedBy(bvp.name, float64(len(items))) } else { - bvp.metrics.IncItemsExportedBy(bvp.name, float64(len(itemsBatch))) - bvp.metrics.ObserveBatchSize(bvp.name, float64(len(itemsBatch))) + bvp.metrics.IncItemsExportedBy(bvp.name, float64(len(items))) + bvp.metrics.ObserveBatchSize(bvp.name, float64(len(items))) } for _, item := range itemsBatch { @@ -427,6 +445,14 @@ func (bvp *BatchItemProcessor[T]) batchBuilder(ctx context.Context) { return case item := <-bvp.queue: + if item == nil { + bvp.metrics.IncItemsDroppedBy(bvp.name, float64(1)) + + bvp.log.Warnf("Attempted to build a batch with a nil item. This item has been dropped. This probably shouldn't happen and is likely a bug.") + + continue + } + batch = append(batch, item) if len(batch) >= bvp.o.MaxExportBatchSize { diff --git a/pkg/processor/batch_test.go b/pkg/processor/batch_test.go index 98e58ccf..4f0eb940 100644 --- a/pkg/processor/batch_test.go +++ b/pkg/processor/batch_test.go @@ -44,6 +44,9 @@ type testBatchExporter[T TestItem] struct { func (t *testBatchExporter[T]) ExportItems(ctx context.Context, items []*T) error { time.Sleep(t.delay) + t.mu.Lock() + defer t.mu.Unlock() + if t.failNextExport { t.failNextExport = false @@ -58,9 +61,6 @@ func (t *testBatchExporter[T]) ExportItems(ctx context.Context, items []*T) erro return errors.New("export failure") } - t.mu.Lock() - defer t.mu.Unlock() - if t.idx < len(t.errors) { t.droppedCount += len(items) err := t.errors[t.idx] @@ -85,6 +85,9 @@ func (t *testBatchExporter[T]) ExportItems(ctx context.Context, items []*T) erro } func (t *testBatchExporter[T]) Shutdown(context.Context) error { + t.mu.Lock() + defer t.mu.Unlock() + t.shutdownCount++ return nil @@ -138,7 +141,10 @@ func TestAsyncNewBatchItemProcessorWithOptions(t *testing.T) { name: "default", o: []BatchItemProcessorOption{ WithShippingMethod(ShippingMethodAsync), - WithBatchTimeout(10 * time.Millisecond), + // Don't set this too low. If the items aren't written fast enough, smaller batches may be created, + // increasing the total batch count and ultimately failing the test (flakey). In production, this + // defaults to 5000ms. + WithBatchTimeout(schDelay / 2), }, writeNumItems: 2048, genNumItems: 2053, @@ -190,9 +196,11 @@ func TestAsyncNewBatchItemProcessorWithOptions(t *testing.T) { time.Sleep(10 * time.Millisecond) } - if err := ssp.Write(context.Background(), []*TestItem{{ - name: "test", - }}); err != nil { + item := TestItem{ + name: "test" + strconv.Itoa(i), + } + + if err := ssp.Write(context.Background(), []*TestItem{&item}); err != nil { t.Errorf("%s: Error writing to BatchItemProcessor", option.name) } } @@ -201,14 +209,14 @@ func TestAsyncNewBatchItemProcessorWithOptions(t *testing.T) { gotNumOfItems := te.len() if option.wantNumItems > 0 && option.wantNumItems != gotNumOfItems { - t.Errorf("number of exported items: got %v, want %v", - gotNumOfItems, option.wantNumItems) + t.Errorf("%s: number of exported items: got %v, want %v", + option.name, gotNumOfItems, option.wantNumItems) } gotBatchCount := te.getBatchCount() if option.wantBatchCount > 0 && gotBatchCount != option.wantBatchCount { - t.Errorf("number batches: got %v, want %v", - gotBatchCount, option.wantBatchCount) + t.Errorf("%s: number batches: got %v, want %v", + option.name, gotBatchCount, option.wantBatchCount) t.Errorf("Batches %v", te.sizes) } }) @@ -222,11 +230,22 @@ type stuckExporter[T TestItem] struct { // ExportItems waits for ctx to expire and returns that error. func (e *stuckExporter[T]) ExportItems(ctx context.Context, _ []*T) error { <-ctx.Done() + + e.mu.Lock() // Use the existing mutex to protect `err` e.err = ctx.Err() + e.mu.Unlock() return ctx.Err() } +// Thread-safe getter for `err`. +func (e *stuckExporter[T]) GetError() error { + e.mu.Lock() // Use the existing mutex + defer e.mu.Unlock() + + return e.err +} + func TestBatchItemProcessorExportTimeout(t *testing.T) { exp := new(stuckExporter[TestItem]) bvp, err := NewBatchItemProcessor[TestItem]( @@ -249,7 +268,7 @@ func TestBatchItemProcessorExportTimeout(t *testing.T) { time.Sleep(1 * time.Millisecond) - if exp.err != context.DeadlineExceeded { + if !errors.Is(exp.GetError(), context.DeadlineExceeded) { t.Errorf("context deadline error not returned: got %+v", exp.err) } } @@ -341,20 +360,34 @@ func TestBatchItemProcessorPostShutdown(t *testing.T) { } type slowExporter[T TestItem] struct { + // itemsExported field is read within our tests while simultaneously being incremented or modified by one + // or more worker goroutines in the slowExporter.ExportItems. itemsExported int + mu sync.Mutex } func (slowExporter[T]) Shutdown(context.Context) error { return nil } + func (t *slowExporter[T]) ExportItems(ctx context.Context, items []*T) error { time.Sleep(100 * time.Millisecond) + t.mu.Lock() // Protect concurrent access t.itemsExported += len(items) + t.mu.Unlock() <-ctx.Done() return ctx.Err() } +// GetItemsExported is a thread-safe getter for itemsExported used +func (e *slowExporter[T]) GetItemsExported() int { + e.mu.Lock() + defer e.mu.Unlock() + + return e.itemsExported +} + func TestMultipleWorkersConsumeConcurrently(t *testing.T) { te := slowExporter[TestItem]{} bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(10), WithBatchTimeout(5*time.Minute), WithWorkers(20)) @@ -372,7 +405,7 @@ func TestMultipleWorkersConsumeConcurrently(t *testing.T) { time.Sleep(1 * time.Second) // give some time for workers to process - if te.itemsExported != itemsToExport { + if te.GetItemsExported() != itemsToExport { t.Errorf("Expected all items to be exported, got: %v", te.itemsExported) } } @@ -733,8 +766,23 @@ func TestBatchItemProcessorDrainOnShutdownAfterContextCancellation(t *testing.T) require.Greater(t, itemsToExport, 0, "No items should have been exported on shutdown") } +type blockingExporter[T TestItem] struct { + blockCh <-chan struct{} +} + +func (be *blockingExporter[T]) ExportItems(ctx context.Context, items []*T) error { + <-be.blockCh // Block until channel is closed + + return nil +} + +func (be *blockingExporter[T]) Shutdown(ctx context.Context) error { + return nil +} + func TestBatchItemProcessorQueueSize(t *testing.T) { - te := indefiniteExporter[TestItem]{} + blockCh := make(chan struct{}) // This should block the worker. We can unblock once we've filled the queue to test. + te := blockingExporter[TestItem]{blockCh: blockCh} metrics := NewMetrics("test") maxQueueSize := 5 @@ -773,14 +821,147 @@ func TestBatchItemProcessorQueueSize(t *testing.T) { // Ensure that the queue size is respected require.Equal(t, maxQueueSize, len(bsp.queue), "Queue size should be equal to maxQueueSize") + // Unblock the worker to allow processing to proceed + close(blockCh) + // Ensure that the dropped count is correct counter, err := bsp.metrics.itemsDropped.GetMetricWith(prometheus.Labels{"processor": "processor"}) require.NoError(t, err) metric := &dto.Metric{} - err = counter.Write(metric) require.NoError(t, err) require.Equal(t, float64(itemsToExport-maxQueueSize), *metric.Counter.Value, "Dropped count should be equal to the number of items that exceeded the queue size") } + +func TestBatchItemProcessorNilItem(t *testing.T) { + te := testBatchExporter[TestItem]{} + + bsp, err := NewBatchItemProcessor[TestItem]( + &te, + "processor", + nullLogger(), + WithBatchTimeout(10*time.Millisecond), + WithMaxQueueSize(5), + WithMaxExportBatchSize(5), + WithWorkers(1), + WithShippingMethod(ShippingMethodSync), + ) + require.NoError(t, err) + + bsp.Start(context.Background()) + + // Write nil item to processor + err = bsp.Write(context.Background(), []*TestItem{nil}) + require.NoError(t, err) + + // Write invalid items to processor + err = bsp.Write(context.Background(), nil) + require.NoError(t, err) + + // Give processor time to process the nil item + time.Sleep(500 * time.Millisecond) + + // Verify processor is still running by writing a valid item + err = bsp.Write(context.Background(), []*TestItem{{name: "test"}}) + require.NoError(t, err) +} + +func TestBatchItemProcessorNilExporter(t *testing.T) { + bsp, err := NewBatchItemProcessor[TestItem]( + nil, + "processor", + nullLogger(), + WithBatchTimeout(10*time.Millisecond), + WithMaxQueueSize(5), + WithMaxExportBatchSize(5), + WithWorkers(1), + WithShippingMethod(ShippingMethodSync), + ) + require.NoError(t, err) + + bsp.Start(context.Background()) + + // Write an item to processor + err = bsp.Write(context.Background(), []*TestItem{{name: "test"}}) + require.Error(t, err) +} + +func TestBatchItemProcessorNilExporterAfterProcessing(t *testing.T) { + exporter := &testBatchExporter[TestItem]{} + bsp, err := NewBatchItemProcessor[TestItem]( + exporter, + "processor", + nullLogger(), + WithBatchTimeout(500*time.Millisecond), + WithMaxQueueSize(5), + WithMaxExportBatchSize(5), + WithWorkers(1), + WithShippingMethod(ShippingMethodAsync), + ) + require.NoError(t, err) + + bsp.Start(context.Background()) + + // Write an item to processor with valid exporter + err = bsp.Write(context.Background(), []*TestItem{{name: "test"}}) + require.NoError(t, err) + + // Give processor time to process the item + time.Sleep(1000 * time.Millisecond) + + // Nil the exporter + bsp.e = nil + + // Write an item to processor with nil exporter + err = bsp.Write(context.Background(), []*TestItem{{name: "test"}}) + require.Error(t, err) + + // Verify we can still shutdown without panic + require.NotPanics(t, func() { + err := bsp.Shutdown(context.Background()) + require.NoError(t, err) + }) +} + +func TestBatchItemProcessorNilItemAfterQueue(t *testing.T) { + exporter := &testBatchExporter[TestItem]{} + bsp, err := NewBatchItemProcessor[TestItem]( + exporter, + "processor", + nullLogger(), + WithBatchTimeout(500*time.Millisecond), + WithMaxQueueSize(5), + WithMaxExportBatchSize(5), + WithWorkers(1), + WithShippingMethod(ShippingMethodAsync), + ) + require.NoError(t, err) + + bsp.Start(context.Background()) + + // Write a valid item first + item := &TestItem{name: "test"} + err = bsp.Write(context.Background(), []*TestItem{item}) + require.NoError(t, err) + + // Inject nil directly into the processor's queue + bsp.queue <- nil + + // Write a valid item to ensure the processor is still running + err = bsp.Write(context.Background(), []*TestItem{item}) + require.NoError(t, err) + + // Give processor time to handle the nil item + time.Sleep(1000 * time.Millisecond) + + // Ensure no panic during shutdown + require.NotPanics(t, func() { + err := bsp.Shutdown(context.Background()) + require.NoError(t, err) + }) + + // Verify that valid items were still exported + require.Equal(t, 2, exporter.len()) +}