Skip to content

Commit

Permalink
chore: Check for warnings/errors in telemetry-self-monitor (kyma-proj…
Browse files Browse the repository at this point in the history
…ect#1251)

Co-authored-by: Teodor-Adrian Mihaescu <103431261+TeodorSAP@users.noreply.github.com>
  • Loading branch information
a-thaler and TeodorSAP authored Jul 9, 2024
1 parent 7562b70 commit 7cc3456
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 123 deletions.
261 changes: 145 additions & 116 deletions test/e2e/telemetry_logs_analysis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package e2e

import (
"fmt"
"net/http"
"time"

Expand All @@ -25,117 +24,128 @@ import (
"github.com/kyma-project/telemetry-manager/test/testkit/suite"
)

var _ = Describe(suite.ID(), Label(suite.LabelTelemetryLogsAnalysis), Ordered, func() {
var _ = Describe(suite.ID(), Label(suite.LabelTelemetryLogAnalysis), Ordered, func() {
const (
otelCollectorNs = "tlogs-otelcollector"
fluentBitNs = "tlogs-fluentbit"
otelCollectorLogBackendName = "tlogs-otelcollector-log"
fluentBitLogBackendName = "tlogs-fluentbit-log"
metricBackendName = "tlogs-metric"
traceBackendName = "tlogs-trace"
pushMetricsDepName = "push-metrics-istiofied"
consistentlyTimeout = time.Second * 120
traceBackendName = "trace-backend"
metricBackendName = "metric-backend"
logBackendName = "log-backend"
otelCollectorLogBackendName = "otel-collector-log-backend"
fluentBitLogBackendName = "fluent-bit-log-backend"
selfMonitorLogBackendName = "self-monitor-log-backend"
)

var (
otelCollectorLogPipelineName string
fluentBitLogPipelineName string
metricPipelineName string
tracePipelineName string
otelCollectorLogbackendExportURL string
fluentBitLogbackendExportURL string
metricbackendExportURL string
tracebackendExportURL string
gomegaMaxLength = format.MaxLength
errorWarningLevels = []string{
"ERROR", "error",
"WARNING", "warning",
"WARN", "warn"}
traceBackendURL string
metricBackendURL string
logBackendURL string
otelCollectorLogBackendURL string
fluentBitLogBackendURL string
selfMonitorLogBackendURL string
namespace = suite.ID()
gomegaMaxLength = format.MaxLength
logLevelsRegexp = "ERROR|error|WARNING|warning|WARN|warn"
)

makeResourcesOtelCollector := func() []client.Object {
makeResourcesTracePipeline := func(backendName string) []client.Object {
var objs []client.Object
objs = append(objs, kitk8s.NewNamespace(otelCollectorNs).K8sObject())

// backends
otelCollectorLogBackend := backend.New(otelCollectorNs, backend.SignalTypeLogs, backend.WithName(otelCollectorLogBackendName))
objs = append(objs, otelCollectorLogBackend.K8sObjects()...)
otelCollectorLogbackendExportURL = otelCollectorLogBackend.ExportURL(proxyClient)
metricBackend := backend.New(otelCollectorNs, backend.SignalTypeMetrics, backend.WithName(metricBackendName))
metricbackendExportURL = metricBackend.ExportURL(proxyClient)
objs = append(objs, metricBackend.K8sObjects()...)
traceBackend := backend.New(otelCollectorNs, backend.SignalTypeTraces, backend.WithName(traceBackendName))
tracebackendExportURL = traceBackend.ExportURL(proxyClient)
//backend
traceBackend := backend.New(namespace, backend.SignalTypeTraces, backend.WithName(backendName))
traceBackendURL = traceBackend.ExportURL(proxyClient)
objs = append(objs, traceBackend.K8sObjects()...)

// log pipeline
otelCollectorLogPipelineName = fmt.Sprintf("%s-pipeline", otelCollectorLogBackend.Name())
otelCollectorLogPipeline := testutils.NewLogPipelineBuilder().
WithName(otelCollectorLogPipelineName).
WithIncludeNamespaces(kitkyma.SystemNamespaceName).
WithIncludeContainers("collector").
WithHTTPOutput(testutils.HTTPHost(otelCollectorLogBackend.Host()), testutils.HTTPPort(otelCollectorLogBackend.Port())).
//pipeline
tracePipeline := testutils.NewTracePipelineBuilder().
WithName(backendName).
WithOTLPOutput(testutils.OTLPEndpoint(traceBackend.Endpoint())).
Build()
objs = append(objs, &otelCollectorLogPipeline)
objs = append(objs, &tracePipeline)

//client
objs = append(objs, kitk8s.NewPod("telemetrygen-traces", namespace).WithPodSpec(telemetrygen.PodSpec(telemetrygen.SignalTypeTraces)).K8sObject())
return objs
}

makeResourcesMetricPipeline := func(backendName string) []client.Object {
var objs []client.Object

//backend
metricBackend := backend.New(namespace, backend.SignalTypeMetrics, backend.WithName(backendName))
metricBackendURL = metricBackend.ExportURL(proxyClient)
objs = append(objs, metricBackend.K8sObjects()...)

// metrics & traces
metricPipelineName = fmt.Sprintf("%s-pipeline", metricBackend.Name())
//pipeline
metricPipeline := testutils.NewMetricPipelineBuilder().
WithName(metricPipelineName).
WithPrometheusInput(true, testutils.IncludeNamespaces(otelCollectorNs)).
WithRuntimeInput(true, testutils.IncludeNamespaces(otelCollectorNs)).
WithIstioInput(true, testutils.IncludeNamespaces(otelCollectorNs)).
WithName(backendName).
WithPrometheusInput(true, testutils.IncludeNamespaces(namespace)).
WithRuntimeInput(true, testutils.IncludeNamespaces(namespace)).
WithIstioInput(true, testutils.IncludeNamespaces(namespace)).
WithOTLPOutput(testutils.OTLPEndpoint(metricBackend.Endpoint())).
Build()
objs = append(objs, &metricPipeline)

tracePipelineName = fmt.Sprintf("%s-pipeline", traceBackend.Name())
tracePipeline := testutils.NewTracePipelineBuilder().
WithName(tracePipelineName).
WithOTLPOutput(testutils.OTLPEndpoint(traceBackend.Endpoint())).
Build()
objs = append(objs, &tracePipeline)

// metrics istio set-up (trafficgen & telemetrygen)
objs = append(objs, trafficgen.K8sObjects(otelCollectorNs)...)
objs = append(objs,
kitk8s.NewPod("telemetrygen-metrics", otelCollectorNs).WithPodSpec(telemetrygen.PodSpec(telemetrygen.SignalTypeMetrics)).K8sObject(),
kitk8s.NewPod("telemetrygen-traces", otelCollectorNs).WithPodSpec(telemetrygen.PodSpec(telemetrygen.SignalTypeTraces)).K8sObject(),
)
//client
objs = append(objs, trafficgen.K8sObjects(namespace)...)
objs = append(objs, kitk8s.NewPod("telemetrygen-metrics", namespace).WithPodSpec(telemetrygen.PodSpec(telemetrygen.SignalTypeMetrics)).K8sObject())

return objs
}

makeResourcesFluentBit := func() []client.Object {
makeResourcesLogPipeline := func(backendName string) []client.Object {
var objs []client.Object
objs = append(objs, kitk8s.NewNamespace(fluentBitNs).K8sObject())

// logs overrides (include agent logs)
overrides := kitk8s.NewOverrides().WithPaused(false).WithCollectAgentLogs(true)
objs = append(objs, overrides.K8sObject())

// backend
fluentBitLogBackend := backend.New(fluentBitNs, backend.SignalTypeLogs, backend.WithName(fluentBitLogBackendName))
objs = append(objs, fluentBitLogBackend.K8sObjects()...)
fluentBitLogbackendExportURL = fluentBitLogBackend.ExportURL(proxyClient)
logBackend := backend.New(namespace, backend.SignalTypeLogs, backend.WithName(backendName))
logBackendURL = logBackend.ExportURL(proxyClient)
objs = append(objs, logBackend.K8sObjects()...)

// log pipeline
fluentBitLogPipelineName = fmt.Sprintf("%s-pipeline", fluentBitLogBackend.Name())
fluentBitLogPipeline := testutils.NewLogPipelineBuilder().
WithName(fluentBitLogPipelineName).
WithIncludeNamespaces(kitkyma.SystemNamespaceName).
WithIncludeContainers("fluent-bit", "exporter").
WithHTTPOutput(testutils.HTTPHost(fluentBitLogBackend.Host()), testutils.HTTPPort(fluentBitLogBackend.Port())).
logPipeline := testutils.NewLogPipelineBuilder().
WithName(backendName).
WithHTTPOutput(testutils.HTTPHost(logBackend.Host()), testutils.HTTPPort(logBackend.Port())).
Build()
objs = append(objs, &fluentBitLogPipeline)
objs = append(objs, &logPipeline)

//no client
return objs
}

Context("When OtelCollector-based components are deployed", func() {
makeResourcesToCollectLogs := func(backendName string, containers ...string) ([]client.Object, string) {
var objs []client.Object

// backends
logBackend := backend.New(namespace, backend.SignalTypeLogs, backend.WithName(backendName))
backendURL := logBackend.ExportURL(proxyClient)
objs = append(objs, logBackend.K8sObjects()...)

// log pipeline
logPipeline := testutils.NewLogPipelineBuilder().
WithName(backendName).
WithIncludeNamespaces(kitkyma.SystemNamespaceName).
WithIncludeContainers(containers...).
WithHTTPOutput(testutils.HTTPHost(logBackend.Host()), testutils.HTTPPort(logBackend.Port())).
Build()
objs = append(objs, &logPipeline)
return objs, backendURL
}

Context("When all components are deployed", func() {
BeforeAll(func() {
format.MaxLength = 0 // remove Gomega truncation
k8sObjects := makeResourcesOtelCollector()
var k8sObjects []client.Object
k8sObjects = append(k8sObjects, kitk8s.NewNamespace(namespace).K8sObject())
k8sObjects = append(k8sObjects, makeResourcesTracePipeline(traceBackendName)...)
k8sObjects = append(k8sObjects, makeResourcesMetricPipeline(metricBackendName)...)
k8sObjects = append(k8sObjects, makeResourcesLogPipeline(logBackendName)...)
var objs []client.Object
objs, otelCollectorLogBackendURL = makeResourcesToCollectLogs(otelCollectorLogBackendName, "collector")
k8sObjects = append(k8sObjects, objs...)
objs, fluentBitLogBackendURL = makeResourcesToCollectLogs(fluentBitLogBackendName, "fluent-bit", "exporter")
k8sObjects = append(k8sObjects, objs...)
objs, selfMonitorLogBackendURL = makeResourcesToCollectLogs(selfMonitorLogBackendName, "self-monitor")
k8sObjects = append(k8sObjects, objs...)

DeferCleanup(func() {
Expect(kitk8s.DeleteObjects(ctx, k8sClient, k8sObjects...)).Should(Succeed())
})
Expand All @@ -148,80 +158,99 @@ var _ = Describe(suite.ID(), Label(suite.LabelTelemetryLogsAnalysis), Ordered, f
})

It("Should have running backends", func() {
assert.DeploymentReady(ctx, k8sClient, types.NamespacedName{Namespace: otelCollectorNs, Name: otelCollectorLogBackendName})
assert.DeploymentReady(ctx, k8sClient, types.NamespacedName{Namespace: otelCollectorNs, Name: metricBackendName})
assert.DeploymentReady(ctx, k8sClient, types.NamespacedName{Namespace: otelCollectorNs, Name: traceBackendName})
})
assert.DeploymentReady(ctx, k8sClient, types.NamespacedName{Namespace: namespace, Name: logBackendName})
assert.DeploymentReady(ctx, k8sClient, types.NamespacedName{Namespace: namespace, Name: metricBackendName})
assert.DeploymentReady(ctx, k8sClient, types.NamespacedName{Namespace: namespace, Name: traceBackendName})

It("Should have running pipelines", func() {
assert.LogPipelineHealthy(ctx, k8sClient, otelCollectorLogPipelineName)
assert.MetricPipelineHealthy(ctx, k8sClient, metricPipelineName)
assert.TracePipelineHealthy(ctx, k8sClient, tracePipelineName)
assert.DeploymentReady(ctx, k8sClient, types.NamespacedName{Namespace: namespace, Name: otelCollectorLogBackendName})
assert.DeploymentReady(ctx, k8sClient, types.NamespacedName{Namespace: namespace, Name: fluentBitLogBackendName})
assert.DeploymentReady(ctx, k8sClient, types.NamespacedName{Namespace: namespace, Name: selfMonitorLogBackendName})
})

It("Should have a running metric agent daemonset", func() {
It("Should have running agents", func() {
assert.DaemonSetReady(ctx, k8sClient, kitkyma.MetricAgentName)
assert.DaemonSetReady(ctx, k8sClient, kitkyma.FluentBitDaemonSet)
})

It("Should have running pipelines", func() {
assert.LogPipelineHealthy(ctx, k8sClient, logBackendName)
assert.MetricPipelineHealthy(ctx, k8sClient, metricBackendName)
assert.TracePipelineHealthy(ctx, k8sClient, traceBackendName)

assert.LogPipelineHealthy(ctx, k8sClient, otelCollectorLogBackendName)
assert.LogPipelineHealthy(ctx, k8sClient, fluentBitLogBackendName)
assert.LogPipelineHealthy(ctx, k8sClient, selfMonitorLogBackendName)
})

It("Should push metrics successfully", func() {
assert.MetricsFromNamespaceDelivered(proxyClient, metricbackendExportURL, otelCollectorNs, telemetrygen.MetricNames)
assert.MetricsFromNamespaceDelivered(proxyClient, metricBackendURL, namespace, telemetrygen.MetricNames)
})

It("Should push traces successfully", func() {
assert.TracesFromNamespaceDelivered(proxyClient, tracebackendExportURL, otelCollectorNs)
assert.TracesFromNamespaceDelivered(proxyClient, traceBackendURL, namespace)
})

It("Should not have any ERROR/WARNING logs in the OtelCollector containers", func() {
It("Should collect logs successfully", func() {
assert.LogsDelivered(proxyClient, "", logBackendURL)
})

It("Should collect otel collector component logs successfully", func() {
assert.LogsDelivered(proxyClient, "telemetry-", otelCollectorLogBackendURL)
})

It("Should collect fluent-bit component logs successfully", func() {
assert.LogsDelivered(proxyClient, "telemetry-", fluentBitLogBackendURL)
})

It("Should collect self-monitor component logs successfully", func() {
assert.LogsDelivered(proxyClient, "telemetry-", selfMonitorLogBackendURL)
})

It("Should not have any error/warn logs in the otel collector component containers", func() {
Consistently(func(g Gomega) {
resp, err := proxyClient.Get(otelCollectorLogbackendExportURL)
resp, err := proxyClient.Get(otelCollectorLogBackendURL)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(resp).To(HaveHTTPStatus(http.StatusOK))
g.Expect(resp).To(HaveHTTPBody(
Not(ContainLd(ContainLogRecord(SatisfyAll(
WithPodName(ContainSubstring("telemetry-")),
WithLevel(BeElementOf(errorWarningLevels)),
WithLevel(MatchRegexp(logLevelsRegexp)),
WithLogBody(Not( // whitelist possible (flaky/expected) errors
Or(
ContainSubstring("grpc: addrConn.createTransport failed to connect"),
ContainSubstring("rpc error: code = Unavailable desc = no healthy upstream"),
ContainSubstring("interrupted due to shutdown:"),
ContainSubstring("Variable substitution using $VAR will be deprecated"),
),
)),
)))),
))
}, consistentlyTimeout, periodic.TelemetryInterval).Should(Succeed())
})

AfterAll(func() {
format.MaxLength = gomegaMaxLength // restore Gomega truncation
})
})

Context("When FluentBit-based components are deployed", func() {
BeforeAll(func() {
format.MaxLength = 0 // remove Gomega truncation
k8sObjects := makeResourcesFluentBit()
DeferCleanup(func() {
Expect(kitk8s.DeleteObjects(ctx, k8sClient, k8sObjects...)).Should(Succeed())
})
Expect(kitk8s.CreateObjects(ctx, k8sClient, k8sObjects...)).Should(Succeed())
})

It("Should have a running backend", func() {
assert.DeploymentReady(ctx, k8sClient, types.NamespacedName{Namespace: fluentBitNs, Name: fluentBitLogBackendName})
})

It("Should have a running pipeline", func() {
assert.LogPipelineHealthy(ctx, k8sClient, fluentBitLogPipelineName)
It("Should not have any error/warn logs in the FluentBit containers", func() {
Consistently(func(g Gomega) {
resp, err := proxyClient.Get(fluentBitLogBackendURL)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(resp).To(HaveHTTPStatus(http.StatusOK))
g.Expect(resp).To(HaveHTTPBody(
Not(ContainLd(ContainLogRecord(SatisfyAll(
WithPodName(ContainSubstring("telemetry-")),
WithLogBody(MatchRegexp(logLevelsRegexp)), // fluenbit does not log in JSON, so we need to check the body for errors
)))),
))
}, consistentlyTimeout, periodic.TelemetryInterval).Should(Succeed())
})

It("Should not have any ERROR/WARNING logs in the FluentBit containers", func() {
It("Should not have any error/warn logs in the self-monitor containers", func() {
Consistently(func(g Gomega) {
resp, err := proxyClient.Get(fluentBitLogbackendExportURL)
resp, err := proxyClient.Get(selfMonitorLogBackendURL)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(resp).To(HaveHTTPStatus(http.StatusOK))
g.Expect(resp).To(HaveHTTPBody(
Not(ContainLd(ContainLogRecord(SatisfyAll(
WithPodName(ContainSubstring("telemetry-")),
WithLevel(BeElementOf(errorWarningLevels)),
WithLevel(MatchRegexp(logLevelsRegexp)),
)))),
))
}, consistentlyTimeout, periodic.TelemetryInterval).Should(Succeed())
Expand Down
14 changes: 7 additions & 7 deletions test/testkit/suite/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ func sanitizeSpecID(filePath string) string {
}

const (
LabelLogs = "logs"
LabelTraces = "traces"
LabelMetrics = "metrics"
LabelTelemetry = "telemetry"
LabelV1Beta1 = "v1beta1"
LabelTelemetryLogsAnalysis = "telemetry-logs-analysis"
LabelMaxPipeline = "max-pipeline"
LabelLogs = "logs"
LabelTraces = "traces"
LabelMetrics = "metrics"
LabelTelemetry = "telemetry"
LabelV1Beta1 = "v1beta1"
LabelTelemetryLogAnalysis = "telemetry-log-analysis"
LabelMaxPipeline = "max-pipeline"

LabelSelfMonitoringLogsHealthy = "self-mon-logs-healthy"
LabelSelfMonitoringLogsBackpressure = "self-mon-logs-backpressure"
Expand Down

0 comments on commit 7cc3456

Please sign in to comment.