From fcf6640a28916556fb3840ed8df7af6e90b86efa Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Fri, 20 Mar 2020 19:21:16 +0100 Subject: [PATCH] Fix issues around multiple registrations (#2309) Move all the caches up and don't create new ones for each schema. Signed-off-by: Goutham Veeramachaneni --- pkg/chunk/cache/cache_test.go | 4 +--- pkg/chunk/cache/memcached_client.go | 5 ++++- pkg/chunk/cache/mock.go | 7 ++++++- pkg/chunk/chunk_store.go | 4 ++-- pkg/chunk/chunk_store_test.go | 7 ++++++- pkg/chunk/chunk_store_utils.go | 10 ++-------- pkg/chunk/composite_store.go | 8 +++++--- pkg/chunk/series_store.go | 9 ++------- pkg/chunk/storage/factory.go | 22 ++++++++++++++++++---- pkg/chunk/testutils/testutils.go | 3 ++- 10 files changed, 48 insertions(+), 31 deletions(-) diff --git a/pkg/chunk/cache/cache_test.go b/pkg/chunk/cache/cache_test.go index ac586c74ec..66b24d5a73 100644 --- a/pkg/chunk/cache/cache_test.go +++ b/pkg/chunk/cache/cache_test.go @@ -113,9 +113,7 @@ func testCacheMultiple(t *testing.T, cache cache.Cache, keys []string, chunks [] } func testChunkFetcher(t *testing.T, c cache.Cache, keys []string, chunks []chunk.Chunk) { - fetcher, err := chunk.NewChunkFetcher(cache.Config{ - Cache: c, - }, false, nil) + fetcher, err := chunk.NewChunkFetcher(c, false, nil) require.NoError(t, err) defer fetcher.Stop() diff --git a/pkg/chunk/cache/memcached_client.go b/pkg/chunk/cache/memcached_client.go index b120b61fec..89ee406065 100644 --- a/pkg/chunk/cache/memcached_client.go +++ b/pkg/chunk/cache/memcached_client.go @@ -101,13 +101,16 @@ func NewMemcachedClient(cfg MemcachedClientConfig, name string, r prometheus.Reg serverList: selector, hostname: cfg.Host, service: cfg.Service, - addresses: strings.Split(cfg.Addresses, ","), provider: dns.NewProvider(util.Logger, dnsProviderRegisterer, dns.GolangResolverType), quit: make(chan struct{}), numServers: memcacheServersDiscovered.WithLabelValues(name), } + if len(cfg.Addresses) > 0 { + newClient.addresses = strings.Split(cfg.Addresses, ",") + } + err := newClient.updateMemcacheServers() if err != nil { level.Error(util.Logger).Log("msg", "error setting memcache servers to host", "host", cfg.Host, "err", err) diff --git a/pkg/chunk/cache/mock.go b/pkg/chunk/cache/mock.go index e44be6cbf3..6503aea80d 100644 --- a/pkg/chunk/cache/mock.go +++ b/pkg/chunk/cache/mock.go @@ -36,9 +36,14 @@ func (m *mockCache) Fetch(ctx context.Context, keys []string) (found []string, b func (m *mockCache) Stop() { } -// NewMockCache makes a new MockCache +// NewMockCache makes a new MockCache. func NewMockCache() Cache { return &mockCache{ cache: map[string][]byte{}, } } + +// NewNoopCache returns a no-op cache. +func NewNoopCache() Cache { + return NewTiered(nil) +} diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index 9080d0a86a..444b539fa5 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -84,8 +84,8 @@ type store struct { *Fetcher } -func newStore(cfg StoreConfig, schema Schema, index IndexClient, chunks Client, limits StoreLimits) (Store, error) { - fetcher, err := NewChunkFetcher(cfg.ChunkCacheConfig, cfg.chunkCacheStubs, chunks) +func newStore(cfg StoreConfig, schema Schema, index IndexClient, chunks Client, limits StoreLimits, chunksCache cache.Cache) (Store, error) { + fetcher, err := NewChunkFetcher(chunksCache, cfg.chunkCacheStubs, chunks) if err != nil { return nil, err } diff --git a/pkg/chunk/chunk_store_test.go b/pkg/chunk/chunk_store_test.go index dde220990c..b0963de2bb 100644 --- a/pkg/chunk/chunk_store_test.go +++ b/pkg/chunk/chunk_store_test.go @@ -82,8 +82,13 @@ func newTestChunkStoreConfig(t require.TestingT, schemaName string, storeCfg Sto overrides, err := validation.NewOverrides(limits, nil) require.NoError(t, err) + chunksCache, err := cache.New(storeCfg.ChunkCacheConfig) + require.NoError(t, err) + writeDedupeCache, err := cache.New(storeCfg.WriteDedupeCacheConfig) + require.NoError(t, err) + store := NewCompositeStore() - err = store.AddPeriod(storeCfg, schemaCfg.Configs[0], storage, storage, overrides) + err = store.AddPeriod(storeCfg, schemaCfg.Configs[0], storage, storage, overrides, chunksCache, writeDedupeCache) require.NoError(t, err) return store } diff --git a/pkg/chunk/chunk_store_utils.go b/pkg/chunk/chunk_store_utils.go index 27a5a84fe9..78cd7c14fe 100644 --- a/pkg/chunk/chunk_store_utils.go +++ b/pkg/chunk/chunk_store_utils.go @@ -99,16 +99,10 @@ type decodeResponse struct { } // NewChunkFetcher makes a new ChunkFetcher. -func NewChunkFetcher(cfg cache.Config, cacheStubs bool, storage Client) (*Fetcher, error) { - cfg.Prefix = "chunks" - cache, err := cache.New(cfg) - if err != nil { - return nil, err - } - +func NewChunkFetcher(cacher cache.Cache, cacheStubs bool, storage Client) (*Fetcher, error) { c := &Fetcher{ storage: storage, - cache: cache, + cache: cacher, cacheStubs: cacheStubs, decodeRequests: make(chan decodeRequest), } diff --git a/pkg/chunk/composite_store.go b/pkg/chunk/composite_store.go index fdb30d339f..366ca1de86 100644 --- a/pkg/chunk/composite_store.go +++ b/pkg/chunk/composite_store.go @@ -7,6 +7,8 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" + + "github.com/cortexproject/cortex/pkg/chunk/cache" ) // StoreLimits helps get Limits specific to Queries for Stores @@ -56,15 +58,15 @@ func NewCompositeStore() CompositeStore { } // AddPeriod adds the configuration for a period of time to the CompositeStore -func (c *CompositeStore) AddPeriod(storeCfg StoreConfig, cfg PeriodConfig, index IndexClient, chunks Client, limits StoreLimits) error { +func (c *CompositeStore) AddPeriod(storeCfg StoreConfig, cfg PeriodConfig, index IndexClient, chunks Client, limits StoreLimits, chunksCache, writeDedupeCache cache.Cache) error { schema := cfg.CreateSchema() var store Store var err error switch cfg.Schema { case "v9", "v10", "v11": - store, err = newSeriesStore(storeCfg, schema, index, chunks, limits) + store, err = newSeriesStore(storeCfg, schema, index, chunks, limits, chunksCache, writeDedupeCache) default: - store, err = newStore(storeCfg, schema, index, chunks, limits) + store, err = newStore(storeCfg, schema, index, chunks, limits, chunksCache) } if err != nil { return err diff --git a/pkg/chunk/series_store.go b/pkg/chunk/series_store.go index 16f5fb2fe3..7d99e0349c 100644 --- a/pkg/chunk/series_store.go +++ b/pkg/chunk/series_store.go @@ -68,13 +68,8 @@ type seriesStore struct { writeDedupeCache cache.Cache } -func newSeriesStore(cfg StoreConfig, schema Schema, index IndexClient, chunks Client, limits StoreLimits) (Store, error) { - fetcher, err := NewChunkFetcher(cfg.ChunkCacheConfig, cfg.chunkCacheStubs, chunks) - if err != nil { - return nil, err - } - - writeDedupeCache, err := cache.New(cfg.WriteDedupeCacheConfig) +func newSeriesStore(cfg StoreConfig, schema Schema, index IndexClient, chunks Client, limits StoreLimits, chunksCache, writeDedupeCache cache.Cache) (Store, error) { + fetcher, err := NewChunkFetcher(chunksCache, cfg.chunkCacheStubs, chunks) if err != nil { return nil, err } diff --git a/pkg/chunk/storage/factory.go b/pkg/chunk/storage/factory.go index ce857ed0d1..dc89bf67f3 100644 --- a/pkg/chunk/storage/factory.go +++ b/pkg/chunk/storage/factory.go @@ -95,14 +95,28 @@ func (cfg *Config) Validate() error { // NewStore makes the storage clients based on the configuration. func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits StoreLimits) (chunk.Store, error) { - tieredCache, err := cache.New(cfg.IndexQueriesCacheConfig) + indexReadCache, err := cache.New(cfg.IndexQueriesCacheConfig) + if err != nil { + return nil, err + } + + writeDedupeCache, err := cache.New(storeCfg.WriteDedupeCacheConfig) + if err != nil { + return nil, err + } + + chunkCacheCfg := storeCfg.ChunkCacheConfig + chunkCacheCfg.Prefix = "chunks" + chunksCache, err := cache.New(chunkCacheCfg) if err != nil { return nil, err } // Cache is shared by multiple stores, which means they will try and Stop // it more than once. Wrap in a StopOnce to prevent this. - tieredCache = cache.StopOnce(tieredCache) + indexReadCache = cache.StopOnce(indexReadCache) + chunksCache = cache.StopOnce(chunksCache) + writeDedupeCache = cache.StopOnce(writeDedupeCache) err = schemaCfg.Load() if err != nil { @@ -115,7 +129,7 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf if err != nil { return nil, errors.Wrap(err, "error creating index client") } - index = newCachingIndexClient(index, tieredCache, cfg.IndexCacheValidity, limits) + index = newCachingIndexClient(index, indexReadCache, cfg.IndexCacheValidity, limits) objectStoreType := s.ObjectType if objectStoreType == "" { @@ -126,7 +140,7 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf return nil, errors.Wrap(err, "error creating object client") } - err = stores.AddPeriod(storeCfg, s, index, chunks, limits) + err = stores.AddPeriod(storeCfg, s, index, chunks, limits, chunksCache, writeDedupeCache) if err != nil { return nil, err } diff --git a/pkg/chunk/testutils/testutils.go b/pkg/chunk/testutils/testutils.go index 92dd0dcaf4..4124a9f629 100644 --- a/pkg/chunk/testutils/testutils.go +++ b/pkg/chunk/testutils/testutils.go @@ -12,6 +12,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/cache" promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/util/flagext" @@ -135,7 +136,7 @@ func SetupTestChunkStore() (chunk.Store, error) { flagext.DefaultValues(&storeCfg) store := chunk.NewCompositeStore() - err = store.AddPeriod(storeCfg, schemaCfg.Configs[0], storage, storage, overrides) + err = store.AddPeriod(storeCfg, schemaCfg.Configs[0], storage, storage, overrides, cache.NewNoopCache(), cache.NewNoopCache()) if err != nil { return nil, err }