diff --git a/api/telemetry/v1alpha1/otlp_config.go b/api/telemetry/v1alpha1/otlp_config.go index 650000a..4da8715 100644 --- a/api/telemetry/v1alpha1/otlp_config.go +++ b/api/telemetry/v1alpha1/otlp_config.go @@ -24,49 +24,49 @@ import ( type TimeoutSettings struct { // Timeout is the timeout for every attempt to send data to the backend. // A zero timeout means no timeout. - Timeout time.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"` + Timeout *time.Duration `json:"timeout,omitempty"` } // QueueSettings defines configuration for queueing batches before sending to the consumerSender. type QueueSettings struct { // Enabled indicates whether to not enqueue batches before sending to the consumerSender. - Enabled bool `json:"enabled,omitempty" yaml:"enabled,omitempty"` + Enabled bool `json:"enabled,omitempty"` // NumConsumers is the number of consumers from the queue. - NumConsumers int `json:"num_consumers,omitempty" yaml:"num_consumers,omitempty"` + NumConsumers int `json:"num_consumers,omitempty"` // QueueSize is the maximum number of batches allowed in queue at a given time. - QueueSize int `json:"queue_size,omitempty" yaml:"queue_size,omitempty"` + QueueSize int `json:"queue_size,omitempty"` // StorageID if not empty, enables the persistent storage and uses the component specified // as a storage extension for the persistent queue - StorageID string `json:"storage,omitempty" yaml:"storage,omitempty"` //TODO this is *component.ID at Otel + StorageID string `json:"storage,omitempty"` //TODO this is *component.ID at Otel } // BackOffConfig defines configuration for retrying batches in case of export failure. // The current supported strategy is exponential backoff. type BackOffConfig struct { // Enabled indicates whether to not retry sending batches in case of export failure. - Enabled bool `json:"enabled,omitempty" yaml:"enabled,omitempty"` + Enabled bool `json:"enabled,omitempty"` // InitialInterval the time to wait after the first failure before retrying. - InitialInterval time.Duration `json:"initial_interval,omitempty" yaml:"initial_interval,omitempty"` + InitialInterval time.Duration `json:"initial_interval,omitempty"` // RandomizationFactor is a random factor used to calculate next backoffs // Randomized interval = RetryInterval * (1 ± RandomizationFactor) - RandomizationFactor string `json:"randomization_factor,omitempty" yaml:"randomization_factor,omitempty"` + RandomizationFactor string `json:"randomization_factor,omitempty"` // Multiplier is the value multiplied by the backoff interval bounds - Multiplier string `json:"multiplier,omitempty" yaml:"multiplier,omitempty"` + Multiplier string `json:"multiplier,omitempty"` // MaxInterval is the upper bound on backoff interval. Once this value is reached the delay between // consecutive retries will always be `MaxInterval`. - MaxInterval time.Duration `json:"max_interval,omitempty" yaml:"max_interval,omitempty"` + MaxInterval time.Duration `json:"max_interval,omitempty"` // MaxElapsedTime is the maximum amount of time (including retries) spent trying to send a request/batch. // Once this value is reached, the data is discarded. If set to 0, the retries are never stopped. - MaxElapsedTime time.Duration `json:"max_elapsed_time,omitempty" yaml:"max_elapsed_time,omitempty"` + MaxElapsedTime time.Duration `json:"max_elapsed_time,omitempty"` } // KeepaliveClientConfig exposes the keepalive.ClientParameters to be used by the exporter. // Refer to the original data-structure for the meaning of each parameter: // https://godoc.org/google.golang.org/grpc/keepalive#ClientParameters type KeepaliveClientConfig struct { - Time time.Duration `json:"time,omitempty" yaml:"time,omitempty"` - Timeout time.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"` - PermitWithoutStream bool `json:"permit_without_stream,omitempty" yaml:"permit_without_stream,omitempty"` + Time time.Duration `json:"time,omitempty"` + Timeout time.Duration `json:"timeout,omitempty"` + PermitWithoutStream bool `json:"permit_without_stream,omitempty"` } // ClientConfig defines common settings for a gRPC client configuration. @@ -74,43 +74,43 @@ type GRPCClientConfig struct { // The target to which the exporter is going to send traces or metrics, // using the gRPC protocol. The valid syntax is described at // https://github.com/grpc/grpc/blob/master/doc/naming.md. - Endpoint string `json:"endpoint" yaml:"endpoint"` + Endpoint *string `json:"endpoint"` // The compression key for supported compression types within collector. - Compression configcompression.Type `json:"compression,omitempty" yaml:"compression,omitempty"` + Compression *configcompression.Type `json:"compression,omitempty"` // TLSSetting struct exposes TLS client configuration. - TLSSetting TLSClientSetting `json:"tls,omitempty" yaml:"tls,omitempty"` + TLSSetting *TLSClientSetting `json:"tls,omitempty"` // The keepalive parameters for gRPC client. See grpc.WithKeepaliveParams. // (https://godoc.org/google.golang.org/grpc#WithKeepaliveParams). - Keepalive *KeepaliveClientConfig `json:"keepalive,omitempty" yaml:"keepalive,omitempty"` + Keepalive *KeepaliveClientConfig `json:"keepalive,omitempty"` // ReadBufferSize for gRPC client. See grpc.WithReadBufferSize. // (https://godoc.org/google.golang.org/grpc#WithReadBufferSize). - ReadBufferSize int `json:"read_buffer_size,omitempty" yaml:"read_buffer_size,omitempty"` + ReadBufferSize *int `json:"read_buffer_size,omitempty"` // WriteBufferSize for gRPC gRPC. See grpc.WithWriteBufferSize. // (https://godoc.org/google.golang.org/grpc#WithWriteBufferSize). - WriteBufferSize int `json:"write_buffer_size,omitempty" yaml:"write_buffer_size,omitempty"` + WriteBufferSize *int `json:"write_buffer_size,omitempty"` // WaitForReady parameter configures client to wait for ready state before sending data. // (https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md) - WaitForReady bool `json:"wait_for_ready,omitempty" yaml:"wait_for_ready,omitempty"` + WaitForReady *bool `json:"wait_for_ready,omitempty"` // The headers associated with gRPC requests. - Headers map[string]string `json:"headers,omitempty" yaml:"headers,omitempty"` + Headers *map[string]string `json:"headers,omitempty"` // Sets the balancer in grpclb_policy to discover the servers. Default is pick_first. // https://github.com/grpc/grpc-go/blob/master/examples/features/load_balancing/README.md - BalancerName string `json:"balancer_name,omitempty" yaml:"balancer_name,omitempty"` + BalancerName *string `json:"balancer_name,omitempty"` // WithAuthority parameter configures client to rewrite ":authority" header // (godoc.org/google.golang.org/grpc#WithAuthority) - Authority string `json:"authority,omitempty" yaml:"authority,omitempty"` + Authority *string `json:"authority,omitempty"` // Auth configuration for outgoing RPCs. - Auth *Authentication `json:"auth,omitempty" yaml:"auth,omitempty"` + Auth *Authentication `json:"auth,omitempty"` } // TLSClientSetting contains TLS configurations that are specific to client @@ -118,7 +118,7 @@ type GRPCClientConfig struct { // components configuring TLS client connections. type TLSClientSetting struct { // squash ensures fields are correctly decoded in embedded struct. - TLSSetting `json:",inline" yaml:",inline"` + TLSSetting `json:",inline"` // These are config options specific to client connections. @@ -128,13 +128,13 @@ type TLSClientSetting struct { // (InsecureSkipVerify in the tls Config). Please refer to // https://godoc.org/crypto/tls#Config for more information. // (optional, default false) - Insecure bool `json:"insecure,omitempty" yaml:"insecure,omitempty"` + Insecure bool `json:"insecure,omitempty"` // InsecureSkipVerify will enable TLS but not verify the certificate. - InsecureSkipVerify bool `json:"insecure_skip_verify,omitempty" yaml:"insecure_skip_verify,omitempty"` + InsecureSkipVerify bool `json:"insecure_skip_verify,omitempty"` // ServerName requested by client for virtual hosting. // This sets the ServerName in the TLSConfig. Please refer to // https://godoc.org/crypto/tls#Config for more information. (optional) - ServerName string `json:"server_name_override,omitempty" yaml:"server_name_override,omitempty"` + ServerName string `json:"server_name_override,omitempty"` } // TLSSetting exposes the common client and server TLS configurations. @@ -144,88 +144,88 @@ type TLSSetting struct { // Path to the CA cert. For a client this verifies the server certificate. // For a server this verifies client certificates. If empty uses system root CA. // (optional) - CAFile string `json:"ca_file,omitempty" yaml:"ca_file,omitempty"` + CAFile string `json:"ca_file,omitempty"` // In memory PEM encoded cert. (optional) - CAPem string `json:"ca_pem,omitempty" yaml:"ca_pem,omitempty"` + CAPem string `json:"ca_pem,omitempty"` // Path to the TLS cert to use for TLS required connections. (optional) - CertFile string `json:"cert_file,omitempty" yaml:"cert_file,omitempty"` + CertFile string `json:"cert_file,omitempty"` // In memory PEM encoded TLS cert to use for TLS required connections. (optional) - CertPem string `json:"cert_pem,omitempty" yaml:"cert_pem,omitempty"` + CertPem string `json:"cert_pem,omitempty"` // Path to the TLS key to use for TLS required connections. (optional) - KeyFile string `json:"key_file,omitempty" yaml:"key_file,omitempty"` + KeyFile string `json:"key_file,omitempty"` // In memory PEM encoded TLS key to use for TLS required connections. (optional) - KeyPem string `json:"key_pem,omitempty" yaml:"key_pem,omitempty"` + KeyPem string `json:"key_pem,omitempty"` // MinVersion sets the minimum TLS version that is acceptable. // If not set, TLS 1.2 will be used. (optional) - MinVersion string `json:"min_version,omitempty" yaml:"min_version,omitempty"` + MinVersion string `json:"min_version,omitempty"` // MaxVersion sets the maximum TLS version that is acceptable. // If not set, refer to crypto/tls for defaults. (optional) - MaxVersion string `json:"max_version,omitempty" yaml:"max_version,omitempty"` + MaxVersion string `json:"max_version,omitempty"` // ReloadInterval specifies the duration after which the certificate will be reloaded // If not set, it will never be reloaded (optional) - ReloadInterval time.Duration `json:"reload_interval,omitempty" yaml:"reload_interval,omitempty"` + ReloadInterval time.Duration `json:"reload_interval,omitempty"` } type Authentication struct { // AuthenticatorID specifies the name of the extension to use in order to authenticate the incoming data point. - AuthenticatorID string `json:"authenticator,omitempty"` + AuthenticatorID *string `json:"authenticator,omitempty"` } // ClientConfig defines settings for creating an HTTP client. type HTTPClientConfig struct { // The target URL to send data to (e.g.: http://some.url:9411/v1/traces). - Endpoint string `json:"endpoint,omitempty" yaml:"endpoint,omitempty"` + Endpoint *string `json:"endpoint,omitempty"` // ProxyURL setting for the collector - ProxyURL string `json:"proxy_url,omitempty" yaml:"proxy_url,omitempty"` + ProxyURL *string `json:"proxy_url,omitempty"` // TLSSetting struct exposes TLS client configuration. - TLSSetting TLSClientSetting `json:"tls,omitempty" yaml:"tls,omitempty"` + TLSSetting *TLSClientSetting `json:"tls,omitempty"` // ReadBufferSize for HTTP client. See http.Transport.ReadBufferSize. - ReadBufferSize int `json:"read_buffer_size,omitempty" yaml:"read_buffer_size,omitempty"` + ReadBufferSize *int `json:"read_buffer_size,omitempty"` // WriteBufferSize for HTTP client. See http.Transport.WriteBufferSize. - WriteBufferSize int `json:"write_buffer_size,omitempty" yaml:"write_buffer_size,omitempty"` + WriteBufferSize *int `json:"write_buffer_size,omitempty"` // Timeout parameter configures `http.Client.Timeout`. - Timeout time.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"` + Timeout *time.Duration `json:"timeout,omitempty"` // Additional headers attached to each HTTP request sent by the client. // Existing header values are overwritten if collision happens. // Header values are opaque since they may be sensitive. - Headers map[string]configopaque.String `json:"headers,omitempty" yaml:"headers,omitempty"` + Headers *map[string]configopaque.String `json:"headers,omitempty"` // Auth configuration for outgoing HTTP calls. - Auth Authentication `json:"auth,omitempty" yaml:"auth,omitempty"` + Auth *Authentication `json:"auth,omitempty"` // The compression key for supported compression types within collector. - Compression configcompression.Type `json:"compression,omitempty" yaml:"compression,omitempty"` + Compression *configcompression.Type `json:"compression,omitempty"` // MaxIdleConns is used to set a limit to the maximum idle HTTP connections the client can keep open. // There's an already set value, and we want to override it only if an explicit value provided - MaxIdleConns *int `json:"max_idle_conns,omitempty" yaml:"max_idle_conns,omitempty"` + MaxIdleConns *int `json:"max_idle_conns,omitempty"` // MaxIdleConnsPerHost is used to set a limit to the maximum idle HTTP connections the host can keep open. // There's an already set value, and we want to override it only if an explicit value provided - MaxIdleConnsPerHost *int `json:"max_idle_conns_per_host,omitempty" yaml:"max_idle_conns_per_host,omitempty"` + MaxIdleConnsPerHost *int `json:"max_idle_conns_per_host,omitempty"` // MaxConnsPerHost limits the total number of connections per host, including connections in the dialing, // active, and idle states. // There's an already set value, and we want to override it only if an explicit value provided - MaxConnsPerHost *int `json:"max_conns_per_host,omitempty" yaml:"max_conns_per_host,omitempty"` + MaxConnsPerHost *int `json:"max_conns_per_host,omitempty"` // IdleConnTimeout is the maximum amount of time a connection will remain open before closing itself. // There's an already set value, and we want to override it only if an explicit value provided - IdleConnTimeout *time.Duration `json:"idle_conn_timeout,omitempty" yaml:"idle_conn_timeout,omitempty"` + IdleConnTimeout *time.Duration `json:"idle_conn_timeout,omitempty"` // DisableKeepAlives, if true, disables HTTP keep-alives and will only use the connection to the server // for a single HTTP request. @@ -233,15 +233,15 @@ type HTTPClientConfig struct { // WARNING: enabling this option can result in significant overhead establishing a new HTTP(S) // connection for every request. Before enabling this option please consider whether changes // to idle connection settings can achieve your goal. - DisableKeepAlives bool `json:"disable_keep_alives,omitempty" yaml:"disable_keep_alives,omitempty"` + DisableKeepAlives *bool `json:"disable_keep_alives,omitempty"` // This is needed in case you run into // https://github.com/golang/go/issues/59690 // https://github.com/golang/go/issues/36026 // HTTP2ReadIdleTimeout if the connection has been idle for the configured value send a ping frame for health check // 0s means no health check will be performed. - HTTP2ReadIdleTimeout time.Duration `json:"http2_read_idle_timeout,omitempty" yaml:"http2_read_idle_timeout,omitempty"` + HTTP2ReadIdleTimeout *time.Duration `json:"http2_read_idle_timeout,omitempty"` // HTTP2PingTimeout if there's no response to the ping within the configured value, the connection will be closed. // If not set or set to 0, it defaults to 15s. - HTTP2PingTimeout time.Duration `json:"http2_ping_timeout,omitempty" yaml:"http2_ping_timeout,omitempty"` + HTTP2PingTimeout *time.Duration `json:"http2_ping_timeout,omitempty"` } diff --git a/api/telemetry/v1alpha1/output_types.go b/api/telemetry/v1alpha1/output_types.go index 7b89539..483bf81 100644 --- a/api/telemetry/v1alpha1/output_types.go +++ b/api/telemetry/v1alpha1/output_types.go @@ -15,10 +15,7 @@ package v1alpha1 import ( - "time" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -86,56 +83,58 @@ type BearerAuthConfig struct { // OTLP grpc exporter config ref: https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/otlpexporter/config.go type OTLPGRPC struct { - QueueConfig QueueSettings `json:"sending_queue,omitempty" yaml:"sending_queue,omitempty"` - RetryConfig BackOffConfig `json:"retry_on_failure,omitempty" yaml:"retry_on_failure,omitempty"` - TimeoutSettings `json:",inline" yaml:",inline"` - GRPCClientConfig `json:",inline" yaml:",inline"` + QueueConfig *QueueSettings `json:"sending_queue,omitempty"` + RetryConfig *BackOffConfig `json:"retry_on_failure,omitempty"` + TimeoutSettings `json:",inline"` + GRPCClientConfig `json:",inline"` } type OTLPHTTP struct { - QueueConfig QueueSettings `json:"sending_queue,omitempty" yaml:"sending_queue,omitempty"` - RetryConfig BackOffConfig `json:"retry_on_failure,omitempty" yaml:"retry_on_failure,omitempty"` - HTTPClientConfig `json:",inline" yaml:",inline"` + QueueConfig *QueueSettings `json:"sending_queue,omitempty"` + RetryConfig *BackOffConfig `json:"retry_on_failure,omitempty"` + HTTPClientConfig `json:",inline"` } type Fluentforward struct { - TCPClientSettings `json:",inline" yaml:",inline"` // squash ensures fields are correctly decoded in embedded struct. + TCPClientSettings `json:",inline"` // RequireAck enables the acknowledgement feature. - RequireAck bool `json:"require_ack,omitempty" yaml:"require_ack,omitempty"` + RequireAck *bool `json:"require_ack,omitempty"` // The Fluent tag parameter used for routing - Tag string `json:"tag,omitempty" yaml:"tag,omitempty"` + Tag *string `json:"tag,omitempty"` // CompressGzip enables gzip compression for the payload. - CompressGzip bool `json:"compress_gzip,omitempty" yaml:"compress_gzip,omitempty"` + CompressGzip *bool `json:"compress_gzip,omitempty"` // DefaultLabelsEnabled is a map of default attributes to be added to each log record. - DefaultLabelsEnabled map[string]bool `json:"default_labels_enabled,omitempty" yaml:"default_labels_enabled,omitempty"` + DefaultLabelsEnabled *map[string]bool `json:"default_labels_enabled,omitempty"` - QueueConfig QueueSettings `json:"sending_queue,omitempty" yaml:"sending_queue,omitempty"` - RetryConfig BackOffConfig `json:"retry_on_failure,omitempty" yaml:"retry_on_failure,omitempty"` + QueueConfig *QueueSettings `json:"sending_queue,omitempty"` + RetryConfig *BackOffConfig `json:"retry_on_failure,omitempty"` - Kubernetes *KubernetesMetadata `json:"kubernetes_metadata,omitempty" yaml:"kubernetes_metadata,omitempty"` + Kubernetes *KubernetesMetadata `json:"kubernetes_metadata,omitempty"` } type KubernetesMetadata struct { - Key string `json:"key" yaml:"key,omitempty"` - IncludePodLabels bool `json:"include_pod_labels" yaml:"include_pod_labels,omitempty"` + Key string `json:"key"` + IncludePodLabels bool `json:"include_pod_labels"` } type TCPClientSettings struct { // The target endpoint URI to send data to (e.g.: some.url:24224). - Endpoint string `json:"endpoint,omitempty" yaml:"endpoint,omitempty"` + Endpoint *string `json:"endpoint,omitempty"` + + // +kubebuilder:validation:Format=duration // Connection Timeout parameter configures `net.Dialer`. - ConnectionTimeout time.Duration `json:"connection_timeout,omitempty" yaml:"connection_timeout,omitempty"` + ConnectionTimeout *string `json:"connection_timeout,omitempty"` // TLSSetting struct exposes TLS client configuration. - TLSSetting TLSClientSetting `json:"tls,omitempty" yaml:"tls,omitempty"` + TLSSetting *TLSClientSetting `json:"tls,omitempty"` // SharedKey is used for authorization with the server that knows it. - SharedKey string `json:"shared_key,omitempty" yaml:"shared_key,omitempty"` + SharedKey *string `json:"shared_key,omitempty"` } // OutputStatus defines the observed state of Output @@ -151,7 +150,7 @@ type Output struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` - Spec OutputSpec `json:"spec,omitempty" yaml:"spec,omitempty"` + Spec OutputSpec `json:"spec,omitempty"` Status OutputStatus `json:"status,omitempty"` } @@ -159,7 +158,7 @@ type Output struct { // OutputList contains a list of Output type OutputList struct { - metav1.TypeMeta `json:",inline" yaml:",inline"` + metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` Items []Output `json:"items"` } diff --git a/api/telemetry/v1alpha1/zz_generated.deepcopy.go b/api/telemetry/v1alpha1/zz_generated.deepcopy.go index 844c307..efc0f08 100644 --- a/api/telemetry/v1alpha1/zz_generated.deepcopy.go +++ b/api/telemetry/v1alpha1/zz_generated.deepcopy.go @@ -20,6 +20,7 @@ package v1alpha1 import ( "github.com/cisco-open/operator-tools/pkg/typeoverride" + "go.opentelemetry.io/collector/config/configcompression" "go.opentelemetry.io/collector/config/configopaque" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -30,6 +31,11 @@ import ( // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Authentication) DeepCopyInto(out *Authentication) { *out = *in + if in.AuthenticatorID != nil { + in, out := &in.AuthenticatorID, &out.AuthenticatorID + *out = new(string) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Authentication. @@ -314,16 +320,43 @@ func (in *CollectorStatus) DeepCopy() *CollectorStatus { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Fluentforward) DeepCopyInto(out *Fluentforward) { *out = *in - out.TCPClientSettings = in.TCPClientSettings + in.TCPClientSettings.DeepCopyInto(&out.TCPClientSettings) + if in.RequireAck != nil { + in, out := &in.RequireAck, &out.RequireAck + *out = new(bool) + **out = **in + } + if in.Tag != nil { + in, out := &in.Tag, &out.Tag + *out = new(string) + **out = **in + } + if in.CompressGzip != nil { + in, out := &in.CompressGzip, &out.CompressGzip + *out = new(bool) + **out = **in + } if in.DefaultLabelsEnabled != nil { in, out := &in.DefaultLabelsEnabled, &out.DefaultLabelsEnabled - *out = make(map[string]bool, len(*in)) - for key, val := range *in { - (*out)[key] = val + *out = new(map[string]bool) + if **in != nil { + in, out := *in, *out + *out = make(map[string]bool, len(*in)) + for key, val := range *in { + (*out)[key] = val + } } } - out.QueueConfig = in.QueueConfig - out.RetryConfig = in.RetryConfig + if in.QueueConfig != nil { + in, out := &in.QueueConfig, &out.QueueConfig + *out = new(QueueSettings) + **out = **in + } + if in.RetryConfig != nil { + in, out := &in.RetryConfig, &out.RetryConfig + *out = new(BackOffConfig) + **out = **in + } if in.Kubernetes != nil { in, out := &in.Kubernetes, &out.Kubernetes *out = new(KubernetesMetadata) @@ -344,23 +377,66 @@ func (in *Fluentforward) DeepCopy() *Fluentforward { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *GRPCClientConfig) DeepCopyInto(out *GRPCClientConfig) { *out = *in - out.TLSSetting = in.TLSSetting + if in.Endpoint != nil { + in, out := &in.Endpoint, &out.Endpoint + *out = new(string) + **out = **in + } + if in.Compression != nil { + in, out := &in.Compression, &out.Compression + *out = new(configcompression.Type) + **out = **in + } + if in.TLSSetting != nil { + in, out := &in.TLSSetting, &out.TLSSetting + *out = new(TLSClientSetting) + **out = **in + } if in.Keepalive != nil { in, out := &in.Keepalive, &out.Keepalive *out = new(KeepaliveClientConfig) **out = **in } + if in.ReadBufferSize != nil { + in, out := &in.ReadBufferSize, &out.ReadBufferSize + *out = new(int) + **out = **in + } + if in.WriteBufferSize != nil { + in, out := &in.WriteBufferSize, &out.WriteBufferSize + *out = new(int) + **out = **in + } + if in.WaitForReady != nil { + in, out := &in.WaitForReady, &out.WaitForReady + *out = new(bool) + **out = **in + } if in.Headers != nil { in, out := &in.Headers, &out.Headers - *out = make(map[string]string, len(*in)) - for key, val := range *in { - (*out)[key] = val + *out = new(map[string]string) + if **in != nil { + in, out := *in, *out + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } } } + if in.BalancerName != nil { + in, out := &in.BalancerName, &out.BalancerName + *out = new(string) + **out = **in + } + if in.Authority != nil { + in, out := &in.Authority, &out.Authority + *out = new(string) + **out = **in + } if in.Auth != nil { in, out := &in.Auth, &out.Auth *out = new(Authentication) - **out = **in + (*in).DeepCopyInto(*out) } } @@ -377,15 +453,57 @@ func (in *GRPCClientConfig) DeepCopy() *GRPCClientConfig { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *HTTPClientConfig) DeepCopyInto(out *HTTPClientConfig) { *out = *in - out.TLSSetting = in.TLSSetting + if in.Endpoint != nil { + in, out := &in.Endpoint, &out.Endpoint + *out = new(string) + **out = **in + } + if in.ProxyURL != nil { + in, out := &in.ProxyURL, &out.ProxyURL + *out = new(string) + **out = **in + } + if in.TLSSetting != nil { + in, out := &in.TLSSetting, &out.TLSSetting + *out = new(TLSClientSetting) + **out = **in + } + if in.ReadBufferSize != nil { + in, out := &in.ReadBufferSize, &out.ReadBufferSize + *out = new(int) + **out = **in + } + if in.WriteBufferSize != nil { + in, out := &in.WriteBufferSize, &out.WriteBufferSize + *out = new(int) + **out = **in + } + if in.Timeout != nil { + in, out := &in.Timeout, &out.Timeout + *out = new(timex.Duration) + **out = **in + } if in.Headers != nil { in, out := &in.Headers, &out.Headers - *out = make(map[string]configopaque.String, len(*in)) - for key, val := range *in { - (*out)[key] = val + *out = new(map[string]configopaque.String) + if **in != nil { + in, out := *in, *out + *out = make(map[string]configopaque.String, len(*in)) + for key, val := range *in { + (*out)[key] = val + } } } - out.Auth = in.Auth + if in.Auth != nil { + in, out := &in.Auth, &out.Auth + *out = new(Authentication) + (*in).DeepCopyInto(*out) + } + if in.Compression != nil { + in, out := &in.Compression, &out.Compression + *out = new(configcompression.Type) + **out = **in + } if in.MaxIdleConns != nil { in, out := &in.MaxIdleConns, &out.MaxIdleConns *out = new(int) @@ -406,6 +524,21 @@ func (in *HTTPClientConfig) DeepCopyInto(out *HTTPClientConfig) { *out = new(timex.Duration) **out = **in } + if in.DisableKeepAlives != nil { + in, out := &in.DisableKeepAlives, &out.DisableKeepAlives + *out = new(bool) + **out = **in + } + if in.HTTP2ReadIdleTimeout != nil { + in, out := &in.HTTP2ReadIdleTimeout, &out.HTTP2ReadIdleTimeout + *out = new(timex.Duration) + **out = **in + } + if in.HTTP2PingTimeout != nil { + in, out := &in.HTTP2PingTimeout, &out.HTTP2PingTimeout + *out = new(timex.Duration) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new HTTPClientConfig. @@ -481,9 +614,17 @@ func (in *NamespacedName) DeepCopy() *NamespacedName { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *OTLPGRPC) DeepCopyInto(out *OTLPGRPC) { *out = *in - out.QueueConfig = in.QueueConfig - out.RetryConfig = in.RetryConfig - out.TimeoutSettings = in.TimeoutSettings + if in.QueueConfig != nil { + in, out := &in.QueueConfig, &out.QueueConfig + *out = new(QueueSettings) + **out = **in + } + if in.RetryConfig != nil { + in, out := &in.RetryConfig, &out.RetryConfig + *out = new(BackOffConfig) + **out = **in + } + in.TimeoutSettings.DeepCopyInto(&out.TimeoutSettings) in.GRPCClientConfig.DeepCopyInto(&out.GRPCClientConfig) } @@ -500,8 +641,16 @@ func (in *OTLPGRPC) DeepCopy() *OTLPGRPC { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *OTLPHTTP) DeepCopyInto(out *OTLPHTTP) { *out = *in - out.QueueConfig = in.QueueConfig - out.RetryConfig = in.RetryConfig + if in.QueueConfig != nil { + in, out := &in.QueueConfig, &out.QueueConfig + *out = new(QueueSettings) + **out = **in + } + if in.RetryConfig != nil { + in, out := &in.RetryConfig, &out.RetryConfig + *out = new(BackOffConfig) + **out = **in + } in.HTTPClientConfig.DeepCopyInto(&out.HTTPClientConfig) } @@ -791,7 +940,26 @@ func (in *SubscriptionStatus) DeepCopy() *SubscriptionStatus { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TCPClientSettings) DeepCopyInto(out *TCPClientSettings) { *out = *in - out.TLSSetting = in.TLSSetting + if in.Endpoint != nil { + in, out := &in.Endpoint, &out.Endpoint + *out = new(string) + **out = **in + } + if in.ConnectionTimeout != nil { + in, out := &in.ConnectionTimeout, &out.ConnectionTimeout + *out = new(string) + **out = **in + } + if in.TLSSetting != nil { + in, out := &in.TLSSetting, &out.TLSSetting + *out = new(TLSClientSetting) + **out = **in + } + if in.SharedKey != nil { + in, out := &in.SharedKey, &out.SharedKey + *out = new(string) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TCPClientSettings. @@ -958,6 +1126,11 @@ func (in *TenantStatus) DeepCopy() *TenantStatus { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TimeoutSettings) DeepCopyInto(out *TimeoutSettings) { *out = *in + if in.Timeout != nil { + in, out := &in.Timeout, &out.Timeout + *out = new(timex.Duration) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TimeoutSettings. diff --git a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_outputs.yaml b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_outputs.yaml index 83f7234..e29e5ad 100644 --- a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_outputs.yaml +++ b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_outputs.yaml @@ -138,8 +138,8 @@ spec: type: boolean connection_timeout: description: Connection Timeout parameter configures `net.Dialer`. - format: int64 - type: integer + format: duration + type: string default_labels_enabled: additionalProperties: type: boolean diff --git a/config/crd/bases/telemetry.kube-logging.dev_outputs.yaml b/config/crd/bases/telemetry.kube-logging.dev_outputs.yaml index 83f7234..e29e5ad 100644 --- a/config/crd/bases/telemetry.kube-logging.dev_outputs.yaml +++ b/config/crd/bases/telemetry.kube-logging.dev_outputs.yaml @@ -138,8 +138,8 @@ spec: type: boolean connection_timeout: description: Connection Timeout parameter configures `net.Dialer`. - format: int64 - type: integer + format: duration + type: string default_labels_enabled: additionalProperties: type: boolean diff --git a/go.mod b/go.mod index df47e7d..3232c63 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,6 @@ require ( go.opentelemetry.io/collector/service v0.113.0 go.uber.org/zap v1.27.0 golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 - gopkg.in/yaml.v3 v3.0.1 k8s.io/api v0.30.2 k8s.io/apimachinery v0.30.2 k8s.io/client-go v0.30.2 @@ -166,6 +165,7 @@ require ( google.golang.org/protobuf v1.35.1 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.30.2 // indirect k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20240322212309-b815d8309940 // indirect diff --git a/internal/controller/telemetry/controller_integration_test.go b/internal/controller/telemetry/controller_integration_test.go index 87be70f..18dc03e 100644 --- a/internal/controller/telemetry/controller_integration_test.go +++ b/internal/controller/telemetry/controller_integration_test.go @@ -19,6 +19,7 @@ import ( "time" "github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1" + "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/utils" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" otelv1beta1 "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1" @@ -177,8 +178,8 @@ var _ = Describe("Telemetry controller integration test", func() { Spec: v1alpha1.OutputSpec{ OTLPGRPC: &v1alpha1.OTLPGRPC{ GRPCClientConfig: v1alpha1.GRPCClientConfig{ - Endpoint: "receiver-collector.example-tenant-ns.svc.cluster.local:4317", - TLSSetting: v1alpha1.TLSClientSetting{ + Endpoint: utils.ToPtr("receiver-collector.example-tenant-ns.svc.cluster.local:4317"), + TLSSetting: &v1alpha1.TLSClientSetting{ Insecure: true, }, }, @@ -193,8 +194,8 @@ var _ = Describe("Telemetry controller integration test", func() { Spec: v1alpha1.OutputSpec{ OTLPGRPC: &v1alpha1.OTLPGRPC{ GRPCClientConfig: v1alpha1.GRPCClientConfig{ - Endpoint: "receiver-collector.example-tenant-ns.svc.cluster.local:4317", - TLSSetting: v1alpha1.TLSClientSetting{ + Endpoint: utils.ToPtr("receiver-collector.example-tenant-ns.svc.cluster.local:4317"), + TLSSetting: &v1alpha1.TLSClientSetting{ Insecure: true, }, }, diff --git a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go index 0e1565f..8a9e4d3 100644 --- a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go +++ b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go @@ -35,6 +35,7 @@ import ( "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/pipeline" "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/pipeline/components" "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/pipeline/components/connector" + "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/utils" ) //go:embed otel_col_conf_test_fixtures/complex.yaml @@ -194,8 +195,8 @@ func TestOtelColConfComplex(t *testing.T) { Spec: v1alpha1.OutputSpec{ OTLPGRPC: &v1alpha1.OTLPGRPC{ GRPCClientConfig: v1alpha1.GRPCClientConfig{ - Endpoint: "receiver-collector.example-tenant-a-ns.svc.cluster.local:4317", - TLSSetting: v1alpha1.TLSClientSetting{ + Endpoint: utils.ToPtr("receiver-collector.example-tenant-a-ns.svc.cluster.local:4317"), + TLSSetting: &v1alpha1.TLSClientSetting{ Insecure: true, }, }, @@ -227,8 +228,8 @@ func TestOtelColConfComplex(t *testing.T) { Spec: v1alpha1.OutputSpec{ OTLPGRPC: &v1alpha1.OTLPGRPC{ GRPCClientConfig: v1alpha1.GRPCClientConfig{ - Endpoint: "receiver-collector.example-tenant-a-ns.svc.cluster.local:4317", - TLSSetting: v1alpha1.TLSClientSetting{ + Endpoint: utils.ToPtr("receiver-collector.example-tenant-a-ns.svc.cluster.local:4317"), + TLSSetting: &v1alpha1.TLSClientSetting{ Insecure: true, }, }, @@ -245,8 +246,8 @@ func TestOtelColConfComplex(t *testing.T) { Spec: v1alpha1.OutputSpec{ OTLPHTTP: &v1alpha1.OTLPHTTP{ HTTPClientConfig: v1alpha1.HTTPClientConfig{ - Endpoint: "loki.example-tenant-a-ns.svc.cluster.local:4317", - TLSSetting: v1alpha1.TLSClientSetting{ + Endpoint: utils.ToPtr[string]("loki.example-tenant-a-ns.svc.cluster.local:4317"), + TLSSetting: &v1alpha1.TLSClientSetting{ Insecure: true, }, }, @@ -263,8 +264,8 @@ func TestOtelColConfComplex(t *testing.T) { Spec: v1alpha1.OutputSpec{ Fluentforward: &v1alpha1.Fluentforward{ TCPClientSettings: v1alpha1.TCPClientSettings{ - Endpoint: "fluentforward.example-tenant-ns.svc.cluster.local:8888", - TLSSetting: v1alpha1.TLSClientSetting{ + Endpoint: utils.ToPtr("fluentd.example-tenant-b-ns.svc.cluster.local:24224"), + TLSSetting: &v1alpha1.TLSClientSetting{ Insecure: true, }, }, diff --git a/internal/controller/telemetry/pipeline/components/exporter/fluent_forward_exporter.go b/internal/controller/telemetry/pipeline/components/exporter/fluent_forward_exporter.go index 90f5eeb..2bb6c25 100644 --- a/internal/controller/telemetry/pipeline/components/exporter/fluent_forward_exporter.go +++ b/internal/controller/telemetry/pipeline/components/exporter/fluent_forward_exporter.go @@ -16,11 +16,11 @@ package exporter import ( "context" + "encoding/json" "errors" "fmt" "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/pipeline/components" - "gopkg.in/yaml.v3" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -30,18 +30,16 @@ func GenerateFluentforwardExporters(ctx context.Context, outputsWithSecretData [ result := make(map[string]any) for _, output := range outputsWithSecretData { if output.Output.Spec.Fluentforward != nil { - // TODO: add proper error handling - name := fmt.Sprintf("fluentforwardexporter/%s_%s", output.Output.Namespace, output.Output.Name) - fluentForwardMarshaled, err := yaml.Marshal(output.Output.Spec.Fluentforward) + fluentForwardMarshaled, err := json.Marshal(output.Output.Spec.Fluentforward) if err != nil { logger.Error(errors.New("failed to compile config for output"), "failed to compile config for output %q", output.Output.NamespacedName().String()) } - var fluetForwardValues map[string]any - if err := yaml.Unmarshal(fluentForwardMarshaled, &fluetForwardValues); err != nil { + var fluentForwardValues map[string]any + if err := json.Unmarshal(fluentForwardMarshaled, &fluentForwardValues); err != nil { logger.Error(errors.New("failed to compile config for output"), "failed to compile config for output %q", output.Output.NamespacedName().String()) } - result[name] = fluetForwardValues + result[fmt.Sprintf("fluentforwardexporter/%s_%s", output.Output.Namespace, output.Output.Name)] = fluentForwardValues } } diff --git a/internal/controller/telemetry/pipeline/components/exporter/fluent_forward_exporter_test.go b/internal/controller/telemetry/pipeline/components/exporter/fluent_forward_exporter_test.go new file mode 100644 index 0000000..37bbae0 --- /dev/null +++ b/internal/controller/telemetry/pipeline/components/exporter/fluent_forward_exporter_test.go @@ -0,0 +1,114 @@ +// Copyright © 2024 Kube logging authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporter + +import ( + "context" + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1" + "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/pipeline/components" + "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/utils" + "github.com/stretchr/testify/assert" +) + +func TestGenerateFluentforwardExporters(t *testing.T) { + tests := []struct { + name string + outputsWithSecretData []components.OutputWithSecretData + expectedResult map[string]any + }{ + { + name: "Valid config", + outputsWithSecretData: []components.OutputWithSecretData{ + { + Output: v1alpha1.Output{ + ObjectMeta: metav1.ObjectMeta{ + Name: "output1", + Namespace: "default", + }, + Spec: v1alpha1.OutputSpec{ + Fluentforward: &v1alpha1.Fluentforward{ + TCPClientSettings: v1alpha1.TCPClientSettings{ + Endpoint: utils.ToPtr("http://example.com"), + }, + }, + }, + }, + }, + }, + expectedResult: map[string]any{ + "fluentforwardexporter/default_output1": map[string]any{ + "endpoint": "http://example.com", + }, + }, + }, + { + name: "All fields set, tls settings omitted", + outputsWithSecretData: []components.OutputWithSecretData{ + { + Output: v1alpha1.Output{ + ObjectMeta: metav1.ObjectMeta{ + Name: "output3", + Namespace: "default", + }, + Spec: v1alpha1.OutputSpec{ + Fluentforward: &v1alpha1.Fluentforward{ + TCPClientSettings: v1alpha1.TCPClientSettings{ + Endpoint: utils.ToPtr("http://example.com"), + ConnectionTimeout: utils.ToPtr("30s"), + SharedKey: utils.ToPtr("shared-key"), + }, + RequireAck: utils.ToPtr(true), + Tag: utils.ToPtr("tag"), + CompressGzip: utils.ToPtr(true), + DefaultLabelsEnabled: &map[string]bool{"label1": true}, + QueueConfig: &v1alpha1.QueueSettings{}, + RetryConfig: &v1alpha1.BackOffConfig{}, + Kubernetes: &v1alpha1.KubernetesMetadata{Key: "key", IncludePodLabels: true}, + }, + }, + }, + }, + }, + expectedResult: map[string]any{ + "fluentforwardexporter/default_output3": map[string]any{ + "endpoint": "http://example.com", + "connection_timeout": "30s", + "shared_key": "shared-key", + "require_ack": true, + "tag": "tag", + "compress_gzip": true, + "default_labels_enabled": map[string]any{"label1": true}, + "sending_queue": map[string]any{}, + "retry_on_failure": map[string]any{}, + "kubernetes_metadata": map[string]any{ + "key": "key", + "include_pod_labels": true, + }, + }, + }, + }, + } + + for _, tt := range tests { + ttp := tt + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, ttp.expectedResult, GenerateFluentforwardExporters(context.TODO(), ttp.outputsWithSecretData)) + }) + } +} diff --git a/internal/controller/telemetry/pipeline/components/exporter/otlp_exporter.go b/internal/controller/telemetry/pipeline/components/exporter/otlp_exporter.go index f69b53c..f6c9184 100644 --- a/internal/controller/telemetry/pipeline/components/exporter/otlp_exporter.go +++ b/internal/controller/telemetry/pipeline/components/exporter/otlp_exporter.go @@ -16,12 +16,13 @@ package exporter import ( "context" + "encoding/json" "errors" "fmt" "github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1" "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/pipeline/components" - "gopkg.in/yaml.v3" + "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/utils" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -31,27 +32,25 @@ func GenerateOTLPGRPCExporters(ctx context.Context, outputsWithSecretData []comp result := make(map[string]any) for _, output := range outputsWithSecretData { if output.Output.Spec.OTLPGRPC != nil { - name := components.GetExporterNameForOutput(output.Output) - if output.Output.Spec.Authentication != nil { if output.Output.Spec.Authentication.BasicAuth != nil { output.Output.Spec.OTLPGRPC.Auth = &v1alpha1.Authentication{ - AuthenticatorID: fmt.Sprintf("basicauth/%s_%s", output.Output.Namespace, output.Output.Name)} + AuthenticatorID: utils.ToPtr(fmt.Sprintf("basicauth/%s_%s", output.Output.Namespace, output.Output.Name))} } else if output.Output.Spec.Authentication.BearerAuth != nil { output.Output.Spec.OTLPGRPC.Auth = &v1alpha1.Authentication{ - AuthenticatorID: fmt.Sprintf("bearertokenauth/%s_%s", output.Output.Namespace, output.Output.Name)} + AuthenticatorID: utils.ToPtr(fmt.Sprintf("bearertokenauth/%s_%s", output.Output.Namespace, output.Output.Name))} } } - otlpGrpcValuesMarshaled, err := yaml.Marshal(output.Output.Spec.OTLPGRPC) + otlpGrpcValuesMarshaled, err := json.Marshal(output.Output.Spec.OTLPGRPC) if err != nil { logger.Error(errors.New("failed to compile config for output"), "failed to compile config for output %q", output.Output.NamespacedName().String()) } - var otlpGrpcValues map[string]any - if err := yaml.Unmarshal(otlpGrpcValuesMarshaled, &otlpGrpcValues); err != nil { + var otlpGRPCValues map[string]any + if err := json.Unmarshal(otlpGrpcValuesMarshaled, &otlpGRPCValues); err != nil { logger.Error(errors.New("failed to compile config for output"), "failed to compile config for output %q", output.Output.NamespacedName().String()) } - result[name] = otlpGrpcValues + result[components.GetExporterNameForOutput(output.Output)] = otlpGRPCValues } } diff --git a/internal/controller/telemetry/pipeline/components/exporter/otlp_exporter_test.go b/internal/controller/telemetry/pipeline/components/exporter/otlp_exporter_test.go new file mode 100644 index 0000000..53b896e --- /dev/null +++ b/internal/controller/telemetry/pipeline/components/exporter/otlp_exporter_test.go @@ -0,0 +1,260 @@ +// Copyright © 2024 Kube logging authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporter + +import ( + "context" + "testing" + "time" + + "go.opentelemetry.io/collector/config/configcompression" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1" + "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/pipeline/components" + "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/utils" + "github.com/stretchr/testify/assert" +) + +func TestGenerateOTLPGRPCExporters(t *testing.T) { + tests := []struct { + name string + outputsWithSecretData []components.OutputWithSecretData + expectedResult map[string]any + }{ + { + name: "Basic auth", + outputsWithSecretData: []components.OutputWithSecretData{ + { + Output: v1alpha1.Output{ + ObjectMeta: metav1.ObjectMeta{ + Name: "output1", + Namespace: "default", + }, + Spec: v1alpha1.OutputSpec{ + OTLPGRPC: &v1alpha1.OTLPGRPC{ + GRPCClientConfig: v1alpha1.GRPCClientConfig{ + Endpoint: utils.ToPtr("http://example.com"), + }, + }, + Authentication: &v1alpha1.OutputAuth{ + BasicAuth: &v1alpha1.BasicAuthConfig{ + SecretRef: &corev1.SecretReference{ + Name: "secret-name", + Namespace: "secret-ns", + }, + UsernameField: "username", + PasswordField: "password", + }, + }, + }, + }, + Secret: corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "secret-name", + Namespace: "secret-ns", + }, + Data: map[string][]byte{ + "username": []byte("user"), + "password": []byte("pass"), + }, + }, + }, + }, + expectedResult: map[string]any{ + "otlp/default_output1": map[string]any{ + "endpoint": "http://example.com", + "auth": map[string]any{ + "authenticator": "basicauth/default_output1", + }, + }, + }, + }, + { + name: "Bearer auth", + outputsWithSecretData: []components.OutputWithSecretData{ + { + Output: v1alpha1.Output{ + ObjectMeta: metav1.ObjectMeta{ + Name: "output2", + Namespace: "default", + }, + Spec: v1alpha1.OutputSpec{ + OTLPGRPC: &v1alpha1.OTLPGRPC{ + GRPCClientConfig: v1alpha1.GRPCClientConfig{ + Endpoint: utils.ToPtr("http://example.com"), + }, + }, + Authentication: &v1alpha1.OutputAuth{ + BearerAuth: &v1alpha1.BearerAuthConfig{ + SecretRef: &corev1.SecretReference{ + Name: "secret-name", + Namespace: "secret-ns", + }, + TokenField: "token", + }, + }, + }, + }, + Secret: corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "secret-name", + Namespace: "secret-ns", + }, + Data: map[string][]byte{ + "token": []byte("token-value"), + }, + }, + }, + }, + expectedResult: map[string]any{ + "otlp/default_output2": map[string]any{ + "endpoint": "http://example.com", + "auth": map[string]any{ + "authenticator": "bearertokenauth/default_output2", + }, + }, + }, + }, + { + name: "No auth", + outputsWithSecretData: []components.OutputWithSecretData{ + { + Output: v1alpha1.Output{ + ObjectMeta: metav1.ObjectMeta{ + Name: "output3", + Namespace: "default", + }, + Spec: v1alpha1.OutputSpec{ + OTLPGRPC: &v1alpha1.OTLPGRPC{ + GRPCClientConfig: v1alpha1.GRPCClientConfig{ + Endpoint: utils.ToPtr("http://example.com"), + }, + }, + }, + }, + }, + }, + expectedResult: map[string]any{ + "otlp/default_output3": map[string]any{ + "endpoint": "http://example.com", + }, + }, + }, + { + name: "All fields set", + outputsWithSecretData: []components.OutputWithSecretData{ + { + Output: v1alpha1.Output{ + ObjectMeta: metav1.ObjectMeta{ + Name: "output4", + Namespace: "default", + }, + Spec: v1alpha1.OutputSpec{ + OTLPGRPC: &v1alpha1.OTLPGRPC{ + QueueConfig: &v1alpha1.QueueSettings{ + Enabled: true, + NumConsumers: 10, + QueueSize: 100, + StorageID: "storage-id", + }, + RetryConfig: &v1alpha1.BackOffConfig{ + Enabled: true, + InitialInterval: 5 * time.Second, + RandomizationFactor: "0.1", + Multiplier: "2.0", + MaxInterval: 10 * time.Second, + MaxElapsedTime: 60 * time.Second, + }, + TimeoutSettings: v1alpha1.TimeoutSettings{ + Timeout: utils.ToPtr(5 * time.Second), + }, + GRPCClientConfig: v1alpha1.GRPCClientConfig{ + Endpoint: utils.ToPtr("http://example.com"), + Compression: utils.ToPtr(configcompression.Type("gzip")), + TLSSetting: &v1alpha1.TLSClientSetting{ + Insecure: true, + InsecureSkipVerify: true, + ServerName: "server-name", + }, + Keepalive: &v1alpha1.KeepaliveClientConfig{ + Time: 5 * time.Second, + Timeout: 5 * time.Second, + PermitWithoutStream: true, + }, + ReadBufferSize: utils.ToPtr(1024), + WriteBufferSize: utils.ToPtr(1024), + WaitForReady: utils.ToPtr(true), + Headers: &map[string]string{ + "header1": "value1", + }, + BalancerName: utils.ToPtr("round_robin"), + Authority: utils.ToPtr("authority"), + }, + }, + }, + }, + }, + }, + expectedResult: map[string]any{ + "otlp/default_output4": map[string]any{ + "endpoint": "http://example.com", + "tls": map[string]any{ + "insecure": true, + "insecure_skip_verify": true, + "server_name_override": "server-name", + }, + "compression": "gzip", + "keepalive": map[string]any{ + "time": float64(5 * time.Second), + "timeout": float64(5 * time.Second), + "permit_without_stream": true, + }, + "read_buffer_size": float64(1024), + "write_buffer_size": float64(1024), + "wait_for_ready": true, + "headers": map[string]any{ + "header1": "value1", + }, + "balancer_name": "round_robin", + "authority": "authority", + "sending_queue": map[string]any{ + "enabled": true, + "num_consumers": float64(10), + "queue_size": float64(100), + "storage": "storage-id", + }, + "retry_on_failure": map[string]any{ + "enabled": true, + "initial_interval": float64(5 * time.Second), + "randomization_factor": "0.1", + "multiplier": "2.0", + "max_interval": float64(10 * time.Second), + "max_elapsed_time": float64(60 * time.Second), + }, + "timeout": float64(5 * time.Second), + }, + }, + }, + } + + for _, tt := range tests { + ttp := tt + t.Run(ttp.name, func(t *testing.T) { + assert.Equal(t, ttp.expectedResult, GenerateOTLPGRPCExporters(context.TODO(), ttp.outputsWithSecretData)) + }) + } +} diff --git a/internal/controller/telemetry/pipeline/components/exporter/otlphttp_exporter.go b/internal/controller/telemetry/pipeline/components/exporter/otlphttp_exporter.go index 2a4d761..c03f1ae 100644 --- a/internal/controller/telemetry/pipeline/components/exporter/otlphttp_exporter.go +++ b/internal/controller/telemetry/pipeline/components/exporter/otlphttp_exporter.go @@ -16,11 +16,13 @@ package exporter import ( "context" + "encoding/json" "errors" "fmt" + "github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1" "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/pipeline/components" - "gopkg.in/yaml.v3" + "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/utils" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -30,25 +32,25 @@ func GenerateOTLPHTTPExporters(ctx context.Context, outputsWithSecretData []comp result := make(map[string]any) for _, output := range outputsWithSecretData { if output.Output.Spec.OTLPHTTP != nil { - name := components.GetExporterNameForOutput(output.Output) - if output.Output.Spec.Authentication != nil { if output.Output.Spec.Authentication.BasicAuth != nil { - output.Output.Spec.OTLPHTTP.Auth.AuthenticatorID = fmt.Sprintf("basicauth/%s_%s", output.Output.Namespace, output.Output.Name) + output.Output.Spec.OTLPHTTP.Auth = &v1alpha1.Authentication{ + AuthenticatorID: utils.ToPtr(fmt.Sprintf("basicauth/%s_%s", output.Output.Namespace, output.Output.Name))} } else if output.Output.Spec.Authentication.BearerAuth != nil { - output.Output.Spec.OTLPHTTP.Auth.AuthenticatorID = fmt.Sprintf("bearertokenauth/%s_%s", output.Output.Namespace, output.Output.Name) + output.Output.Spec.OTLPHTTP.Auth = &v1alpha1.Authentication{ + AuthenticatorID: utils.ToPtr(fmt.Sprintf("bearertokenauth/%s_%s", output.Output.Namespace, output.Output.Name))} } } - otlpHttpValuesMarshaled, err := yaml.Marshal(output.Output.Spec.OTLPHTTP) + otlpHttpValuesMarshaled, err := json.Marshal(output.Output.Spec.OTLPHTTP) if err != nil { logger.Error(errors.New("failed to compile config for output"), "failed to compile config for output %q", output.Output.NamespacedName().String()) } var otlpHttpValues map[string]any - if err := yaml.Unmarshal(otlpHttpValuesMarshaled, &otlpHttpValues); err != nil { + if err := json.Unmarshal(otlpHttpValuesMarshaled, &otlpHttpValues); err != nil { logger.Error(errors.New("failed to compile config for output"), "failed to compile config for output %q", output.Output.NamespacedName().String()) } - result[name] = otlpHttpValues + result[components.GetExporterNameForOutput(output.Output)] = otlpHttpValues } } diff --git a/internal/controller/telemetry/pipeline/components/exporter/otlphttp_exporter_test.go b/internal/controller/telemetry/pipeline/components/exporter/otlphttp_exporter_test.go new file mode 100644 index 0000000..a66c2b8 --- /dev/null +++ b/internal/controller/telemetry/pipeline/components/exporter/otlphttp_exporter_test.go @@ -0,0 +1,259 @@ +// Copyright © 2024 Kube logging authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporter + +import ( + "context" + "testing" + "time" + + "go.opentelemetry.io/collector/config/configcompression" + "go.opentelemetry.io/collector/config/configopaque" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1" + "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/pipeline/components" + "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/utils" + "github.com/stretchr/testify/assert" +) + +func TestGenerateOTLPHTTPExporters(t *testing.T) { + tests := []struct { + name string + outputsWithSecretData []components.OutputWithSecretData + expectedResult map[string]any + }{ + { + name: "Basic auth", + outputsWithSecretData: []components.OutputWithSecretData{ + { + Output: v1alpha1.Output{ + ObjectMeta: metav1.ObjectMeta{ + Name: "output1", + Namespace: "default", + }, + Spec: v1alpha1.OutputSpec{ + OTLPHTTP: &v1alpha1.OTLPHTTP{ + HTTPClientConfig: v1alpha1.HTTPClientConfig{ + Endpoint: utils.ToPtr("http://example.com"), + }, + }, + Authentication: &v1alpha1.OutputAuth{ + BasicAuth: &v1alpha1.BasicAuthConfig{ + SecretRef: &corev1.SecretReference{ + Name: "secret-name", + Namespace: "secret-ns", + }, + UsernameField: "username", + PasswordField: "password", + }, + }, + }, + }, + Secret: corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "secret-name", + Namespace: "secret-ns", + }, + Data: map[string][]byte{ + "username": []byte("user"), + "password": []byte("pass"), + }, + }, + }, + }, + expectedResult: map[string]any{ + "otlphttp/default_output1": map[string]any{ + "endpoint": "http://example.com", + "auth": map[string]any{ + "authenticator": "basicauth/default_output1", + }, + }, + }, + }, + { + name: "Bearer auth", + outputsWithSecretData: []components.OutputWithSecretData{ + { + Output: v1alpha1.Output{ + ObjectMeta: metav1.ObjectMeta{ + Name: "output2", + Namespace: "default", + }, + Spec: v1alpha1.OutputSpec{ + OTLPHTTP: &v1alpha1.OTLPHTTP{ + HTTPClientConfig: v1alpha1.HTTPClientConfig{ + Endpoint: utils.ToPtr("http://example.com"), + }, + }, + Authentication: &v1alpha1.OutputAuth{ + BearerAuth: &v1alpha1.BearerAuthConfig{ + SecretRef: &corev1.SecretReference{ + Name: "secret-name", + Namespace: "secret-ns", + }, + TokenField: "token", + }, + }, + }, + }, + Secret: corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "secret-name", + Namespace: "secret-ns", + }, + Data: map[string][]byte{ + "token": []byte("token-value"), + }, + }, + }, + }, + expectedResult: map[string]any{ + "otlphttp/default_output2": map[string]any{ + "endpoint": "http://example.com", + "auth": map[string]any{ + "authenticator": "bearertokenauth/default_output2", + }, + }, + }, + }, + { + name: "No auth", + outputsWithSecretData: []components.OutputWithSecretData{ + { + Output: v1alpha1.Output{ + ObjectMeta: metav1.ObjectMeta{ + Name: "output3", + Namespace: "default", + }, + Spec: v1alpha1.OutputSpec{ + OTLPHTTP: &v1alpha1.OTLPHTTP{ + HTTPClientConfig: v1alpha1.HTTPClientConfig{ + Endpoint: utils.ToPtr("http://example.com"), + }, + }, + }, + }, + }, + }, + expectedResult: map[string]any{ + "otlphttp/default_output3": map[string]any{ + "endpoint": "http://example.com", + }, + }, + }, + { + name: "All fields set", + outputsWithSecretData: []components.OutputWithSecretData{ + { + Output: v1alpha1.Output{ + ObjectMeta: metav1.ObjectMeta{ + Name: "output4", + Namespace: "default", + }, + Spec: v1alpha1.OutputSpec{ + OTLPHTTP: &v1alpha1.OTLPHTTP{ + QueueConfig: &v1alpha1.QueueSettings{ + Enabled: true, + NumConsumers: 10, + QueueSize: 100, + StorageID: "storage-id", + }, + RetryConfig: &v1alpha1.BackOffConfig{ + Enabled: true, + InitialInterval: 5 * time.Second, + RandomizationFactor: "0.1", + Multiplier: "2.0", + MaxInterval: 10 * time.Second, + MaxElapsedTime: 60 * time.Second, + }, + HTTPClientConfig: v1alpha1.HTTPClientConfig{ + Endpoint: utils.ToPtr("http://example.com"), + ProxyURL: utils.ToPtr("http://proxy.example.com"), + TLSSetting: &v1alpha1.TLSClientSetting{ + Insecure: true, + InsecureSkipVerify: true, + ServerName: "server-name", + }, + ReadBufferSize: utils.ToPtr(1024), + WriteBufferSize: utils.ToPtr(1024), + Timeout: utils.ToPtr(5 * time.Second), + Headers: &map[string]configopaque.String{ + "header1": configopaque.String("value1"), + }, + Compression: utils.ToPtr(configcompression.Type("gzip")), + MaxIdleConns: utils.ToPtr(10), + MaxIdleConnsPerHost: utils.ToPtr(10), + MaxConnsPerHost: utils.ToPtr(10), + IdleConnTimeout: utils.ToPtr(5 * time.Second), + DisableKeepAlives: utils.ToPtr(true), + HTTP2ReadIdleTimeout: utils.ToPtr(5 * time.Second), + HTTP2PingTimeout: utils.ToPtr(5 * time.Second), + }, + }, + }, + }, + }, + }, + expectedResult: map[string]any{ + "otlphttp/default_output4": map[string]any{ + "endpoint": "http://example.com", + "proxy_url": "http://proxy.example.com", + "tls": map[string]any{ + "insecure": true, + "insecure_skip_verify": true, + "server_name_override": "server-name", + }, + "read_buffer_size": float64(1024), + "write_buffer_size": float64(1024), + "timeout": float64(5 * time.Second), + "headers": map[string]any{ + "header1": configopaque.String("value1").String(), + }, + "compression": "gzip", + "max_idle_conns": float64(10), + "max_idle_conns_per_host": float64(10), + "max_conns_per_host": float64(10), + "idle_conn_timeout": float64(5 * time.Second), + "disable_keep_alives": true, + "http2_read_idle_timeout": float64(5 * time.Second), + "http2_ping_timeout": float64(5 * time.Second), + "sending_queue": map[string]any{ + "enabled": true, + "num_consumers": float64(10), + "queue_size": float64(100), + "storage": "storage-id", + }, + "retry_on_failure": map[string]any{ + "enabled": true, + "initial_interval": float64(5 * time.Second), + "randomization_factor": "0.1", + "multiplier": "2.0", + "max_interval": float64(10 * time.Second), + "max_elapsed_time": float64(60 * time.Second), + }, + }, + }, + }, + } + + for _, tt := range tests { + ttp := tt + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, ttp.expectedResult, GenerateOTLPHTTPExporters(context.TODO(), ttp.outputsWithSecretData)) + }) + } +} diff --git a/internal/controller/telemetry/pipeline/components/exporter/prometheus_exporter.go b/internal/controller/telemetry/pipeline/components/exporter/prometheus_exporter.go index 3d7e9be..9358057 100644 --- a/internal/controller/telemetry/pipeline/components/exporter/prometheus_exporter.go +++ b/internal/controller/telemetry/pipeline/components/exporter/prometheus_exporter.go @@ -107,7 +107,7 @@ type PrometheusExporterConfig struct { MetricExpiration time.Duration `json:"metric_expiration,omitempty"` // ResourceToTelemetrySettings defines configuration for converting resource attributes to metric labels. - ResourceToTelemetrySettings ResourceToTelemetrySettings `json:"resource_to_telemetry_conversion,omitempty"` + ResourceToTelemetrySettings *ResourceToTelemetrySettings `json:"resource_to_telemetry_conversion,omitempty"` // EnableOpenMetrics enables the use of the OpenMetrics encoding option for the prometheus exporter. EnableOpenMetrics bool `json:"enable_open_metrics,omitempty"` diff --git a/internal/controller/telemetry/utils/utils.go b/internal/controller/telemetry/utils/utils.go index 13d6af6..9ca23da 100644 --- a/internal/controller/telemetry/utils/utils.go +++ b/internal/controller/telemetry/utils/utils.go @@ -26,3 +26,11 @@ func SortNamespacedNames(names []v1alpha1.NamespacedName) { return strings.Compare(a.String(), b.String()) }) } + +func ToPtr[T any](v T) *T { + return &v +} + +func ToValue[T any](v *T) T { + return *v +}