Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: flag to enable gauge replacement #77

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ Flags:
Example: "user1=pass1,user2=pass2"
--apiListen string Listen for API requests on this host/port. (default ":80")
--cors string The 'Access-Control-Allow-Origin' value to be returned. (default "*")
--gaugeBehavior string How gauges are aggregated (sum or replace). (default "sum")
-h, --help help for prom-aggregation-gateway
--lifecycleListen string Listen for lifecycle requests (health, metrics) on this host/port (default ":8888")

Expand All @@ -73,6 +74,15 @@ Use "prom-aggregation-gateway [command] --help" for more information about a com

Any flags you see above can also be set by `ENV_VARIABLES`. ENV_VARS must have a prefix of `PAG_`, for example `PAG_AUTHUSERS=user1=pass1,user2=pass2` will start the service with basic auth. If an ENV_VARIABLE is set than it will be used over a CLI argument passed to the service.

### Gauge Behavior

Gauges can be aggregated in two ways:

1. `sum` (default) - just like counters, by summing the values received.
2. `replace` - every new value replaces the current value, scrape will always see the latest value.

set this with the flag `--gaugeBehavior=XXX` or as an env var `PAG_GAUGEBEHAVIOR=XXX`.

## Ready-built images

Container images are published here:
Expand Down
1 change: 1 addition & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func Execute() {
rootCmd.PersistentFlags().StringVar(&cfg.ApiListen, "apiListen", ":80", "Listen for API requests on this host/port.")
rootCmd.PersistentFlags().StringVar(&cfg.LifecycleListen, "lifecycleListen", ":8888", "Listen for lifecycle requests (health, metrics) on this host/port")
rootCmd.PersistentFlags().StringVar(&cfg.CorsDomain, "cors", "*", "The 'Access-Control-Allow-Origin' value to be returned.")
rootCmd.PersistentFlags().StringVar(&cfg.GaugeBehavior, "gaugeBehavior", "sum", "How gauges are aggregated (sum or replace).")

if err := rootCmd.Execute(); err != nil {
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func startFunc(cmd *cobra.Command, args []string) error {
Accounts: cfg.AuthUsers,
}

routers.RunServers(apiCfg, cfg.ApiListen, cfg.LifecycleListen)
routers.RunServers(apiCfg, cfg.ApiListen, cfg.LifecycleListen, cfg.GaugeBehavior)

return nil
}
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Server struct {
LifecycleListen string
CorsDomain string
AuthUsers []string
GaugeBehavior string
}

const (
Expand Down
23 changes: 21 additions & 2 deletions metrics/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (

type metricFamily struct {
*dto.MetricFamily
lock sync.RWMutex
lock sync.RWMutex
options *aggregateOptions
}

type Aggregate struct {
Expand All @@ -28,9 +29,17 @@ type Aggregate struct {

type ignoredLabels []string

type gaugeBehavior string

const (
sumBehavior gaugeBehavior = "sum"
replaceBehavior = "replace"
)

type aggregateOptions struct {
ignoredLabels ignoredLabels
metricTTLDuration *time.Duration
gaugeBehavior gaugeBehavior
}

type aggregateOptionsFunc func(a *Aggregate)
Expand All @@ -47,6 +56,16 @@ func SetTTLMetricTime(duration *time.Duration) aggregateOptionsFunc {
}
}

func SetGaugeBehavior(behavior string) aggregateOptionsFunc {
return func(a *Aggregate) {
if behavior == replaceBehavior {
a.options.gaugeBehavior = replaceBehavior
} else {
a.options.gaugeBehavior = sumBehavior
}
}
}

func NewAggregate(opts ...aggregateOptionsFunc) *Aggregate {
a := &Aggregate{
families: map[string]*metricFamily{},
Expand Down Expand Up @@ -91,7 +110,7 @@ func (a *Aggregate) setFamilyOrGetExistingFamily(familyName string, family *dto.
defer a.familiesLock.Unlock()
existingFamily, ok := a.families[familyName]
if !ok {
a.families[familyName] = &metricFamily{MetricFamily: family}
a.families[familyName] = &metricFamily{MetricFamily: family, options: &a.options}
return nil
}
return existingFamily
Expand Down
48 changes: 39 additions & 9 deletions metrics/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,28 @@ histogram_bucket{job="test",le="+Inf"} 9
histogram_sum{job="test"} 7
histogram_count{job="test"} 2
`

wantReplace = `# HELP counter A counter
# TYPE counter counter
counter{job="test"} 60
# HELP gauge A gauge
# TYPE gauge gauge
gauge{job="test"} 57
# HELP histogram A histogram
# TYPE histogram histogram
histogram_bucket{job="test",le="1"} 0
histogram_bucket{job="test",le="2"} 0
histogram_bucket{job="test",le="3"} 3
histogram_bucket{job="test",le="4"} 8
histogram_bucket{job="test",le="5"} 9
histogram_bucket{job="test",le="6"} 9
histogram_bucket{job="test",le="7"} 9
histogram_bucket{job="test",le="8"} 9
histogram_bucket{job="test",le="9"} 9
histogram_bucket{job="test",le="10"} 9
histogram_bucket{job="test",le="+Inf"} 9
histogram_sum{job="test"} 7
histogram_count{job="test"} 2
`
multilabel1 = `# HELP counter A counter
# TYPE counter counter
counter{a="a",b="b", ignore_label="ignore_value"} 1
Expand Down Expand Up @@ -121,6 +142,12 @@ ui_external_lib_loaded{name="mixpanel",loaded="true"} 1
ui_external_lib_loaded{job="test",loaded="true",name="Intercom"} 2
ui_external_lib_loaded{job="test",loaded="true",name="ga"} 2
ui_external_lib_loaded{job="test",loaded="true",name="mixpanel"} 2
`
gaugeOutputReplace = `# HELP ui_external_lib_loaded A gauge with entries in un-sorted order
# TYPE ui_external_lib_loaded gauge
ui_external_lib_loaded{job="test",loaded="true",name="Intercom"} 1
ui_external_lib_loaded{job="test",loaded="true",name="ga"} 1
ui_external_lib_loaded{job="test",loaded="true",name="mixpanel"} 1
`
duplicateLabels = `
# HELP ui_external_lib_loaded Test with duplicate values
Expand Down Expand Up @@ -179,17 +206,20 @@ func TestAggregate(t *testing.T) {
a, b string
want string
ignoredLabels []string
behavior string
}{
{"simpleGauge", gaugeInput, gaugeInput, gaugeOutput, []string{}},
{"in", in1, in2, want, []string{}},
{"multilabel", multilabel1, multilabel2, multilabelResult, []string{"ignore_label"}},
{"labelFields", labelFields1, labelFields2, labelFieldResult, []string{}},
{"reorderedLabels", reorderedLabels1, reorderedLabels2, reorderedLabelsResult, []string{}},
{"ignoredLabels", ignoredLabels1, ignoredLabels2, ignoredLabelsResult, []string{"ignore_me"}},
{"summary", summaryInput, summaryInput, summaryOutput, []string{}},
{"simpleGauge", gaugeInput, gaugeInput, gaugeOutput, []string{}, "sum"},
{"simpleGaugeReplace", gaugeInput, gaugeInput, gaugeOutputReplace, []string{}, "replace"},
{"in", in1, in2, want, []string{}, "sum"},
{"inReplace", in1, in2, wantReplace, []string{}, "replace"},
{"multilabel", multilabel1, multilabel2, multilabelResult, []string{"ignore_label"}, "sum"},
{"labelFields", labelFields1, labelFields2, labelFieldResult, []string{}, "sum"},
{"reorderedLabels", reorderedLabels1, reorderedLabels2, reorderedLabelsResult, []string{}, "sum"},
{"ignoredLabels", ignoredLabels1, ignoredLabels2, ignoredLabelsResult, []string{"ignore_me"}, "sum"},
{"summary", summaryInput, summaryInput, summaryOutput, []string{}, "sum"},
} {
t.Run(c.testName, func(t *testing.T) {
agg := NewAggregate(AddIgnoredLabels(c.ignoredLabels...))
agg := NewAggregate(AddIgnoredLabels(c.ignoredLabels...), SetGaugeBehavior(c.behavior))

err := agg.parseAndMerge(strings.NewReader(c.a), testLabels)
require.NoError(t, err)
Expand Down
15 changes: 10 additions & 5 deletions metrics/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func mergeBuckets(a, b []*dto.Bucket) []*dto.Bucket {
return output
}

func mergeMetric(ty dto.MetricType, a, b *dto.Metric) *dto.Metric {
func mergeMetric(ty dto.MetricType, a, b *dto.Metric, options *aggregateOptions) *dto.Metric {
switch ty {
case dto.MetricType_COUNTER:
return &dto.Metric{
Expand All @@ -82,9 +82,14 @@ func mergeMetric(ty dto.MetricType, a, b *dto.Metric) *dto.Metric {
}

case dto.MetricType_GAUGE:
// No very meaningful way for us to merge gauges. We'll sum them
// and clear out any gauges on scrape, as a best approximation, but
// this relies on client pushing with the same interval as we scrape.
if options.gaugeBehavior == replaceBehavior {
return &dto.Metric{
Label: a.Label,
Gauge: &dto.Gauge{
Value: float64ptr(*b.Gauge.Value),
},
}
}
return &dto.Metric{
Label: a.Label,
Gauge: &dto.Gauge{
Expand Down Expand Up @@ -143,7 +148,7 @@ func (mf *metricFamily) mergeFamily(b *dto.MetricFamily) error {
newMetric = append(newMetric, b.Metric[j])
j++
} else {
merged := mergeMetric(*mf.Type, mf.Metric[i], b.Metric[j])
merged := mergeMetric(*mf.Type, mf.Metric[i], b.Metric[j], mf.options)
if merged != nil {
newMetric = append(newMetric, merged)
}
Expand Down
4 changes: 2 additions & 2 deletions routers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (
"github.com/zapier/prom-aggregation-gateway/metrics"
)

func RunServers(cfg ApiRouterConfig, apiListen string, lifecycleListen string) {
func RunServers(cfg ApiRouterConfig, apiListen string, lifecycleListen string, gaugeBehavior string) {
sigChannel := make(chan os.Signal, 1)
signal.Notify(sigChannel, syscall.SIGTERM, syscall.SIGINT)

agg := metrics.NewAggregate()
agg := metrics.NewAggregate(metrics.SetGaugeBehavior(gaugeBehavior))

promMetricsConfig := promMetrics.Config{
Registry: metrics.PromRegistry,
Expand Down