From 7599cb53ef650887e843bcd80c09b5ed609dca5c Mon Sep 17 00:00:00 2001 From: Yi Jin <96499497+jnyi@users.noreply.github.com> Date: Tue, 3 Dec 2024 00:52:06 -0800 Subject: [PATCH] [Receive] Fix race condition when adding multiple new tenants at once (#7941) * [Receive] fix race condition Signed-off-by: Yi Jin * add a change log Signed-off-by: Yi Jin * memorize tsdb local clients without race condition Signed-off-by: Yi Jin * fix data race in testing with some concurrent safe helper functions Signed-off-by: Yi Jin * address comments Signed-off-by: Yi Jin --------- Signed-off-by: Yi Jin --- CHANGELOG.md | 4 + docs/sharding.md | 2 +- pkg/receive/multitsdb.go | 140 ++++++++++++++++------------------ pkg/receive/multitsdb_test.go | 49 +++++++++++- pkg/receive/receive_test.go | 12 +-- 5 files changed, 124 insertions(+), 83 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ef69c15ca..c22d0983fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,10 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7852](https://github.com/thanos-io/thanos/pull/7852) Query Frontend: pass "stats" parameter forward to queriers and fix Prometheus stats merging. - [#7832](https://github.com/thanos-io/thanos/pull/7832) Query Frontend: Fix cache keys for dynamic split intervals. - [#7885](https://github.com/thanos-io/thanos/pull/7885) Store: Return chunks to the pool after completing a Series call. +- [#7893](https://github.com/thanos-io/thanos/pull/7893) Sidecar: Fix retrieval of external labels for Prometheus v3.0.0. +- [#7903](https://github.com/thanos-io/thanos/pull/7903) Query: Fix panic on regex store matchers. +- [#7915](https://github.com/thanos-io/thanos/pull/7915) Store: Close block series client at the end to not reuse chunk buffer +- [#7941](https://github.com/thanos-io/thanos/pull/7941) Receive: Fix race condition when adding multiple new tenants, see [issue-7892](https://github.com/thanos-io/thanos/issues/7892). ### Added - [#7763](https://github.com/thanos-io/thanos/pull/7763) Ruler: use native histograms for client latency metrics. diff --git a/docs/sharding.md b/docs/sharding.md index 9cfa0fbf8a..f943ec071b 100644 --- a/docs/sharding.md +++ b/docs/sharding.md @@ -18,7 +18,7 @@ Queries against store gateway which are touching large number of blocks (no matt # Relabelling -Similar to [promtail](https://grafana.com/docs/loki/latest/send-data/promtail/configuration/#relabel_configs) this config follows native [Prometheus relabel-config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config) syntax. +Similar to [promtail](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#relabel_configs) this config follows native [Prometheus relabel-config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config) syntax. Currently, thanos only supports the following relabel actions: diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 1997e0fd38..da974d140c 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -64,11 +64,8 @@ type MultiTSDB struct { hashFunc metadata.HashFunc hashringConfigs []HashringConfig - tsdbClients []store.Client - tsdbClientsNeedUpdate bool - - exemplarClients map[string]*exemplars.TSDB - exemplarClientsNeedUpdate bool + tsdbClients []store.Client + exemplarClients map[string]*exemplars.TSDB metricNameFilterEnabled bool @@ -117,19 +114,19 @@ func NewMultiTSDB( } mt := &MultiTSDB{ - dataDir: dataDir, - logger: log.With(l, "component", "multi-tsdb"), - reg: reg, - tsdbOpts: tsdbOpts, - mtx: &sync.RWMutex{}, - tenants: map[string]*tenant{}, - labels: labels, - tsdbClientsNeedUpdate: true, - exemplarClientsNeedUpdate: true, - tenantLabelName: tenantLabelName, - bucket: bucket, - allowOutOfOrderUpload: allowOutOfOrderUpload, - hashFunc: hashFunc, + dataDir: dataDir, + logger: log.With(l, "component", "multi-tsdb"), + reg: reg, + tsdbOpts: tsdbOpts, + mtx: &sync.RWMutex{}, + tenants: map[string]*tenant{}, + labels: labels, + tsdbClients: make([]store.Client, 0), + exemplarClients: map[string]*exemplars.TSDB{}, + tenantLabelName: tenantLabelName, + bucket: bucket, + allowOutOfOrderUpload: allowOutOfOrderUpload, + hashFunc: hashFunc, } for _, option := range options { @@ -139,6 +136,49 @@ func NewMultiTSDB( return mt } +// testGetTenant returns the tenant with the given tenantID for testing purposes. +func (t *MultiTSDB) testGetTenant(tenantID string) *tenant { + t.mtx.RLock() + defer t.mtx.RUnlock() + return t.tenants[tenantID] +} + +func (t *MultiTSDB) updateTSDBClients() { + t.tsdbClients = t.tsdbClients[:0] + for _, tenant := range t.tenants { + client := tenant.client() + if client != nil { + t.tsdbClients = append(t.tsdbClients, client) + } + } +} + +func (t *MultiTSDB) addTenantUnlocked(tenantID string, newTenant *tenant) { + t.tenants[tenantID] = newTenant + t.updateTSDBClients() + if newTenant.exemplars() != nil { + t.exemplarClients[tenantID] = newTenant.exemplars() + } +} + +func (t *MultiTSDB) addTenantLocked(tenantID string, newTenant *tenant) { + t.mtx.Lock() + defer t.mtx.Unlock() + t.addTenantUnlocked(tenantID, newTenant) +} + +func (t *MultiTSDB) removeTenantUnlocked(tenantID string) { + delete(t.tenants, tenantID) + delete(t.exemplarClients, tenantID) + t.updateTSDBClients() +} + +func (t *MultiTSDB) removeTenantLocked(tenantID string) { + t.mtx.Lock() + defer t.mtx.Unlock() + t.removeTenantUnlocked(tenantID) +} + type localClient struct { store *store.TSDBStore @@ -433,9 +473,7 @@ func (t *MultiTSDB) Prune(ctx context.Context) error { } level.Info(t.logger).Log("msg", "Pruned tenant", "tenant", tenantID) - delete(t.tenants, tenantID) - t.tsdbClientsNeedUpdate = true - t.exemplarClientsNeedUpdate = true + t.removeTenantUnlocked(tenantID) } return merr.Err() @@ -595,58 +633,17 @@ func (t *MultiTSDB) RemoveLockFilesIfAny() error { return merr.Err() } +// TSDBLocalClients should be used as read-only. func (t *MultiTSDB) TSDBLocalClients() []store.Client { t.mtx.RLock() - if !t.tsdbClientsNeedUpdate { - t.mtx.RUnlock() - return t.tsdbClients - } - - t.mtx.RUnlock() - t.mtx.Lock() - defer t.mtx.Unlock() - if !t.tsdbClientsNeedUpdate { - return t.tsdbClients - } - - res := make([]store.Client, 0, len(t.tenants)) - for _, tenant := range t.tenants { - client := tenant.client() - if client != nil { - res = append(res, client) - } - } - - t.tsdbClientsNeedUpdate = false - t.tsdbClients = res - + defer t.mtx.RUnlock() return t.tsdbClients } +// TSDBExemplars should be used as read-only. func (t *MultiTSDB) TSDBExemplars() map[string]*exemplars.TSDB { t.mtx.RLock() - if !t.exemplarClientsNeedUpdate { - t.mtx.RUnlock() - return t.exemplarClients - } - t.mtx.RUnlock() - t.mtx.Lock() - defer t.mtx.Unlock() - - if !t.exemplarClientsNeedUpdate { - return t.exemplarClients - } - - res := make(map[string]*exemplars.TSDB, len(t.tenants)) - for k, tenant := range t.tenants { - e := tenant.exemplars() - if e != nil { - res[k] = e - } - } - - t.exemplarClientsNeedUpdate = false - t.exemplarClients = res + defer t.mtx.RUnlock() return t.exemplarClients } @@ -740,11 +737,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant nil, ) if err != nil { - t.mtx.Lock() - delete(t.tenants, tenantID) - t.tsdbClientsNeedUpdate = true - t.exemplarClientsNeedUpdate = true - t.mtx.Unlock() + t.removeTenantLocked(tenantID) return err } var ship *shipper.Shipper @@ -767,6 +760,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant options = append(options, store.WithCuckooMetricNameStoreFilter()) } tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, options...), s, ship, exemplars.NewTSDB(s, lset)) + t.addTenantLocked(tenantID, tenant) // need to update the client list once store is ready & client != nil level.Info(logger).Log("msg", "TSDB is now ready") return nil } @@ -795,9 +789,7 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan } tenant = newTenant() - t.tenants[tenantID] = tenant - t.tsdbClientsNeedUpdate = true - t.exemplarClientsNeedUpdate = true + t.addTenantUnlocked(tenantID, tenant) t.mtx.Unlock() logger := log.With(t.logger, "tenant", tenantID) diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index eb82c22281..bafd882e0e 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -5,6 +5,7 @@ package receive import ( "context" + "fmt" "io" "math" "os" @@ -191,7 +192,7 @@ func TestMultiTSDB(t *testing.T) { testutil.Ok(t, m.Open()) testutil.Ok(t, appendSample(m, testTenant, time.Now())) - tenant := m.tenants[testTenant] + tenant := m.testGetTenant(testTenant) db := tenant.readyStorage().Get() testutil.Equals(t, 0, len(db.Blocks())) @@ -535,6 +536,47 @@ func TestMultiTSDBRecreatePrunedTenant(t *testing.T) { testutil.Equals(t, 1, len(m.TSDBLocalClients())) } +func TestMultiTSDBAddNewTenant(t *testing.T) { + t.Parallel() + const iterations = 10 + // This test detects race conditions, so we run it multiple times to increase the chance of catching the issue. + for i := 0; i < iterations; i++ { + t.Run(fmt.Sprintf("iteration-%d", i), func(t *testing.T) { + dir := t.TempDir() + m := NewMultiTSDB(dir, log.NewNopLogger(), prometheus.NewRegistry(), + &tsdb.Options{ + MinBlockDuration: (2 * time.Hour).Milliseconds(), + MaxBlockDuration: (2 * time.Hour).Milliseconds(), + RetentionDuration: (6 * time.Hour).Milliseconds(), + }, + labels.FromStrings("replica", "test"), + "tenant_id", + objstore.NewInMemBucket(), + false, + metadata.NoneFunc, + ) + defer func() { testutil.Ok(t, m.Close()) }() + + concurrency := 50 + var wg sync.WaitGroup + for i := 0; i < concurrency; i++ { + wg.Add(1) + // simulate remote write with new tenant concurrently + go func(i int) { + defer wg.Done() + testutil.Ok(t, appendSample(m, fmt.Sprintf("tenant-%d", i), time.UnixMilli(int64(10)))) + }(i) + // simulate read request concurrently + go func() { + m.TSDBLocalClients() + }() + } + wg.Wait() + testutil.Equals(t, concurrency, len(m.TSDBLocalClients())) + }) + } +} + func TestAlignedHeadFlush(t *testing.T) { hourInSeconds := int64(1 * 60 * 60) @@ -787,7 +829,10 @@ func appendSampleWithLabels(m *MultiTSDB, tenant string, lbls labels.Labels, tim func queryLabelValues(ctx context.Context, m *MultiTSDB) error { proxy := store.NewProxyStore(nil, nil, func() []store.Client { - clients := m.TSDBLocalClients() + m.mtx.Lock() + defer m.mtx.Unlock() + clients := make([]store.Client, len(m.tsdbClients)) + copy(clients, m.tsdbClients) if len(clients) > 0 { clients[0] = &slowClient{clients[0]} } diff --git a/pkg/receive/receive_test.go b/pkg/receive/receive_test.go index ea1b6d81fd..08fe68e229 100644 --- a/pkg/receive/receive_test.go +++ b/pkg/receive/receive_test.go @@ -208,7 +208,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) { for _, c := range tc.cfg { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -290,7 +290,7 @@ func TestLabelSetsOfTenantsWhenAddingTenants(t *testing.T) { for _, c := range initialConfig { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -315,7 +315,7 @@ func TestLabelSetsOfTenantsWhenAddingTenants(t *testing.T) { for _, c := range changedConfig { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -528,7 +528,7 @@ func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) { for _, c := range initialConfig { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -696,7 +696,7 @@ func TestAddingLabelsWhenTenantAppearsInMultipleHashrings(t *testing.T) { for _, c := range initialConfig { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -768,7 +768,7 @@ func TestReceiverLabelsNotOverwrittenByExternalLabels(t *testing.T) { for _, c := range cfg { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) }