From 011e14dfdf329371169b7465376cbf2bab5e2918 Mon Sep 17 00:00:00 2001 From: Jan Cajthaml Date: Mon, 27 May 2019 10:13:24 +0200 Subject: [PATCH] better metrics --- bbtest/steps/orchestration_steps.rb | 2 +- packaging/debian_amd64/DEBIAN/control | 2 +- services/cnb-rates-batch/batch/batch.go | 4 + .../cnb-rates-batch/config/environment.go | 20 ++- services/cnb-rates-batch/metrics/metrics.go | 95 +++----------- .../cnb-rates-batch/metrics/metrics_test.go | 30 ++--- .../cnb-rates-batch/metrics/persistence.go | 109 ++++++++++++++++ .../cnb-rates-import/config/environment.go | 27 ++-- .../cnb-rates-import/integration/import.go | 5 + services/cnb-rates-import/metrics/metrics.go | 77 +++--------- .../cnb-rates-import/metrics/metrics_test.go | 20 ++- .../cnb-rates-import/metrics/persistence.go | 119 ++++++++++++++++++ services/cnb-rates-rest/boot/init.go | 4 + services/cnb-rates-rest/boot/run.go | 5 +- services/cnb-rates-rest/config/config.go | 7 ++ services/cnb-rates-rest/config/environment.go | 37 +++++- services/cnb-rates-rest/go.mod | 2 + services/cnb-rates-rest/go.sum | 2 + services/cnb-rates-rest/metrics/metrics.go | 115 +++++++++++++++++ .../cnb-rates-rest/metrics/metrics_test.go | 39 ++++++ .../cnb-rates-rest/metrics/persistence.go | 107 ++++++++++++++++ 21 files changed, 641 insertions(+), 187 deletions(-) create mode 100644 services/cnb-rates-batch/metrics/persistence.go create mode 100644 services/cnb-rates-import/metrics/persistence.go create mode 100644 services/cnb-rates-rest/metrics/metrics.go create mode 100644 services/cnb-rates-rest/metrics/metrics_test.go create mode 100644 services/cnb-rates-rest/metrics/persistence.go diff --git a/bbtest/steps/orchestration_steps.rb b/bbtest/steps/orchestration_steps.rb index 3c454167..28dc79e6 100644 --- a/bbtest/steps/orchestration_steps.rb +++ b/bbtest/steps/orchestration_steps.rb @@ -46,7 +46,7 @@ "STORAGE" => "/data", "LOG_LEVEL" => "DEBUG", "CNB_GATEWAY" => "https://127.0.0.1:4000", - "METRICS_OUTPUT" => "/reports/metrics.json", + "METRICS_OUTPUT" => "/reports", "METRICS_REFRESHRATE" => "1h", "HTTP_PORT" => "443", "SECRETS" => "/opt/cnb-rates/secrets", diff --git a/packaging/debian_amd64/DEBIAN/control b/packaging/debian_amd64/DEBIAN/control index 379ca9f4..a7fe4d0f 100755 --- a/packaging/debian_amd64/DEBIAN/control +++ b/packaging/debian_amd64/DEBIAN/control @@ -1,5 +1,5 @@ Package: cnb-rates -Version: 1.0.0+single-app-struct +Version: 1.0.0+better-metrics Section: misc Priority: extra Architecture: amd64 diff --git a/services/cnb-rates-batch/batch/batch.go b/services/cnb-rates-batch/batch/batch.go index dc26125a..0007c533 100644 --- a/services/cnb-rates-batch/batch/batch.go +++ b/services/cnb-rates-batch/batch/batch.go @@ -112,6 +112,8 @@ func (batch Batch) ProcessNewFXMain(wg *sync.WaitGroup) error { log.Warnf("error write cache fail fx-main data for day %s, %+v\n", day, err) continue } + + batch.metrics.DayProcessed() } return nil @@ -158,6 +160,8 @@ func (batch Batch) ProcessNewFXOther(wg *sync.WaitGroup) error { log.Warnf("error write cache fail fx-other data for day %s, %+v\n", day, err) continue } + + batch.metrics.DayProcessed() } return nil diff --git a/services/cnb-rates-batch/config/environment.go b/services/cnb-rates-batch/config/environment.go index 2ed229b8..a0f3d5d8 100644 --- a/services/cnb-rates-batch/config/environment.go +++ b/services/cnb-rates-batch/config/environment.go @@ -30,7 +30,7 @@ func loadConfFromEnv() Configuration { logLevel := strings.ToUpper(getEnvString("CNB_RATES_LOG_LEVEL", "DEBUG")) rootStorage := getEnvString("CNB_RATES_STORAGE", "/data") cnbGateway := getEnvString("CNB_RATES_CNB_GATEWAY", "https://www.cnb.cz") - metricsOutput := getEnvString("CNB_RATES_METRICS_OUTPUT", "") + metricsOutput := getEnvFilename("CNB_RATES_METRICS_OUTPUT", "/tmp") metricsRefreshRate := getEnvDuration("CNB_RATES_METRICS_REFRESHRATE", time.Second) if rootStorage == "" { @@ -39,10 +39,6 @@ func loadConfFromEnv() Configuration { rootStorage = rootStorage + "/rates/cnb" - if metricsOutput != "" && os.MkdirAll(filepath.Dir(metricsOutput), os.ModePerm) != nil { - log.Fatal("unable to assert metrics output") - } - if os.MkdirAll(rootStorage+"/"+utils.FXMainDailyCacheDirectory(), os.ModePerm) != nil { log.Fatal("unable to assert fx-main daily cache directory") } @@ -72,8 +68,20 @@ func loadConfFromEnv() Configuration { CNBGateway: cnbGateway, LogLevel: logLevel, MetricsRefreshRate: metricsRefreshRate, - MetricsOutput: metricsOutput, + MetricsOutput: metricsOutput + "/metrics.batch.json", + } +} + +func getEnvFilename(key, fallback string) string { + var value = strings.TrimSpace(os.Getenv(key)) + if value == "" { + return fallback } + value = filepath.Clean(value) + if os.MkdirAll(value, os.ModePerm) != nil { + return fallback + } + return value } func getEnvString(key, fallback string) string { diff --git a/services/cnb-rates-batch/metrics/metrics.go b/services/cnb-rates-batch/metrics/metrics.go index 7a1a3586..185d36f7 100644 --- a/services/cnb-rates-batch/metrics/metrics.go +++ b/services/cnb-rates-batch/metrics/metrics.go @@ -17,8 +17,6 @@ package metrics import ( "context" "fmt" - "os" - "path/filepath" "time" "github.com/jancajthaml-openbank/cnb-rates-batch/utils" @@ -30,80 +28,29 @@ import ( // Metrics represents metrics subroutine type Metrics struct { utils.DaemonSupport - output string - refreshRate time.Duration - gatewayLatency metrics.Timer - importLatency metrics.Timer -} - -// Snapshot holds metrics snapshot status -type Snapshot struct { - GatewayLatency float64 `json:"gatewayLatency"` - ImportLatency float64 `json:"importLatency"` + output string + refreshRate time.Duration + daysProcessed metrics.Counter + monthsProcessed metrics.Counter } // NewMetrics returns metrics fascade func NewMetrics(ctx context.Context, output string, refreshRate time.Duration) Metrics { return Metrics{ - DaemonSupport: utils.NewDaemonSupport(ctx), - output: output, - refreshRate: refreshRate, - gatewayLatency: metrics.NewTimer(), - importLatency: metrics.NewTimer(), + DaemonSupport: utils.NewDaemonSupport(ctx), + output: output, + refreshRate: refreshRate, + daysProcessed: metrics.NewCounter(), + monthsProcessed: metrics.NewCounter(), } } -// NewSnapshot returns metrics snapshot -func NewSnapshot(metrics Metrics) Snapshot { - return Snapshot{ - GatewayLatency: metrics.gatewayLatency.Percentile(0.95), - ImportLatency: metrics.importLatency.Percentile(0.95), - } -} - -func (metrics Metrics) TimeGatewayLatency(f func()) { - metrics.gatewayLatency.Time(f) -} - -func (metrics Metrics) TimeImportLatency(f func()) { - metrics.importLatency.Time(f) -} - -func (metrics Metrics) persist(filename string) { - tempFile := filename + "_temp" - - data, err := utils.JSON.Marshal(NewSnapshot(metrics)) - if err != nil { - log.Warnf("unable to create serialize metrics with error: %v", err) - return - } - f, err := os.OpenFile(tempFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm) - if err != nil { - log.Warnf("unable to create file with error: %v", err) - return - } - defer f.Close() - - if _, err := f.Write(data); err != nil { - log.Warnf("unable to write file with error: %v", err) - return - } - - if err := os.Rename(tempFile, filename); err != nil { - log.Warnf("unable to move file with error: %v", err) - return - } - - return +func (metrics *Metrics) DayProcessed() { + metrics.daysProcessed.Inc(1) } -func getFilename(path string) string { - dirname := filepath.Dir(path) - ext := filepath.Ext(path) - filename := filepath.Base(path) - filename = filename[:len(filename)-len(ext)] - - return dirname + "/" + filename + ".batch" + ext +func (metrics *Metrics) MonthProcessed() { + metrics.monthsProcessed.Inc(1) } // WaitReady wait for metrics to be ready @@ -137,16 +84,12 @@ func (metrics Metrics) WaitReady(deadline time.Duration) (err error) { func (metrics Metrics) Start() { defer metrics.MarkDone() - if metrics.output == "" { - log.Warnf("no metrics output defined, skipping metrics persistence") - metrics.MarkReady() - return - } - - output := getFilename(metrics.output) ticker := time.NewTicker(metrics.refreshRate) defer ticker.Stop() + if err := metrics.Hydrate(); err != nil { + log.Warn(err.Error()) + } metrics.MarkReady() select { @@ -156,17 +99,17 @@ func (metrics Metrics) Start() { return } - log.Infof("Start metrics daemon, update each %v into %v", metrics.refreshRate, output) + log.Infof("Start metrics daemon, update each %v into %v", metrics.refreshRate, metrics.output) for { select { case <-metrics.Done(): log.Info("Stopping metrics daemon") - metrics.persist(output) + metrics.Persist() log.Info("Stop metrics daemon") return case <-ticker.C: - metrics.persist(output) + metrics.Persist() } } } diff --git a/services/cnb-rates-batch/metrics/metrics_test.go b/services/cnb-rates-batch/metrics/metrics_test.go index 679baa7b..70e94c00 100644 --- a/services/cnb-rates-batch/metrics/metrics_test.go +++ b/services/cnb-rates-batch/metrics/metrics_test.go @@ -9,35 +9,23 @@ import ( "github.com/stretchr/testify/require" ) -func TestGetFilename(t *testing.T) { - assert.Equal(t, "/a/b/c.batch.d", getFilename("/a/b/c.d")) -} - -func TestMetricsPersist(t *testing.T) { +func TestMetrics(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() entity := NewMetrics(ctx, "", time.Hour) - delay := 1e8 - delta := 1e8 - t.Log("TimeGatewayLatency properly times gateway latency") + t.Log("DayProcessed properly increments number of processed days") { - require.Equal(t, int64(0), entity.gatewayLatency.Count()) - entity.TimeGatewayLatency(func() { - time.Sleep(time.Duration(delay)) - }) - assert.Equal(t, int64(1), entity.gatewayLatency.Count()) - assert.InDelta(t, entity.gatewayLatency.Percentile(0.95), delay, delta) + require.Equal(t, int64(0), entity.daysProcessed.Count()) + entity.DayProcessed() + assert.Equal(t, int64(1), entity.daysProcessed.Count()) } - t.Log("TimeImportLatency properly times import latency") + t.Log("MonthProcessed properly increments number of processed months") { - require.Equal(t, int64(0), entity.importLatency.Count()) - entity.TimeImportLatency(func() { - time.Sleep(time.Duration(delay)) - }) - assert.Equal(t, int64(1), entity.importLatency.Count()) - assert.InDelta(t, entity.importLatency.Percentile(0.95), delay, delta) + require.Equal(t, int64(0), entity.monthsProcessed.Count()) + entity.MonthProcessed() + assert.Equal(t, int64(1), entity.monthsProcessed.Count()) } } diff --git a/services/cnb-rates-batch/metrics/persistence.go b/services/cnb-rates-batch/metrics/persistence.go new file mode 100644 index 00000000..8954dff5 --- /dev/null +++ b/services/cnb-rates-batch/metrics/persistence.go @@ -0,0 +1,109 @@ +// Copyright (c) 2016-2019, Jan Cajthaml +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "bytes" + "fmt" + "io" + "os" + "strconv" + + "github.com/jancajthaml-openbank/cnb-rates-batch/utils" +) + +// MarshalJSON serialises Metrics as json preserving uint64 +func (entity *Metrics) MarshalJSON() ([]byte, error) { + var buffer bytes.Buffer + + buffer.WriteString("{\"daysProcessed\":") + buffer.WriteString(strconv.FormatInt(entity.daysProcessed.Count(), 10)) + buffer.WriteString(",\"monthsProcessed\":") + buffer.WriteString(strconv.FormatInt(entity.monthsProcessed.Count(), 10)) + buffer.WriteString("}") + + return buffer.Bytes(), nil +} + +// UnmarshalJSON unmarshal json of Metrics entity +func (entity *Metrics) UnmarshalJSON(data []byte) error { + if entity == nil { + return fmt.Errorf("cannot unmarshall to nil pointer") + } + all := struct { + DaysProcessed int64 `json:"daysProcessed"` + MonthsProcessed int64 `json:"monthsProcessed"` + }{} + err := utils.JSON.Unmarshal(data, &all) + if err != nil { + return err + } + + entity.daysProcessed.Clear() + entity.daysProcessed.Inc(all.DaysProcessed) + + entity.monthsProcessed.Clear() + entity.monthsProcessed.Inc(all.MonthsProcessed) + + return nil +} + +func (metrics *Metrics) Persist() error { + if metrics == nil { + return fmt.Errorf("cannot persist nil reference") + } + tempFile := metrics.output + "_temp" + data, err := utils.JSON.Marshal(metrics) + if err != nil { + return err + } + f, err := os.OpenFile(tempFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm) + if err != nil { + return err + } + defer f.Close() + if _, err := f.Write(data); err != nil { + return err + } + if err := os.Rename(tempFile, metrics.output); err != nil { + return err + } + return nil +} + +func (metrics *Metrics) Hydrate() error { + if metrics == nil { + return fmt.Errorf("cannot hydrate nil reference") + } + f, err := os.OpenFile(metrics.output, os.O_RDONLY, os.ModePerm) + if err != nil { + return err + } + defer f.Close() + fi, err := f.Stat() + if err != nil { + return err + } + buf := make([]byte, fi.Size()) + _, err = f.Read(buf) + if err != nil && err != io.EOF { + return err + } + err = utils.JSON.Unmarshal(buf, metrics) + if err != nil { + return err + } + return nil +} diff --git a/services/cnb-rates-import/config/environment.go b/services/cnb-rates-import/config/environment.go index 43a415a6..67986390 100644 --- a/services/cnb-rates-import/config/environment.go +++ b/services/cnb-rates-import/config/environment.go @@ -30,36 +30,41 @@ func loadConfFromEnv() Configuration { logLevel := strings.ToUpper(getEnvString("CNB_RATES_LOG_LEVEL", "DEBUG")) rootStorage := getEnvString("CNB_RATES_STORAGE", "/data") cnbGateway := getEnvString("CNB_RATES_CNB_GATEWAY", "https://www.cnb.cz") - metricsOutput := getEnvString("CNB_RATES_METRICS_OUTPUT", "") + metricsOutput := getEnvFilename("CNB_RATES_METRICS_OUTPUT", "/tmp") metricsRefreshRate := getEnvDuration("CNB_RATES_METRICS_REFRESHRATE", time.Second) if rootStorage == "" { log.Fatal("missing required parameter to run") } - rootStorage = rootStorage + "/rates/cnb" - - if metricsOutput != "" && os.MkdirAll(filepath.Dir(metricsOutput), os.ModePerm) != nil { - log.Fatal("unable to assert metrics output") - } - - if os.MkdirAll(rootStorage+"/"+utils.FXMainOfflineDirectory(), os.ModePerm) != nil { + if os.MkdirAll(rootStorage+"/rates/cnb/"+utils.FXMainOfflineDirectory(), os.ModePerm) != nil { log.Fatal("unable to assert daily offline directory") } - if os.MkdirAll(rootStorage+"/"+utils.FXOtherOfflineDirectory(), os.ModePerm) != nil { + if os.MkdirAll(rootStorage+"/rates/cnb/"+utils.FXOtherOfflineDirectory(), os.ModePerm) != nil { log.Fatal("unable to assert monthly offline directory") } return Configuration{ - RootStorage: rootStorage, + RootStorage: rootStorage + "/rates/cnb", CNBGateway: cnbGateway, LogLevel: logLevel, MetricsRefreshRate: metricsRefreshRate, - MetricsOutput: metricsOutput, + MetricsOutput: metricsOutput + "/metrics.import.json", } } +func getEnvFilename(key, fallback string) string { + var value = strings.TrimSpace(os.Getenv(key)) + if value == "" { + return fallback + } + value = filepath.Clean(value) + if os.MkdirAll(value, os.ModePerm) != nil { + return fallback + } + return value +} func getEnvString(key, fallback string) string { value := strings.TrimSpace(os.Getenv(key)) if value == "" { diff --git a/services/cnb-rates-import/integration/import.go b/services/cnb-rates-import/integration/import.go index d0e77f9e..920dad14 100644 --- a/services/cnb-rates-import/integration/import.go +++ b/services/cnb-rates-import/integration/import.go @@ -77,6 +77,8 @@ func (cnb CNBRatesImport) syncMainRateToday(today time.Time) error { return fmt.Errorf("cannot store cache for %s at %s", today.Format("02.01.2006"), cachePath) } + cnb.metrics.DayImported() + log.Debug("downloaded fx-main for today") return nil } @@ -103,6 +105,8 @@ func (cnb CNBRatesImport) syncOtherRates(day time.Time) error { } log.Infof("downloaded fx-other for %s", day.Format("02.01.2006")) + + cnb.metrics.DayImported() return nil } @@ -164,6 +168,7 @@ func (cnb CNBRatesImport) syncMainRates(days []time.Time) error { } log.Infof("downloaded fx-main for day %s", date.Format("02.01.2006")) + cnb.metrics.DayImported() wg.Done() } } diff --git a/services/cnb-rates-import/metrics/metrics.go b/services/cnb-rates-import/metrics/metrics.go index 55376355..f5ce7dbc 100644 --- a/services/cnb-rates-import/metrics/metrics.go +++ b/services/cnb-rates-import/metrics/metrics.go @@ -17,8 +17,6 @@ package metrics import ( "context" "fmt" - "os" - "path/filepath" "time" "github.com/jancajthaml-openbank/cnb-rates-import/utils" @@ -32,78 +30,39 @@ type Metrics struct { utils.DaemonSupport output string refreshRate time.Duration + daysImported metrics.Counter + monthsImported metrics.Counter gatewayLatency metrics.Timer importLatency metrics.Timer } -// Snapshot holds metrics snapshot status -type Snapshot struct { - GatewayLatency float64 `json:"gatewayLatency"` - ImportLatency float64 `json:"importLatency"` -} - // NewMetrics returns metrics fascade func NewMetrics(ctx context.Context, output string, refreshRate time.Duration) Metrics { return Metrics{ DaemonSupport: utils.NewDaemonSupport(ctx), output: output, refreshRate: refreshRate, + daysImported: metrics.NewCounter(), + monthsImported: metrics.NewCounter(), gatewayLatency: metrics.NewTimer(), importLatency: metrics.NewTimer(), } } -// NewSnapshot returns metrics snapshot -func NewSnapshot(metrics Metrics) Snapshot { - return Snapshot{ - GatewayLatency: metrics.gatewayLatency.Percentile(0.95), - ImportLatency: metrics.importLatency.Percentile(0.95), - } -} - -func (metrics Metrics) TimeGatewayLatency(f func()) { +func (metrics *Metrics) TimeGatewayLatency(f func()) { metrics.gatewayLatency.Time(f) } -func (metrics Metrics) TimeImportLatency(f func()) { +func (metrics *Metrics) TimeImportLatency(f func()) { metrics.importLatency.Time(f) } -func (metrics Metrics) persist(filename string) { - tempFile := filename + "_temp" - - data, err := utils.JSON.Marshal(NewSnapshot(metrics)) - if err != nil { - log.Warnf("unable to create serialize metrics with error: %v", err) - return - } - f, err := os.OpenFile(tempFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm) - if err != nil { - log.Warnf("unable to create file with error: %v", err) - return - } - defer f.Close() - - if _, err := f.Write(data); err != nil { - log.Warnf("unable to write file with error: %v", err) - return - } - - if err := os.Rename(tempFile, filename); err != nil { - log.Warnf("unable to move file with error: %v", err) - return - } - - return +func (metrics *Metrics) DayImported() { + metrics.daysImported.Inc(1) } -func getFilename(path string) string { - dirname := filepath.Dir(path) - ext := filepath.Ext(path) - filename := filepath.Base(path) - filename = filename[:len(filename)-len(ext)] - - return dirname + "/" + filename + ".import" + ext +func (metrics *Metrics) MonthImported() { + metrics.monthsImported.Inc(1) } // WaitReady wait for metrics to be ready @@ -137,16 +96,12 @@ func (metrics Metrics) WaitReady(deadline time.Duration) (err error) { func (metrics Metrics) Start() { defer metrics.MarkDone() - if metrics.output == "" { - log.Warnf("no metrics output defined, skipping metrics persistence") - metrics.MarkReady() - return - } - - output := getFilename(metrics.output) ticker := time.NewTicker(metrics.refreshRate) defer ticker.Stop() + if err := metrics.Hydrate(); err != nil { + log.Warn(err.Error()) + } metrics.MarkReady() select { @@ -156,17 +111,17 @@ func (metrics Metrics) Start() { return } - log.Infof("Start metrics daemon, update each %v into %v", metrics.refreshRate, output) + log.Infof("Start metrics daemon, update each %v into %v", metrics.refreshRate, metrics.output) for { select { case <-metrics.Done(): log.Info("Stopping metrics daemon") - metrics.persist(output) + metrics.Persist() log.Info("Stop metrics daemon") return case <-ticker.C: - metrics.persist(output) + metrics.Persist() } } } diff --git a/services/cnb-rates-import/metrics/metrics_test.go b/services/cnb-rates-import/metrics/metrics_test.go index 6cb55867..0122ddb0 100644 --- a/services/cnb-rates-import/metrics/metrics_test.go +++ b/services/cnb-rates-import/metrics/metrics_test.go @@ -9,11 +9,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestGetFilename(t *testing.T) { - assert.Equal(t, "/a/b/c.import.d", getFilename("/a/b/c.d")) -} - -func TestMetricsPersist(t *testing.T) { +func TestMetrics(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -40,4 +36,18 @@ func TestMetricsPersist(t *testing.T) { assert.Equal(t, int64(1), entity.importLatency.Count()) assert.InDelta(t, entity.importLatency.Percentile(0.95), delay, delta) } + + t.Log("DayImported properly increments number of imported days") + { + require.Equal(t, int64(0), entity.daysImported.Count()) + entity.DayImported() + assert.Equal(t, int64(1), entity.daysImported.Count()) + } + + t.Log("MonthImported properly increments number of imported months") + { + require.Equal(t, int64(0), entity.monthsImported.Count()) + entity.MonthImported() + assert.Equal(t, int64(1), entity.monthsImported.Count()) + } } diff --git a/services/cnb-rates-import/metrics/persistence.go b/services/cnb-rates-import/metrics/persistence.go new file mode 100644 index 00000000..55e95611 --- /dev/null +++ b/services/cnb-rates-import/metrics/persistence.go @@ -0,0 +1,119 @@ +// Copyright (c) 2016-2019, Jan Cajthaml +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "bytes" + "fmt" + "io" + "os" + "strconv" + "time" + + "github.com/jancajthaml-openbank/cnb-rates-import/utils" +) + +// MarshalJSON serialises Metrics as json preserving uint64 +func (entity *Metrics) MarshalJSON() ([]byte, error) { + var buffer bytes.Buffer + + buffer.WriteString("{\"gatewayLatency\":") + buffer.WriteString(strconv.FormatFloat(entity.gatewayLatency.Percentile(0.95), 'f', -1, 64)) + buffer.WriteString(",\"importLatency\":") + buffer.WriteString(strconv.FormatFloat(entity.importLatency.Percentile(0.95), 'f', -1, 64)) + buffer.WriteString(",\"daysImported\":") + buffer.WriteString(strconv.FormatInt(entity.daysImported.Count(), 10)) + buffer.WriteString(",\"monthsImported\":") + buffer.WriteString(strconv.FormatInt(entity.monthsImported.Count(), 10)) + buffer.WriteString("}") + + return buffer.Bytes(), nil +} + +// UnmarshalJSON unmarshal json of Metrics entity +func (entity *Metrics) UnmarshalJSON(data []byte) error { + if entity == nil { + return fmt.Errorf("cannot unmarshall to nil pointer") + } + all := struct { + GatewayLatency float64 `json:"gatewayLatency"` + ImportLatency float64 `json:"importLatency"` + DaysImported int64 `json:"daysImported"` + MonthsImported int64 `json:"monthsImported"` + }{} + err := utils.JSON.Unmarshal(data, &all) + if err != nil { + return err + } + + entity.gatewayLatency.Update(time.Duration(all.GatewayLatency)) + entity.importLatency.Update(time.Duration(all.ImportLatency)) + + entity.daysImported.Clear() + entity.daysImported.Inc(all.DaysImported) + + entity.monthsImported.Clear() + entity.monthsImported.Inc(all.MonthsImported) + + return nil +} + +func (metrics *Metrics) Persist() error { + if metrics == nil { + return fmt.Errorf("cannot persist nil reference") + } + tempFile := metrics.output + "_temp" + data, err := utils.JSON.Marshal(metrics) + if err != nil { + return err + } + f, err := os.OpenFile(tempFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm) + if err != nil { + return err + } + defer f.Close() + if _, err := f.Write(data); err != nil { + return err + } + if err := os.Rename(tempFile, metrics.output); err != nil { + return err + } + return nil +} + +func (metrics *Metrics) Hydrate() error { + if metrics == nil { + return fmt.Errorf("cannot hydrate nil reference") + } + f, err := os.OpenFile(metrics.output, os.O_RDONLY, os.ModePerm) + if err != nil { + return err + } + defer f.Close() + fi, err := f.Stat() + if err != nil { + return err + } + buf := make([]byte, fi.Size()) + _, err = f.Read(buf) + if err != nil && err != io.EOF { + return err + } + err = utils.JSON.Unmarshal(buf, metrics) + if err != nil { + return err + } + return nil +} diff --git a/services/cnb-rates-rest/boot/init.go b/services/cnb-rates-rest/boot/init.go index 0c6ca080..621a269b 100644 --- a/services/cnb-rates-rest/boot/init.go +++ b/services/cnb-rates-rest/boot/init.go @@ -20,6 +20,7 @@ import ( "github.com/jancajthaml-openbank/cnb-rates-rest/api" "github.com/jancajthaml-openbank/cnb-rates-rest/config" + "github.com/jancajthaml-openbank/cnb-rates-rest/metrics" "github.com/jancajthaml-openbank/cnb-rates-rest/utils" localfs "github.com/jancajthaml-openbank/local-fs" @@ -29,6 +30,7 @@ import ( type Program struct { cfg config.Configuration interrupt chan os.Signal + metrics metrics.Metrics rest api.Server cancel context.CancelFunc } @@ -42,6 +44,7 @@ func Initialize() Program { utils.SetupLogger(cfg.LogLevel) storage := localfs.NewStorage(cfg.RootStorage) + metricsDaemon := metrics.NewMetrics(ctx, cfg.MetricsOutput, cfg.MetricsRefreshRate) restDaemon := api.NewServer(ctx, cfg.ServerPort, cfg.SecretsPath, &storage) @@ -49,6 +52,7 @@ func Initialize() Program { cfg: cfg, interrupt: make(chan os.Signal, 1), rest: restDaemon, + metrics: metricsDaemon, cancel: cancel, } } diff --git a/services/cnb-rates-rest/boot/run.go b/services/cnb-rates-rest/boot/run.go index fc20f2bc..fb5e939e 100644 --- a/services/cnb-rates-rest/boot/run.go +++ b/services/cnb-rates-rest/boot/run.go @@ -49,7 +49,8 @@ func (app Program) WaitReady(deadline time.Duration) error { }() } - wg.Add(1) + wg.Add(2) + waitWithDeadline(app.metrics) waitWithDeadline(app.rest) wg.Wait() @@ -74,6 +75,7 @@ func (app Program) WaitInterrupt() { func (app Program) Run() { log.Info(">>> Start <<<") + go app.metrics.Start() go app.rest.Start() if err := app.WaitReady(5 * time.Second); err != nil { @@ -90,6 +92,7 @@ func (app Program) Run() { utils.NotifyServiceStopping() app.rest.Stop() + app.metrics.Stop() app.cancel() log.Info(">>> Stop <<<") diff --git a/services/cnb-rates-rest/config/config.go b/services/cnb-rates-rest/config/config.go index 45d9957e..5b09602b 100644 --- a/services/cnb-rates-rest/config/config.go +++ b/services/cnb-rates-rest/config/config.go @@ -14,6 +14,8 @@ package config +import "time" + // Configuration of application type Configuration struct { // RootStorage gives where to store journals @@ -24,6 +26,11 @@ type Configuration struct { SecretsPath string // LogLevel ignorecase log level LogLevel string + // MetricsRefreshRate represents interval in which in memory metrics should be + // persisted to disk + MetricsRefreshRate time.Duration + // MetricsOutput represents output file for metrics persistence + MetricsOutput string } // GetConfig loads application configuration diff --git a/services/cnb-rates-rest/config/environment.go b/services/cnb-rates-rest/config/environment.go index 15374834..3887ee20 100644 --- a/services/cnb-rates-rest/config/environment.go +++ b/services/cnb-rates-rest/config/environment.go @@ -16,8 +16,10 @@ package config import ( "os" + "path/filepath" "strconv" "strings" + "time" log "github.com/sirupsen/logrus" ) @@ -27,18 +29,33 @@ func loadConfFromEnv() Configuration { secrets := getEnvString("CNB_RATES_SECRETS", "") rootStorage := getEnvString("CNB_RATES_STORAGE", "/data") port := getEnvInteger("CNB_RATES_HTTP_PORT", 4011) + metricsOutput := getEnvFilename("CNB_RATES_METRICS_OUTPUT", "/tmp") + metricsRefreshRate := getEnvDuration("CNB_RATES_METRICS_REFRESHRATE", time.Second) if secrets == "" || rootStorage == "" { log.Fatal("missing required parameter to run") } return Configuration{ - RootStorage: rootStorage, - ServerPort: port, - SecretsPath: secrets, - LogLevel: logLevel, + MetricsRefreshRate: metricsRefreshRate, + MetricsOutput: metricsOutput + "/metrics.json", + RootStorage: rootStorage, + ServerPort: port, + SecretsPath: secrets, + LogLevel: logLevel, } } +func getEnvFilename(key, fallback string) string { + var value = strings.TrimSpace(os.Getenv(key)) + if value == "" { + return fallback + } + value = filepath.Clean(value) + if os.MkdirAll(value, os.ModePerm) != nil { + return fallback + } + return value +} func getEnvString(key, fallback string) string { value := strings.TrimSpace(os.Getenv(key)) @@ -59,3 +76,15 @@ func getEnvInteger(key string, fallback int) int { } return cast } + +func getEnvDuration(key string, fallback time.Duration) time.Duration { + value := strings.TrimSpace(os.Getenv(key)) + if value == "" { + return fallback + } + cast, err := time.ParseDuration(value) + if err != nil { + log.Panicf("invalid value of variable %s", key) + } + return cast +} diff --git a/services/cnb-rates-rest/go.mod b/services/cnb-rates-rest/go.mod index 4d66177e..0763286b 100644 --- a/services/cnb-rates-rest/go.mod +++ b/services/cnb-rates-rest/go.mod @@ -6,5 +6,7 @@ require ( github.com/json-iterator/go v1.1.6 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.1 // indirect + github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a github.com/sirupsen/logrus v1.4.2 + github.com/stretchr/testify v1.3.0 ) diff --git a/services/cnb-rates-rest/go.sum b/services/cnb-rates-rest/go.sum index c973a21b..76299fc6 100644 --- a/services/cnb-rates-rest/go.sum +++ b/services/cnb-rates-rest/go.sum @@ -15,6 +15,8 @@ github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9 github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= +github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/services/cnb-rates-rest/metrics/metrics.go b/services/cnb-rates-rest/metrics/metrics.go new file mode 100644 index 00000000..34df177d --- /dev/null +++ b/services/cnb-rates-rest/metrics/metrics.go @@ -0,0 +1,115 @@ +// Copyright (c) 2016-2019, Jan Cajthaml +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "context" + "fmt" + "time" + + "github.com/jancajthaml-openbank/cnb-rates-rest/utils" + + metrics "github.com/rcrowley/go-metrics" + log "github.com/sirupsen/logrus" +) + +// Metrics represents metrics subroutine +type Metrics struct { + utils.DaemonSupport + output string + refreshRate time.Duration + gatewayLatency metrics.Timer + importLatency metrics.Timer +} + +// NewMetrics returns metrics fascade +func NewMetrics(ctx context.Context, output string, refreshRate time.Duration) Metrics { + return Metrics{ + DaemonSupport: utils.NewDaemonSupport(ctx), + output: output, + refreshRate: refreshRate, + gatewayLatency: metrics.NewTimer(), + importLatency: metrics.NewTimer(), + } +} + +func (metrics *Metrics) TimeGatewayLatency(f func()) { + metrics.gatewayLatency.Time(f) +} + +func (metrics *Metrics) TimeImportLatency(f func()) { + metrics.importLatency.Time(f) +} + +// WaitReady wait for metrics to be ready +func (metrics Metrics) WaitReady(deadline time.Duration) (err error) { + defer func() { + if e := recover(); e != nil { + switch x := e.(type) { + case string: + err = fmt.Errorf(x) + case error: + err = x + default: + err = fmt.Errorf("unknown panic") + } + } + }() + + ticker := time.NewTicker(deadline) + select { + case <-metrics.IsReady: + ticker.Stop() + err = nil + return + case <-ticker.C: + err = fmt.Errorf("daemon was not ready within %v seconds", deadline) + return + } +} + +// Start handles everything needed to start metrics daemon +func (metrics Metrics) Start() { + defer metrics.MarkDone() + + ticker := time.NewTicker(metrics.refreshRate) + defer ticker.Stop() + + if err := metrics.Hydrate(); err != nil { + log.Warn(err.Error()) + } + metrics.MarkReady() + + select { + case <-metrics.CanStart: + break + case <-metrics.Done(): + return + } + + log.Infof("Start metrics daemon, update each %v into %v", metrics.refreshRate, metrics.output) + + for { + select { + case <-metrics.Done(): + log.Info("Stopping metrics daemon") + metrics.Persist() + log.Info("Stop metrics daemon") + return + case <-ticker.C: + metrics.Persist() + } + } +} diff --git a/services/cnb-rates-rest/metrics/metrics_test.go b/services/cnb-rates-rest/metrics/metrics_test.go new file mode 100644 index 00000000..7bee25e0 --- /dev/null +++ b/services/cnb-rates-rest/metrics/metrics_test.go @@ -0,0 +1,39 @@ +package metrics + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMetrics(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + entity := NewMetrics(ctx, "", time.Hour) + delay := 1e8 + delta := 1e8 + + t.Log("TimeGatewayLatency properly times gateway latency") + { + require.Equal(t, int64(0), entity.gatewayLatency.Count()) + entity.TimeGatewayLatency(func() { + time.Sleep(time.Duration(delay)) + }) + assert.Equal(t, int64(1), entity.gatewayLatency.Count()) + assert.InDelta(t, entity.gatewayLatency.Percentile(0.95), delay, delta) + } + + t.Log("TimeImportLatency properly times import latency") + { + require.Equal(t, int64(0), entity.importLatency.Count()) + entity.TimeImportLatency(func() { + time.Sleep(time.Duration(delay)) + }) + assert.Equal(t, int64(1), entity.importLatency.Count()) + assert.InDelta(t, entity.importLatency.Percentile(0.95), delay, delta) + } +} diff --git a/services/cnb-rates-rest/metrics/persistence.go b/services/cnb-rates-rest/metrics/persistence.go new file mode 100644 index 00000000..3fefdf23 --- /dev/null +++ b/services/cnb-rates-rest/metrics/persistence.go @@ -0,0 +1,107 @@ +// Copyright (c) 2016-2019, Jan Cajthaml +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "bytes" + "fmt" + "io" + "os" + "strconv" + "time" + + "github.com/jancajthaml-openbank/cnb-rates-rest/utils" +) + +// MarshalJSON serialises Metrics as json preserving uint64 +func (entity *Metrics) MarshalJSON() ([]byte, error) { + var buffer bytes.Buffer + + buffer.WriteString("{\"gatewayLatency\":") + buffer.WriteString(strconv.FormatFloat(entity.gatewayLatency.Percentile(0.95), 'f', -1, 64)) + buffer.WriteString(",\"importLatency\":") + buffer.WriteString(strconv.FormatFloat(entity.importLatency.Percentile(0.95), 'f', -1, 64)) + buffer.WriteString("}") + + return buffer.Bytes(), nil +} + +// UnmarshalJSON unmarshal json of Metrics entity +func (entity *Metrics) UnmarshalJSON(data []byte) error { + if entity == nil { + return fmt.Errorf("cannot unmarshall to nil pointer") + } + all := struct { + GatewayLatency float64 `json:"gatewayLatency"` + ImportLatency float64 `json:"importLatency"` + }{} + err := utils.JSON.Unmarshal(data, &all) + if err != nil { + return err + } + + entity.gatewayLatency.Update(time.Duration(all.GatewayLatency)) + entity.importLatency.Update(time.Duration(all.ImportLatency)) + + return nil +} + +func (metrics *Metrics) Persist() error { + if metrics == nil { + return fmt.Errorf("cannot persist nil reference") + } + tempFile := metrics.output + "_temp" + data, err := utils.JSON.Marshal(metrics) + if err != nil { + return err + } + f, err := os.OpenFile(tempFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm) + if err != nil { + return err + } + defer f.Close() + if _, err := f.Write(data); err != nil { + return err + } + if err := os.Rename(tempFile, metrics.output); err != nil { + return err + } + return nil +} + +func (metrics *Metrics) Hydrate() error { + if metrics == nil { + return fmt.Errorf("cannot hydrate nil reference") + } + f, err := os.OpenFile(metrics.output, os.O_RDONLY, os.ModePerm) + if err != nil { + return err + } + defer f.Close() + fi, err := f.Stat() + if err != nil { + return err + } + buf := make([]byte, fi.Size()) + _, err = f.Read(buf) + if err != nil && err != io.EOF { + return err + } + err = utils.JSON.Unmarshal(buf, metrics) + if err != nil { + return err + } + return nil +}