Skip to content

Commit

Permalink
WIP updating prometheus
Browse files Browse the repository at this point in the history
  • Loading branch information
paul1r committed Feb 20, 2025
1 parent 473199b commit fd5e357
Show file tree
Hide file tree
Showing 39 changed files with 792 additions and 88 deletions.
2 changes: 1 addition & 1 deletion clients/cmd/fluent-bit/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func extractLabels(records map[string]interface{}, keys []string) model.LabelSet
}
ln := model.LabelName(k)
// skips invalid name and values
if !ln.IsValid() {
if !ln.IsValidLegacy() {
continue
}
lv := model.LabelValue(fmt.Sprintf("%v", v))
Expand Down
21 changes: 15 additions & 6 deletions clients/pkg/logentry/stages/eventlogmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func validateEventLogMessageConfig(c *EventLogMessageConfig) error {
if c == nil {
return errors.New(ErrEmptyEvtLogMsgStageConfig)
}
if c.Source != nil && !model.LabelName(*c.Source).IsValid() {
if c.Source != nil && !model.LabelName(*c.Source).IsValidLegacy() {
return fmt.Errorf(ErrInvalidLabelName, *c.Source)
}
return nil
Expand Down Expand Up @@ -108,7 +108,7 @@ func (m *eventLogMessageStage) processEntry(extracted map[string]interface{}, ke
continue
}
mkey := parts[0]
if !model.LabelName(mkey).IsValid() {
if !model.LabelName(mkey).IsValidLegacy() {
if m.cfg.DropInvalidLabels {
if Debug {
level.Debug(m.logger).Log("msg", "invalid label parsed from message", "key", mkey)
Expand Down Expand Up @@ -148,17 +148,26 @@ func (*eventLogMessageStage) Cleanup() {
}

// Sanitize a input string to convert it into a valid prometheus label
// TODO: switch to prometheus/prometheus/util/strutil/SanitizeFullLabelName
func SanitizeFullLabelName(input string) string {
if len(input) == 0 {
return "_"
}
var validSb strings.Builder
for i, b := range input {
if !((b >= 'a' && b <= 'z') || (b >= 'A' && b <= 'Z') || b == '_' || (b >= '0' && b <= '9' && i > 0)) {
validSb.WriteRune('_')
// Handle first character - must be a letter or underscore
if len(input) > 0 {
b := rune(input[0])
if (b >= 'a' && b <= 'z') || (b >= 'A' && b <= 'Z') || b == '_' {
validSb.WriteRune(b)
} else {
validSb.WriteRune('_')
}
}
// Handle rest of characters
for _, b := range input[1:] {
if (b >= 'a' && b <= 'z') || (b >= 'A' && b <= 'Z') || b == '_' || (b >= '0' && b <= '9') {
validSb.WriteRune(b)
} else {
validSb.WriteRune('_')
}
}
return validSb.String()
Expand Down
2 changes: 1 addition & 1 deletion clients/pkg/logentry/stages/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func validateLabelsConfig(c LabelsConfig) error {
return errors.New(ErrEmptyLabelStageConfig)
}
for labelName, labelSrc := range c {
if !model.LabelName(labelName).IsValid() {
if !model.LabelName(labelName).IsValidLegacy() {
return fmt.Errorf(ErrInvalidLabelName, labelName)
}
// If no label source was specified, use the key name
Expand Down
34 changes: 16 additions & 18 deletions clients/pkg/promtail/discovery/consulagent/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net"
"net/http"
"os"
"strconv"
"strings"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
consul "github.com/hashicorp/consul/api"
conntrack "github.com/mwitkow/go-conntrack"
"github.com/pkg/errors"
Expand Down Expand Up @@ -152,19 +152,19 @@ type Discovery struct {
allowStale bool
refreshInterval time.Duration
finalizer func()
logger log.Logger
logger *slog.Logger
metrics *consulMetrics
}

// NewDiscovery returns a new Discovery for the given config.
func NewDiscovery(conf *SDConfig, logger log.Logger, metrics discovery.DiscovererMetrics) (*Discovery, error) {
func NewDiscovery(conf *SDConfig, logger *slog.Logger, metrics discovery.DiscovererMetrics) (*Discovery, error) {
m, ok := metrics.(*consulMetrics)
if !ok {
return nil, fmt.Errorf("invalid discovery metrics type")
}

if logger == nil {
logger = log.NewNopLogger()
logger = slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}))
}

tls, err := config.NewTLSConfig(&conf.TLSConfig)
Expand Down Expand Up @@ -265,15 +265,15 @@ func (d *Discovery) getDatacenter() error {
}
info, err := d.client.Agent().Self()
if err != nil {
level.Error(d.logger).Log("msg", "Error retrieving datacenter name", "err", err)
d.logger.Error("Error retrieving datacenter name", "err", err)
d.metrics.rpcFailuresCount.Inc()
return err
}

dc, ok := info["Config"]["Datacenter"].(string)
if !ok {
err := errors.Errorf("invalid value '%v' for Config.Datacenter", info["Config"]["Datacenter"])
level.Error(d.logger).Log("msg", "Error retrieving datacenter name", "err", err)
d.logger.Error("Error retrieving datacenter name", "err", err)
return err
}

Expand Down Expand Up @@ -342,7 +342,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
// entire list of services.
func (d *Discovery) watchServices(ctx context.Context, ch chan<- []*targetgroup.Group, services map[string]func(error)) {
agent := d.client.Agent()
level.Debug(d.logger).Log("msg", "Watching services", "tags", strings.Join(d.watchedTags, ","))
d.logger.Debug("Watching services", "tags", strings.Join(d.watchedTags, ","))

t0 := time.Now()
srvs, err := agent.Services()
Expand All @@ -357,7 +357,7 @@ func (d *Discovery) watchServices(ctx context.Context, ch chan<- []*targetgroup.
}

if err != nil {
level.Error(d.logger).Log("msg", "Error refreshing service list", "err", err)
d.logger.Error("Error refreshing service list", "err", err)
d.metrics.rpcFailuresCount.Inc()
time.Sleep(retryInterval)
return
Expand Down Expand Up @@ -386,9 +386,7 @@ func (d *Discovery) watchServices(ctx context.Context, ch chan<- []*targetgroup.
// Check for removed services.
for name, cancel := range services {
if _, ok := discoveredServices[name]; !ok {
level.Debug(d.logger).Log(
"msg", "removing service since consul no longer has a record of it",
"name", name)
d.logger.Debug("removing service since consul no longer has a record of it", "name", name)
// Call the watch cancellation function.
cancel(errors.New("canceling service since consul no longer has a record of it"))
delete(services, name)
Expand Down Expand Up @@ -420,7 +418,7 @@ type consulService struct {
discovery *Discovery
client *consul.Client
tagSeparator string
logger log.Logger
logger *slog.Logger
rpcFailuresCount prometheus.Counter
serviceRPCDuration prometheus.Observer
}
Expand Down Expand Up @@ -464,7 +462,7 @@ func (d *Discovery) watchService(ctx context.Context, ch chan<- []*targetgroup.G

// Get updates for a service.
func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Group, agent *consul.Agent) {
level.Debug(srv.logger).Log("msg", "Watching service", "service", srv.name, "tags", strings.Join(srv.tags, ","))
srv.logger.Debug("Watching service", "service", srv.name, "tags", strings.Join(srv.tags, ","))

t0 := time.Now()
aggregatedStatus, serviceChecks, err := agent.AgentHealthServiceByName(srv.name)
Expand All @@ -480,26 +478,26 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Gr
}

if err != nil {
level.Error(srv.logger).Log("msg", "Error refreshing service", "service", srv.name, "tags", strings.Join(srv.tags, ","), "err", err)
srv.logger.Error("Error refreshing service", "service", srv.name, "tags", strings.Join(srv.tags, ","), "err", err)
srv.rpcFailuresCount.Inc()
time.Sleep(retryInterval)
return
}

self, err := agent.Self()
if err != nil {
level.Error(srv.logger).Log("msg", "failed to get agent info from agent api", "err", err)
srv.logger.Error("failed to get agent info from agent api", "err", err)
return
}
var member = consul.AgentMember{}
memberBytes, err := json.Marshal(self["Member"])
if err != nil {
level.Error(srv.logger).Log("msg", "failed to get member information from agent", "err", err)
srv.logger.Error("failed to get member information from agent", "err", err)
return
}
err = json.Unmarshal(memberBytes, &member)
if err != nil {
level.Error(srv.logger).Log("msg", "failed to unmarshal member information from agent", "err", err)
srv.logger.Error("failed to unmarshal member information from agent", "err", err)
return
}

Expand Down
5 changes: 3 additions & 2 deletions clients/pkg/promtail/discovery/consulagent/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ package consulagent

import (
"context"
"log/slog"
"net/http"
"net/http/httptest"
"net/url"
"os"
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -364,7 +365,7 @@ func newServer(t *testing.T) (*httptest.Server, *SDConfig) {
}

func newDiscovery(t *testing.T, config *SDConfig) *Discovery {
logger := log.NewNopLogger()
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}))
metrics := NewTestMetrics(t, config, prometheus.NewRegistry())
d, err := NewDiscovery(config, logger, metrics)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion clients/pkg/promtail/promtail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ func getPromMetrics(t *testing.T, httpListenAddr net.Addr) ([]byte, string) {
func parsePromMetrics(t *testing.T, bytes []byte, contentType string, metricName string, label string) map[string]float64 {
rb := map[string]float64{}

pr, err := textparse.New(bytes, contentType, false, nil)
pr, err := textparse.New(bytes, contentType, "", false, false, nil)
require.NoError(t, err)
for {
et, err := pr.Next()
Expand Down
3 changes: 2 additions & 1 deletion clients/pkg/promtail/targets/docker/targetmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/grafana/loki/v3/clients/pkg/promtail/targets/target"

"github.com/grafana/loki/v3/pkg/util"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)

const (
Expand Down Expand Up @@ -60,7 +61,7 @@ func NewTargetManager(
positions: positions,
manager: discovery.NewManager(
ctx,
log.With(logger, "component", "docker_discovery"),
util_log.SlogFromGoKit(log.With(logger, "component", "docker_discovery")),
noopRegistry,
noopSdMetrics,
),
Expand Down
3 changes: 2 additions & 1 deletion clients/pkg/promtail/targets/file/filetargetmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/grafana/loki/v3/clients/pkg/promtail/targets/target"

"github.com/grafana/loki/v3/pkg/util"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)

const (
Expand Down Expand Up @@ -84,7 +85,7 @@ func NewFileTargetManager(
syncers: map[string]*targetSyncer{},
manager: discovery.NewManager(
ctx,
log.With(logger, "component", "discovery"),
util_log.SlogFromGoKit(log.With(logger, "component", "discovery")),
noopRegistry,
noopSdMetrics,
),
Expand Down
2 changes: 1 addition & 1 deletion clients/pkg/promtail/targets/heroku/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (h *Target) run() error {
// To prevent metric collisions because all metrics are going to be registered in the global Prometheus registry.

tentativeServerMetricNamespace := "promtail_heroku_drain_target_" + h.jobName
if !model.IsValidMetricName(model.LabelValue(tentativeServerMetricNamespace)) {
if !model.LabelName(tentativeServerMetricNamespace).IsValidLegacy() {
return fmt.Errorf("invalid prometheus-compatible job name: %s", h.jobName)
}
h.config.Server.MetricsNamespace = tentativeServerMetricNamespace
Expand Down
3 changes: 2 additions & 1 deletion clients/pkg/promtail/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/prometheus/prometheus/tsdb/wlog"

"github.com/grafana/loki/v3/pkg/ingester/wal"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)

var (
Expand Down Expand Up @@ -37,7 +38,7 @@ type wrapper struct {
func New(cfg Config, log log.Logger, registerer prometheus.Registerer) (WAL, error) {
// TODO: We should fine-tune the WAL instantiated here to allow some buffering of written entries, but not written to disk
// yet. This will attest for the lack of buffering in the channel Writer exposes.
tsdbWAL, err := wlog.NewSize(log, registerer, cfg.Dir, wlog.DefaultSegmentSize, wlog.CompressionNone)
tsdbWAL, err := wlog.NewSize(util_log.SlogFromGoKit(log), registerer, cfg.Dir, wlog.DefaultSegmentSize, wlog.CompressionNone)
if err != nil {
return nil, fmt.Errorf("failde to create tsdb WAL: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion clients/pkg/promtail/wal/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/prometheus/prometheus/tsdb/wlog"

"github.com/grafana/loki/v3/pkg/ingester/wal"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)

const (
Expand Down Expand Up @@ -160,7 +161,7 @@ func (w *Watcher) watch(segmentNum int) error {
}
defer segment.Close()

reader := wlog.NewLiveReader(w.logger, nil, segment)
reader := wlog.NewLiveReader(util_log.SlogFromGoKit(w.logger), nil, segment)

readTimer := newBackoffTimer(w.minReadFreq, w.maxReadFreq)

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ require (
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect
github.com/prometheus/sigv4 v0.1.1 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/tjhop/slog-gokit v0.1.3 // indirect
github.com/tklauser/go-sysconf v0.3.13 // indirect
github.com/tklauser/numcpus v0.7.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,8 @@ github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKN
github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4O8IB2ozzxM=
github.com/tencentyun/cos-go-sdk-v5 v0.7.40/go.mod h1:4dCEtLHGh8QPxHEkgq+nFaky7yZxQuYwgSJM87icDaw=
github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg=
github.com/tjhop/slog-gokit v0.1.3 h1:6SdexP3UIeg93KLFeiM1Wp1caRwdTLgsD/THxBUy1+o=
github.com/tjhop/slog-gokit v0.1.3/go.mod h1:Bbu5v2748qpAWH7k6gse/kw3076IJf6owJmh7yArmJs=
github.com/tklauser/go-sysconf v0.3.13 h1:GBUpcahXSpR2xN01jhkNAbTLRk2Yzgggk8IM08lq3r4=
github.com/tklauser/go-sysconf v0.3.13/go.mod h1:zwleP4Q4OehZHGn4CYZDipCgg9usW5IJePewFCGVEa0=
github.com/tklauser/numcpus v0.7.0 h1:yjuerZP127QG9m5Zh/mSO4wqurYil27tHrqwRoRjpr4=
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (w *WALCheckpointWriter) Advance() (bool, error) {
return false, fmt.Errorf("create checkpoint dir: %w", err)
}

checkpoint, err := wlog.NewSize(log.With(util_log.Logger, "component", "checkpoint_wal"), nil, checkpointDirTemp, walSegmentSize, wlog.CompressionNone)
checkpoint, err := wlog.NewSize(util_log.SlogFromGoKit(log.With(util_log.Logger, "component", "checkpoint_wal")), nil, checkpointDirTemp, walSegmentSize, wlog.CompressionNone)
if err != nil {
return false, fmt.Errorf("open checkpoint: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func newWAL(cfg WALConfig, registerer prometheus.Registerer, metrics *ingesterMe
return noopWAL{}, nil
}

tsdbWAL, err := wlog.NewSize(util_log.Logger, registerer, cfg.Dir, walSegmentSize, wlog.CompressionNone)
tsdbWAL, err := wlog.NewSize(util_log.SlogFromGoKit(util_log.Logger), registerer, cfg.Dir, walSegmentSize, wlog.CompressionNone)
if err != nil {
return nil, err
}
Expand Down
9 changes: 2 additions & 7 deletions pkg/querier/queryrange/queryrangebase/promql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/querier/astmapper"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)

var (
Expand All @@ -28,7 +29,7 @@ var (
ctx = context.Background()
engine = promql.NewEngine(promql.EngineOpts{
Reg: prometheus.DefaultRegisterer,
Logger: log.NewNopLogger(),
Logger: util_log.SlogFromGoKit(log.NewNopLogger()),
Timeout: 1 * time.Hour,
MaxSamples: 10e6,
ActiveQueryTracker: nil,
Expand Down Expand Up @@ -518,12 +519,6 @@ func Test_FunctionParallelism(t *testing.T) {
fn: "round",
fArgs: []string{"20"},
},
{
fn: "holt_winters",
isTestMatrix: true,
fArgs: []string{"0.5", "0.7"},
approximate: true,
},
} {

t.Run(tc.fn, func(t *testing.T) {
Expand Down
Loading

0 comments on commit fd5e357

Please sign in to comment.