Skip to content

Commit

Permalink
Merge pull request #332 from atlassian/new-load-tester
Browse files Browse the repository at this point in the history
This adds a new load tester
  • Loading branch information
tiedotguy authored Aug 26, 2020
2 parents 53a5e85 + 4f38ea5 commit 3ea558e
Show file tree
Hide file tree
Showing 6 changed files with 339 additions and 4 deletions.
17 changes: 14 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,16 @@ Incorrect meta tag values will be handled in best effort manner, i.e.

This is an experimental feature and it may be removed or changed in future versions.


Load testing
------------
There is a tool under `cmd/loader` with support for a number of options which can be used to generate synthetic statsd
load. There is also another load generation tool under `cmd/tester` which is deprecated and will be removed in a
future release.

Help for the loader tool can be found through `--help`.


Sending metrics
---------------
The server listens for UDP packets on the address given by the `--metrics-addr` flag,
Expand All @@ -316,12 +326,13 @@ flag (space separated list of backend names).

Currently supported backends are:

* graphite
* cloudwatch
* datadog
* graphite
* influxdb
* newrelic
* statsdaemon
* stdout
* cloudwatch
* newrelic

The format of each metric is:

Expand Down
100 changes: 100 additions & 0 deletions cmd/loader/args.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package main

import (
"fmt"
"os"

"github.com/jessevdk/go-flags"
)

type commandOptions struct {
Target string `short:"a" long:"address" default:"127.0.0.1:8125" description:"Address to send metrics" `
MetricPrefix string `short:"p" long:"metric-prefix" default:"loadtest." description:"Metric name prefix" `
MetricSuffix string ` long:"metric-suffix" default:".%d" description:"Metric suffix with cardinality marker" `
Rate uint `short:"r" long:"rate" default:"1000" description:"Target packets per second" `
DatagramSize uint ` long:"buffer-size" default:"1500" description:"Maximum size of datagram to send" `
Workers uint `short:"w" long:"workers" default:"1" description:"Number of parallel workers to use" `
Counts struct {
Counter uint64 ` short:"c" long:"counter-count" description:"Number of counters to send" `
Gauge uint64 ` short:"g" long:"gauge-count" description:"Number of gauges to send" `
Set uint64 ` short:"s" long:"set-count" description:"Number of sets to send" `
Timer uint64 ` short:"t" long:"timer-count" description:"Number of timers to send" `
} `group:"Metric count"`
NameCard struct {
Counter uint ` long:"counter-cardinality" default:"1" description:"Cardinality of counter names" `
Gauge uint ` long:"gauge-cardinality" default:"1" description:"Cardinality of gauges names" `
Set uint ` long:"set-cardinality" default:"1" description:"Cardinality of set names" `
Timer uint ` long:"timer-cardinality" default:"1" description:"Cardinality of timer names" `
} `group:"Name cardinality"`
TagCard struct {
Counter []uint ` long:"counter-tag-cardinality" description:"Cardinality of count tags" `
Gauge []uint ` long:"gauge-tag-cardinality" description:"Cardinality of gauge tags" `
Set []uint ` long:"set-tag-cardinality" description:"Cardinality of set tags" `
Timer []uint ` long:"timer-tag-cardinality" description:"Cardinality of timer tags" `
} `group:"Tag cardinality"`
ValueRange struct {
Counter uint ` long:"counter-value-limit" default:"0" description:"Maximum value of counters minus one" `
Gauge uint ` long:"gauge-value-limit" default:"1" description:"Maximum value of gauges" `
Set uint ` long:"set-value-cardinality" default:"1" description:"Maximum number of values to send per set"`
Timer uint ` long:"timer-value-limit" default:"1" description:"Maximum value of timers" `
} `group:"Value range"`
}

func parseArgs(args []string) commandOptions {
var opts commandOptions
parser := flags.NewParser(&opts, flags.HelpFlag | flags.PassDoubleDash)
parser.LongDescription = "" + // because gofmt
"When specifying cardinality, the tag cardinality can be specified multiple times,\n" +
"and each tag will be named tagN:M. The maximum total cardinality will be:\n\n" +
"|name| * |tag1| * |tag2| * ... * |tagN|\n\n" +
"Care should be taken to not cause a combinatorial explosion."

positional, err := parser.ParseArgs(args)
if err != nil {
if !isHelp(err) {
parser.WriteHelp(os.Stderr)
_, _ = fmt.Fprintf(os.Stderr, "\n\nerror parsing command line: %v\n", err)
os.Exit(1)
}
parser.WriteHelp(os.Stdout)
os.Exit(0)
}

if len(positional) != 0 {
// Near as I can tell there's no way to say no positional arguments allowed.
parser.WriteHelp(os.Stderr)
_, _ = fmt.Fprintf(os.Stderr, "\n\nno positional arguments allowed\n")
os.Exit(1)
}

if opts.Counts.Counter+opts.Counts.Gauge+opts.Counts.Set+opts.Counts.Timer == 0 {
parser.WriteHelp(os.Stderr)
_, _ = fmt.Fprintf(os.Stderr, "\n\nAt least one of counter-count, gauge-count, set-count, or timer-count must be non-zero\n")
os.Exit(1)
}
return opts
}

// isHelp is a helper to test the error from ParseArgs() to
// determine if the help message was written. It is safe to
// call without first checking that error is nil.
func isHelp(err error) bool {
// This was copied from https://github.com/jessevdk/go-flags/blame/master/help.go#L499, as there has not been an
// official release yet with this code. Renamed from WriteHelp to isHelp, as flags.ErrHelp is still returned when
// flags.HelpFlag is set, flags.PrintError is clear, and -h/--help is passed on the command line, even though the
// help is not displayed in such a situation.
if err == nil { // No error
return false
}

flagError, ok := err.(*flags.Error)
if !ok { // Not a go-flag error
return false
}

if flagError.Type != flags.ErrHelp { // Did not print the help message
return false
}

return true
}
96 changes: 96 additions & 0 deletions cmd/loader/generation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package main

import (
"fmt"
"math/rand"
"strconv"
"strings"
"sync/atomic"
)

type metricData struct {
count uint64 // atomic
nameFormat string
nameCardinality uint
tagCardinality []uint
valueLimit uint
}

type metricGenerator struct {
rnd *rand.Rand

counters metricData
gauges metricData
sets metricData
timers metricData
}

func (md *metricData) genName(sb *strings.Builder, r *rand.Rand) {
sb.WriteString(fmt.Sprintf(md.nameFormat, r.Intn(int(md.nameCardinality))))
sb.WriteByte(':')
}

func (md *metricData) genTags(sb *strings.Builder, r *rand.Rand) {
if len(md.tagCardinality) > 0 {
sb.WriteString("|#")
for idx, c := range md.tagCardinality {
if idx > 0 {
sb.WriteByte(',')
}
sb.WriteString(fmt.Sprintf("tag%d:%d", idx, r.Intn(int(c))))
}
}
sb.WriteByte('\n')
}

func (mg *metricGenerator) nextCounter(sb *strings.Builder) {
atomic.AddUint64(&mg.counters.count, ^uint64(0))
mg.counters.genName(sb, mg.rnd)
sb.WriteString(strconv.Itoa(1 + mg.rnd.Intn(int(mg.counters.valueLimit+1))))
sb.WriteString("|c")
mg.counters.genTags(sb, mg.rnd)
}

func (mg *metricGenerator) nextGauge(sb *strings.Builder) {
atomic.AddUint64(&mg.gauges.count, ^uint64(0))
mg.gauges.genName(sb, mg.rnd)
sb.WriteString(strconv.Itoa(mg.rnd.Intn(int(mg.gauges.valueLimit))))
sb.WriteString("|g")
mg.gauges.genTags(sb, mg.rnd)
}

func (mg *metricGenerator) nextSet(sb *strings.Builder) {
atomic.AddUint64(&mg.sets.count, ^uint64(0))
mg.sets.genName(sb, mg.rnd)
sb.WriteString(strconv.Itoa(mg.rnd.Intn(int(mg.sets.valueLimit))))
sb.WriteString("|s")
mg.sets.genTags(sb, mg.rnd)
}

func (mg *metricGenerator) nextTimer(sb *strings.Builder) {
atomic.AddUint64(&mg.timers.count, ^uint64(0))
mg.timers.genName(sb, mg.rnd)
sb.WriteString(strconv.FormatFloat(mg.rnd.Float64()*float64(mg.timers.valueLimit), 'g', -1, 64))
sb.WriteString("|ms")
mg.timers.genTags(sb, mg.rnd)
}

func (mg *metricGenerator) next(sb *strings.Builder) bool {
// We can safely read these non-atomically, because this goroutine is the only one that writes to them.
total := mg.counters.count + mg.gauges.count + mg.sets.count + mg.timers.count
if total == 0 {
return false
}

n := uint64(mg.rnd.Int63n(int64(total)))
if n < mg.counters.count {
mg.nextCounter(sb)
} else if n < mg.counters.count+mg.gauges.count {
mg.nextGauge(sb)
} else if n < mg.counters.count+mg.gauges.count+mg.sets.count {
mg.nextSet(sb)
} else {
mg.nextTimer(sb)
}
return true
}
127 changes: 127 additions & 0 deletions cmd/loader/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package main

import (
"bytes"
"fmt"
"math/rand"
"net"
"os"
"strings"
"sync/atomic"
"time"
)

func main() {
opts := parseArgs(os.Args[1:])

pendingWorkers := make(chan struct{}, opts.Workers)
metricGenerators := make([]*metricGenerator, 0, opts.Workers)
for i := uint(0); i < opts.Workers; i++ {
generator := &metricGenerator{
rnd: rand.New(rand.NewSource(rand.Int63())),
counters: metricData{
nameFormat: fmt.Sprintf("%scounter%s", opts.MetricPrefix, opts.MetricSuffix),
count: opts.Counts.Counter / uint64(opts.Workers),
nameCardinality: opts.NameCard.Counter,
tagCardinality: opts.TagCard.Counter,
valueLimit: opts.ValueRange.Counter,
},
gauges: metricData{
nameFormat: fmt.Sprintf("%sgauge%s", opts.MetricPrefix, opts.MetricSuffix),
count: opts.Counts.Gauge / uint64(opts.Workers),
nameCardinality: opts.NameCard.Gauge,
tagCardinality: opts.TagCard.Gauge,
valueLimit: opts.ValueRange.Gauge,
},
sets: metricData{
nameFormat: fmt.Sprintf("%sset%s", opts.MetricPrefix, opts.MetricSuffix),
count: opts.Counts.Set / uint64(opts.Workers),
nameCardinality: opts.NameCard.Set,
tagCardinality: opts.TagCard.Set,
valueLimit: opts.ValueRange.Set,
},
timers: metricData{
nameFormat: fmt.Sprintf("%stimer%s", opts.MetricPrefix, opts.MetricSuffix),
count: opts.Counts.Timer / uint64(opts.Workers),
nameCardinality: opts.NameCard.Timer,
tagCardinality: opts.TagCard.Timer,
valueLimit: opts.ValueRange.Timer,
},
}
metricGenerators = append(metricGenerators, generator)
go sendMetricsWorker(
opts.Target,
opts.DatagramSize,
opts.Rate/opts.Workers,
generator,
pendingWorkers,
)
}

runningWorkers := opts.Workers
statusTicker := time.NewTicker(1 * time.Second)
for runningWorkers > 0 {
select {
case <-pendingWorkers:
runningWorkers--
case <-statusTicker.C:
counters := uint64(0)
gauges := uint64(0)
sets := uint64(0)
timers := uint64(0)
for _, mg := range metricGenerators {
counters += atomic.LoadUint64(&mg.counters.count)
gauges += atomic.LoadUint64(&mg.gauges.count)
sets += atomic.LoadUint64(&mg.sets.count)
timers += atomic.LoadUint64(&mg.timers.count)
}
fmt.Printf("%d counters, %d gauges, %d sets, %d timers\n", counters, gauges, sets, timers)
}
}
}

func sendMetricsWorker(
address string,
bufSize uint,
rate uint,
generator *metricGenerator,
chDone chan<- struct{},
) {
s, err := net.DialTimeout("udp", address, 1*time.Second)
if err != nil {
panic(err)
}

b := &bytes.Buffer{}

interval := time.Second / time.Duration(rate)

next := time.Now().Add(interval)

sb := &strings.Builder{}
for generator.next(sb) {
if uint(b.Len()+sb.Len()) > bufSize {
timeToFlush := time.Until(next)
if timeToFlush > 0 {
time.Sleep(timeToFlush)
}
_, err := s.Write(b.Bytes())
if err != nil {
fmt.Printf("Pausing for 1 second, error sending packet: %v\n", err)
time.Sleep(1*time.Second)
}
b.Reset()
next = next.Add(interval)
}
b.WriteString(sb.String())
sb.Reset()
}

if b.Len() > 0 {
_, err := s.Write(b.Bytes())
if err != nil {
panic(err)
}
}
chDone <- struct{}{}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ require (
github.com/gorilla/mux v1.7.3
github.com/howeyc/fsnotify v0.9.0 // indirect
github.com/imdario/mergo v0.3.8 // indirect
github.com/jessevdk/go-flags v1.4.0
github.com/json-iterator/go v1.1.9
github.com/jstemmer/go-junit-report v0.9.1
github.com/libp2p/go-reuseport v0.0.1
github.com/magiconair/properties v1.8.1
github.com/mozilla/tls-observatory v0.0.0-20190404164649-a3c1b6cfecfd
github.com/sirupsen/logrus v1.4.2
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.6.2
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ github.com/imdario/mergo v0.3.8 h1:CGgOkSJeqMRmt0D9XLWExdT4m4F1vd3FV3VPt+0VxkQ=
github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jingyugao/rowserrcheck v0.0.0-20191204022205-72ab7603b68a h1:GmsqmapfzSJkm28dhRoHz2tLRbJmqhU86IPgBtN3mmk=
github.com/jingyugao/rowserrcheck v0.0.0-20191204022205-72ab7603b68a/go.mod h1:xRskid8CManxVta/ALEhJha/pweKBaVG6fWgc0yH25s=
github.com/jirfag/go-printf-func-name v0.0.0-20191110105641-45db9963cdd3 h1:jNYPNLe3d8smommaoQlK7LOA5ESyUJJ+Wf79ZtA7Vp4=
Expand Down

0 comments on commit 3ea558e

Please sign in to comment.