Skip to content

Commit 518e851

Browse files
author
Ivan Valkov
authored
Added file SD (thanos-io#546)
* Added file SD to query and rule * Added static flags in rule to specify queriers * Added e2e tests for both static flag SD and file SD
1 parent a55bd24 commit 518e851

File tree

12 files changed

+563
-113
lines changed

12 files changed

+563
-113
lines changed

Gopkg.lock

+12
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benchmark/cmd/thanosbench/resources.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
prom "github.com/prometheus/prometheus/config"
1313
"gopkg.in/yaml.v2"
1414
appsv1 "k8s.io/api/apps/v1"
15+
"k8s.io/api/core/v1"
1516
rbacv1 "k8s.io/api/rbac/v1"
1617
"k8s.io/apimachinery/pkg/api/resource"
1718
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -207,7 +208,7 @@ func createPrometheus(opts *opts, name string, bucket string) *appsv1.StatefulSe
207208
Name: name,
208209
Namespace: promNamespace,
209210
Labels: map[string]string{
210-
"app": name,
211+
"app": name,
211212
"thanos-gossip-member": "true",
212213
},
213214
}
@@ -370,7 +371,7 @@ func createThanosQuery(opts *opts) (*v1.Service, *v1.Pod) {
370371
Name: "thanos-query",
371372
Namespace: thanosNamespace,
372373
Labels: map[string]string{
373-
"app": "thanos-query",
374+
"app": "thanos-query",
374375
"thanos-gossip-member": "true",
375376
},
376377
}

cmd/thanos/query.go

+89
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/grpc-ecosystem/go-grpc-middleware"
1717
"github.com/grpc-ecosystem/go-grpc-prometheus"
1818
"github.com/improbable-eng/thanos/pkg/cluster"
19+
"github.com/improbable-eng/thanos/pkg/discovery/cache"
1920
"github.com/improbable-eng/thanos/pkg/query"
2021
"github.com/improbable-eng/thanos/pkg/query/api"
2122
"github.com/improbable-eng/thanos/pkg/runutil"
@@ -28,6 +29,8 @@ import (
2829
"github.com/pkg/errors"
2930
"github.com/prometheus/client_golang/prometheus"
3031
"github.com/prometheus/common/route"
32+
"github.com/prometheus/prometheus/discovery/file"
33+
"github.com/prometheus/prometheus/discovery/targetgroup"
3134
"github.com/prometheus/prometheus/promql"
3235
"github.com/prometheus/tsdb/labels"
3336
"google.golang.org/grpc"
@@ -65,6 +68,12 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
6568
stores := cmd.Flag("store", "Addresses of statically configured store API servers (repeatable).").
6669
PlaceHolder("<store>").Strings()
6770

71+
fileSDFiles := cmd.Flag("store.file-sd-config.files", "Path to files that contain addresses of store API servers. The path can be a glob pattern (repeatable).").
72+
PlaceHolder("<path>").Strings()
73+
74+
fileSDInterval := modelDuration(cmd.Flag("store.file-sd-config.interval", "Refresh interval to re-read file SD files. (used as a fallback)").
75+
Default("5m"))
76+
6877
enableAutodownsampling := cmd.Flag("query.auto-downsampling", "Enable automatic adjustment (step / 5) to what source of data should be used in store gateways if no max_source_resolution param is specified. ").
6978
Default("false").Bool()
7079

@@ -87,6 +96,15 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
8796
lookupStores[s] = struct{}{}
8897
}
8998

99+
var fileSD *file.Discovery
100+
if len(*fileSDFiles) > 0 {
101+
conf := &file.SDConfig{
102+
Files: *fileSDFiles,
103+
RefreshInterval: *fileSDInterval,
104+
}
105+
fileSD = file.NewDiscovery(conf, logger)
106+
}
107+
90108
return runQuery(
91109
g,
92110
logger,
@@ -109,6 +127,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
109127
selectorLset,
110128
*stores,
111129
*enableAutodownsampling,
130+
fileSD,
112131
)
113132
}
114133
}
@@ -218,7 +237,14 @@ func runQuery(
218237
selectorLset labels.Labels,
219238
storeAddrs []string,
220239
enableAutodownsampling bool,
240+
fileSD *file.Discovery,
221241
) error {
242+
duplicatedStores := prometheus.NewCounter(prometheus.CounterOpts{
243+
Name: "thanos_query_duplicated_store_address",
244+
Help: "The number of times a duplicated store addresses is detected from the different configs in query",
245+
})
246+
reg.MustRegister(duplicatedStores)
247+
222248
var staticSpecs []query.StoreSpec
223249
for _, addr := range storeAddrs {
224250
if addr == "" {
@@ -233,13 +259,17 @@ func runQuery(
233259
return errors.Wrap(err, "building gRPC client")
234260
}
235261

262+
fileSDCache := cache.New()
263+
236264
var (
237265
stores = query.NewStoreSet(
238266
logger,
239267
reg,
240268
func() (specs []query.StoreSpec) {
269+
// Add store specs from static flags.
241270
specs = append(staticSpecs)
242271

272+
// Add store specs from gossip.
243273
for id, ps := range peer.PeerStates(cluster.PeerTypesStoreAPIs()...) {
244274
if ps.StoreAPIAddr == "" {
245275
level.Error(logger).Log("msg", "Gossip found peer that propagates empty address, ignoring.", "lset", fmt.Sprintf("%v", ps.Metadata.Labels))
@@ -248,6 +278,14 @@ func runQuery(
248278

249279
specs = append(specs, &gossipSpec{id: id, addr: ps.StoreAPIAddr, peer: peer})
250280
}
281+
282+
// Add store specs from file SD.
283+
for _, addr := range fileSDCache.Addresses() {
284+
specs = append(specs, query.NewGRPCStoreSpec(addr))
285+
}
286+
287+
specs = removeDuplicateStoreSpecs(logger, duplicatedStores, specs)
288+
251289
return specs
252290
},
253291
dialOpts,
@@ -271,6 +309,40 @@ func runQuery(
271309
stores.Close()
272310
})
273311
}
312+
// Run File Service Discovery and update the store set when the files are modified.
313+
if fileSD != nil {
314+
var fileSDUpdates chan []*targetgroup.Group
315+
ctxRun, cancelRun := context.WithCancel(context.Background())
316+
317+
fileSDUpdates = make(chan []*targetgroup.Group)
318+
319+
g.Add(func() error {
320+
fileSD.Run(ctxRun, fileSDUpdates)
321+
return nil
322+
}, func(error) {
323+
cancelRun()
324+
})
325+
326+
ctxUpdate, cancelUpdate := context.WithCancel(context.Background())
327+
g.Add(func() error {
328+
for {
329+
select {
330+
case update := <-fileSDUpdates:
331+
// Discoverers sometimes send nil updates so need to check for it to avoid panics.
332+
if update == nil {
333+
continue
334+
}
335+
fileSDCache.Update(update)
336+
stores.Update(ctxUpdate)
337+
case <-ctxUpdate.Done():
338+
return nil
339+
}
340+
}
341+
}, func(error) {
342+
cancelUpdate()
343+
close(fileSDUpdates)
344+
})
345+
}
274346
{
275347
ctx, cancel := context.WithCancel(context.Background())
276348
g.Add(func() error {
@@ -340,6 +412,23 @@ func runQuery(
340412
return nil
341413
}
342414

415+
func removeDuplicateStoreSpecs(logger log.Logger, duplicatedStores prometheus.Counter, specs []query.StoreSpec) []query.StoreSpec {
416+
set := make(map[string]query.StoreSpec)
417+
for _, spec := range specs {
418+
addr := spec.Addr()
419+
if _, ok := set[addr]; ok {
420+
level.Warn(logger).Log("msg", "Duplicate store address is provided - %v", addr)
421+
duplicatedStores.Inc()
422+
}
423+
set[addr] = spec
424+
}
425+
deduplicated := make([]query.StoreSpec, 0, len(set))
426+
for _, value := range set {
427+
deduplicated = append(deduplicated, value)
428+
}
429+
return deduplicated
430+
}
431+
343432
type gossipSpec struct {
344433
id string
345434
addr string

0 commit comments

Comments
 (0)