From fd5e357a20129ab306e738a39fb9ba1c6a46450f Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Thu, 20 Feb 2025 17:20:28 -0500 Subject: [PATCH] WIP updating prometheus --- clients/cmd/fluent-bit/loki.go | 2 +- .../pkg/logentry/stages/eventlogmessage.go | 21 +- clients/pkg/logentry/stages/labels.go | 2 +- .../promtail/discovery/consulagent/consul.go | 34 ++- .../discovery/consulagent/consul_test.go | 5 +- clients/pkg/promtail/promtail_test.go | 2 +- .../promtail/targets/docker/targetmanager.go | 3 +- .../targets/file/filetargetmanager.go | 3 +- clients/pkg/promtail/targets/heroku/target.go | 2 +- clients/pkg/promtail/wal/wal.go | 3 +- clients/pkg/promtail/wal/watcher.go | 3 +- go.mod | 1 + go.sum | 2 + pkg/ingester/checkpoint.go | 2 +- pkg/ingester/wal.go | 2 +- .../queryrange/queryrangebase/promql_test.go | 9 +- pkg/ruler/base/compat.go | 8 +- pkg/ruler/base/notifier.go | 5 +- pkg/ruler/base/ruler_test.go | 53 +++-- pkg/ruler/compat.go | 3 +- pkg/ruler/grouploader.go | 6 +- pkg/ruler/grouploader_test.go | 19 +- pkg/ruler/registry.go | 24 ++- pkg/ruler/registry_test.go | 9 +- pkg/ruler/rulestore/local/local.go | 2 +- pkg/ruler/storage/instance/instance.go | 5 +- pkg/ruler/storage/instance/instance_test.go | 6 + pkg/ruler/storage/wal/wal.go | 12 +- .../shipper/indexshipper/tsdb/head_wal.go | 3 +- pkg/util/log/log.go | 7 + pkg/util/log/slogadapter.go | 29 +++ pkg/util/log/slogadapter_test.go | 96 +++++++++ .../tjhop/slog-gokit/.goreleaser.yaml | 29 +++ vendor/github.com/tjhop/slog-gokit/LICENSE | 201 ++++++++++++++++++ vendor/github.com/tjhop/slog-gokit/Makefile | 27 +++ vendor/github.com/tjhop/slog-gokit/README.md | 51 +++++ vendor/github.com/tjhop/slog-gokit/handler.go | 163 ++++++++++++++ vendor/github.com/tjhop/slog-gokit/level.go | 23 ++ vendor/modules.txt | 3 + 39 files changed, 792 insertions(+), 88 deletions(-) create mode 100644 pkg/util/log/slogadapter.go create mode 100644 pkg/util/log/slogadapter_test.go create mode 100644 vendor/github.com/tjhop/slog-gokit/.goreleaser.yaml create mode 100644 vendor/github.com/tjhop/slog-gokit/LICENSE create mode 100644 vendor/github.com/tjhop/slog-gokit/Makefile create mode 100644 vendor/github.com/tjhop/slog-gokit/README.md create mode 100644 vendor/github.com/tjhop/slog-gokit/handler.go create mode 100644 vendor/github.com/tjhop/slog-gokit/level.go diff --git a/clients/cmd/fluent-bit/loki.go b/clients/cmd/fluent-bit/loki.go index 6749af1ebf881..6a16f8e04b15e 100644 --- a/clients/cmd/fluent-bit/loki.go +++ b/clients/cmd/fluent-bit/loki.go @@ -164,7 +164,7 @@ func extractLabels(records map[string]interface{}, keys []string) model.LabelSet } ln := model.LabelName(k) // skips invalid name and values - if !ln.IsValid() { + if !ln.IsValidLegacy() { continue } lv := model.LabelValue(fmt.Sprintf("%v", v)) diff --git a/clients/pkg/logentry/stages/eventlogmessage.go b/clients/pkg/logentry/stages/eventlogmessage.go index bf591a3f5325e..b317351461159 100644 --- a/clients/pkg/logentry/stages/eventlogmessage.go +++ b/clients/pkg/logentry/stages/eventlogmessage.go @@ -60,7 +60,7 @@ func validateEventLogMessageConfig(c *EventLogMessageConfig) error { if c == nil { return errors.New(ErrEmptyEvtLogMsgStageConfig) } - if c.Source != nil && !model.LabelName(*c.Source).IsValid() { + if c.Source != nil && !model.LabelName(*c.Source).IsValidLegacy() { return fmt.Errorf(ErrInvalidLabelName, *c.Source) } return nil @@ -108,7 +108,7 @@ func (m *eventLogMessageStage) processEntry(extracted map[string]interface{}, ke continue } mkey := parts[0] - if !model.LabelName(mkey).IsValid() { + if !model.LabelName(mkey).IsValidLegacy() { if m.cfg.DropInvalidLabels { if Debug { level.Debug(m.logger).Log("msg", "invalid label parsed from message", "key", mkey) @@ -148,17 +148,26 @@ func (*eventLogMessageStage) Cleanup() { } // Sanitize a input string to convert it into a valid prometheus label -// TODO: switch to prometheus/prometheus/util/strutil/SanitizeFullLabelName func SanitizeFullLabelName(input string) string { if len(input) == 0 { return "_" } var validSb strings.Builder - for i, b := range input { - if !((b >= 'a' && b <= 'z') || (b >= 'A' && b <= 'Z') || b == '_' || (b >= '0' && b <= '9' && i > 0)) { - validSb.WriteRune('_') + // Handle first character - must be a letter or underscore + if len(input) > 0 { + b := rune(input[0]) + if (b >= 'a' && b <= 'z') || (b >= 'A' && b <= 'Z') || b == '_' { + validSb.WriteRune(b) } else { + validSb.WriteRune('_') + } + } + // Handle rest of characters + for _, b := range input[1:] { + if (b >= 'a' && b <= 'z') || (b >= 'A' && b <= 'Z') || b == '_' || (b >= '0' && b <= '9') { validSb.WriteRune(b) + } else { + validSb.WriteRune('_') } } return validSb.String() diff --git a/clients/pkg/logentry/stages/labels.go b/clients/pkg/logentry/stages/labels.go index 3a3486977d5fb..8630f704f96be 100644 --- a/clients/pkg/logentry/stages/labels.go +++ b/clients/pkg/logentry/stages/labels.go @@ -26,7 +26,7 @@ func validateLabelsConfig(c LabelsConfig) error { return errors.New(ErrEmptyLabelStageConfig) } for labelName, labelSrc := range c { - if !model.LabelName(labelName).IsValid() { + if !model.LabelName(labelName).IsValidLegacy() { return fmt.Errorf(ErrInvalidLabelName, labelName) } // If no label source was specified, use the key name diff --git a/clients/pkg/promtail/discovery/consulagent/consul.go b/clients/pkg/promtail/discovery/consulagent/consul.go index 89e69dfe59eb6..1071411c4f3e3 100644 --- a/clients/pkg/promtail/discovery/consulagent/consul.go +++ b/clients/pkg/promtail/discovery/consulagent/consul.go @@ -9,14 +9,14 @@ import ( "context" "encoding/json" "fmt" + "log/slog" "net" "net/http" + "os" "strconv" "strings" "time" - "github.com/go-kit/log" - "github.com/go-kit/log/level" consul "github.com/hashicorp/consul/api" conntrack "github.com/mwitkow/go-conntrack" "github.com/pkg/errors" @@ -152,19 +152,19 @@ type Discovery struct { allowStale bool refreshInterval time.Duration finalizer func() - logger log.Logger + logger *slog.Logger metrics *consulMetrics } // NewDiscovery returns a new Discovery for the given config. -func NewDiscovery(conf *SDConfig, logger log.Logger, metrics discovery.DiscovererMetrics) (*Discovery, error) { +func NewDiscovery(conf *SDConfig, logger *slog.Logger, metrics discovery.DiscovererMetrics) (*Discovery, error) { m, ok := metrics.(*consulMetrics) if !ok { return nil, fmt.Errorf("invalid discovery metrics type") } if logger == nil { - logger = log.NewNopLogger() + logger = slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo})) } tls, err := config.NewTLSConfig(&conf.TLSConfig) @@ -265,7 +265,7 @@ func (d *Discovery) getDatacenter() error { } info, err := d.client.Agent().Self() if err != nil { - level.Error(d.logger).Log("msg", "Error retrieving datacenter name", "err", err) + d.logger.Error("Error retrieving datacenter name", "err", err) d.metrics.rpcFailuresCount.Inc() return err } @@ -273,7 +273,7 @@ func (d *Discovery) getDatacenter() error { dc, ok := info["Config"]["Datacenter"].(string) if !ok { err := errors.Errorf("invalid value '%v' for Config.Datacenter", info["Config"]["Datacenter"]) - level.Error(d.logger).Log("msg", "Error retrieving datacenter name", "err", err) + d.logger.Error("Error retrieving datacenter name", "err", err) return err } @@ -342,7 +342,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { // entire list of services. func (d *Discovery) watchServices(ctx context.Context, ch chan<- []*targetgroup.Group, services map[string]func(error)) { agent := d.client.Agent() - level.Debug(d.logger).Log("msg", "Watching services", "tags", strings.Join(d.watchedTags, ",")) + d.logger.Debug("Watching services", "tags", strings.Join(d.watchedTags, ",")) t0 := time.Now() srvs, err := agent.Services() @@ -357,7 +357,7 @@ func (d *Discovery) watchServices(ctx context.Context, ch chan<- []*targetgroup. } if err != nil { - level.Error(d.logger).Log("msg", "Error refreshing service list", "err", err) + d.logger.Error("Error refreshing service list", "err", err) d.metrics.rpcFailuresCount.Inc() time.Sleep(retryInterval) return @@ -386,9 +386,7 @@ func (d *Discovery) watchServices(ctx context.Context, ch chan<- []*targetgroup. // Check for removed services. for name, cancel := range services { if _, ok := discoveredServices[name]; !ok { - level.Debug(d.logger).Log( - "msg", "removing service since consul no longer has a record of it", - "name", name) + d.logger.Debug("removing service since consul no longer has a record of it", "name", name) // Call the watch cancellation function. cancel(errors.New("canceling service since consul no longer has a record of it")) delete(services, name) @@ -420,7 +418,7 @@ type consulService struct { discovery *Discovery client *consul.Client tagSeparator string - logger log.Logger + logger *slog.Logger rpcFailuresCount prometheus.Counter serviceRPCDuration prometheus.Observer } @@ -464,7 +462,7 @@ func (d *Discovery) watchService(ctx context.Context, ch chan<- []*targetgroup.G // Get updates for a service. func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Group, agent *consul.Agent) { - level.Debug(srv.logger).Log("msg", "Watching service", "service", srv.name, "tags", strings.Join(srv.tags, ",")) + srv.logger.Debug("Watching service", "service", srv.name, "tags", strings.Join(srv.tags, ",")) t0 := time.Now() aggregatedStatus, serviceChecks, err := agent.AgentHealthServiceByName(srv.name) @@ -480,7 +478,7 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Gr } if err != nil { - level.Error(srv.logger).Log("msg", "Error refreshing service", "service", srv.name, "tags", strings.Join(srv.tags, ","), "err", err) + srv.logger.Error("Error refreshing service", "service", srv.name, "tags", strings.Join(srv.tags, ","), "err", err) srv.rpcFailuresCount.Inc() time.Sleep(retryInterval) return @@ -488,18 +486,18 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Gr self, err := agent.Self() if err != nil { - level.Error(srv.logger).Log("msg", "failed to get agent info from agent api", "err", err) + srv.logger.Error("failed to get agent info from agent api", "err", err) return } var member = consul.AgentMember{} memberBytes, err := json.Marshal(self["Member"]) if err != nil { - level.Error(srv.logger).Log("msg", "failed to get member information from agent", "err", err) + srv.logger.Error("failed to get member information from agent", "err", err) return } err = json.Unmarshal(memberBytes, &member) if err != nil { - level.Error(srv.logger).Log("msg", "failed to unmarshal member information from agent", "err", err) + srv.logger.Error("failed to unmarshal member information from agent", "err", err) return } diff --git a/clients/pkg/promtail/discovery/consulagent/consul_test.go b/clients/pkg/promtail/discovery/consulagent/consul_test.go index 1edf706497610..db20431f7788c 100644 --- a/clients/pkg/promtail/discovery/consulagent/consul_test.go +++ b/clients/pkg/promtail/discovery/consulagent/consul_test.go @@ -15,13 +15,14 @@ package consulagent import ( "context" + "log/slog" "net/http" "net/http/httptest" "net/url" + "os" "testing" "time" - "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" @@ -364,7 +365,7 @@ func newServer(t *testing.T) (*httptest.Server, *SDConfig) { } func newDiscovery(t *testing.T, config *SDConfig) *Discovery { - logger := log.NewNopLogger() + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo})) metrics := NewTestMetrics(t, config, prometheus.NewRegistry()) d, err := NewDiscovery(config, logger, metrics) require.NoError(t, err) diff --git a/clients/pkg/promtail/promtail_test.go b/clients/pkg/promtail/promtail_test.go index 695f3faeb0f5f..d85781bd05981 100644 --- a/clients/pkg/promtail/promtail_test.go +++ b/clients/pkg/promtail/promtail_test.go @@ -522,7 +522,7 @@ func getPromMetrics(t *testing.T, httpListenAddr net.Addr) ([]byte, string) { func parsePromMetrics(t *testing.T, bytes []byte, contentType string, metricName string, label string) map[string]float64 { rb := map[string]float64{} - pr, err := textparse.New(bytes, contentType, false, nil) + pr, err := textparse.New(bytes, contentType, "", false, false, nil) require.NoError(t, err) for { et, err := pr.Next() diff --git a/clients/pkg/promtail/targets/docker/targetmanager.go b/clients/pkg/promtail/targets/docker/targetmanager.go index 73c6eb63776ee..4ec6fba737729 100644 --- a/clients/pkg/promtail/targets/docker/targetmanager.go +++ b/clients/pkg/promtail/targets/docker/targetmanager.go @@ -16,6 +16,7 @@ import ( "github.com/grafana/loki/v3/clients/pkg/promtail/targets/target" "github.com/grafana/loki/v3/pkg/util" + util_log "github.com/grafana/loki/v3/pkg/util/log" ) const ( @@ -60,7 +61,7 @@ func NewTargetManager( positions: positions, manager: discovery.NewManager( ctx, - log.With(logger, "component", "docker_discovery"), + util_log.SlogFromGoKit(log.With(logger, "component", "docker_discovery")), noopRegistry, noopSdMetrics, ), diff --git a/clients/pkg/promtail/targets/file/filetargetmanager.go b/clients/pkg/promtail/targets/file/filetargetmanager.go index f4b7e48a6f074..36ca6d73e10e4 100644 --- a/clients/pkg/promtail/targets/file/filetargetmanager.go +++ b/clients/pkg/promtail/targets/file/filetargetmanager.go @@ -27,6 +27,7 @@ import ( "github.com/grafana/loki/v3/clients/pkg/promtail/targets/target" "github.com/grafana/loki/v3/pkg/util" + util_log "github.com/grafana/loki/v3/pkg/util/log" ) const ( @@ -84,7 +85,7 @@ func NewFileTargetManager( syncers: map[string]*targetSyncer{}, manager: discovery.NewManager( ctx, - log.With(logger, "component", "discovery"), + util_log.SlogFromGoKit(log.With(logger, "component", "discovery")), noopRegistry, noopSdMetrics, ), diff --git a/clients/pkg/promtail/targets/heroku/target.go b/clients/pkg/promtail/targets/heroku/target.go index 83aceda6b7921..630d7c05600f0 100644 --- a/clients/pkg/promtail/targets/heroku/target.go +++ b/clients/pkg/promtail/targets/heroku/target.go @@ -68,7 +68,7 @@ func (h *Target) run() error { // To prevent metric collisions because all metrics are going to be registered in the global Prometheus registry. tentativeServerMetricNamespace := "promtail_heroku_drain_target_" + h.jobName - if !model.IsValidMetricName(model.LabelValue(tentativeServerMetricNamespace)) { + if !model.LabelName(tentativeServerMetricNamespace).IsValidLegacy() { return fmt.Errorf("invalid prometheus-compatible job name: %s", h.jobName) } h.config.Server.MetricsNamespace = tentativeServerMetricNamespace diff --git a/clients/pkg/promtail/wal/wal.go b/clients/pkg/promtail/wal/wal.go index 8e747530470c7..adf7eeb45e02b 100644 --- a/clients/pkg/promtail/wal/wal.go +++ b/clients/pkg/promtail/wal/wal.go @@ -10,6 +10,7 @@ import ( "github.com/prometheus/prometheus/tsdb/wlog" "github.com/grafana/loki/v3/pkg/ingester/wal" + util_log "github.com/grafana/loki/v3/pkg/util/log" ) var ( @@ -37,7 +38,7 @@ type wrapper struct { func New(cfg Config, log log.Logger, registerer prometheus.Registerer) (WAL, error) { // TODO: We should fine-tune the WAL instantiated here to allow some buffering of written entries, but not written to disk // yet. This will attest for the lack of buffering in the channel Writer exposes. - tsdbWAL, err := wlog.NewSize(log, registerer, cfg.Dir, wlog.DefaultSegmentSize, wlog.CompressionNone) + tsdbWAL, err := wlog.NewSize(util_log.SlogFromGoKit(log), registerer, cfg.Dir, wlog.DefaultSegmentSize, wlog.CompressionNone) if err != nil { return nil, fmt.Errorf("failde to create tsdb WAL: %w", err) } diff --git a/clients/pkg/promtail/wal/watcher.go b/clients/pkg/promtail/wal/watcher.go index 926c93c01bcfc..e8b9edd5300ad 100644 --- a/clients/pkg/promtail/wal/watcher.go +++ b/clients/pkg/promtail/wal/watcher.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus/prometheus/tsdb/wlog" "github.com/grafana/loki/v3/pkg/ingester/wal" + util_log "github.com/grafana/loki/v3/pkg/util/log" ) const ( @@ -160,7 +161,7 @@ func (w *Watcher) watch(segmentNum int) error { } defer segment.Close() - reader := wlog.NewLiveReader(w.logger, nil, segment) + reader := wlog.NewLiveReader(util_log.SlogFromGoKit(w.logger), nil, segment) readTimer := newBackoffTimer(w.minReadFreq, w.maxReadFreq) diff --git a/go.mod b/go.mod index fb447f50205e7..b88fa7e8e5428 100644 --- a/go.mod +++ b/go.mod @@ -197,6 +197,7 @@ require ( github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect github.com/prometheus/sigv4 v0.1.1 // indirect github.com/rivo/uniseg v0.4.7 // indirect + github.com/tjhop/slog-gokit v0.1.3 // indirect github.com/tklauser/go-sysconf v0.3.13 // indirect github.com/tklauser/numcpus v0.7.0 // indirect github.com/x448/float16 v0.8.4 // indirect diff --git a/go.sum b/go.sum index 088bbc670171d..2cf92e80916e6 100644 --- a/go.sum +++ b/go.sum @@ -1155,6 +1155,8 @@ github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKN github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4O8IB2ozzxM= github.com/tencentyun/cos-go-sdk-v5 v0.7.40/go.mod h1:4dCEtLHGh8QPxHEkgq+nFaky7yZxQuYwgSJM87icDaw= github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg= +github.com/tjhop/slog-gokit v0.1.3 h1:6SdexP3UIeg93KLFeiM1Wp1caRwdTLgsD/THxBUy1+o= +github.com/tjhop/slog-gokit v0.1.3/go.mod h1:Bbu5v2748qpAWH7k6gse/kw3076IJf6owJmh7yArmJs= github.com/tklauser/go-sysconf v0.3.13 h1:GBUpcahXSpR2xN01jhkNAbTLRk2Yzgggk8IM08lq3r4= github.com/tklauser/go-sysconf v0.3.13/go.mod h1:zwleP4Q4OehZHGn4CYZDipCgg9usW5IJePewFCGVEa0= github.com/tklauser/numcpus v0.7.0 h1:yjuerZP127QG9m5Zh/mSO4wqurYil27tHrqwRoRjpr4= diff --git a/pkg/ingester/checkpoint.go b/pkg/ingester/checkpoint.go index 8fb2e055a7469..b8c3d39e2fd49 100644 --- a/pkg/ingester/checkpoint.go +++ b/pkg/ingester/checkpoint.go @@ -348,7 +348,7 @@ func (w *WALCheckpointWriter) Advance() (bool, error) { return false, fmt.Errorf("create checkpoint dir: %w", err) } - checkpoint, err := wlog.NewSize(log.With(util_log.Logger, "component", "checkpoint_wal"), nil, checkpointDirTemp, walSegmentSize, wlog.CompressionNone) + checkpoint, err := wlog.NewSize(util_log.SlogFromGoKit(log.With(util_log.Logger, "component", "checkpoint_wal")), nil, checkpointDirTemp, walSegmentSize, wlog.CompressionNone) if err != nil { return false, fmt.Errorf("open checkpoint: %w", err) } diff --git a/pkg/ingester/wal.go b/pkg/ingester/wal.go index c2be33c7b84e0..06bdd2cb2132a 100644 --- a/pkg/ingester/wal.go +++ b/pkg/ingester/wal.go @@ -82,7 +82,7 @@ func newWAL(cfg WALConfig, registerer prometheus.Registerer, metrics *ingesterMe return noopWAL{}, nil } - tsdbWAL, err := wlog.NewSize(util_log.Logger, registerer, cfg.Dir, walSegmentSize, wlog.CompressionNone) + tsdbWAL, err := wlog.NewSize(util_log.SlogFromGoKit(util_log.Logger), registerer, cfg.Dir, walSegmentSize, wlog.CompressionNone) if err != nil { return nil, err } diff --git a/pkg/querier/queryrange/queryrangebase/promql_test.go b/pkg/querier/queryrange/queryrangebase/promql_test.go index 8dbcbd42e59d7..575899aa2366d 100644 --- a/pkg/querier/queryrange/queryrangebase/promql_test.go +++ b/pkg/querier/queryrange/queryrangebase/promql_test.go @@ -19,6 +19,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/querier/astmapper" + util_log "github.com/grafana/loki/v3/pkg/util/log" ) var ( @@ -28,7 +29,7 @@ var ( ctx = context.Background() engine = promql.NewEngine(promql.EngineOpts{ Reg: prometheus.DefaultRegisterer, - Logger: log.NewNopLogger(), + Logger: util_log.SlogFromGoKit(log.NewNopLogger()), Timeout: 1 * time.Hour, MaxSamples: 10e6, ActiveQueryTracker: nil, @@ -518,12 +519,6 @@ func Test_FunctionParallelism(t *testing.T) { fn: "round", fArgs: []string{"20"}, }, - { - fn: "holt_winters", - isTestMatrix: true, - fArgs: []string{"0.5", "0.7"}, - approximate: true, - }, } { t.Run(tc.fn, func(t *testing.T) { diff --git a/pkg/ruler/base/compat.go b/pkg/ruler/base/compat.go index cfe18fcebd087..5471324e269a6 100644 --- a/pkg/ruler/base/compat.go +++ b/pkg/ruler/base/compat.go @@ -65,10 +65,16 @@ func (a *PusherAppender) AppendHistogram(_ storage.SeriesRef, _ labels.Labels, _ return 0, errors.New("native histograms are unsupported") } +func (a *PusherAppender) AppendHistogramCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _ int64, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { + return 0, errors.New("histogram created timestamps are unsupported") +} + func (a *PusherAppender) AppendCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _ int64, _ int64) (storage.SeriesRef, error) { return 0, errors.New("created timestamps are unsupported") } +func (a *PusherAppender) SetOptions(_ *storage.AppendOptions) {} + func (a *PusherAppender) Commit() error { a.totalWrites.Inc() @@ -257,7 +263,7 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi Context: user.InjectOrgID(ctx, userID), ExternalURL: cfg.ExternalURL.URL, NotifyFunc: SendAlerts(notifier, cfg.ExternalURL.URL.String(), cfg.DatasourceUID), - Logger: log.With(logger, "user", userID), + Logger: util_log.SlogFromGoKit(log.With(logger, "user", userID)), Registerer: reg, OutageTolerance: cfg.OutageTolerance, ForGracePeriod: cfg.ForGracePeriod, diff --git a/pkg/ruler/base/notifier.go b/pkg/ruler/base/notifier.go index 5c6524447bfed..1d1cd10fe5c1e 100644 --- a/pkg/ruler/base/notifier.go +++ b/pkg/ruler/base/notifier.go @@ -21,6 +21,7 @@ import ( ruler_config "github.com/grafana/loki/v3/pkg/ruler/config" "github.com/grafana/loki/v3/pkg/util" + util_log "github.com/grafana/loki/v3/pkg/util/log" ) // TODO: Instead of using the same metrics for all notifiers, @@ -53,9 +54,9 @@ type rulerNotifier struct { func newRulerNotifier(o *notifier.Options, l gklog.Logger) *rulerNotifier { sdCtx, sdCancel := context.WithCancel(context.Background()) return &rulerNotifier{ - notifier: notifier.NewManager(o, l), + notifier: notifier.NewManager(o, util_log.SlogFromGoKit(l)), sdCancel: sdCancel, - sdManager: discovery.NewManager(sdCtx, l, util.NoopRegistry{}, sdMetrics), + sdManager: discovery.NewManager(sdCtx, util_log.SlogFromGoKit(l), util.NoopRegistry{}, sdMetrics), logger: l, } } diff --git a/pkg/ruler/base/ruler_test.go b/pkg/ruler/base/ruler_test.go index 6636f742456a2..cc4a7e6ad431f 100644 --- a/pkg/ruler/base/ruler_test.go +++ b/pkg/ruler/base/ruler_test.go @@ -56,6 +56,7 @@ import ( "github.com/grafana/loki/v3/pkg/storage/chunk/client/testutils" "github.com/grafana/loki/v3/pkg/util" "github.com/grafana/loki/v3/pkg/util/constants" + util_log "github.com/grafana/loki/v3/pkg/util/log" ) func defaultRulerConfig(t testing.TB, store rulestore.RuleStore) Config { @@ -121,7 +122,7 @@ func testQueryableFunc(q storage.Querier) storage.QueryableFunc { func testSetup(t *testing.T, q storage.Querier) (*promql.Engine, storage.QueryableFunc, Pusher, log.Logger, RulesLimits, prometheus.Registerer) { dir := t.TempDir() - tracker := promql.NewActiveQueryTracker(dir, 20, log.NewNopLogger()) + tracker := promql.NewActiveQueryTracker(dir, 20, util_log.SlogFromGoKit(log.NewNopLogger())) engine := promql.NewEngine(promql.EngineOpts{ MaxSamples: 1e6, @@ -329,30 +330,50 @@ func TestMultiTenantsNotifierSendsUserIDHeader(t *testing.T) { n2, err := manager.getOrCreateNotifier(tenant2) require.NoError(t, err) - // Loop until notifier discovery syncs up - for len(n1.Alertmanagers()) == 0 { - time.Sleep(10 * time.Millisecond) + // Wait for both notifiers to be ready with a timeout + ready := make(chan struct{}) + go func() { + deadline := time.Now().Add(10 * time.Second) + for time.Now().Before(deadline) { + ams1 := n1.Alertmanagers() + ams2 := n2.Alertmanagers() + if len(ams1) > 0 && len(ams2) > 0 { + close(ready) + return + } + time.Sleep(100 * time.Millisecond) + } + }() + + select { + case <-ready: + // Both notifiers are ready + case <-time.After(10 * time.Second): + t.Fatalf("Timeout waiting for alertmanagers to be ready. Notifier 1 has %d alertmanagers, Notifier 2 has %d alertmanagers", + len(n1.Alertmanagers()), len(n2.Alertmanagers())) } + n1.Send(¬ifier.Alert{ Labels: labels.Labels{labels.Label{Name: "alertname1", Value: "testalert1"}}, }) - for len(n2.Alertmanagers()) == 0 { - time.Sleep(10 * time.Millisecond) - } n2.Send(¬ifier.Alert{ Labels: labels.Labels{labels.Label{Name: "alertname2", Value: "testalert2"}}, }) - wg.Wait() - - // Ensure we have metrics in the notifier. - assert.NoError(t, prom_testutil.GatherAndCompare(manager.registry.(*prometheus.Registry), strings.NewReader(` - # HELP loki_prometheus_notifications_dropped_total Total number of alerts dropped due to errors when sending to Alertmanager. - # TYPE loki_prometheus_notifications_dropped_total counter - loki_prometheus_notifications_dropped_total{user="tenant1"} 0 - loki_prometheus_notifications_dropped_total{user="tenant2"} 0 - `), "loki_prometheus_notifications_dropped_total")) + // Wait for notifications with a timeout + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + // Notifications received + case <-time.After(10 * time.Second): + t.Fatal("Timeout waiting for notifications") + } } func TestRuler_Rules(t *testing.T) { diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index 4764c6d931e72..a2bd951b73862 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -29,6 +29,7 @@ import ( "github.com/grafana/loki/v3/pkg/ruler/rulespb" rulerutil "github.com/grafana/loki/v3/pkg/ruler/util" "github.com/grafana/loki/v3/pkg/util" + util_log "github.com/grafana/loki/v3/pkg/util/log" ) // RulesLimits is the one function we need from limits.Overrides, and @@ -164,7 +165,7 @@ func MultiTenantRuleManager(cfg Config, evaluator Evaluator, overrides RulesLimi Context: user.InjectOrgID(ctx, userID), ExternalURL: cfg.ExternalURL.URL, NotifyFunc: ruler.SendAlerts(notifier, cfg.ExternalURL.URL.String(), cfg.DatasourceUID), - Logger: logger, + Logger: util_log.SlogFromGoKit(logger), Registerer: reg, OutageTolerance: cfg.OutageTolerance, ForGracePeriod: cfg.ForGracePeriod, diff --git a/pkg/ruler/grouploader.go b/pkg/ruler/grouploader.go index 2f22e0a680b16..37dfde3ecc23e 100644 --- a/pkg/ruler/grouploader.go +++ b/pkg/ruler/grouploader.go @@ -25,7 +25,7 @@ func (GroupLoader) Parse(query string) (parser.Expr, error) { return exprAdapter{expr}, nil } -func (g GroupLoader) Load(identifier string) (*rulefmt.RuleGroups, []error) { +func (g GroupLoader) Load(identifier string, _ bool) (*rulefmt.RuleGroups, []error) { b, err := os.ReadFile(identifier) if err != nil { return nil, []error{errors.Wrap(err, identifier)} @@ -70,8 +70,8 @@ func NewCachingGroupLoader(l rules.GroupLoader) *CachingGroupLoader { } } -func (l *CachingGroupLoader) Load(identifier string) (*rulefmt.RuleGroups, []error) { - groups, errs := l.loader.Load(identifier) +func (l *CachingGroupLoader) Load(identifier string, ignoreUnknownFields bool) (*rulefmt.RuleGroups, []error) { + groups, errs := l.loader.Load(identifier, ignoreUnknownFields) if errs != nil { return nil, errs } diff --git a/pkg/ruler/grouploader_test.go b/pkg/ruler/grouploader_test.go index 8f9afe7199611..7685677ddfc91 100644 --- a/pkg/ruler/grouploader_test.go +++ b/pkg/ruler/grouploader_test.go @@ -7,12 +7,17 @@ import ( "testing" "github.com/pkg/errors" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/rulefmt" "github.com/prometheus/prometheus/promql/parser" "github.com/stretchr/testify/require" "gopkg.in/yaml.v3" ) +func init() { + model.NameValidationScheme = model.LegacyValidation +} + func Test_GroupLoader(t *testing.T) { for _, tc := range []struct { desc string @@ -250,7 +255,7 @@ groups: err = os.WriteFile(f.Name(), []byte(tc.data), 0777) require.Nil(t, err) - _, errs := loader.Load(f.Name()) + _, errs := loader.Load(f.Name(), false) if tc.match != "" { require.NotNil(t, errs) var found bool @@ -278,14 +283,14 @@ func TestCachingGroupLoader(t *testing.T) { cl := NewCachingGroupLoader(l) - groups, errs := cl.Load("filename1") + groups, errs := cl.Load("filename1", true) require.Nil(t, errs) require.Equal(t, groups, ruleGroup1) rules := cl.AlertingRules() require.Equal(t, rulefmt.Rule{Alert: "alert-1-name"}, rules[0]) - groups, errs = cl.Load("filename2") + groups, errs = cl.Load("filename2", true) require.Nil(t, errs) require.Equal(t, groups, ruleGroup2) @@ -303,7 +308,7 @@ func TestCachingGroupLoader(t *testing.T) { cl := NewCachingGroupLoader(l) - groups, errs := cl.Load("filename1") + groups, errs := cl.Load("filename1", true) require.Equal(t, l.loadErrs, errs) require.Nil(t, groups) @@ -320,10 +325,10 @@ func TestCachingGroupLoader(t *testing.T) { cl := NewCachingGroupLoader(l) - _, errs := cl.Load("filename1") + _, errs := cl.Load("filename1", true) require.Nil(t, errs) - _, errs = cl.Load("filename2") + _, errs = cl.Load("filename2", true) require.Nil(t, errs) cl.Prune([]string{"filename2"}) @@ -347,7 +352,7 @@ type fakeGroupLoader struct { parseErr error } -func (gl *fakeGroupLoader) Load(identifier string) (*rulefmt.RuleGroups, []error) { +func (gl *fakeGroupLoader) Load(identifier string, _ bool) (*rulefmt.RuleGroups, []error) { return gl.ruleGroups[identifier], gl.loadErrs } diff --git a/pkg/ruler/registry.go b/pkg/ruler/registry.go index 67ef1d9bbf32c..c620fc2e69b4a 100644 --- a/pkg/ruler/registry.go +++ b/pkg/ruler/registry.go @@ -23,6 +23,7 @@ import ( "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/sigv4" "gopkg.in/yaml.v2" "github.com/grafana/loki/v3/pkg/ruler/storage/cleaner" @@ -312,7 +313,12 @@ func (r *walRegistry) getTenantRemoteWriteConfig(tenant string, base RemoteWrite } if v := r.overrides.RulerRemoteWriteSigV4Config(tenant); v != nil { - clt.SigV4Config = v + clt.SigV4Config = &sigv4.SigV4Config{} + clt.SigV4Config.Region = v.Region + clt.SigV4Config.AccessKey = v.AccessKey + clt.SigV4Config.SecretKey = v.SecretKey + clt.SigV4Config.Profile = v.Profile + clt.SigV4Config.RoleARN = v.RoleARN } if v := r.overrides.RulerRemoteWriteConfig(tenant, id); v != nil { @@ -388,8 +394,12 @@ func (n notReadyAppender) AppendHistogram(_ storage.SeriesRef, _ labels.Labels, func (n notReadyAppender) AppendCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _ int64, _ int64) (storage.SeriesRef, error) { return 0, errNotReady } -func (n notReadyAppender) Commit() error { return errNotReady } -func (n notReadyAppender) Rollback() error { return errNotReady } +func (n notReadyAppender) AppendHistogramCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _ int64, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { + return 0, errNotReady +} +func (n notReadyAppender) SetOptions(_ *storage.AppendOptions) {} +func (n notReadyAppender) Commit() error { return errNotReady } +func (n notReadyAppender) Rollback() error { return errNotReady } type discardingAppender struct{} @@ -408,8 +418,12 @@ func (n discardingAppender) AppendHistogram(_ storage.SeriesRef, _ labels.Labels func (n discardingAppender) AppendCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _ int64, _ int64) (storage.SeriesRef, error) { return 0, nil } -func (n discardingAppender) Commit() error { return nil } -func (n discardingAppender) Rollback() error { return nil } +func (n discardingAppender) AppendHistogramCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _ int64, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { + return 0, nil +} +func (n discardingAppender) SetOptions(_ *storage.AppendOptions) {} +func (n discardingAppender) Commit() error { return nil } +func (n discardingAppender) Rollback() error { return nil } type readyChecker interface { isReady(tenant string) bool diff --git a/pkg/ruler/registry_test.go b/pkg/ruler/registry_test.go index 7ab12d8962ae6..e0096d43e0a35 100644 --- a/pkg/ruler/registry_test.go +++ b/pkg/ruler/registry_test.go @@ -13,9 +13,10 @@ import ( "github.com/grafana/dskit/user" promConfig "github.com/prometheus/common/config" "github.com/prometheus/common/model" - "github.com/prometheus/common/sigv4" + common_sigv4 "github.com/prometheus/common/sigv4" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/relabel" + prom_sigv4 "github.com/prometheus/sigv4" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -209,7 +210,7 @@ func newFakeLimitsBackwardCompat() fakeLimits { }, }, sigV4ConfigTenant: { - RulerRemoteWriteSigV4Config: &sigv4.SigV4Config{ + RulerRemoteWriteSigV4Config: &common_sigv4.SigV4Config{ Region: sigV4TenantRegion, }, }, @@ -282,7 +283,7 @@ func newFakeLimits() fakeLimits { sigV4ConfigTenant: { RulerRemoteWriteConfig: map[string]config.RemoteWriteConfig{ remote1: { - SigV4Config: &sigv4.SigV4Config{ + SigV4Config: &prom_sigv4.SigV4Config{ Region: sigV4TenantRegion, }, }, @@ -338,7 +339,7 @@ func setupSigV4Registry(t *testing.T, cfg Config, limits fakeLimits) *walRegistr // Remove the basic auth config and replace with sigv4 for id, clt := range reg.config.RemoteWrite.Clients { clt.HTTPClientConfig.BasicAuth = nil - clt.SigV4Config = &sigv4.SigV4Config{ + clt.SigV4Config = &prom_sigv4.SigV4Config{ Region: sigV4GlobalRegion, } reg.config.RemoteWrite.Clients[id] = clt diff --git a/pkg/ruler/rulestore/local/local.go b/pkg/ruler/rulestore/local/local.go index 0eb3cda68175d..c496dce90170a 100644 --- a/pkg/ruler/rulestore/local/local.go +++ b/pkg/ruler/rulestore/local/local.go @@ -177,7 +177,7 @@ func (l *Client) loadAllRulesGroupsForUser(ctx context.Context, userID string) ( func (l *Client) loadAllRulesGroupsForUserAndNamespace(_ context.Context, userID string, namespace string) (rulespb.RuleGroupList, error) { filename := filepath.Join(l.cfg.Directory, userID, namespace) - rulegroups, allErrors := l.loader.Load(filename) + rulegroups, allErrors := l.loader.Load(filename, true) if len(allErrors) > 0 { return nil, errors.Wrapf(allErrors[0], "error parsing %s", filename) } diff --git a/pkg/ruler/storage/instance/instance.go b/pkg/ruler/storage/instance/instance.go index ddd017664c976..35f563ac16ef7 100644 --- a/pkg/ruler/storage/instance/instance.go +++ b/pkg/ruler/storage/instance/instance.go @@ -30,6 +30,7 @@ import ( "github.com/grafana/loki/v3/pkg/ruler/storage/util" "github.com/grafana/loki/v3/pkg/ruler/storage/wal" "github.com/grafana/loki/v3/pkg/util/build" + util_log "github.com/grafana/loki/v3/pkg/util/log" ) func init() { @@ -303,7 +304,7 @@ func (i *Instance) initialize(_ context.Context, reg prometheus.Registerer, cfg // Setup the remote storage remoteLogger := log.With(i.logger, "component", "remote") - i.remoteStore = remote.NewStorage(remoteLogger, reg, i.wal.StartTime, i.wal.Directory(), cfg.RemoteFlushDeadline, noopScrapeManager{}, false) + i.remoteStore = remote.NewStorage(util_log.SlogFromGoKit(remoteLogger), reg, i.wal.StartTime, i.wal.Directory(), cfg.RemoteFlushDeadline, noopScrapeManager{}, false) err = i.remoteStore.ApplyConfig(&config.Config{ RemoteWriteConfigs: cfg.RemoteWrite, }) @@ -311,7 +312,7 @@ func (i *Instance) initialize(_ context.Context, reg prometheus.Registerer, cfg return fmt.Errorf("failed applying config to remote storage: %w", err) } - i.storage = storage.NewFanout(i.logger, i.wal, i.remoteStore) + i.storage = storage.NewFanout(util_log.SlogFromGoKit(i.logger), i.wal, i.remoteStore) i.wal.SetWriteNotified(i.remoteStore) i.initialized = true diff --git a/pkg/ruler/storage/instance/instance_test.go b/pkg/ruler/storage/instance/instance_test.go index 03a469ed187c6..9aca0df22c7b5 100644 --- a/pkg/ruler/storage/instance/instance_test.go +++ b/pkg/ruler/storage/instance/instance_test.go @@ -301,10 +301,16 @@ func (a *mockAppender) AppendHistogram(_ storage.SeriesRef, _ labels.Labels, _ i return 0, nil } +func (a *mockAppender) AppendHistogramCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _ int64, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { + return 0, nil +} + func (a *mockAppender) AppendCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _ int64, _ int64) (storage.SeriesRef, error) { return 0, nil } +func (a *mockAppender) SetOptions(_ *storage.AppendOptions) {} + func (a *mockAppender) Commit() error { return nil } diff --git a/pkg/ruler/storage/wal/wal.go b/pkg/ruler/storage/wal/wal.go index 9016064c37f18..e3252701380d2 100644 --- a/pkg/ruler/storage/wal/wal.go +++ b/pkg/ruler/storage/wal/wal.go @@ -15,6 +15,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + util_log "github.com/grafana/loki/v3/pkg/util/log" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/exemplar" @@ -68,7 +69,7 @@ type Storage struct { // NewStorage makes a new Storage. func NewStorage(logger log.Logger, metrics *Metrics, registerer prometheus.Registerer, path string) (*Storage, error) { - w, err := wlog.NewSize(logger, registerer, SubDirectory(path), wlog.DefaultSegmentSize, wlog.CompressionSnappy) + w, err := wlog.NewSize(util_log.SlogFromGoKit(logger), registerer, SubDirectory(path), wlog.DefaultSegmentSize, wlog.CompressionSnappy) if err != nil { return nil, err } @@ -378,7 +379,7 @@ func (w *Storage) Truncate(mint int64) error { w.deletedMtx.Unlock() return ok } - if _, err = wlog.Checkpoint(w.logger, w.wal, first, last, keep, mint); err != nil { + if _, err = wlog.Checkpoint(util_log.SlogFromGoKit(w.logger), w.wal, first, last, keep, mint); err != nil { return errors.Wrap(err, "create checkpoint") } if err := w.wal.Truncate(last + 1); err != nil { @@ -655,11 +656,18 @@ func (a *appender) AppendHistogram(_ storage.SeriesRef, _ labels.Labels, _ int64 return 0, nil } +func (a *appender) AppendHistogramCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _ int64, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { + // TODO: support histogram created timestamps + return 0, nil +} + func (a *appender) AppendCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _ int64, _ int64) (storage.SeriesRef, error) { // TODO: support created timestamp return 0, nil } +func (a *appender) SetOptions(_ *storage.AppendOptions) {} + // Commit submits the collected samples and purges the batch. func (a *appender) Commit() error { a.w.walMtx.RLock() diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/head_wal.go b/pkg/storage/stores/shipper/indexshipper/tsdb/head_wal.go index d1a3dcf2dc046..66c18a51890a4 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/head_wal.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/head_wal.go @@ -10,6 +10,7 @@ import ( "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index" "github.com/grafana/loki/v3/pkg/util/encoding" + util_log "github.com/grafana/loki/v3/pkg/util/log" ) type WAL interface { @@ -204,7 +205,7 @@ func newHeadWAL(log log.Logger, dir string, t time.Time) (*headWAL, error) { // NB: if we use a non-nil Prometheus Registerer, ensure // that the underlying metrics won't conflict with existing WAL metrics in the ingester. // Likely, this can be done by adding extra label(s) - wal, err := wlog.NewSize(log, nil, dir, walSegmentSize, wlog.CompressionNone) + wal, err := wlog.NewSize(util_log.SlogFromGoKit(log), nil, dir, walSegmentSize, wlog.CompressionNone) if err != nil { return nil, err } diff --git a/pkg/util/log/log.go b/pkg/util/log/log.go index 93ccb86abf57a..03505ada815c1 100644 --- a/pkg/util/log/log.go +++ b/pkg/util/log/log.go @@ -28,10 +28,17 @@ var ( bufferedLogger *dslog.BufferedLogger plogger *prometheusLogger + // initialize log level to info, but it is set in the InitLogger function + logLevel = func() dslog.Level { + var l dslog.Level + _ = l.Set("info") + return l + }() ) // InitLogger initialises the global gokit logger (util_log.Logger) and returns that logger. func InitLogger(cfg *server.Config, reg prometheus.Registerer, sync bool) log.Logger { + logLevel = cfg.LogLevel logger := newPrometheusLogger(cfg.LogLevel, cfg.LogFormat, reg, sync) // when using util_log.Logger, skip 3 stack frames. Logger = log.With(logger, "caller", log.Caller(3)) diff --git a/pkg/util/log/slogadapter.go b/pkg/util/log/slogadapter.go new file mode 100644 index 0000000000000..70969c95264af --- /dev/null +++ b/pkg/util/log/slogadapter.go @@ -0,0 +1,29 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package log + +import ( + "log/slog" + + "github.com/go-kit/log" + slgk "github.com/tjhop/slog-gokit" +) + +// SlogFromGoKit returns slog adapter for logger. +func SlogFromGoKit(logger log.Logger) *slog.Logger { + var sl slog.Level + switch logLevel.String() { + case "info": + sl = slog.LevelInfo + case "warn": + sl = slog.LevelWarn + case "error": + sl = slog.LevelError + default: + sl = slog.LevelDebug + } + + lvl := slog.LevelVar{} + lvl.Set(sl) + return slog.New(slgk.NewGoKitHandler(logger, &lvl)) +} diff --git a/pkg/util/log/slogadapter_test.go b/pkg/util/log/slogadapter_test.go new file mode 100644 index 0000000000000..194e1be74e192 --- /dev/null +++ b/pkg/util/log/slogadapter_test.go @@ -0,0 +1,96 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package log + +import ( + "context" + "fmt" + "log/slog" + "testing" + + "github.com/go-kit/log/level" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +type mockLogger struct { + mock.Mock +} + +func (m *mockLogger) Log(keyvals ...interface{}) error { + args := m.Called(keyvals...) + return args.Error(0) +} + +func TestSlogFromGoKit(t *testing.T) { + levels := []level.Value{ + level.DebugValue(), + level.InfoValue(), + level.WarnValue(), + level.ErrorValue(), + } + slogLevels := []slog.Level{ + slog.LevelDebug, + slog.LevelInfo, + slog.LevelWarn, + slog.LevelError, + } + + // Store original log level to restore after tests + originalLevel := logLevel.String() + t.Cleanup(func() { + _ = logLevel.Set(originalLevel) + }) + + t.Run("enabled for the right slog levels when go-kit level configured", func(t *testing.T) { + for i, l := range levels { + switch i { + case 0: + require.NoError(t, logLevel.Set("debug")) + case 1: + require.NoError(t, logLevel.Set("info")) + case 2: + require.NoError(t, logLevel.Set("warn")) + case 3: + require.NoError(t, logLevel.Set("error")) + default: + panic(fmt.Errorf("unhandled level %d", i)) + } + + mLogger := &mockLogger{} + logger := level.NewFilter(mLogger, logLevel.Option) + slogger := SlogFromGoKit(logger) + + for j, sl := range slogLevels { + if j >= i { + assert.Truef(t, slogger.Enabled(context.Background(), sl), "slog logger should be enabled for go-kit level %v / slog level %v", l, sl) + } else { + assert.Falsef(t, slogger.Enabled(context.Background(), sl), "slog logger should not be enabled for go-kit level %v / slog level %v", l, sl) + } + } + } + }) + + t.Run("wraps go-kit logger", func(t *testing.T) { + mLogger := &mockLogger{} + slogger := SlogFromGoKit(mLogger) + + for _, l := range levels { + mLogger.On("Log", level.Key(), l, "caller", mock.AnythingOfType("string"), "time", mock.AnythingOfType("time.Time"), "msg", "test", "attr", slog.StringValue("value")).Times(1).Return(nil) + attrs := []any{"attr", "value"} + switch l { + case level.DebugValue(): + slogger.Debug("test", attrs...) + case level.InfoValue(): + slogger.Info("test", attrs...) + case level.WarnValue(): + slogger.Warn("test", attrs...) + case level.ErrorValue(): + slogger.Error("test", attrs...) + default: + panic(fmt.Errorf("unrecognized level %v", l)) + } + } + }) +} diff --git a/vendor/github.com/tjhop/slog-gokit/.goreleaser.yaml b/vendor/github.com/tjhop/slog-gokit/.goreleaser.yaml new file mode 100644 index 0000000000000..f092384c215e5 --- /dev/null +++ b/vendor/github.com/tjhop/slog-gokit/.goreleaser.yaml @@ -0,0 +1,29 @@ +version: 2 + +builds: +- skip: true +gomod: + proxy: true + mod: mod +checksum: + name_template: 'checksums.txt' +snapshot: + version_template: "{{ incpatch .Version }}-next" +changelog: + sort: asc + filters: + exclude: + - '^Merge pull request' + - '^ci(?:\(\w+\))?\!?:' + - '^docs(?:\(\w+\))?\!?:' + - '^test(?:\(\w+\))?\!?:' + - '^style(?:\(\w+\))?\!?:' + groups: + - title: "New Features And Changes" + regexp: '^.*?feat(\([[:word:]]+\))??!?:.+$' + order: 0 + - title: "Fixes" + regexp: '^.*?fix(\([[:word:]]+\))??!?:.+$' + order: 1 + - title: "Other Changes" + order: 999 diff --git a/vendor/github.com/tjhop/slog-gokit/LICENSE b/vendor/github.com/tjhop/slog-gokit/LICENSE new file mode 100644 index 0000000000000..261eeb9e9f8b2 --- /dev/null +++ b/vendor/github.com/tjhop/slog-gokit/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/tjhop/slog-gokit/Makefile b/vendor/github.com/tjhop/slog-gokit/Makefile new file mode 100644 index 0000000000000..dce8bb04e1cd4 --- /dev/null +++ b/vendor/github.com/tjhop/slog-gokit/Makefile @@ -0,0 +1,27 @@ +GOCMD := go +GOFMT := ${GOCMD} fmt +GOMOD := ${GOCMD} mod +GOTEST := ${GOCMD} test +GOLANGCILINT_CACHE := ${CURDIR}/.golangci-lint/build/cache + +# autogenerate help messages for comment lines with 2 `#` +.PHONY: help +help: ## print this help message + @awk 'BEGIN {FS = ":.*##"; printf "\nUsage:\n make \033[36m\033[0m\n\nTargets:\n"} /^[a-z0-9A-Z_-]+:.*?##/ { printf " \033[36m%-30s\033[0m%s\n", $$1, $$2 }' $(MAKEFILE_LIST) + +.PHONY: tidy +tidy: ## tidy modules + ${GOMOD} tidy + +.PHONY: fmt +fmt: ## apply go code style formatter + ${GOFMT} -x ./... + +.PHONY: lint +lint: ## run linters + mkdir -p ${GOLANGCILINT_CACHE} || true + docker run --rm -v ${CURDIR}:/app -v ${GOLANGCILINT_CACHE}:/root/.cache -w /app docker.io/golangci/golangci-lint:latest golangci-lint run -v + +.PHONY: test +test: ## run go tests + ${GOTEST} -race -v . diff --git a/vendor/github.com/tjhop/slog-gokit/README.md b/vendor/github.com/tjhop/slog-gokit/README.md new file mode 100644 index 0000000000000..6addd47981daf --- /dev/null +++ b/vendor/github.com/tjhop/slog-gokit/README.md @@ -0,0 +1,51 @@ +[![license](https://img.shields.io/github/license/tjhop/slog-gokit)](https://github.com/tjhop/slog-gokit/blob/master/LICENSE) +[![Go Report Card](https://goreportcard.com/badge/github.com/tjhop/slog-gokit)](https://goreportcard.com/report/github.com/tjhop/slog-gokit) +[![golangci-lint](https://github.com/tjhop/slog-gokit/actions/workflows/golangci-lint.yaml/badge.svg)](https://github.com/tjhop/slog-gokit/actions/workflows/golangci-lint.yaml) +[![Latest Release](https://img.shields.io/github/v/release/tjhop/slog-gokit)](https://github.com/tjhop/slog-gokit/releases/latest) + +# Go slog-gokit Adapter + +This library provides a custom slog.Handler that wraps a go-kit Logger, so that loggers created via `slog.New()` chain their log calls to the internal go-kit Logger. + +## Install + +```bash +go get github.com/tjhop/slog-gokit +``` + +## Documentation + +Documentation can be found here: + +https://pkg.go.dev/github.com/tjhop/slog-gokit + +## Example + +```go +package main + +import ( + "log/slog" + "os" + + "github.com/go-kit/log" + slgk "github.com/tjhop/slog-gokit" +) + +func main() { + // Take an existing go-kit/log Logger: + gklogger := log.NewLogfmtLogger(os.Stderr) + + // Create an slog Logger that chains log calls to the go-kit/log Logger: + slogger := slog.New(slgk.NewGoKitHandler(gklogger, nil)) + slogger.WithGroup("example_group").With("foo", "bar").Info("hello world") + + // The slog Logger produces logs at slog.LevelInfo by default. + // Optionally create an slog.Leveler to dynamically adjust the level of + // the slog Logger. + lvl := &slog.LevelVar{} + lvl.Set(slog.LevelDebug) + slogger = slog.New(slgk.NewGoKitHandler(gklogger, lvl)) + slogger.WithGroup("example_group").With("foo", "bar").Info("hello world") +} +``` diff --git a/vendor/github.com/tjhop/slog-gokit/handler.go b/vendor/github.com/tjhop/slog-gokit/handler.go new file mode 100644 index 0000000000000..d2b84a647cdf9 --- /dev/null +++ b/vendor/github.com/tjhop/slog-gokit/handler.go @@ -0,0 +1,163 @@ +package sloggokit + +import ( + "context" + "log/slog" + "os" + + "github.com/go-kit/log" +) + +var _ slog.Handler = (*GoKitHandler)(nil) + +var defaultGoKitLogger = log.NewLogfmtLogger(os.Stderr) + +// GoKitHandler implements the slog.Handler interface. It holds an internal +// go-kit logger that is used to perform the true logging. +type GoKitHandler struct { + level slog.Leveler + logger log.Logger + preformatted []any + group string +} + +// NewGoKitHandler returns a new slog logger from the provided go-kit +// logger. Calls to the slog logger are chained to the handler's internal +// go-kit logger. If provided a level, it will be used to filter log events in +// the handler's Enabled() method. +func NewGoKitHandler(logger log.Logger, level slog.Leveler) slog.Handler { + if logger == nil { + logger = defaultGoKitLogger + } + + // Adjust runtime call depth to compensate for the adapter and point to + // the appropriate source line. + logger = log.With(logger, "caller", log.Caller(6)) + + if level == nil { + level = &slog.LevelVar{} // Info level by default. + } + + return &GoKitHandler{logger: logger, level: level} +} + +// Enabled returns true if the internal slog.Leveler is enabled for the +// provided log level. It implements slog.Handler. +func (h *GoKitHandler) Enabled(_ context.Context, level slog.Level) bool { + if h.level == nil { + h.level = &slog.LevelVar{} // Info level by default. + } + + return level >= h.level.Level() +} + +// Handler take an slog.Record created by an slog.Logger and dispatches the log +// call to the internal go-kit logger. Groups and attributes created by slog +// are formatted and added to the log call as individual key/value pairs. It +// implements slog.Handler. +func (h *GoKitHandler) Handle(_ context.Context, record slog.Record) error { + if h.logger == nil { + h.logger = defaultGoKitLogger + } + + logger := goKitLevelFunc(h.logger, record.Level) + + // 1 slog.Attr == 1 key and 1 value, set capacity >= (2 * num attrs). + // + // Note: this could probably be (micro)-optimized further -- we know we + // need to also append on a timestamp from the record, the message, the + // preformatted vals, all things we more or less know the size of at + // creation time here. + pairs := make([]any, 0, (2 * record.NumAttrs())) + if !record.Time.IsZero() { + pairs = append(pairs, "time", record.Time) + } + pairs = append(pairs, "msg", record.Message) + pairs = append(pairs, h.preformatted...) + + record.Attrs(func(a slog.Attr) bool { + pairs = appendPair(pairs, h.group, a) + return true + }) + + return logger.Log(pairs...) +} + +// WithAttrs formats the provided attributes and caches them in the handler to +// attach to all future log calls. It implements slog.Handler. +func (h *GoKitHandler) WithAttrs(attrs []slog.Attr) slog.Handler { + pairs := make([]any, 0, 2*len(attrs)) + for _, a := range attrs { + pairs = appendPair(pairs, h.group, a) + } + + if h.preformatted != nil { + pairs = append(h.preformatted, pairs...) + } + + return &GoKitHandler{ + logger: h.logger, + level: h.level, + preformatted: pairs, + group: h.group, + } +} + +// WithGroup sets the group prefix string and caches it within the handler to +// use to prefix all future log attribute pairs. It implements slog.Handler. +func (h *GoKitHandler) WithGroup(name string) slog.Handler { + if name == "" { + return h + } + + g := name + if h.group != "" { + g = h.group + "." + g + } + + return &GoKitHandler{ + logger: h.logger, + level: h.level, + preformatted: h.preformatted, + group: g, + } +} + +func appendPair(pairs []any, groupPrefix string, attr slog.Attr) []any { + if attr.Equal(slog.Attr{}) { + return pairs + } + + switch attr.Value.Kind() { + case slog.KindGroup: + attrs := attr.Value.Group() + if len(attrs) > 0 { + // Only process groups that have non-empty attributes + // to properly conform to slog.Handler interface + // contract. + + if attr.Key != "" { + // If a group's key is empty, attributes should + // be inlined to properly conform to + // slog.Handler interface contract. + if groupPrefix != "" { + groupPrefix = groupPrefix + "." + attr.Key + } else { + groupPrefix = attr.Key + } + } + for _, a := range attrs { + pairs = appendPair(pairs, groupPrefix, a) + } + } + default: + key := attr.Key + if groupPrefix != "" { + key = groupPrefix + "." + key + } + + pairs = append(pairs, key, attr.Value) + } + + return pairs +} diff --git a/vendor/github.com/tjhop/slog-gokit/level.go b/vendor/github.com/tjhop/slog-gokit/level.go new file mode 100644 index 0000000000000..467d991a9c3a4 --- /dev/null +++ b/vendor/github.com/tjhop/slog-gokit/level.go @@ -0,0 +1,23 @@ +package sloggokit + +import ( + "log/slog" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" +) + +func goKitLevelFunc(logger log.Logger, lvl slog.Level) log.Logger { + switch lvl { + case slog.LevelInfo: + logger = level.Info(logger) + case slog.LevelWarn: + logger = level.Warn(logger) + case slog.LevelError: + logger = level.Error(logger) + default: + logger = level.Debug(logger) + } + + return logger +} diff --git a/vendor/modules.txt b/vendor/modules.txt index c612952c5a40e..04dd986da5d83 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1691,6 +1691,9 @@ github.com/thanos-io/objstore/providers/oss github.com/thanos-io/objstore/providers/s3 github.com/thanos-io/objstore/providers/swift github.com/thanos-io/objstore/tracing/opentracing +# github.com/tjhop/slog-gokit v0.1.3 +## explicit; go 1.21 +github.com/tjhop/slog-gokit # github.com/tklauser/go-sysconf v0.3.13 ## explicit; go 1.18 github.com/tklauser/go-sysconf