From 0da317591575bbe715416662c1bcabd78b15a8ea Mon Sep 17 00:00:00 2001 From: tiedotguy Date: Mon, 13 Jul 2020 15:10:15 +1000 Subject: [PATCH 1/3] This adds a new load tester The load tester in the repo currently is not very configurable, yet still overly complicated. This introduces a completely fresh rewrite, which is both more configurable for testing different things, and simpler. --- README.md | 17 +++++- cmd/loader/args.go | 100 +++++++++++++++++++++++++++++++ cmd/loader/generation.go | 96 +++++++++++++++++++++++++++++ cmd/loader/main.go | 126 +++++++++++++++++++++++++++++++++++++++ go.mod | 2 +- go.sum | 1 + 6 files changed, 338 insertions(+), 4 deletions(-) create mode 100644 cmd/loader/args.go create mode 100644 cmd/loader/generation.go create mode 100644 cmd/loader/main.go diff --git a/README.md b/README.md index ecd307f0..70151eb7 100644 --- a/README.md +++ b/README.md @@ -308,6 +308,16 @@ Incorrect meta tag values will be handled in best effort manner, i.e. This is an experimental feature and it may be removed or changed in future versions. + +Load testing +------------ +There is a tool under `cmd/loader` with support for a number of options which can be used to generate synthetic statsd +load. There is also another load generation tool under `cmd/tester` which is deprecated and will be removed in a +future release. + +Help for the loader tool can be found through `--help`. + + Sending metrics --------------- The server listens for UDP packets on the address given by the `--metrics-addr` flag, @@ -316,12 +326,13 @@ flag (space separated list of backend names). Currently supported backends are: -* graphite +* cloudwatch * datadog +* graphite +* influxdb +* newrelic * statsdaemon * stdout -* cloudwatch -* newrelic The format of each metric is: diff --git a/cmd/loader/args.go b/cmd/loader/args.go new file mode 100644 index 00000000..255c9a92 --- /dev/null +++ b/cmd/loader/args.go @@ -0,0 +1,100 @@ +package main + +import ( + "fmt" + "os" + + "github.com/jessevdk/go-flags" +) + +type commandOptions struct { + Target string `short:"a" long:"address" default:"127.0.0.1:8125" description:"Address to send metrics" ` + MetricPrefix string `short:"p" long:"metric-prefix" default:"loadtest." description:"Metric name prefix" ` + MetricSuffix string ` long:"metric-suffix" default:".%d" description:"Metric suffix with cardinality marker" ` + Rate uint `short:"r" long:"rate" default:"1000" description:"Target packets per second" ` + DatagramSize uint ` long:"buffer-size" default:"1500" description:"Maximum size of datagram to send" ` + Workers uint `short:"w" long:"workers" default:"1" description:"Number of parallel workers to use" ` + Counts struct { + Counter uint64 ` short:"c" long:"counter-count" description:"Number of counters to send" ` + Gauge uint64 ` short:"g" long:"gauge-count" description:"Number of gauges to send" ` + Set uint64 ` short:"s" long:"set-count" description:"Number of sets to send" ` + Timer uint64 ` short:"t" long:"timer-count" description:"Number of timers to send" ` + } `group:"Metric count"` + NameCard struct { + Counter uint ` long:"counter-cardinality" default:"1" description:"Cardinality of counter names" ` + Gauge uint ` long:"gauge-cardinality" default:"1" description:"Cardinality of gauges names" ` + Set uint ` long:"set-cardinality" default:"1" description:"Cardinality of set names" ` + Timer uint ` long:"timer-cardinality" default:"1" description:"Cardinality of timer names" ` + } `group:"Name cardinality"` + TagCard struct { + Counter []uint ` long:"counter-tag-cardinality" description:"Cardinality of count tags" ` + Gauge []uint ` long:"gauge-tag-cardinality" description:"Cardinality of gauge tags" ` + Set []uint ` long:"set-tag-cardinality" description:"Cardinality of set tags" ` + Timer []uint ` long:"timer-tag-cardinality" description:"Cardinality of timer tags" ` + } `group:"Tag cardinality"` + ValueRange struct { + Counter uint ` long:"counter-value-limit" default:"0" description:"Maximum value of counters minus one" ` + Gauge uint ` long:"gauge-value-limit" default:"1" description:"Maximum value of gauges" ` + Set uint ` long:"set-value-cardinality" default:"1" description:"Maximum number of values to send per set"` + Timer uint ` long:"timer-value-limit" default:"1" description:"Maximum value of timers" ` + } `group:"Value range"` +} + +func parseArgs(args []string) commandOptions { + var opts commandOptions + parser := flags.NewParser(&opts, flags.HelpFlag | flags.PassDoubleDash) + parser.LongDescription = "" + // because gofmt + "When specifying cardinality, the tag cardinality can be specified multiple times,\n" + + "and each tag will be named tagN:M. The maximum total cardinality will be:\n\n" + + "|name| * |tag1| * |tag2| * ... * |tagN|\n\n" + + "Care should be taken to not cause a combinatorial explosion." + + positional, err := parser.ParseArgs(args) + if err != nil { + if !isHelp(err) { + parser.WriteHelp(os.Stderr) + _, _ = fmt.Fprintf(os.Stderr, "\n\nerror parsing command line: %v\n", err) + os.Exit(1) + } + parser.WriteHelp(os.Stdout) + os.Exit(0) + } + + if len(positional) != 0 { + // Near as I can tell there's no way to say no positional arguments allowed. + parser.WriteHelp(os.Stderr) + _, _ = fmt.Fprintf(os.Stderr, "\n\nno positional arguments allowed\n") + os.Exit(1) + } + + if opts.Counts.Counter+opts.Counts.Gauge+opts.Counts.Set+opts.Counts.Timer == 0 { + parser.WriteHelp(os.Stderr) + _, _ = fmt.Fprintf(os.Stderr, "\n\nAt least one of counter-count, gauge-count, set-count, or timer-count must be non-zero\n") + os.Exit(1) + } + return opts +} + +// isHelp is a helper to test the error from ParseArgs() to +// determine if the help message was written. It is safe to +// call without first checking that error is nil. +func isHelp(err error) bool { + // This was copied from https://github.com/jessevdk/go-flags/blame/master/help.go#L499, as there has not been an + // official release yet with this code. Renamed from WriteHelp to isHelp, as flags.ErrHelp is still returned when + // flags.HelpFlag is set, flags.PrintError is clear, and -h/--help is passed on the command line, even though the + // help is not displayed in such a situation. + if err == nil { // No error + return false + } + + flagError, ok := err.(*flags.Error) + if !ok { // Not a go-flag error + return false + } + + if flagError.Type != flags.ErrHelp { // Did not print the help message + return false + } + + return true +} diff --git a/cmd/loader/generation.go b/cmd/loader/generation.go new file mode 100644 index 00000000..1e0391d9 --- /dev/null +++ b/cmd/loader/generation.go @@ -0,0 +1,96 @@ +package main + +import ( + "fmt" + "math/rand" + "strconv" + "strings" + "sync/atomic" +) + +type metricData struct { + count uint64 // atomic + nameFormat string + nameCardinality uint + tagCardinality []uint + valueLimit uint +} + +type metricGenerator struct { + rnd *rand.Rand + + counters metricData + gauges metricData + sets metricData + timers metricData +} + +func (md *metricData) genName(sb *strings.Builder, r *rand.Rand) { + sb.WriteString(fmt.Sprintf(md.nameFormat, r.Intn(int(md.nameCardinality)))) + sb.WriteByte(':') +} + +func (md *metricData) genTags(sb *strings.Builder, r *rand.Rand) { + if len(md.tagCardinality) > 0 { + sb.WriteString("|#") + for idx, c := range md.tagCardinality { + if idx > 0 { + sb.WriteByte(',') + } + sb.WriteString(fmt.Sprintf("tag%d:%d", idx, r.Intn(int(c)))) + } + } + sb.WriteByte('\n') +} + +func (mg *metricGenerator) nextCounter(sb *strings.Builder) { + atomic.AddUint64(&mg.counters.count, ^uint64(0)) + mg.counters.genName(sb, mg.rnd) + sb.WriteString(strconv.Itoa(1 + mg.rnd.Intn(int(mg.counters.valueLimit+1)))) + sb.WriteString("|c") + mg.counters.genTags(sb, mg.rnd) +} + +func (mg *metricGenerator) nextGauge(sb *strings.Builder) { + atomic.AddUint64(&mg.gauges.count, ^uint64(0)) + mg.gauges.genName(sb, mg.rnd) + sb.WriteString(strconv.Itoa(mg.rnd.Intn(int(mg.gauges.valueLimit)))) + sb.WriteString("|g") + mg.gauges.genTags(sb, mg.rnd) +} + +func (mg *metricGenerator) nextSet(sb *strings.Builder) { + atomic.AddUint64(&mg.sets.count, ^uint64(0)) + mg.sets.genName(sb, mg.rnd) + sb.WriteString(strconv.Itoa(mg.rnd.Intn(int(mg.sets.valueLimit)))) + sb.WriteString("|s") + mg.sets.genTags(sb, mg.rnd) +} + +func (mg *metricGenerator) nextTimer(sb *strings.Builder) { + atomic.AddUint64(&mg.timers.count, ^uint64(0)) + mg.timers.genName(sb, mg.rnd) + sb.WriteString(strconv.FormatFloat(mg.rnd.Float64()*float64(mg.timers.valueLimit), 'g', -1, 64)) + sb.WriteString("|ms") + mg.timers.genTags(sb, mg.rnd) +} + +func (mg *metricGenerator) next(sb *strings.Builder) bool { + // We can safely read these non-atomically, because this goroutine is the only one that writes to them. + total := mg.counters.count + mg.gauges.count + mg.sets.count + mg.timers.count + if total == 0 { + return false + } + + n := uint64(mg.rnd.Int63n(int64(total))) + if n < mg.counters.count { + mg.nextCounter(sb) + } else if n < mg.counters.count+mg.gauges.count { + mg.nextGauge(sb) + } else if n < mg.counters.count+mg.gauges.count+mg.sets.count { + mg.nextSet(sb) + } else { + mg.nextTimer(sb) + } + return true +} diff --git a/cmd/loader/main.go b/cmd/loader/main.go new file mode 100644 index 00000000..2a93b6c8 --- /dev/null +++ b/cmd/loader/main.go @@ -0,0 +1,126 @@ +package main + +import ( + "bytes" + "fmt" + "math/rand" + "net" + "os" + "strings" + "sync/atomic" + "time" +) + +func main() { + opts := parseArgs(os.Args[1:]) + + pendingWorkers := make(chan struct{}, opts.Workers) + metricGenerators := make([]*metricGenerator, 0, opts.Workers) + for i := uint(0); i < opts.Workers; i++ { + generator := &metricGenerator{ + rnd: rand.New(rand.NewSource(rand.Int63())), + counters: metricData{ + nameFormat: fmt.Sprintf("%scounter%s", opts.MetricPrefix, opts.MetricSuffix), + count: opts.Counts.Counter / uint64(opts.Workers), + nameCardinality: opts.NameCard.Counter, + tagCardinality: opts.TagCard.Counter, + valueLimit: opts.ValueRange.Counter, + }, + gauges: metricData{ + nameFormat: fmt.Sprintf("%sgauge%s", opts.MetricPrefix, opts.MetricSuffix), + count: opts.Counts.Gauge / uint64(opts.Workers), + nameCardinality: opts.NameCard.Gauge, + tagCardinality: opts.TagCard.Gauge, + valueLimit: opts.ValueRange.Gauge, + }, + sets: metricData{ + nameFormat: fmt.Sprintf("%sset%s", opts.MetricPrefix, opts.MetricSuffix), + count: opts.Counts.Set / uint64(opts.Workers), + nameCardinality: opts.NameCard.Set, + tagCardinality: opts.TagCard.Set, + valueLimit: opts.ValueRange.Set, + }, + timers: metricData{ + nameFormat: fmt.Sprintf("%stimer%s", opts.MetricPrefix, opts.MetricSuffix), + count: opts.Counts.Timer / uint64(opts.Workers), + nameCardinality: opts.NameCard.Timer, + tagCardinality: opts.TagCard.Timer, + valueLimit: opts.ValueRange.Timer, + }, + } + metricGenerators = append(metricGenerators, generator) + go sendMetricsWorker( + opts.Target, + opts.DatagramSize, + opts.Rate/opts.Workers, + generator, + pendingWorkers, + ) + } + + runningWorkers := opts.Workers + statusTicker := time.NewTicker(1 * time.Second) + for runningWorkers > 0 { + select { + case <-pendingWorkers: + runningWorkers-- + case <-statusTicker.C: + counters := uint64(0) + gauges := uint64(0) + sets := uint64(0) + timers := uint64(0) + for _, mg := range metricGenerators { + counters += atomic.LoadUint64(&mg.counters.count) + gauges += atomic.LoadUint64(&mg.gauges.count) + sets += atomic.LoadUint64(&mg.sets.count) + timers += atomic.LoadUint64(&mg.timers.count) + } + fmt.Printf("%d counters, %d gauges, %d sets, %d timers\n", counters, gauges, sets, timers) + } + } +} + +func sendMetricsWorker( + address string, + bufSize uint, + rate uint, + generator *metricGenerator, + chDone chan<- struct{}, +) { + s, err := net.DialTimeout("udp", address, 1*time.Second) + if err != nil { + panic(err) + } + + b := &bytes.Buffer{} + + interval := time.Second / time.Duration(rate) + + next := time.Now().Add(interval) + + sb := &strings.Builder{} + for generator.next(sb) { + if uint(b.Len()+sb.Len()) > bufSize { + timeToFlush := next.Sub(time.Now()) + if timeToFlush > 0 { + time.Sleep(timeToFlush) + } + _, err := s.Write(b.Bytes()) + if err != nil { + panic(err) + } + b.Reset() + next = next.Add(interval) + } + b.WriteString(sb.String()) + sb.Reset() + } + + if b.Len() > 0 { + _, err := s.Write(b.Bytes()) + if err != nil { + panic(err) + } + } + chDone <- struct{}{} +} diff --git a/go.mod b/go.mod index 1633923b..0a6f9cab 100644 --- a/go.mod +++ b/go.mod @@ -15,11 +15,11 @@ require ( github.com/gorilla/mux v1.7.3 github.com/howeyc/fsnotify v0.9.0 // indirect github.com/imdario/mergo v0.3.8 // indirect + github.com/jessevdk/go-flags v1.4.0 github.com/json-iterator/go v1.1.9 github.com/jstemmer/go-junit-report v0.9.1 github.com/libp2p/go-reuseport v0.0.1 github.com/magiconair/properties v1.8.1 - github.com/mozilla/tls-observatory v0.0.0-20190404164649-a3c1b6cfecfd github.com/sirupsen/logrus v1.4.2 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.6.2 diff --git a/go.sum b/go.sum index 92e9cc61..52f2a94e 100644 --- a/go.sum +++ b/go.sum @@ -198,6 +198,7 @@ github.com/imdario/mergo v0.3.8 h1:CGgOkSJeqMRmt0D9XLWExdT4m4F1vd3FV3VPt+0VxkQ= github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jingyugao/rowserrcheck v0.0.0-20191204022205-72ab7603b68a h1:GmsqmapfzSJkm28dhRoHz2tLRbJmqhU86IPgBtN3mmk= github.com/jingyugao/rowserrcheck v0.0.0-20191204022205-72ab7603b68a/go.mod h1:xRskid8CManxVta/ALEhJha/pweKBaVG6fWgc0yH25s= github.com/jirfag/go-printf-func-name v0.0.0-20191110105641-45db9963cdd3 h1:jNYPNLe3d8smommaoQlK7LOA5ESyUJJ+Wf79ZtA7Vp4= From 78490084390efe0da8ee39ae29c46ea562036fb3 Mon Sep 17 00:00:00 2001 From: tiedotguy Date: Mon, 13 Jul 2020 16:08:08 +1000 Subject: [PATCH 2/3] Make the linter happy --- cmd/loader/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/loader/main.go b/cmd/loader/main.go index 2a93b6c8..55d26fba 100644 --- a/cmd/loader/main.go +++ b/cmd/loader/main.go @@ -101,7 +101,7 @@ func sendMetricsWorker( sb := &strings.Builder{} for generator.next(sb) { if uint(b.Len()+sb.Len()) > bufSize { - timeToFlush := next.Sub(time.Now()) + timeToFlush := time.Until(next) if timeToFlush > 0 { time.Sleep(timeToFlush) } From 4f38ea536a4b1de5862bcf3ebbc810285fb63d1c Mon Sep 17 00:00:00 2001 From: tiedotguy Date: Wed, 15 Jul 2020 09:43:29 +1000 Subject: [PATCH 3/3] Don't panic after failing to send. This makes it easier to leave the load generator running while gostatsd is restarting. --- cmd/loader/main.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/loader/main.go b/cmd/loader/main.go index 55d26fba..e2b07b31 100644 --- a/cmd/loader/main.go +++ b/cmd/loader/main.go @@ -107,7 +107,8 @@ func sendMetricsWorker( } _, err := s.Write(b.Bytes()) if err != nil { - panic(err) + fmt.Printf("Pausing for 1 second, error sending packet: %v\n", err) + time.Sleep(1*time.Second) } b.Reset() next = next.Add(interval)