Skip to content

Commit

Permalink
feat: Add K8s cluster receiver (kyma-project#1343)
Browse files Browse the repository at this point in the history
  • Loading branch information
rakesh-garimella authored Aug 26, 2024
1 parent 9cacc14 commit cffacab
Show file tree
Hide file tree
Showing 23 changed files with 461 additions and 161 deletions.
3 changes: 0 additions & 3 deletions config/development/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ patches:
- op: add
path: /spec/template/spec/containers/0/args/-
value: --kyma-input-allowed=true
- op: add
path: /spec/template/spec/containers/0/args/-
value: --k8s-cluster-receiver-allowed=true
target:
kind: Deployment
name: manager
Expand Down
2 changes: 1 addition & 1 deletion controllers/telemetry/metricpipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func NewMetricPipelineController(client client.Client, reconcileTriggerChan <-ch
}

agentRBAC := otelcollector.MakeMetricAgentRBAC(types.NamespacedName{Name: config.Agent.BaseName, Namespace: config.Agent.Namespace})
gatewayRBAC := otelcollector.MakeMetricGatewayRBAC(types.NamespacedName{Name: config.Gateway.BaseName, Namespace: config.Gateway.Namespace}, config.KymaInputAllowed, config.K8sClusterReceiverAllowed)
gatewayRBAC := otelcollector.MakeMetricGatewayRBAC(types.NamespacedName{Name: config.Gateway.BaseName, Namespace: config.Gateway.Namespace}, config.KymaInputAllowed)

reconciler := metricpipeline.New(
client,
Expand Down
42 changes: 40 additions & 2 deletions internal/otelcollector/config/metric/gateway/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ type Config struct {
}

type Receivers struct {
OTLP config.OTLPReceiver `yaml:"otlp"`
SingletonKymaStatsReceiverCreator *SingletonKymaStatsReceiverCreator `yaml:"singleton_receiver_creator/kymastats,omitempty"`
OTLP config.OTLPReceiver `yaml:"otlp"`
SingletonKymaStatsReceiverCreator *SingletonKymaStatsReceiverCreator `yaml:"singleton_receiver_creator/kymastats,omitempty"`
SingletonK8sClusterReceiverCreator *SingletonK8sClusterReceiverCreator `yaml:"singleton_receiver_creator/k8s_cluster,omitempty"`
}

type SingletonKymaStatsReceiverCreator struct {
Expand All @@ -24,6 +25,12 @@ type SingletonKymaStatsReceiverCreator struct {
SingletonKymaStatsReceiver SingletonKymaStatsReceiver `yaml:"receiver"`
}

type SingletonK8sClusterReceiverCreator struct {
AuthType string `yaml:"auth_type"`
LeaderElection LeaderElection `yaml:"leader_election"`
SingletonK8sClusterReceiver SingletonK8sClusterReceiver `yaml:"receiver"`
}

type LeaderElection struct {
LeaseName string `yaml:"lease_name"`
LeaseNamespace string `yaml:"lease_namespace"`
Expand All @@ -39,6 +46,35 @@ type KymaStatsReceiver struct {
Modules []ModuleGVR `yaml:"modules"`
}

type SingletonK8sClusterReceiver struct {
K8sClusterReceiver K8sClusterReceiver `yaml:"k8s_cluster"`
}

type K8sClusterReceiver struct {
AuthType string `yaml:"auth_type"`
CollectionInterval string `yaml:"collection_interval"`
NodeConditionsToReport []string `yaml:"node_conditions_to_report"`
AllocatableTypesToReport []string `yaml:"allocatable_types_to_report"`
Metrics K8sClusterMetricsConfig `yaml:"metrics"`
}

type MetricConfig struct {
Enabled bool `yaml:"enabled"`
}

type K8sClusterMetricsConfig struct {
// metrics allows enabling/disabling scraped metric.
K8sContainerStorageRequest MetricConfig `yaml:"k8s.container.storage_request"`
K8sContainerStorageLimit MetricConfig `yaml:"k8s.container.storage_limit"`
K8sContainerEphemeralStorageRequest MetricConfig `yaml:"k8s.container.ephemeralstorage_request"`
K8sContainerEphemeralStorageLimit MetricConfig `yaml:"k8s.container.ephemeralstorage_limit"`
K8sContainerRestarts MetricConfig `yaml:"k8s.container.restarts"`
K8sContainerReady MetricConfig `yaml:"k8s.container.ready"`
K8sNamespacePhase MetricConfig `yaml:"k8s.namespace.phase"`
K8sReplicationControllerAvailable MetricConfig `yaml:"k8s.replication_controller.available"`
K8sReplicationControllerDesired MetricConfig `yaml:"k8s.replication_controller.desired"`
}

type ModuleGVR struct {
Group string `yaml:"group"`
Version string `yaml:"version"`
Expand All @@ -58,8 +94,10 @@ type Processors struct {
DropIfInputSourceOtlp *FilterProcessor `yaml:"filter/drop-if-input-source-otlp,omitempty"`
DropRuntimePodMetrics *FilterProcessor `yaml:"filter/drop-runtime-pod-metrics,omitempty"`
DropRuntimeContainerMetrics *FilterProcessor `yaml:"filter/drop-runtime-container-metrics,omitempty"`
DropK8sClusterMetrics *FilterProcessor `yaml:"filter/drop-k8s-cluster-metrics,omitempty"`
ResolveServiceName *TransformProcessor `yaml:"transform/resolve-service-name,omitempty"`
SetInstrumentationScopeKyma *metric.TransformProcessor `yaml:"transform/set-instrumentation-scope-kyma,omitempty"`
SetInstrumentationScopeRuntime *metric.TransformProcessor `yaml:"transform/set-instrumentation-scope-runtime,omitempty"`

// NamespaceFilters contains filter processors, which need different configurations per pipeline
NamespaceFilters NamespaceFilters `yaml:",inline,omitempty"`
Expand Down
32 changes: 29 additions & 3 deletions internal/otelcollector/config/metric/gateway/config_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func declareComponentsForMetricPipeline(
opts BuildOptions,
) error {
declareSingletonKymaStatsReceiverCreator(pipeline, cfg, opts)
declareSingletonK8sClusterReceiverCreator(pipeline, cfg, opts)
declareDiagnosticMetricsDropFilters(pipeline, cfg)
declareInputSourceFilters(pipeline, cfg)
declareRuntimeResourcesFilters(pipeline, cfg)
Expand All @@ -81,6 +82,12 @@ func declareComponentsForMetricPipeline(
return declareOTLPExporter(ctx, otlpExporterBuilder, pipeline, cfg, envVars)
}

func declareSingletonK8sClusterReceiverCreator(pipeline *telemetryv1alpha1.MetricPipeline, cfg *Config, opts BuildOptions) {
if isRuntimeInputEnabled(pipeline.Spec.Input) {
cfg.Receivers.SingletonK8sClusterReceiverCreator = makeSingletonK8sClusterReceiverCreatorConfig(opts.GatewayNamespace)
}
}

func declareSingletonKymaStatsReceiverCreator(pipeline *telemetryv1alpha1.MetricPipeline, cfg *Config, opts BuildOptions) {
if isKymaInputEnabled(pipeline.Annotations, opts.KymaInputAllowed) {
cfg.Receivers.SingletonKymaStatsReceiverCreator = makeSingletonKymaStatsReceiverCreatorConfig(opts.GatewayNamespace)
Expand Down Expand Up @@ -124,6 +131,10 @@ func declareRuntimeResourcesFilters(pipeline *telemetryv1alpha1.MetricPipeline,
if isRuntimeInputEnabled(input) && !isRuntimeContainerMetricsEnabled(input) {
cfg.Processors.DropRuntimeContainerMetrics = makeDropRuntimeContainerMetricsConfig()
}

if isRuntimeInputEnabled(input) {
cfg.Processors.DropK8sClusterMetrics = makeK8sClusterDropMetrics()
}
}

func declareNamespaceFilters(pipeline *telemetryv1alpha1.MetricPipeline, cfg *Config) {
Expand Down Expand Up @@ -154,6 +165,9 @@ func declareInstrumentationScopeTransform(pipeline *telemetryv1alpha1.MetricPipe
if isKymaInputEnabled(pipeline.Annotations, opts.KymaInputAllowed) {
cfg.Processors.SetInstrumentationScopeKyma = metric.MakeInstrumentationScopeProcessor(metric.InputSourceKyma, opts.InstrumentationScopeVersion)
}
if isRuntimeInputEnabled(pipeline.Spec.Input) {
cfg.Processors.SetInstrumentationScopeRuntime = metric.MakeInstrumentationScopeProcessor(metric.InputSourceK8sCluster, opts.InstrumentationScopeVersion)
}
}

func declareOTLPExporter(ctx context.Context, otlpExporterBuilder *otlpexporter.ConfigBuilder, pipeline *telemetryv1alpha1.MetricPipeline, cfg *Config, envVars otlpexporter.EnvVars) error {
Expand All @@ -175,6 +189,11 @@ func makeServicePipelineConfig(pipeline *telemetryv1alpha1.MetricPipeline, opts

input := pipeline.Spec.Input

// Perform the transform before runtime resource filter as InstrumentationScopeRuntime is required for dropping container/pod metrics
if isRuntimeInputEnabled(pipeline.Spec.Input) {
processors = append(processors, "transform/set-instrumentation-scope-runtime")
}

processors = append(processors, makeInputSourceFiltersIDs(input)...)
processors = append(processors, makeNamespaceFiltersIDs(input, pipeline)...)
processors = append(processors, makeRuntimeResourcesFiltersIDs(input)...)
Expand All @@ -187,7 +206,7 @@ func makeServicePipelineConfig(pipeline *telemetryv1alpha1.MetricPipeline, opts
processors = append(processors, "resource/insert-cluster-name", "transform/resolve-service-name", "batch")

return config.Pipeline{
Receivers: makeReceiversIDs(pipeline.Annotations, opts.KymaInputAllowed),
Receivers: makeReceiversIDs(pipeline, opts),
Processors: processors,
Exporters: []string{makeOTLPExporterID(pipeline)},
}
Expand Down Expand Up @@ -240,6 +259,9 @@ func makeRuntimeResourcesFiltersIDs(input telemetryv1alpha1.MetricPipelineInput)
if isRuntimeInputEnabled(input) && !isRuntimeContainerMetricsEnabled(input) {
processors = append(processors, "filter/drop-runtime-container-metrics")
}
if isRuntimeInputEnabled(input) {
processors = append(processors, "filter/drop-k8s-cluster-metrics")
}

return processors
}
Expand All @@ -265,15 +287,19 @@ func formatNamespaceFilterID(pipelineName string, inputSourceType metric.InputSo
return fmt.Sprintf("filter/%s-filter-by-namespace-%s-input", pipelineName, inputSourceType)
}

func makeReceiversIDs(annotations map[string]string, kymaInputAllowed bool) []string {
func makeReceiversIDs(pipeline *telemetryv1alpha1.MetricPipeline, opts BuildOptions) []string {
var receivers []string

receivers = append(receivers, "otlp")

if isKymaInputEnabled(annotations, kymaInputAllowed) {
if isKymaInputEnabled(pipeline.Annotations, opts.KymaInputAllowed) {
receivers = append(receivers, "singleton_receiver_creator/kymastats")
}

if isRuntimeInputEnabled(pipeline.Spec.Input) {
receivers = append(receivers, "singleton_receiver_creator/k8s_cluster")
}

return receivers
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ func TestMakeConfig(t *testing.T) {
require.Contains(t, collectorConfig.Service.Pipelines["metrics/test"].Exporters, "otlp/test")
require.Contains(t, collectorConfig.Service.Pipelines["metrics/test"].Receivers, "otlp")
require.NotContains(t, collectorConfig.Service.Pipelines["metrics/test"].Receivers, "singleton_receiver_creator/kymastats")
require.NotContains(t, collectorConfig.Service.Pipelines["metrics/test"].Receivers, "singleton_receiver_creator/k8s_cluster")
require.Equal(t, []string{"memory_limiter",
"k8sattributes",
"filter/drop-if-input-source-runtime",
Expand Down Expand Up @@ -328,8 +329,10 @@ func TestMakeConfig(t *testing.T) {
require.Contains(t, collectorConfig.Service.Pipelines["metrics/test"].Receivers, "otlp")
require.Equal(t, []string{"memory_limiter",
"k8sattributes",
"transform/set-instrumentation-scope-runtime",
"filter/drop-if-input-source-prometheus",
"filter/drop-if-input-source-istio",
"filter/drop-k8s-cluster-metrics",
"resource/insert-cluster-name",
"transform/resolve-service-name",
"batch",
Expand Down Expand Up @@ -357,9 +360,11 @@ func TestMakeConfig(t *testing.T) {
require.Contains(t, collectorConfig.Service.Pipelines["metrics/test"].Receivers, "otlp")
require.Equal(t, []string{"memory_limiter",
"k8sattributes",
"transform/set-instrumentation-scope-runtime",
"filter/drop-if-input-source-prometheus",
"filter/drop-if-input-source-istio",
"filter/drop-runtime-container-metrics",
"filter/drop-k8s-cluster-metrics",
"resource/insert-cluster-name",
"transform/resolve-service-name",
"batch",
Expand Down Expand Up @@ -387,9 +392,11 @@ func TestMakeConfig(t *testing.T) {
require.Contains(t, collectorConfig.Service.Pipelines["metrics/test"].Receivers, "otlp")
require.Equal(t, []string{"memory_limiter",
"k8sattributes",
"transform/set-instrumentation-scope-runtime",
"filter/drop-if-input-source-prometheus",
"filter/drop-if-input-source-istio",
"filter/drop-runtime-pod-metrics",
"filter/drop-k8s-cluster-metrics",
"resource/insert-cluster-name",
"transform/resolve-service-name",
"batch",
Expand Down Expand Up @@ -581,6 +588,7 @@ func TestMakeConfig(t *testing.T) {
"batch",
}, collectorConfig.Service.Pipelines["metrics/test"].Processors)
})

})

t.Run("multi pipeline topology", func(t *testing.T) {
Expand Down Expand Up @@ -617,9 +625,11 @@ func TestMakeConfig(t *testing.T) {
require.Contains(t, collectorConfig.Service.Pipelines["metrics/test-1"].Receivers, "otlp")
require.Equal(t, []string{"memory_limiter",
"k8sattributes",
"transform/set-instrumentation-scope-runtime",
"filter/drop-if-input-source-prometheus",
"filter/drop-if-input-source-istio",
"filter/test-1-filter-by-namespace-runtime-input",
"filter/drop-k8s-cluster-metrics",
"resource/insert-cluster-name",
"transform/resolve-service-name",
"batch",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package gateway

import "github.com/kyma-project/telemetry-manager/internal/otelcollector/config/ottlexpr"
import (
"github.com/kyma-project/telemetry-manager/internal/otelcollector/config/ottlexpr"
)

var scrapeMetrics = []string{"up", "scrape_duration_seconds", "scrape_samples_scraped", "scrape_samples_post_metric_relabeling", "scrape_series_added"}

Expand Down
62 changes: 50 additions & 12 deletions internal/otelcollector/config/metric/gateway/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,27 +86,30 @@ func makeDropIfInputSourceOtlpConfig() *FilterProcessor {
}

func makeDropRuntimePodMetricsConfig() *FilterProcessor {
dropMetricRules := []string{
ottlexpr.JoinWithAnd(
inputSourceEquals(metric.InputSourceRuntime),
ottlexpr.IsMatch("name", "^k8s.pod.*"),
),
}
return &FilterProcessor{
Metrics: FilterProcessorMetrics{
Metric: []string{
ottlexpr.JoinWithAnd(
inputSourceEquals(metric.InputSourceRuntime),
ottlexpr.IsMatch("name", "^k8s.pod.*"),
),
},
Metric: dropMetricRules,
},
}
}

func makeDropRuntimeContainerMetricsConfig() *FilterProcessor {
dropMetricRules := []string{
ottlexpr.JoinWithAnd(
inputSourceEquals(metric.InputSourceRuntime),
ottlexpr.IsMatch("name", "(^k8s.container.*)|(^container.*)"),
),
}

return &FilterProcessor{
Metrics: FilterProcessorMetrics{
Metric: []string{
ottlexpr.JoinWithAnd(
inputSourceEquals(metric.InputSourceRuntime),
ottlexpr.IsMatch("name", "(^k8s.container.*)|(^container.*)"),
),
},
Metric: dropMetricRules,
},
}
}
Expand Down Expand Up @@ -149,6 +152,41 @@ func makeFilterByNamespaceConfig(namespaceSelector *telemetryv1alpha1.MetricPipe
}
}

// Drop the metrics scraped by k8s cluster, except for the pod and container metrics
// Complete list of the metrics is here: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/receiver/k8sclusterreceiver/documentation.md
func makeK8sClusterDropMetrics() *FilterProcessor {
metricNames := []string{
"^k8s.deployment.*",
"^k8s.cronjob.*",
"^k8s.daemonset.*",
"^k8s.hpa.*",
"^k8s.job.*",
"^k8s.replicaset.*",
"^k8s.resource_quota.*",
"^k8s.statefulset.*",
"^k8s.node.*",
}
metricNameConditions := createIsMatchNameConditions(metricNames)
return &FilterProcessor{
Metrics: FilterProcessorMetrics{
Metric: []string{
ottlexpr.JoinWithAnd(
inputSourceEquals(metric.InputSourceRuntime),
ottlexpr.JoinWithOr(metricNameConditions...),
),
},
},
}
}

func createIsMatchNameConditions(names []string) []string {
var nameConditions []string
for _, name := range names {
nameConditions = append(nameConditions, ottlexpr.IsMatch("name", name))
}
return nameConditions
}

func createNamespacesConditions(namespaces []string) []string {
var namespacesConditions []string
for _, ns := range namespaces {
Expand Down
Loading

0 comments on commit cffacab

Please sign in to comment.