Skip to content

Commit

Permalink
Separate logfile logic and better signaling between processes.
Browse files Browse the repository at this point in the history
  • Loading branch information
s-macke committed Aug 4, 2024
1 parent ced34d3 commit 36773a3
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 145 deletions.
72 changes: 72 additions & 0 deletions src/logfile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package slapperx

import (
"bufio"
"os"
"sync"
"sync/atomic"
)

type LogFile struct {
file *os.File
fileWriter *bufio.Writer
isClosed atomic.Bool
c chan string
done chan bool
sync.WaitGroup
}

func NewLogFile(logFile string) *LogFile {
if logFile == "" {
return nil
}
file, err := os.Create(logFile)
if err != nil {
panic(err)
}
f := &LogFile{
file: file,
fileWriter: bufio.NewWriterSize(file, 8192),
c: make(chan string, 100),
done: make(chan bool),
}
f.isClosed.Store(false)
go f.WriteLoop()
return f
}

func (f *LogFile) WriteLoop() {
for {
s, ok := <-f.c
if !ok {
f.done <- true
return
}
_, err := f.fileWriter.WriteString(s)
if err != nil {
panic(err)
}
}
}

func (f *LogFile) Close() {
f.isClosed.Store(true)
close(f.c)
<-f.done // wait for write loop to finish

err := f.fileWriter.Flush()
if err != nil {
panic(err)
}
err = f.file.Close()
if err != nil {
panic(err)
}
}

func (f *LogFile) WriteString(s string) {
if f.isClosed.Load() {
return
}
f.c <- s
}
46 changes: 22 additions & 24 deletions src/slapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,47 +31,45 @@ func Main() {
if config.Verbose {
fmt.Println("Requests:", len(requests))
}
quit := make(chan struct{}, 1)

var logFile *LogFile = nil
if config.LogFile != "" {
logFile = NewLogFile(config.LogFile)
defer logFile.Close()
}

trgt = NewTargeter(&requests, config.Timeout, logFile, config.Verbose)

defer func() {
close(quit) // send all threads the quit signal
trgt.Close() // wait and Close
}()

trgt = NewTargeter(&requests, config.Timeout, config.LogFile, config.Verbose)
defer trgt.close()
if !config.Verbose {
ui = InitTerminal(config.MinY, config.MaxY)
defer ui.close()
defer ui.Close()
}

stats = Stats{}
if !config.Verbose {
stats.initializeTimingsBucket(ui.lbc.buckets)
}

quit := make(chan struct{}, 1)

ticker := NewTicker(config.Rate)

rampUpController := NewRamUpController(config.RampUp, config.Rate)
go rampUpController.startRampUpTimeProcess(ticker.GetRateChanger())

// start attackers
var onTickChan = ticker.Start(quit)
trgt.Start(config.Workers, onTickChan, quit)

// start reporter
trgt.wg.Add(1)
go func() {
defer trgt.wg.Done()
if !config.Verbose {
ui.reporter(quit)
}
}()
var onTickChan = ticker.Start()
defer ticker.Stop()

trgt.Start(config.Workers, onTickChan)

// blocking
if config.Verbose {
<-make(chan bool) // just wait for Ctrl-C
} else {
keyPressListener(rampUpController.GetRateChanger())
if !config.Verbose {
ui.Show() // start Terminal output
}

// bye
close(quit)
trgt.Wait()
keyPressListener(rampUpController.GetRateChanger())
}
7 changes: 7 additions & 0 deletions src/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,10 @@ func (s *Stats) reset() {
func (s *Stats) initializeTimingsBucket(buckets int) {
s.timings = newMovingWindow(movingWindowsSize*screenRefreshFrequency, buckets)
}

func (s *Stats) getInFlightRequests() int64 {
sent := s.requestsSent.Load()
recv := s.responsesReceived.Load()
return sent - recv

}
175 changes: 74 additions & 101 deletions src/targeter.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package slapperx

import (
"bufio"
"errors"
"fmt"
"github.com/s-macke/slapperx/src/tracing"
Expand All @@ -19,47 +18,29 @@ type Targeter struct {
wg sync.WaitGroup
idx counter
requests []http.Request
logFile *LogFile

file *os.File
fileWriter *bufio.Writer
attackStartTime time.Time
attackStartTime time.Time // time when the attack started

verbose bool
}

func NewTargeter(requests *[]http.Request, timeout time.Duration, logFile string, verbose bool) *Targeter {
func NewTargeter(requests *[]http.Request, timeout time.Duration, logFile *LogFile, verbose bool) *Targeter {
client := tracing.NewTracingClient(timeout)

trgt := &Targeter{
client: client,
idx: 0,
requests: *requests,
file: nil,
logFile: logFile,
verbose: verbose,
}

if logFile != "" {
var err error
trgt.file, err = os.Create(logFile)
if err != nil {
panic(err)
}
trgt.fileWriter = bufio.NewWriterSize(trgt.file, 8192)
}
return trgt
}

func (trgt *Targeter) close() {
if trgt.file != nil {
err := trgt.fileWriter.Flush()
if err != nil {
panic(err)
}
err = trgt.file.Close()
if err != nil {
panic(err)
}
}
func (trgt *Targeter) Close() {
trgt.wg.Wait()
}

func (trgt *Targeter) nextRequest() *http.Request {
Expand All @@ -69,103 +50,95 @@ func (trgt *Targeter) nextRequest() *http.Request {
return &request
}

func (trgt *Targeter) attack(client *tracing.Client, ch <-chan time.Time, quit <-chan struct{}) {

func (trgt *Targeter) attack(client *tracing.Client, ch <-chan time.Time) {
var dnsError *net.DNSError

for {
select {
case <-ch:
request := trgt.nextRequest()
stats.requestsSent.Add(1)
_, ok := <-ch
if !ok { // channel closed
return
}
request := trgt.nextRequest()
stats.requestsSent.Add(1)

start := time.Now()
response, err := client.Do(request)
start := time.Now()
response, err := client.Do(request)
if err != nil && trgt.verbose {
fmt.Println("Error:", request.Method, request.URL, err)
}
if err == nil {
_, err = io.ReadAll(response.Body)
if err != nil && trgt.verbose {
fmt.Println("Error:", request.Method, request.URL, err)
}
if err == nil {
_, err = io.ReadAll(response.Body)
if err != nil && trgt.verbose {
fmt.Println("Error:", request.Method, request.URL, err)
}
_ = response.Body.Close()
}

now := time.Now()
elapsed := now.Sub(start)
elapsedMs := float64(elapsed) / float64(time.Millisecond)

stats.responsesReceived.Add(1)

status := 0
if err == nil {
status = response.StatusCode
stats.responses.status[status].Add(1)
} else {
switch {
case
errors.Is(err, io.EOF):
stats.responses.ErrorEof.Add(1)
case
errors.Is(err, syscall.ECONNREFUSED):
stats.responses.ErrorConnRefused.Add(1)
case
os.IsTimeout(err):
stats.responses.ErrorTimeout.Add(1)
case
errors.As(err, &dnsError):
stats.responses.ErrorNoSuchHost.Add(1)
default:
stats.responses.status[0].Add(1)
}
}
_ = response.Body.Close()
}

if trgt.file != nil {
_, err = trgt.fileWriter.WriteString(
fmt.Sprintf("%s,%d,%d,%d\n",
start.Format("2006-01-02T15:04:05.999999999"),
start.Sub(trgt.attackStartTime).Milliseconds(),
elapsed.Milliseconds(),
status))
if err != nil {
panic(err)
}
now := time.Now()
elapsed := now.Sub(start)
elapsedMs := float64(elapsed) / float64(time.Millisecond)

stats.responsesReceived.Add(1)

status := 0
if err == nil {
status = response.StatusCode
stats.responses.status[status].Add(1)
} else {
switch {
case
errors.Is(err, io.EOF):
stats.responses.ErrorEof.Add(1)
case
errors.Is(err, syscall.ECONNREFUSED):
stats.responses.ErrorConnRefused.Add(1)
case
os.IsTimeout(err):
stats.responses.ErrorTimeout.Add(1)
case
errors.As(err, &dnsError):
stats.responses.ErrorNoSuchHost.Add(1)
default:
stats.responses.status[0].Add(1)
}
}

if trgt.verbose {
fmt.Println(request.Method, request.URL, status, elapsedMs)
continue
}
// to test the latency distribution
// elapsedMs = (math.Sin(elapsedMs)+1.1)*30. + math.Cos(float64(start.UnixMilli()/5000))*100 + 100.

elapsedBucket := ui.lbc.calculateBucket(elapsedMs)
timings := stats.timings.getTimingsSlot(now)
if status >= 200 && status < 300 {
timings[elapsedBucket].Ok.Add(1)
} else {
timings[elapsedBucket].Bad.Add(1)
}
if trgt.logFile != nil {
trgt.logFile.WriteString(
fmt.Sprintf("%s,%d,%d,%d,%d,%.1f\n",
start.Format("2006-01-02T15:04:05.999999999"),
start.Sub(trgt.attackStartTime).Milliseconds(),
elapsed.Milliseconds(),
status,
stats.getInFlightRequests(),
stats.currentSetRate))
}

case <-quit:
return
if trgt.verbose {
fmt.Println(request.Method, request.URL, status, elapsedMs)
continue
}
// to test the latency distribution
// elapsedMs = (math.Sin(elapsedMs)+1.1)*30. + math.Cos(float64(start.UnixMilli()/5000))*100 + 100.

elapsedBucket := ui.lbc.calculateBucket(elapsedMs)
timings := stats.timings.getTimingsSlot(now)
if status >= 200 && status < 300 {
timings[elapsedBucket].Ok.Add(1)
} else {
timings[elapsedBucket].Bad.Add(1)
}
}
}

func (trgt *Targeter) Start(workers uint, ticker <-chan time.Time, quit <-chan struct{}) {
func (trgt *Targeter) Start(workers uint, ticker <-chan time.Time) {
trgt.attackStartTime = time.Now()
// start attackers
for i := uint(0); i < workers; i++ {
trgt.wg.Add(1)
go func() {
defer trgt.wg.Done()
trgt.attack(trgt.client, ticker, quit)
trgt.attack(trgt.client, ticker)
}()
}
}

func (trgt *Targeter) Wait() {
trgt.wg.Wait()
}
Loading

0 comments on commit 36773a3

Please sign in to comment.