Skip to content

Commit

Permalink
fixes and improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
nixargh committed Sep 30, 2022
1 parent 90e7099 commit b1ad39a
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 64 deletions.
32 changes: 30 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"strconv"
"strings"
"sync/atomic"
"time"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -198,6 +199,7 @@ func main() {
}

// Init sender counters that are slices
state.Destination = append(state.Destination, graphiteAddress[id])
state.SendError = append(state.SendError, 0)
state.Out = append(state.Out, 0)
state.OutBytes = append(state.OutBytes, 0)
Expand Down Expand Up @@ -231,12 +233,38 @@ func main() {
go runTransformer(inputChan, outputChans, tenant, forceTenant, prefix, immutablePrefix)
go runRouter(statsAddress, statsPort)
go updateQueue(1)
go updatePerMinuteCounters(graphiteAddress, inputChan)

sleepSeconds := 60
clog.WithFields(log.Fields{"sleepSeconds": sleepSeconds}).Info("Starting a waiting loop.")
for {
sendStateMetrics(inputChan)
// Get initial values
in := atomic.LoadInt64(&state.In)
bad := atomic.LoadInt64(&state.Bad)
transformed := atomic.LoadInt64(&state.Transformed)
var out, out_bytes []int64

// For multiple senders
for id := range graphiteAddress {
out = append(out, atomic.LoadInt64(&state.Out[id]))
out_bytes = append(out_bytes, atomic.LoadInt64(&state.OutBytes[id]))
}

// Sleep for a minute
time.Sleep(time.Duration(sleepSeconds) * time.Second)

stlog.WithFields(log.Fields{"graphiteAddress": graphiteAddress}).Info("Updating per-minute metrics.")
// Calculate MPMs
atomic.StoreInt64(&state.InMpm, atomic.LoadInt64(&state.In)-in)
atomic.StoreInt64(&state.BadMpm, atomic.LoadInt64(&state.Bad)-bad)
atomic.StoreInt64(&state.TransformedMpm, atomic.LoadInt64(&state.Transformed)-transformed)

// For multiple senders
for id := range graphiteAddress {
atomic.StoreInt64(&state.OutMpm[id], atomic.LoadInt64(&state.Out[id])-out[id])
atomic.StoreInt64(&state.OutBpm[id], atomic.LoadInt64(&state.OutBytes[id])-out_bytes[id])
}

// Send State metrics
sendStateMetrics(inputChan)
}
}
95 changes: 33 additions & 62 deletions state.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,20 @@ type State struct {
TransformedMpm int64 `json:"transformed_mpm" type:"int"`
TransformQueue int64 `json:"transform_queue" type:"int"`
// Sender metrics
Connection []int64 `json:"connection" type:"slice"`
ConnectionAlive []int64 `json:"connection_alive" type:"slice"`
ConnectionError []int64 `json:"connection_error" type:"slice"`
Out []int64 `json:"out" type:"slice"`
OutBytes []int64 `json:"out_bytes" type:"slice"`
Returned []int64 `json:"returned" type:"slice"`
SendError []int64 `json:"send_error" type:"slice"`
OutQueue []int64 `json:"out_queue" type:"slice"`
Queue []int64 `json:"queue" type:"slice"`
NegativeQueueError []int64 `json:"negative_queue_error" type:"slice"`
PacksOverflewError []int64 `json:"packs_overflew_error" type:"slice"`
OutMpm []int64 `json:"out_mpm" type:"slice"`
OutBpm []int64 `json:"out_bpm" type:"slice"`
Destination []string `json:"destination" type:"slice"`
Connection []int64 `json:"connection" type:"slice"`
ConnectionAlive []int64 `json:"connection_alive" type:"slice"`
ConnectionError []int64 `json:"connection_error" type:"slice"`
Out []int64 `json:"out" type:"slice"`
OutBytes []int64 `json:"out_bytes" type:"slice"`
Returned []int64 `json:"returned" type:"slice"`
SendError []int64 `json:"send_error" type:"slice"`
OutQueue []int64 `json:"out_queue" type:"slice"`
Queue []int64 `json:"queue" type:"slice"`
NegativeQueueError []int64 `json:"negative_queue_error" type:"slice"`
PacksOverflewError []int64 `json:"packs_overflew_error" type:"slice"`
OutMpm []int64 `json:"out_mpm" type:"slice"`
OutBpm []int64 `json:"out_bpm" type:"slice"`
}

func runRouter(address string, port int) {
Expand Down Expand Up @@ -96,23 +97,29 @@ func sendStateMetrics(inputChan chan *Metric) {
metricName := structfield.Tag.Get("json")
valueField := values.Field(i)

if name == "Version" {
switch name {
case "Version":
value := strings.Replace(valueField.String(), ".", "", 2)
doSend(name, metricName, value, timestamp, inputChan)
} else {
switch vtype {
case "int":
value := fmt.Sprintf("%d", valueField.Int())
continue
case "Destination":
value := fmt.Sprintf("%d", valueField.Len())
doSend(name, metricName, value, timestamp, inputChan)
continue
}

switch vtype {
case "int":
value := fmt.Sprintf("%d", valueField.Int())
doSend(name, metricName, value, timestamp, inputChan)
case "slice":
for id := 0; id < valueField.Len(); id++ {
metricName = fmt.Sprintf("%s_%d", structfield.Tag.Get("json"), id)
value := fmt.Sprintf("%d", valueField.Index(id).Int())
doSend(name, metricName, value, timestamp, inputChan)
case "slice":
for id := 0; id < valueField.Len(); id++ {
metricName = fmt.Sprintf("%s_%d", structfield.Tag.Get("json"), id)
value := fmt.Sprintf("%d", valueField.Index(id).Int())
doSend(name, metricName, value, timestamp, inputChan)
}
default:
stlog.WithFields(log.Fields{"vtype": vtype}).Fatal("Unknown state value type.")
}
default:
stlog.WithFields(log.Fields{"vtype": vtype}).Fatal("Unknown state value type.")
}
}
}
Expand Down Expand Up @@ -150,39 +157,3 @@ func doSend(
inputChan <- &metric
atomic.AddInt64(&state.In, 1)
}

func updatePerMinuteCounters(graphiteAddress []string, inputChan chan *Metric) {
sleepSeconds := 60
stlog.WithFields(log.Fields{
"graphiteAddress": graphiteAddress,
"sleepSeconds": sleepSeconds,
}).Info("Starting PerMinuteCounters Updater loop.")

for {
in := atomic.LoadInt64(&state.In)
bad := atomic.LoadInt64(&state.Bad)
transformed := atomic.LoadInt64(&state.Transformed)
var out, out_bytes []int64

// For multiple senders
for id := range graphiteAddress {
out = append(out, atomic.LoadInt64(&state.Out[id]))
out_bytes = append(out_bytes, atomic.LoadInt64(&state.OutBytes[id]))
}

// Sleep for a minute
time.Sleep(time.Duration(sleepSeconds) * time.Second)

stlog.WithFields(log.Fields{"graphiteAddress": graphiteAddress}).Info("Updating per-minute metrics.")
// Calculate MPMs
atomic.StoreInt64(&state.InMpm, atomic.LoadInt64(&state.In)-in)
atomic.StoreInt64(&state.BadMpm, atomic.LoadInt64(&state.Bad)-bad)
atomic.StoreInt64(&state.TransformedMpm, atomic.LoadInt64(&state.Transformed)-transformed)

// For multiple senders
for id := range graphiteAddress {
atomic.StoreInt64(&state.OutMpm[id], atomic.LoadInt64(&state.Out[id])-out[id])
atomic.StoreInt64(&state.OutBpm[id], atomic.LoadInt64(&state.OutBytes[id])-out_bytes[id])
}
}
}

0 comments on commit b1ad39a

Please sign in to comment.