From 36773a3cee0d74b4e806c23d23415c1f0f0b7e54 Mon Sep 17 00:00:00 2001 From: Sebastian Macke Date: Sun, 4 Aug 2024 22:26:11 +0200 Subject: [PATCH] Separate logfile logic and better signaling between processes. --- src/logfile.go | 72 ++++++++++++++++++ src/slapper.go | 46 ++++++------ src/stats.go | 7 ++ src/targeter.go | 175 ++++++++++++++++++------------------------- src/ticker.go | 11 ++- src/ui.go | 41 +++++----- tools/sleepserver.go | 2 +- 7 files changed, 209 insertions(+), 145 deletions(-) create mode 100644 src/logfile.go diff --git a/src/logfile.go b/src/logfile.go new file mode 100644 index 0000000..fdfc796 --- /dev/null +++ b/src/logfile.go @@ -0,0 +1,72 @@ +package slapperx + +import ( + "bufio" + "os" + "sync" + "sync/atomic" +) + +type LogFile struct { + file *os.File + fileWriter *bufio.Writer + isClosed atomic.Bool + c chan string + done chan bool + sync.WaitGroup +} + +func NewLogFile(logFile string) *LogFile { + if logFile == "" { + return nil + } + file, err := os.Create(logFile) + if err != nil { + panic(err) + } + f := &LogFile{ + file: file, + fileWriter: bufio.NewWriterSize(file, 8192), + c: make(chan string, 100), + done: make(chan bool), + } + f.isClosed.Store(false) + go f.WriteLoop() + return f +} + +func (f *LogFile) WriteLoop() { + for { + s, ok := <-f.c + if !ok { + f.done <- true + return + } + _, err := f.fileWriter.WriteString(s) + if err != nil { + panic(err) + } + } +} + +func (f *LogFile) Close() { + f.isClosed.Store(true) + close(f.c) + <-f.done // wait for write loop to finish + + err := f.fileWriter.Flush() + if err != nil { + panic(err) + } + err = f.file.Close() + if err != nil { + panic(err) + } +} + +func (f *LogFile) WriteString(s string) { + if f.isClosed.Load() { + return + } + f.c <- s +} diff --git a/src/slapper.go b/src/slapper.go index 42aa789..56c37c2 100644 --- a/src/slapper.go +++ b/src/slapper.go @@ -31,12 +31,24 @@ func Main() { if config.Verbose { fmt.Println("Requests:", len(requests)) } + quit := make(chan struct{}, 1) + + var logFile *LogFile = nil + if config.LogFile != "" { + logFile = NewLogFile(config.LogFile) + defer logFile.Close() + } + + trgt = NewTargeter(&requests, config.Timeout, logFile, config.Verbose) + + defer func() { + close(quit) // send all threads the quit signal + trgt.Close() // wait and Close + }() - trgt = NewTargeter(&requests, config.Timeout, config.LogFile, config.Verbose) - defer trgt.close() if !config.Verbose { ui = InitTerminal(config.MinY, config.MaxY) - defer ui.close() + defer ui.Close() } stats = Stats{} @@ -44,34 +56,20 @@ func Main() { stats.initializeTimingsBucket(ui.lbc.buckets) } - quit := make(chan struct{}, 1) - ticker := NewTicker(config.Rate) rampUpController := NewRamUpController(config.RampUp, config.Rate) go rampUpController.startRampUpTimeProcess(ticker.GetRateChanger()) // start attackers - var onTickChan = ticker.Start(quit) - trgt.Start(config.Workers, onTickChan, quit) - - // start reporter - trgt.wg.Add(1) - go func() { - defer trgt.wg.Done() - if !config.Verbose { - ui.reporter(quit) - } - }() + var onTickChan = ticker.Start() + defer ticker.Stop() + + trgt.Start(config.Workers, onTickChan) // blocking - if config.Verbose { - <-make(chan bool) // just wait for Ctrl-C - } else { - keyPressListener(rampUpController.GetRateChanger()) + if !config.Verbose { + ui.Show() // start Terminal output } - - // bye - close(quit) - trgt.Wait() + keyPressListener(rampUpController.GetRateChanger()) } diff --git a/src/stats.go b/src/stats.go index 3d9621c..62d519e 100644 --- a/src/stats.go +++ b/src/stats.go @@ -33,3 +33,10 @@ func (s *Stats) reset() { func (s *Stats) initializeTimingsBucket(buckets int) { s.timings = newMovingWindow(movingWindowsSize*screenRefreshFrequency, buckets) } + +func (s *Stats) getInFlightRequests() int64 { + sent := s.requestsSent.Load() + recv := s.responsesReceived.Load() + return sent - recv + +} diff --git a/src/targeter.go b/src/targeter.go index 461ea2b..7a7963c 100644 --- a/src/targeter.go +++ b/src/targeter.go @@ -1,7 +1,6 @@ package slapperx import ( - "bufio" "errors" "fmt" "github.com/s-macke/slapperx/src/tracing" @@ -19,47 +18,29 @@ type Targeter struct { wg sync.WaitGroup idx counter requests []http.Request + logFile *LogFile - file *os.File - fileWriter *bufio.Writer - attackStartTime time.Time + attackStartTime time.Time // time when the attack started verbose bool } -func NewTargeter(requests *[]http.Request, timeout time.Duration, logFile string, verbose bool) *Targeter { +func NewTargeter(requests *[]http.Request, timeout time.Duration, logFile *LogFile, verbose bool) *Targeter { client := tracing.NewTracingClient(timeout) trgt := &Targeter{ client: client, idx: 0, requests: *requests, - file: nil, + logFile: logFile, verbose: verbose, } - if logFile != "" { - var err error - trgt.file, err = os.Create(logFile) - if err != nil { - panic(err) - } - trgt.fileWriter = bufio.NewWriterSize(trgt.file, 8192) - } return trgt } -func (trgt *Targeter) close() { - if trgt.file != nil { - err := trgt.fileWriter.Flush() - if err != nil { - panic(err) - } - err = trgt.file.Close() - if err != nil { - panic(err) - } - } +func (trgt *Targeter) Close() { + trgt.wg.Wait() } func (trgt *Targeter) nextRequest() *http.Request { @@ -69,103 +50,95 @@ func (trgt *Targeter) nextRequest() *http.Request { return &request } -func (trgt *Targeter) attack(client *tracing.Client, ch <-chan time.Time, quit <-chan struct{}) { - +func (trgt *Targeter) attack(client *tracing.Client, ch <-chan time.Time) { var dnsError *net.DNSError for { - select { - case <-ch: - request := trgt.nextRequest() - stats.requestsSent.Add(1) + _, ok := <-ch + if !ok { // channel closed + return + } + request := trgt.nextRequest() + stats.requestsSent.Add(1) - start := time.Now() - response, err := client.Do(request) + start := time.Now() + response, err := client.Do(request) + if err != nil && trgt.verbose { + fmt.Println("Error:", request.Method, request.URL, err) + } + if err == nil { + _, err = io.ReadAll(response.Body) if err != nil && trgt.verbose { fmt.Println("Error:", request.Method, request.URL, err) } - if err == nil { - _, err = io.ReadAll(response.Body) - if err != nil && trgt.verbose { - fmt.Println("Error:", request.Method, request.URL, err) - } - _ = response.Body.Close() - } - - now := time.Now() - elapsed := now.Sub(start) - elapsedMs := float64(elapsed) / float64(time.Millisecond) - - stats.responsesReceived.Add(1) - - status := 0 - if err == nil { - status = response.StatusCode - stats.responses.status[status].Add(1) - } else { - switch { - case - errors.Is(err, io.EOF): - stats.responses.ErrorEof.Add(1) - case - errors.Is(err, syscall.ECONNREFUSED): - stats.responses.ErrorConnRefused.Add(1) - case - os.IsTimeout(err): - stats.responses.ErrorTimeout.Add(1) - case - errors.As(err, &dnsError): - stats.responses.ErrorNoSuchHost.Add(1) - default: - stats.responses.status[0].Add(1) - } - } + _ = response.Body.Close() + } - if trgt.file != nil { - _, err = trgt.fileWriter.WriteString( - fmt.Sprintf("%s,%d,%d,%d\n", - start.Format("2006-01-02T15:04:05.999999999"), - start.Sub(trgt.attackStartTime).Milliseconds(), - elapsed.Milliseconds(), - status)) - if err != nil { - panic(err) - } + now := time.Now() + elapsed := now.Sub(start) + elapsedMs := float64(elapsed) / float64(time.Millisecond) + + stats.responsesReceived.Add(1) + + status := 0 + if err == nil { + status = response.StatusCode + stats.responses.status[status].Add(1) + } else { + switch { + case + errors.Is(err, io.EOF): + stats.responses.ErrorEof.Add(1) + case + errors.Is(err, syscall.ECONNREFUSED): + stats.responses.ErrorConnRefused.Add(1) + case + os.IsTimeout(err): + stats.responses.ErrorTimeout.Add(1) + case + errors.As(err, &dnsError): + stats.responses.ErrorNoSuchHost.Add(1) + default: + stats.responses.status[0].Add(1) } + } - if trgt.verbose { - fmt.Println(request.Method, request.URL, status, elapsedMs) - continue - } - // to test the latency distribution - // elapsedMs = (math.Sin(elapsedMs)+1.1)*30. + math.Cos(float64(start.UnixMilli()/5000))*100 + 100. - - elapsedBucket := ui.lbc.calculateBucket(elapsedMs) - timings := stats.timings.getTimingsSlot(now) - if status >= 200 && status < 300 { - timings[elapsedBucket].Ok.Add(1) - } else { - timings[elapsedBucket].Bad.Add(1) - } + if trgt.logFile != nil { + trgt.logFile.WriteString( + fmt.Sprintf("%s,%d,%d,%d,%d,%.1f\n", + start.Format("2006-01-02T15:04:05.999999999"), + start.Sub(trgt.attackStartTime).Milliseconds(), + elapsed.Milliseconds(), + status, + stats.getInFlightRequests(), + stats.currentSetRate)) + } - case <-quit: - return + if trgt.verbose { + fmt.Println(request.Method, request.URL, status, elapsedMs) + continue + } + // to test the latency distribution + // elapsedMs = (math.Sin(elapsedMs)+1.1)*30. + math.Cos(float64(start.UnixMilli()/5000))*100 + 100. + + elapsedBucket := ui.lbc.calculateBucket(elapsedMs) + timings := stats.timings.getTimingsSlot(now) + if status >= 200 && status < 300 { + timings[elapsedBucket].Ok.Add(1) + } else { + timings[elapsedBucket].Bad.Add(1) } } } -func (trgt *Targeter) Start(workers uint, ticker <-chan time.Time, quit <-chan struct{}) { +func (trgt *Targeter) Start(workers uint, ticker <-chan time.Time) { trgt.attackStartTime = time.Now() // start attackers for i := uint(0); i < workers; i++ { trgt.wg.Add(1) go func() { defer trgt.wg.Done() - trgt.attack(trgt.client, ticker, quit) + trgt.attack(trgt.client, ticker) }() } } - -func (trgt *Targeter) Wait() { - trgt.wg.Wait() -} diff --git a/src/ticker.go b/src/ticker.go index 4ad50d6..e226bb1 100644 --- a/src/ticker.go +++ b/src/ticker.go @@ -9,12 +9,14 @@ type Ticker struct { multiplier int64 tickDuration time.Duration rateChangerChan chan float64 + done chan bool } // NewTicker creates a new ticker instance with a given rate and ramp-up time. func NewTicker(rate float64) *Ticker { t := &Ticker{ rateChangerChan: make(chan float64), + done: make(chan bool), } t.setTickDuration(rate) return t @@ -39,7 +41,7 @@ func (t *Ticker) GetRateChanger() chan float64 { } // Start initializes the tick process and returns a channel to receive tick events. -func (t *Ticker) Start(quit <-chan struct{}) <-chan time.Time { +func (t *Ticker) Start() <-chan time.Time { ticker := make(chan time.Time) // start main workers @@ -63,10 +65,15 @@ func (t *Ticker) Start(quit <-chan struct{}) <-chan time.Time { ticker <- onTick } - case <-quit: + case <-t.done: + close(ticker) // give signal to stop to the outside world return } } }() return ticker } + +func (t *Ticker) Stop() { + t.done <- true +} diff --git a/src/ui.go b/src/ui.go index 601cd44..06a3bd4 100644 --- a/src/ui.go +++ b/src/ui.go @@ -8,6 +8,7 @@ import ( "math" "os" "strings" + "sync" "time" ) @@ -37,6 +38,9 @@ type UI struct { plotWidth int plotHeight int + wg sync.WaitGroup + done chan bool + lbc *logBucketCalculator } @@ -47,14 +51,16 @@ func InitTerminal(minY time.Duration, maxY time.Duration) *UI { } ui := UI{ start: time.Now(), + done: make(chan bool), } ui.setWindowSize() ui.lbc = newLogBucketCalculator(minY, maxY, ui.plotHeight) return &ui } -func (ui *UI) close() { - //term.Close() +func (ui *UI) Close() { + ui.done <- true + ui.wg.Wait() } func (ui *UI) setWindowSize() { @@ -89,13 +95,10 @@ func (ui *UI) listParameters() { // printHistogramHeader prints the header of the histogram with sent, in-flight, and responses information func (ui *UI) printHistogramHeader(sb *strings.Builder, currentRate counter, currentSetRate float64) { - sent := stats.requestsSent.Load() - recv := stats.responsesReceived.Load() - _, _ = fmt.Fprintf(sb, "time: %4ds ", int(time.Since(ui.start).Seconds())) - _, _ = fmt.Fprintf(sb, "sent: %-5d ", sent) + _, _ = fmt.Fprintf(sb, "sent: %-5d ", stats.requestsSent.Load()) //_, _ = fmt.Fprintf(sb, "connections: %-5d ", trgt.client.CurrentConnections) - _, _ = fmt.Fprintf(sb, "in-flight: %-4d ", sent-recv) + _, _ = fmt.Fprintf(sb, "in-flight: %-4d ", stats.getInFlightRequests()) setRateI, setRatef := math.Modf(currentSetRate) if setRatef < 1e-2 { _, _ = fmt.Fprintf(sb, "\033[96mrate: %4d/%d RPS\033[0m ", currentRate.Load(), int(setRateI)) @@ -177,8 +180,8 @@ func (ui *UI) clearScreen() { } } -// reporter periodically updates and redraws the histogram -func (ui *UI) reporter(quit <-chan struct{}) { +// show periodically updates and redraws the histogram +func (ui *UI) Show() { ui.clearScreen() var currentRate counter @@ -192,13 +195,17 @@ func (ui *UI) reporter(quit <-chan struct{}) { }() ticker := time.Tick(screenRefreshInterval) - for { - select { - case <-ticker: - //trgt.client.String() - ui.drawHistogram(currentRate, stats.currentSetRate) - case <-quit: - return + go func() { + ui.wg.Add(1) + for { + select { + case <-ticker: + //trgt.client.String() + ui.drawHistogram(currentRate, stats.currentSetRate) + case <-ui.done: + ui.wg.Done() + return + } } - } + }() } diff --git a/tools/sleepserver.go b/tools/sleepserver.go index 6dad6a1..f62509e 100644 --- a/tools/sleepserver.go +++ b/tools/sleepserver.go @@ -20,7 +20,7 @@ func SleepServer(w http.ResponseWriter, r *http.Request) { } func main() { - port := flag.String("port", "5000", "port to serve on") + port := flag.String("port", "6000", "port to serve on") defaultSleepTimeMs = flag.Int("sleep", 1000, "default sleep time") defaultStatusCode = flag.Int("status", 200, "default status code") flag.Parse()