Skip to content

Commit

Permalink
better metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
jancajthaml committed May 27, 2019
1 parent 8574351 commit 011e14d
Show file tree
Hide file tree
Showing 21 changed files with 641 additions and 187 deletions.
2 changes: 1 addition & 1 deletion bbtest/steps/orchestration_steps.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
"STORAGE" => "/data",
"LOG_LEVEL" => "DEBUG",
"CNB_GATEWAY" => "https://127.0.0.1:4000",
"METRICS_OUTPUT" => "/reports/metrics.json",
"METRICS_OUTPUT" => "/reports",
"METRICS_REFRESHRATE" => "1h",
"HTTP_PORT" => "443",
"SECRETS" => "/opt/cnb-rates/secrets",
Expand Down
2 changes: 1 addition & 1 deletion packaging/debian_amd64/DEBIAN/control
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Package: cnb-rates
Version: 1.0.0+single-app-struct
Version: 1.0.0+better-metrics
Section: misc
Priority: extra
Architecture: amd64
Expand Down
4 changes: 4 additions & 0 deletions services/cnb-rates-batch/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ func (batch Batch) ProcessNewFXMain(wg *sync.WaitGroup) error {
log.Warnf("error write cache fail fx-main data for day %s, %+v\n", day, err)
continue
}

batch.metrics.DayProcessed()
}

return nil
Expand Down Expand Up @@ -158,6 +160,8 @@ func (batch Batch) ProcessNewFXOther(wg *sync.WaitGroup) error {
log.Warnf("error write cache fail fx-other data for day %s, %+v\n", day, err)
continue
}

batch.metrics.DayProcessed()
}

return nil
Expand Down
20 changes: 14 additions & 6 deletions services/cnb-rates-batch/config/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func loadConfFromEnv() Configuration {
logLevel := strings.ToUpper(getEnvString("CNB_RATES_LOG_LEVEL", "DEBUG"))
rootStorage := getEnvString("CNB_RATES_STORAGE", "/data")
cnbGateway := getEnvString("CNB_RATES_CNB_GATEWAY", "https://www.cnb.cz")
metricsOutput := getEnvString("CNB_RATES_METRICS_OUTPUT", "")
metricsOutput := getEnvFilename("CNB_RATES_METRICS_OUTPUT", "/tmp")
metricsRefreshRate := getEnvDuration("CNB_RATES_METRICS_REFRESHRATE", time.Second)

if rootStorage == "" {
Expand All @@ -39,10 +39,6 @@ func loadConfFromEnv() Configuration {

rootStorage = rootStorage + "/rates/cnb"

if metricsOutput != "" && os.MkdirAll(filepath.Dir(metricsOutput), os.ModePerm) != nil {
log.Fatal("unable to assert metrics output")
}

if os.MkdirAll(rootStorage+"/"+utils.FXMainDailyCacheDirectory(), os.ModePerm) != nil {
log.Fatal("unable to assert fx-main daily cache directory")
}
Expand Down Expand Up @@ -72,8 +68,20 @@ func loadConfFromEnv() Configuration {
CNBGateway: cnbGateway,
LogLevel: logLevel,
MetricsRefreshRate: metricsRefreshRate,
MetricsOutput: metricsOutput,
MetricsOutput: metricsOutput + "/metrics.batch.json",
}
}

func getEnvFilename(key, fallback string) string {
var value = strings.TrimSpace(os.Getenv(key))
if value == "" {
return fallback
}
value = filepath.Clean(value)
if os.MkdirAll(value, os.ModePerm) != nil {
return fallback
}
return value
}

func getEnvString(key, fallback string) string {
Expand Down
95 changes: 19 additions & 76 deletions services/cnb-rates-batch/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ package metrics
import (
"context"
"fmt"
"os"
"path/filepath"
"time"

"github.com/jancajthaml-openbank/cnb-rates-batch/utils"
Expand All @@ -30,80 +28,29 @@ import (
// Metrics represents metrics subroutine
type Metrics struct {
utils.DaemonSupport
output string
refreshRate time.Duration
gatewayLatency metrics.Timer
importLatency metrics.Timer
}

// Snapshot holds metrics snapshot status
type Snapshot struct {
GatewayLatency float64 `json:"gatewayLatency"`
ImportLatency float64 `json:"importLatency"`
output string
refreshRate time.Duration
daysProcessed metrics.Counter
monthsProcessed metrics.Counter
}

// NewMetrics returns metrics fascade
func NewMetrics(ctx context.Context, output string, refreshRate time.Duration) Metrics {
return Metrics{
DaemonSupport: utils.NewDaemonSupport(ctx),
output: output,
refreshRate: refreshRate,
gatewayLatency: metrics.NewTimer(),
importLatency: metrics.NewTimer(),
DaemonSupport: utils.NewDaemonSupport(ctx),
output: output,
refreshRate: refreshRate,
daysProcessed: metrics.NewCounter(),
monthsProcessed: metrics.NewCounter(),
}
}

// NewSnapshot returns metrics snapshot
func NewSnapshot(metrics Metrics) Snapshot {
return Snapshot{
GatewayLatency: metrics.gatewayLatency.Percentile(0.95),
ImportLatency: metrics.importLatency.Percentile(0.95),
}
}

func (metrics Metrics) TimeGatewayLatency(f func()) {
metrics.gatewayLatency.Time(f)
}

func (metrics Metrics) TimeImportLatency(f func()) {
metrics.importLatency.Time(f)
}

func (metrics Metrics) persist(filename string) {
tempFile := filename + "_temp"

data, err := utils.JSON.Marshal(NewSnapshot(metrics))
if err != nil {
log.Warnf("unable to create serialize metrics with error: %v", err)
return
}
f, err := os.OpenFile(tempFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm)
if err != nil {
log.Warnf("unable to create file with error: %v", err)
return
}
defer f.Close()

if _, err := f.Write(data); err != nil {
log.Warnf("unable to write file with error: %v", err)
return
}

if err := os.Rename(tempFile, filename); err != nil {
log.Warnf("unable to move file with error: %v", err)
return
}

return
func (metrics *Metrics) DayProcessed() {
metrics.daysProcessed.Inc(1)
}

func getFilename(path string) string {
dirname := filepath.Dir(path)
ext := filepath.Ext(path)
filename := filepath.Base(path)
filename = filename[:len(filename)-len(ext)]

return dirname + "/" + filename + ".batch" + ext
func (metrics *Metrics) MonthProcessed() {
metrics.monthsProcessed.Inc(1)
}

// WaitReady wait for metrics to be ready
Expand Down Expand Up @@ -137,16 +84,12 @@ func (metrics Metrics) WaitReady(deadline time.Duration) (err error) {
func (metrics Metrics) Start() {
defer metrics.MarkDone()

if metrics.output == "" {
log.Warnf("no metrics output defined, skipping metrics persistence")
metrics.MarkReady()
return
}

output := getFilename(metrics.output)
ticker := time.NewTicker(metrics.refreshRate)
defer ticker.Stop()

if err := metrics.Hydrate(); err != nil {
log.Warn(err.Error())
}
metrics.MarkReady()

select {
Expand All @@ -156,17 +99,17 @@ func (metrics Metrics) Start() {
return
}

log.Infof("Start metrics daemon, update each %v into %v", metrics.refreshRate, output)
log.Infof("Start metrics daemon, update each %v into %v", metrics.refreshRate, metrics.output)

for {
select {
case <-metrics.Done():
log.Info("Stopping metrics daemon")
metrics.persist(output)
metrics.Persist()
log.Info("Stop metrics daemon")
return
case <-ticker.C:
metrics.persist(output)
metrics.Persist()
}
}
}
30 changes: 9 additions & 21 deletions services/cnb-rates-batch/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,35 +9,23 @@ import (
"github.com/stretchr/testify/require"
)

func TestGetFilename(t *testing.T) {
assert.Equal(t, "/a/b/c.batch.d", getFilename("/a/b/c.d"))
}

func TestMetricsPersist(t *testing.T) {
func TestMetrics(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

entity := NewMetrics(ctx, "", time.Hour)
delay := 1e8
delta := 1e8

t.Log("TimeGatewayLatency properly times gateway latency")
t.Log("DayProcessed properly increments number of processed days")
{
require.Equal(t, int64(0), entity.gatewayLatency.Count())
entity.TimeGatewayLatency(func() {
time.Sleep(time.Duration(delay))
})
assert.Equal(t, int64(1), entity.gatewayLatency.Count())
assert.InDelta(t, entity.gatewayLatency.Percentile(0.95), delay, delta)
require.Equal(t, int64(0), entity.daysProcessed.Count())
entity.DayProcessed()
assert.Equal(t, int64(1), entity.daysProcessed.Count())
}

t.Log("TimeImportLatency properly times import latency")
t.Log("MonthProcessed properly increments number of processed months")
{
require.Equal(t, int64(0), entity.importLatency.Count())
entity.TimeImportLatency(func() {
time.Sleep(time.Duration(delay))
})
assert.Equal(t, int64(1), entity.importLatency.Count())
assert.InDelta(t, entity.importLatency.Percentile(0.95), delay, delta)
require.Equal(t, int64(0), entity.monthsProcessed.Count())
entity.MonthProcessed()
assert.Equal(t, int64(1), entity.monthsProcessed.Count())
}
}
109 changes: 109 additions & 0 deletions services/cnb-rates-batch/metrics/persistence.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright (c) 2016-2019, Jan Cajthaml <jan.cajthaml@gmail.com>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics

import (
"bytes"
"fmt"
"io"
"os"
"strconv"

"github.com/jancajthaml-openbank/cnb-rates-batch/utils"
)

// MarshalJSON serialises Metrics as json preserving uint64
func (entity *Metrics) MarshalJSON() ([]byte, error) {
var buffer bytes.Buffer

buffer.WriteString("{\"daysProcessed\":")
buffer.WriteString(strconv.FormatInt(entity.daysProcessed.Count(), 10))
buffer.WriteString(",\"monthsProcessed\":")
buffer.WriteString(strconv.FormatInt(entity.monthsProcessed.Count(), 10))
buffer.WriteString("}")

return buffer.Bytes(), nil
}

// UnmarshalJSON unmarshal json of Metrics entity
func (entity *Metrics) UnmarshalJSON(data []byte) error {
if entity == nil {
return fmt.Errorf("cannot unmarshall to nil pointer")
}
all := struct {
DaysProcessed int64 `json:"daysProcessed"`
MonthsProcessed int64 `json:"monthsProcessed"`
}{}
err := utils.JSON.Unmarshal(data, &all)
if err != nil {
return err
}

entity.daysProcessed.Clear()
entity.daysProcessed.Inc(all.DaysProcessed)

entity.monthsProcessed.Clear()
entity.monthsProcessed.Inc(all.MonthsProcessed)

return nil
}

func (metrics *Metrics) Persist() error {
if metrics == nil {
return fmt.Errorf("cannot persist nil reference")
}
tempFile := metrics.output + "_temp"
data, err := utils.JSON.Marshal(metrics)
if err != nil {
return err
}
f, err := os.OpenFile(tempFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm)
if err != nil {
return err
}
defer f.Close()
if _, err := f.Write(data); err != nil {
return err
}
if err := os.Rename(tempFile, metrics.output); err != nil {
return err
}
return nil
}

func (metrics *Metrics) Hydrate() error {
if metrics == nil {
return fmt.Errorf("cannot hydrate nil reference")
}
f, err := os.OpenFile(metrics.output, os.O_RDONLY, os.ModePerm)
if err != nil {
return err
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
return err
}
buf := make([]byte, fi.Size())
_, err = f.Read(buf)
if err != nil && err != io.EOF {
return err
}
err = utils.JSON.Unmarshal(buf, metrics)
if err != nil {
return err
}
return nil
}
Loading

0 comments on commit 011e14d

Please sign in to comment.