From ef012903177720656f286306c768afea13703e18 Mon Sep 17 00:00:00 2001 From: Yi Jin Date: Mon, 25 Nov 2024 10:05:11 -0800 Subject: [PATCH 1/5] [Receive] fix race condition Signed-off-by: Yi Jin --- docs/sharding.md | 2 +- pkg/receive/multitsdb.go | 70 ++++++++--------------------------- pkg/receive/multitsdb_test.go | 42 +++++++++++++++++++++ 3 files changed, 58 insertions(+), 56 deletions(-) diff --git a/docs/sharding.md b/docs/sharding.md index 9cfa0fbf8a..f943ec071b 100644 --- a/docs/sharding.md +++ b/docs/sharding.md @@ -18,7 +18,7 @@ Queries against store gateway which are touching large number of blocks (no matt # Relabelling -Similar to [promtail](https://grafana.com/docs/loki/latest/send-data/promtail/configuration/#relabel_configs) this config follows native [Prometheus relabel-config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config) syntax. +Similar to [promtail](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#relabel_configs) this config follows native [Prometheus relabel-config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config) syntax. Currently, thanos only supports the following relabel actions: diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 10c41d32ba..0c2498b931 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -64,12 +64,6 @@ type MultiTSDB struct { hashFunc metadata.HashFunc hashringConfigs []HashringConfig - tsdbClients []store.Client - tsdbClientsNeedUpdate bool - - exemplarClients map[string]*exemplars.TSDB - exemplarClientsNeedUpdate bool - metricNameFilterEnabled bool headExpandedPostingsCacheSize uint64 @@ -117,19 +111,17 @@ func NewMultiTSDB( } mt := &MultiTSDB{ - dataDir: dataDir, - logger: log.With(l, "component", "multi-tsdb"), - reg: reg, - tsdbOpts: tsdbOpts, - mtx: &sync.RWMutex{}, - tenants: map[string]*tenant{}, - labels: labels, - tsdbClientsNeedUpdate: true, - exemplarClientsNeedUpdate: true, - tenantLabelName: tenantLabelName, - bucket: bucket, - allowOutOfOrderUpload: allowOutOfOrderUpload, - hashFunc: hashFunc, + dataDir: dataDir, + logger: log.With(l, "component", "multi-tsdb"), + reg: reg, + tsdbOpts: tsdbOpts, + mtx: &sync.RWMutex{}, + tenants: map[string]*tenant{}, + labels: labels, + tenantLabelName: tenantLabelName, + bucket: bucket, + allowOutOfOrderUpload: allowOutOfOrderUpload, + hashFunc: hashFunc, } for _, option := range options { @@ -434,8 +426,6 @@ func (t *MultiTSDB) Prune(ctx context.Context) error { level.Info(t.logger).Log("msg", "Pruned tenant", "tenant", tenantID) delete(t.tenants, tenantID) - t.tsdbClientsNeedUpdate = true - t.exemplarClientsNeedUpdate = true } return merr.Err() @@ -597,17 +587,7 @@ func (t *MultiTSDB) RemoveLockFilesIfAny() error { func (t *MultiTSDB) TSDBLocalClients() []store.Client { t.mtx.RLock() - if !t.tsdbClientsNeedUpdate { - t.mtx.RUnlock() - return t.tsdbClients - } - - t.mtx.RUnlock() - t.mtx.Lock() - defer t.mtx.Unlock() - if !t.tsdbClientsNeedUpdate { - return t.tsdbClients - } + defer t.mtx.RUnlock() res := make([]store.Client, 0, len(t.tenants)) for _, tenant := range t.tenants { @@ -617,25 +597,12 @@ func (t *MultiTSDB) TSDBLocalClients() []store.Client { } } - t.tsdbClientsNeedUpdate = false - t.tsdbClients = res - - return t.tsdbClients + return res } func (t *MultiTSDB) TSDBExemplars() map[string]*exemplars.TSDB { t.mtx.RLock() - if !t.exemplarClientsNeedUpdate { - t.mtx.RUnlock() - return t.exemplarClients - } - t.mtx.RUnlock() - t.mtx.Lock() - defer t.mtx.Unlock() - - if !t.exemplarClientsNeedUpdate { - return t.exemplarClients - } + defer t.mtx.RUnlock() res := make(map[string]*exemplars.TSDB, len(t.tenants)) for k, tenant := range t.tenants { @@ -644,10 +611,7 @@ func (t *MultiTSDB) TSDBExemplars() map[string]*exemplars.TSDB { res[k] = e } } - - t.exemplarClientsNeedUpdate = false - t.exemplarClients = res - return t.exemplarClients + return res } func (t *MultiTSDB) TenantStats(limit int, statsByLabelName string, tenantIDs ...string) []status.TenantStats { @@ -742,8 +706,6 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant if err != nil { t.mtx.Lock() delete(t.tenants, tenantID) - t.tsdbClientsNeedUpdate = true - t.exemplarClientsNeedUpdate = true t.mtx.Unlock() return err } @@ -796,8 +758,6 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan tenant = newTenant() t.tenants[tenantID] = tenant - t.tsdbClientsNeedUpdate = true - t.exemplarClientsNeedUpdate = true t.mtx.Unlock() logger := log.With(t.logger, "tenant", tenantID) diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index 4bee9c0514..a846dfbbdd 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -5,6 +5,7 @@ package receive import ( "context" + "fmt" "io" "math" "os" @@ -541,6 +542,47 @@ func TestMultiTSDBRecreatePrunedTenant(t *testing.T) { testutil.Equals(t, 1, len(m.TSDBLocalClients())) } +func TestMultiTSDBAddNewTenant(t *testing.T) { + t.Parallel() + const iterations = 10 + // This test detects race conditions, so we run it multiple times to increase the chance of catching the issue. + for i := 0; i < iterations; i++ { + t.Run(fmt.Sprintf("iteration-%d", i), func(t *testing.T) { + dir := t.TempDir() + m := NewMultiTSDB(dir, log.NewNopLogger(), prometheus.NewRegistry(), + &tsdb.Options{ + MinBlockDuration: (2 * time.Hour).Milliseconds(), + MaxBlockDuration: (2 * time.Hour).Milliseconds(), + RetentionDuration: (6 * time.Hour).Milliseconds(), + }, + labels.FromStrings("replica", "test"), + "tenant_id", + objstore.NewInMemBucket(), + false, + metadata.NoneFunc, + ) + defer func() { testutil.Ok(t, m.Close()) }() + + concurrency := 50 + var wg sync.WaitGroup + for i := 0; i < concurrency; i++ { + wg.Add(1) + // simulate remote write with new tenant concurrently + go func(i int) { + defer wg.Done() + testutil.Ok(t, appendSample(m, fmt.Sprintf("tenant-%d", i), time.UnixMilli(int64(10)))) + }(i) + // simulate read request concurrently + go func() { + m.TSDBLocalClients() + }() + } + wg.Wait() + testutil.Equals(t, concurrency, len(m.TSDBLocalClients())) + }) + } +} + func TestAlignedHeadFlush(t *testing.T) { t.Parallel() From 8dbd7e472d094df6d9c012cfeda27ac1c5487f1c Mon Sep 17 00:00:00 2001 From: Yi Jin Date: Mon, 25 Nov 2024 10:10:34 -0800 Subject: [PATCH 2/5] add a change log Signed-off-by: Yi Jin --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a2956561a..b9fb86439c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7893](https://github.com/thanos-io/thanos/pull/7893) Sidecar: Fix retrieval of external labels for Prometheus v3.0.0. - [#7903](https://github.com/thanos-io/thanos/pull/7903) Query: Fix panic on regex store matchers. - [#7915](https://github.com/thanos-io/thanos/pull/7915) Store: Close block series client at the end to not reuse chunk buffer +- [#7941](https://github.com/thanos-io/thanos/pull/7941) Receive: Fix race condition when adding multiple new tenants, see [issue-7892](https://github.com/thanos-io/thanos/issues/7892). ### Added From 7193180b5cf458f95d18853aab5cd384ceeeaa85 Mon Sep 17 00:00:00 2001 From: Yi Jin Date: Tue, 26 Nov 2024 17:17:00 -0800 Subject: [PATCH 3/5] memorize tsdb local clients without race condition Signed-off-by: Yi Jin --- pkg/receive/multitsdb.go | 71 ++++++++++++++++++++++++++-------------- 1 file changed, 47 insertions(+), 24 deletions(-) diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 0c2498b931..39c947011e 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -64,6 +64,9 @@ type MultiTSDB struct { hashFunc metadata.HashFunc hashringConfigs []HashringConfig + tsdbClients []store.Client + exemplarClients map[string]*exemplars.TSDB + metricNameFilterEnabled bool headExpandedPostingsCacheSize uint64 @@ -118,6 +121,8 @@ func NewMultiTSDB( mtx: &sync.RWMutex{}, tenants: map[string]*tenant{}, labels: labels, + tsdbClients: make([]store.Client, 0), + exemplarClients: map[string]*exemplars.TSDB{}, tenantLabelName: tenantLabelName, bucket: bucket, allowOutOfOrderUpload: allowOutOfOrderUpload, @@ -131,6 +136,42 @@ func NewMultiTSDB( return mt } +func (t *MultiTSDB) updateTSDBClients() { + t.tsdbClients = t.tsdbClients[:0] + for _, tenant := range t.tenants { + client := tenant.client() + if client != nil { + t.tsdbClients = append(t.tsdbClients, client) + } + } +} + +func (t *MultiTSDB) addTenantUnlocked(tenantID string, newTenant *tenant) { + t.tenants[tenantID] = newTenant + t.updateTSDBClients() + if newTenant.exemplars() != nil { + t.exemplarClients[tenantID] = newTenant.exemplars() + } +} + +func (t *MultiTSDB) addTenantLocked(tenantID string, newTenant *tenant) { + t.mtx.Lock() + defer t.mtx.Unlock() + t.addTenantUnlocked(tenantID, newTenant) +} + +func (t *MultiTSDB) removeTenantUnlocked(tenantID string) { + delete(t.tenants, tenantID) + delete(t.exemplarClients, tenantID) + t.updateTSDBClients() +} + +func (t *MultiTSDB) removeTenantLocked(tenantID string) { + t.mtx.Lock() + defer t.mtx.Unlock() + t.removeTenantUnlocked(tenantID) +} + type localClient struct { store *store.TSDBStore @@ -425,7 +466,7 @@ func (t *MultiTSDB) Prune(ctx context.Context) error { } level.Info(t.logger).Log("msg", "Pruned tenant", "tenant", tenantID) - delete(t.tenants, tenantID) + t.removeTenantUnlocked(tenantID) } return merr.Err() @@ -588,30 +629,13 @@ func (t *MultiTSDB) RemoveLockFilesIfAny() error { func (t *MultiTSDB) TSDBLocalClients() []store.Client { t.mtx.RLock() defer t.mtx.RUnlock() - - res := make([]store.Client, 0, len(t.tenants)) - for _, tenant := range t.tenants { - client := tenant.client() - if client != nil { - res = append(res, client) - } - } - - return res + return t.tsdbClients } func (t *MultiTSDB) TSDBExemplars() map[string]*exemplars.TSDB { t.mtx.RLock() defer t.mtx.RUnlock() - - res := make(map[string]*exemplars.TSDB, len(t.tenants)) - for k, tenant := range t.tenants { - e := tenant.exemplars() - if e != nil { - res[k] = e - } - } - return res + return t.exemplarClients } func (t *MultiTSDB) TenantStats(limit int, statsByLabelName string, tenantIDs ...string) []status.TenantStats { @@ -704,9 +728,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant nil, ) if err != nil { - t.mtx.Lock() - delete(t.tenants, tenantID) - t.mtx.Unlock() + t.removeTenantLocked(tenantID) return err } var ship *shipper.Shipper @@ -729,6 +751,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant options = append(options, store.WithCuckooMetricNameStoreFilter()) } tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, options...), s, ship, exemplars.NewTSDB(s, lset)) + t.addTenantLocked(tenantID, tenant) // need to update the client list once store is ready & client != nil level.Info(logger).Log("msg", "TSDB is now ready") return nil } @@ -757,7 +780,7 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan } tenant = newTenant() - t.tenants[tenantID] = tenant + t.addTenantUnlocked(tenantID, tenant) t.mtx.Unlock() logger := log.With(t.logger, "tenant", tenantID) From ca6c149b347fd65b10b0e3e97167f64ce1ba2e85 Mon Sep 17 00:00:00 2001 From: Yi Jin Date: Tue, 26 Nov 2024 19:07:20 -0800 Subject: [PATCH 4/5] fix data race in testing with some concurrent safe helper functions Signed-off-by: Yi Jin --- pkg/receive/multitsdb.go | 20 ++++++++++++++++++++ pkg/receive/multitsdb_test.go | 4 ++-- pkg/receive/receive_test.go | 12 ++++++------ 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 39c947011e..32ac278477 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -136,6 +136,24 @@ func NewMultiTSDB( return mt } +// getTenant returns the tenant with the given tenantID. +// @testing: This method is only for testing purposes. +func (t *MultiTSDB) getTenant(tenantID string) *tenant { + t.mtx.RLock() + defer t.mtx.RUnlock() + return t.tenants[tenantID] +} + +// tsdbLocalClientsCopied returns a copy of tsdbClients. +// @testing: This method is only for testing purposes. +func (t *MultiTSDB) tsdbLocalClientsCopied() []store.Client { + t.mtx.Lock() + defer t.mtx.Unlock() + copied := make([]store.Client, len(t.tsdbClients)) + copy(copied, t.tsdbClients) + return copied +} + func (t *MultiTSDB) updateTSDBClients() { t.tsdbClients = t.tsdbClients[:0] for _, tenant := range t.tenants { @@ -626,12 +644,14 @@ func (t *MultiTSDB) RemoveLockFilesIfAny() error { return merr.Err() } +// TSDBLocalClients should be used as read-only. func (t *MultiTSDB) TSDBLocalClients() []store.Client { t.mtx.RLock() defer t.mtx.RUnlock() return t.tsdbClients } +// TSDBExemplars should be used as read-only. func (t *MultiTSDB) TSDBExemplars() map[string]*exemplars.TSDB { t.mtx.RLock() defer t.mtx.RUnlock() diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index a846dfbbdd..c4da47841e 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -194,7 +194,7 @@ func TestMultiTSDB(t *testing.T) { testutil.Ok(t, m.Open()) testutil.Ok(t, appendSample(m, testTenant, time.Now())) - tenant := m.tenants[testTenant] + tenant := m.getTenant(testTenant) db := tenant.readyStorage().Get() testutil.Equals(t, 0, len(db.Blocks())) @@ -843,7 +843,7 @@ func appendSampleWithLabels(m *MultiTSDB, tenant string, lbls labels.Labels, tim func queryLabelValues(ctx context.Context, m *MultiTSDB) error { proxy := store.NewProxyStore(nil, nil, func() []store.Client { - clients := m.TSDBLocalClients() + clients := m.tsdbLocalClientsCopied() if len(clients) > 0 { clients[0] = &slowClient{clients[0]} } diff --git a/pkg/receive/receive_test.go b/pkg/receive/receive_test.go index 7e90c3c204..dd9b33048a 100644 --- a/pkg/receive/receive_test.go +++ b/pkg/receive/receive_test.go @@ -210,7 +210,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) { for _, c := range tc.cfg { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.getTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -294,7 +294,7 @@ func TestLabelSetsOfTenantsWhenAddingTenants(t *testing.T) { for _, c := range initialConfig { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.getTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -319,7 +319,7 @@ func TestLabelSetsOfTenantsWhenAddingTenants(t *testing.T) { for _, c := range changedConfig { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.getTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -534,7 +534,7 @@ func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) { for _, c := range initialConfig { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.getTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -704,7 +704,7 @@ func TestAddingLabelsWhenTenantAppearsInMultipleHashrings(t *testing.T) { for _, c := range initialConfig { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.getTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -778,7 +778,7 @@ func TestReceiverLabelsNotOverwrittenByExternalLabels(t *testing.T) { for _, c := range cfg { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.getTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } From 0b2511518f2c7b4191df4baf445322e50d8fe79a Mon Sep 17 00:00:00 2001 From: Yi Jin Date: Mon, 2 Dec 2024 10:49:07 -0800 Subject: [PATCH 5/5] address comments Signed-off-by: Yi Jin --- pkg/receive/multitsdb.go | 15 ++------------- pkg/receive/multitsdb_test.go | 7 +++++-- pkg/receive/receive_test.go | 12 ++++++------ 3 files changed, 13 insertions(+), 21 deletions(-) diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 32ac278477..526e3c6ec9 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -136,24 +136,13 @@ func NewMultiTSDB( return mt } -// getTenant returns the tenant with the given tenantID. -// @testing: This method is only for testing purposes. -func (t *MultiTSDB) getTenant(tenantID string) *tenant { +// testGetTenant returns the tenant with the given tenantID for testing purposes. +func (t *MultiTSDB) testGetTenant(tenantID string) *tenant { t.mtx.RLock() defer t.mtx.RUnlock() return t.tenants[tenantID] } -// tsdbLocalClientsCopied returns a copy of tsdbClients. -// @testing: This method is only for testing purposes. -func (t *MultiTSDB) tsdbLocalClientsCopied() []store.Client { - t.mtx.Lock() - defer t.mtx.Unlock() - copied := make([]store.Client, len(t.tsdbClients)) - copy(copied, t.tsdbClients) - return copied -} - func (t *MultiTSDB) updateTSDBClients() { t.tsdbClients = t.tsdbClients[:0] for _, tenant := range t.tenants { diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index c4da47841e..a36db4b402 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -194,7 +194,7 @@ func TestMultiTSDB(t *testing.T) { testutil.Ok(t, m.Open()) testutil.Ok(t, appendSample(m, testTenant, time.Now())) - tenant := m.getTenant(testTenant) + tenant := m.testGetTenant(testTenant) db := tenant.readyStorage().Get() testutil.Equals(t, 0, len(db.Blocks())) @@ -843,7 +843,10 @@ func appendSampleWithLabels(m *MultiTSDB, tenant string, lbls labels.Labels, tim func queryLabelValues(ctx context.Context, m *MultiTSDB) error { proxy := store.NewProxyStore(nil, nil, func() []store.Client { - clients := m.tsdbLocalClientsCopied() + m.mtx.Lock() + defer m.mtx.Unlock() + clients := make([]store.Client, len(m.tsdbClients)) + copy(clients, m.tsdbClients) if len(clients) > 0 { clients[0] = &slowClient{clients[0]} } diff --git a/pkg/receive/receive_test.go b/pkg/receive/receive_test.go index dd9b33048a..bf38cb06ed 100644 --- a/pkg/receive/receive_test.go +++ b/pkg/receive/receive_test.go @@ -210,7 +210,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) { for _, c := range tc.cfg { for _, tenantId := range c.Tenants { - if m.getTenant(tenantId) == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -294,7 +294,7 @@ func TestLabelSetsOfTenantsWhenAddingTenants(t *testing.T) { for _, c := range initialConfig { for _, tenantId := range c.Tenants { - if m.getTenant(tenantId) == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -319,7 +319,7 @@ func TestLabelSetsOfTenantsWhenAddingTenants(t *testing.T) { for _, c := range changedConfig { for _, tenantId := range c.Tenants { - if m.getTenant(tenantId) == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -534,7 +534,7 @@ func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) { for _, c := range initialConfig { for _, tenantId := range c.Tenants { - if m.getTenant(tenantId) == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -704,7 +704,7 @@ func TestAddingLabelsWhenTenantAppearsInMultipleHashrings(t *testing.T) { for _, c := range initialConfig { for _, tenantId := range c.Tenants { - if m.getTenant(tenantId) == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -778,7 +778,7 @@ func TestReceiverLabelsNotOverwrittenByExternalLabels(t *testing.T) { for _, c := range cfg { for _, tenantId := range c.Tenants { - if m.getTenant(tenantId) == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) }