From 1e4299a97e8286fc459ce8dc619100e9d51a78b9 Mon Sep 17 00:00:00 2001 From: Jan Grodowski Date: Mon, 9 Dec 2024 16:45:26 +0100 Subject: [PATCH] Add locking to data_iterator_test.go Fixes `fatal error: concurrent map read and map write` that can occur when listeners are called concurrently --- test/go/data_iterator_test.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/test/go/data_iterator_test.go b/test/go/data_iterator_test.go index 3a2f6109..6d510423 100644 --- a/test/go/data_iterator_test.go +++ b/test/go/data_iterator_test.go @@ -2,21 +2,22 @@ package test import ( "fmt" - "github.com/stretchr/testify/suite" "sync" "testing" "github.com/Shopify/ghostferry" "github.com/Shopify/ghostferry/testhelpers" + "github.com/stretchr/testify/suite" ) type DataIteratorTestSuite struct { *testhelpers.GhostferryUnitTestSuite - di *ghostferry.DataIterator - wg *sync.WaitGroup - tables []*ghostferry.TableSchema - receivedRows map[string][]ghostferry.RowData + di *ghostferry.DataIterator + wg *sync.WaitGroup + tables []*ghostferry.TableSchema + receivedRows map[string][]ghostferry.RowData + receivedRowsMutex sync.Mutex } func (this *DataIteratorTestSuite) SetupTest() { @@ -61,6 +62,9 @@ func (this *DataIteratorTestSuite) SetupTest() { this.receivedRows = make(map[string][]ghostferry.RowData, 0) this.di.AddBatchListener(func(ev *ghostferry.RowBatch) error { + this.receivedRowsMutex.Lock() + defer this.receivedRowsMutex.Unlock() + this.receivedRows[ev.TableSchema().Name] = append(this.receivedRows[ev.TableSchema().Name], ev.Values()...) return nil })