From 76bbe2292d7eb027e6bfeb684605643fdc2f2841 Mon Sep 17 00:00:00 2001 From: Peter Wilcsinszky Date: Wed, 6 Nov 2019 16:45:32 +0100 Subject: [PATCH] Fluentbit support for filesystem buffer storage and rolling upgrade on config change (#213) * Fluentbit support for filesystem buffer storage and rolling upgrade on config change * Separate in-container path of the buffer from the hostpath used for the mount. Setup postionDb by default for fluent-bit. Add docs. * add some lint checks back and fix lint errors * restore sample config, reword docs --- .golangci.yml | 2 - api/v1beta1/fluentbit_types.go | 46 +++++++++++------ api/v1beta1/logging_types.go | 25 +++++++-- api/v1beta1/zz_generated.deepcopy.go | 21 ++++++++ cmd/docs.go | 3 -- .../logging.banzaicloud.io_loggings.yaml | 51 +++++++++++++++++++ docs/crds.md | 4 +- docs/fluentbit.md | 34 ++++++++++++- pkg/k8sutil/resource.go | 6 --- pkg/model/render/fluent.go | 1 - pkg/model/render/fluent_test.go | 6 +-- pkg/model/types/stringmaps.go | 1 - pkg/resources/fluentbit/config.go | 11 ++-- pkg/resources/fluentbit/configsecret.go | 34 ++++++++----- pkg/resources/fluentbit/daemonset.go | 34 +++++++++---- pkg/resources/fluentbit/fluentbit.go | 15 ++++-- pkg/resources/fluentbit/psp.go | 3 -- pkg/resources/fluentbit/service.go | 1 - pkg/resources/fluentd/psp.go | 4 -- pkg/resources/fluentd/rbac.go | 3 -- pkg/resources/fluentd/service.go | 1 - pkg/resources/templates/templates.go | 8 +++ 22 files changed, 238 insertions(+), 76 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 14a42208e..ba6f2e588 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -33,10 +33,8 @@ linters: - stylecheck - staticcheck - gosimple - - whitespace - gocritic - godox - - interfacer - gocyclo issues: diff --git a/api/v1beta1/fluentbit_types.go b/api/v1beta1/fluentbit_types.go index f34966212..ba3d25eb7 100644 --- a/api/v1beta1/fluentbit_types.go +++ b/api/v1beta1/fluentbit_types.go @@ -24,21 +24,23 @@ import ( // FluentbitSpec defines the desired state of Fluentbit type FluentbitSpec struct { - Annotations map[string]string `json:"annotations,omitempty"` - Image ImageSpec `json:"image,omitempty"` - TLS FluentbitTLS `json:"tls,omitempty"` - TargetHost string `json:"targetHost,omitempty"` - TargetPort int32 `json:"targetPort,omitempty"` - Resources corev1.ResourceRequirements `json:"resources,omitempty"` - Parser string `json:"parser,omitempty"` - Tolerations []corev1.Toleration `json:"tolerations,omitempty"` - Metrics *Metrics `json:"metrics,omitempty"` - Security *Security `json:"security,omitempty"` - PositionDB *KubernetesStorage `json:"position_db,omitempty"` - MountPath string `json:"mountPath,omitempty"` - InputTail InputTail `json:"inputTail,omitempty"` - FilterKubernetes FilterKubernetes `json:"filterKubernetes,omitempty"` - CustomConfigSecret string `json:"customConfigSecret,omitempty"` + Annotations map[string]string `json:"annotations,omitempty"` + Image ImageSpec `json:"image,omitempty"` + TLS FluentbitTLS `json:"tls,omitempty"` + TargetHost string `json:"targetHost,omitempty"` + TargetPort int32 `json:"targetPort,omitempty"` + Resources corev1.ResourceRequirements `json:"resources,omitempty"` + Parser string `json:"parser,omitempty"` + Tolerations []corev1.Toleration `json:"tolerations,omitempty"` + Metrics *Metrics `json:"metrics,omitempty"` + Security *Security `json:"security,omitempty"` + PositionDB *KubernetesStorage `json:"position_db,omitempty"` + MountPath string `json:"mountPath,omitempty"` + InputTail InputTail `json:"inputTail,omitempty"` + FilterKubernetes FilterKubernetes `json:"filterKubernetes,omitempty"` + BufferStorage BufferStorage `json:"bufferStorage,omitempty"` + BufferStorageVolume *KubernetesStorage `json:"bufferStorageVolume,omitempty"` + CustomConfigSecret string `json:"customConfigSecret,omitempty"` } // +kubebuilder:object:generate=true @@ -63,8 +65,22 @@ func (spec FluentbitSpec) GetPrometheusPortFromAnnotation() int32 { return int32(port) } +// BufferStorage is the Service Section Configuration of fluent-bit +type BufferStorage struct { + // Set an optional location in the file system to store streams and chunks of data. If this parameter is not set, Input plugins can only use in-memory buffering. + StoragePath string `json:"storage.path,omitempty"` + // Configure the synchronization mode used to store the data into the file system. It can take the values normal or full. (default:normal) + StorageSync string `json:"storage.sync,omitempty"` + // Enable the data integrity check when writing and reading data from the filesystem. The storage layer uses the CRC32 algorithm. (default:Off) + StorageChecksum string `json:"storage.checksum,omitempty"` + // If storage.path is set, Fluent Bit will look for data chunks that were not delivered and are still in the storage layer, these are called backlog data. This option configure a hint of maximum value of memory to use when processing these records. (default:5M) + StorageBacklogMemLimit string `json:"storage.backlog.mem_limit,omitempty"` +} + // InputTail defines Fluentbit tail input configuration The tail input plugin allows to monitor one or several text files. It has a similar behavior like tail -f shell command. type InputTail struct { + // Specify the buffering mechanism to use. It can be memory or filesystem. (default:memory) + StorageType string `json:"storage.type,omitempty"` // Set the buffer size for HTTP client when reading responses from Kubernetes API server. The value must be according to the Unit Size specification. (default:32k) BufferChunkSize string `json:"Buffer_Chunk_Size,omitempty"` // Set the limit of the buffer size per monitored file. When a buffer needs to be increased (e.g: very long lines), this value is used to restrict how much the memory buffer can grow. If reading a file exceed this limit, the file is removed from the monitored file list. The value must be according to the Unit Size specification. (default:Buffer_Chunk_Size) diff --git a/api/v1beta1/logging_types.go b/api/v1beta1/logging_types.go index 28b6b7fe7..c5ed524eb 100644 --- a/api/v1beta1/logging_types.go +++ b/api/v1beta1/logging_types.go @@ -23,6 +23,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +const ( + BuffersPath = "/opt/fluent-bit/%s/buf" + PositionDbPath = "/opt/fluent-bit/%s/pos" +) + // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. @@ -114,7 +119,6 @@ func (l *Logging) SetDefaults() *Logging { } if copy.Spec.FluentdSpec.Metrics.PrometheusAnnotations { - copy.Spec.FluentdSpec.Annotations["prometheus.io/scrape"] = "true" copy.Spec.FluentdSpec.Annotations["prometheus.io/path"] = copy.Spec.FluentdSpec.Metrics.Path @@ -226,7 +230,6 @@ func (l *Logging) SetDefaults() *Logging { copy.Spec.FluentbitSpec.Metrics.Interval = "15s" } if copy.Spec.FluentbitSpec.Metrics.PrometheusAnnotations { - copy.Spec.FluentbitSpec.Annotations["prometheus.io/scrape"] = "true" copy.Spec.FluentbitSpec.Annotations["prometheus.io/path"] = copy.Spec.FluentbitSpec.Metrics.Path copy.Spec.FluentbitSpec.Annotations["prometheus.io/port"] = string(copy.Spec.FluentbitSpec.Metrics.Port) @@ -235,7 +238,23 @@ func (l *Logging) SetDefaults() *Logging { if copy.Spec.FluentbitSpec.MountPath == "" { copy.Spec.FluentbitSpec.MountPath = "/var/lib/docker/containers" } - + if copy.Spec.FluentbitSpec.PositionDB == nil { + copy.Spec.FluentbitSpec.PositionDB = &KubernetesStorage{ + HostPath: &v1.HostPathVolumeSource{ + Path: fmt.Sprintf(PositionDbPath, copy.Name), + }, + } + } + if copy.Spec.FluentbitSpec.BufferStorage.StoragePath == "" { + copy.Spec.FluentbitSpec.BufferStorage.StoragePath = "/buffers" + } + if copy.Spec.FluentbitSpec.BufferStorageVolume == nil { + copy.Spec.FluentbitSpec.BufferStorageVolume = &KubernetesStorage{ + HostPath: &v1.HostPathVolumeSource{ + Path: fmt.Sprintf(BuffersPath, copy.Name), + }, + } + } } return copy } diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go index b5acfb937..a3a45ff1b 100644 --- a/api/v1beta1/zz_generated.deepcopy.go +++ b/api/v1beta1/zz_generated.deepcopy.go @@ -25,6 +25,21 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BufferStorage) DeepCopyInto(out *BufferStorage) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BufferStorage. +func (in *BufferStorage) DeepCopy() *BufferStorage { + if in == nil { + return nil + } + out := new(BufferStorage) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ClusterFlow) DeepCopyInto(out *ClusterFlow) { *out = *in @@ -374,6 +389,12 @@ func (in *FluentbitSpec) DeepCopyInto(out *FluentbitSpec) { } out.InputTail = in.InputTail out.FilterKubernetes = in.FilterKubernetes + out.BufferStorage = in.BufferStorage + if in.BufferStorageVolume != nil { + in, out := &in.BufferStorageVolume, &out.BufferStorageVolume + *out = new(KubernetesStorage) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FluentbitSpec. diff --git a/cmd/docs.go b/cmd/docs.go index 5b4fbc268..dba272d6e 100644 --- a/cmd/docs.go +++ b/cmd/docs.go @@ -70,7 +70,6 @@ func (d *doc) checkNodes(n ast.Node) bool { d.append(fmt.Sprintf("| %s | %s | %s | %s | %s |", name, normaliseType(item.Type), required, def, com)) } } - } } @@ -169,7 +168,6 @@ func main() { } index.generate() - } func getPrefixedLine(origin, expression string) string { @@ -249,7 +247,6 @@ func getValuesFromItem(item *ast.Field) (name, comment, def, required string) { } return nameResult, getLink(commentWithDefault), "-", required - } func getDocumentParser(file plugin) *doc { diff --git a/config/crd/bases/logging.banzaicloud.io_loggings.yaml b/config/crd/bases/logging.banzaicloud.io_loggings.yaml index f5fffb6d5..288293973 100644 --- a/config/crd/bases/logging.banzaicloud.io_loggings.yaml +++ b/config/crd/bases/logging.banzaicloud.io_loggings.yaml @@ -47,6 +47,53 @@ spec: additionalProperties: type: string type: object + bufferStorage: + description: BufferStorage is the Service Section Configuration + of fluent-bit + properties: + storage.backlog.mem_limit: + description: If storage.path is set, Fluent Bit will look for + data chunks that were not delivered and are still in the storage + layer, these are called backlog data. This option configure + a hint of maximum value of memory to use when processing these + records. (default:5M) + type: string + storage.checksum: + description: Enable the data integrity check when writing and + reading data from the filesystem. The storage layer uses the + CRC32 algorithm. (default:Off) + type: string + storage.path: + description: Set an optional location in the file system to + store streams and chunks of data. If this parameter is not + set, Input plugins can only use in-memory buffering. + type: string + storage.sync: + description: Configure the synchronization mode used to store + the data into the file system. It can take the values normal + or full. (default:normal) + type: string + type: object + bufferStorageVolume: + properties: + host_path: + description: Represents a host path mapped into a pod. Host + path volumes do not support ownership management or SELinux + relabeling. + properties: + path: + description: 'Path of the directory on the host. If the + path is a symlink, it will follow the link to the real + path. More info: https://kubernetes.io/docs/concepts/storage/volumes#hostpath' + type: string + type: + description: 'Type for HostPath Volume Defaults to "" More + info: https://kubernetes.io/docs/concepts/storage/volumes#hostpath' + type: string + required: + - path + type: object + type: object customConfigSecret: type: string filterKubernetes: @@ -278,6 +325,10 @@ spec: Tag_Regex: description: Set a regex to extract fields from the file. type: string + storage.type: + description: Specify the buffering mechanism to use. It can + be memory or filesystem. (default:memory) + type: string type: object metrics: description: Metrics defines the service monitor endpoints diff --git a/docs/crds.md b/docs/crds.md index 000282e6e..85a1ef8d1 100644 --- a/docs/crds.md +++ b/docs/crds.md @@ -125,9 +125,11 @@ spec: | tolerations | [Toleration](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.12/#toleration-v1-core) | {} | Pod toleration | | metrics | [Metrics](./logging-operator-monitoring.md#metrics-variables) | {} | Metrics defines the service monitor endpoints | | security | [Security](./security#security-variables) | {} | Security defines Fluentd, Fluentbit deployment security properties | -| position_db | [KubernetesStorage](#KubernetesStorage) | nil | Add position db storage support | +| position_db | [KubernetesStorage](#KubernetesStorage) | nil | Add position db storage support. If nothing is configured a `hostPath` volume is used with the path `/opt/fluent-bit//pos`. | | inputTail | [InputTail](./fluentbit.md#tail-inputtail) | {} | The tail input plugin allows to monitor one or several text files. | | filterKubernetes | [FilterKubernetes](./fluentbit.md#kubernetes-filterkubernetes) | {} | Fluent Bit Kubernetes Filter allows to enrich your log files with Kubernetes metadata. | +| bufferStorage | [BufferStorage](./fluentbit.md#bufferstorage) | | Buffer Storage configures persistent buffer to avoid losing data in case of a failure | +| bufferStorageVolume | [KubernetesStorage](#KubernetesStorage) | nil | Volume definition for the Buffer Storage. If nothing is configured a `hostPath` volume is used with the path `/opt/fluent-bit//buf`. | | customConfigSecret | string | "" | Custom secret to use as fluent-bit config.
It must include all the config files necessary to run fluent-bit (_fluent-bit.conf_, _parsers*.conf_) | **`logging` with custom fluent-bit annotations** diff --git a/docs/fluentbit.md b/docs/fluentbit.md index 661271139..5acff0ddc 100644 --- a/docs/fluentbit.md +++ b/docs/fluentbit.md @@ -81,6 +81,7 @@ The plugin supports the following configuration parameters: | Key | Description | Default | | :--- | :--- | :--- | +| storage.type | Specify the buffering mechanism to use. It can be memory or filesystem. | memory | | Buffer\_Chunk\_Size | Set the initial buffer size to read files data. This value is used too to increase buffer size. The value must be according to the [Unit Size](../configuration/unit_sizes.md) specification. | 32k | | Buffer\_Max\_Size | Set the limit of the buffer size per monitored file. When a buffer needs to be increased \(e.g: very long lines\), this value is used to restrict how much the memory buffer can grow. If reading a file exceed this limit, the file is removed from the monitored file list. The value must be according to the [Unit Size](../configuration/unit_sizes.md) specification. | Buffer\_Chunk\_Size | | Path | Pattern specifying a specific log files or multiple ones through the use of common wildcards. | | @@ -102,4 +103,35 @@ The plugin supports the following configuration parameters: | Parser\_Firstline | Name of the parser that matchs the beginning of a multiline message. Note that the regular expression defined in the parser must include a group name \(named capture\) | | | Parser\_N | Optional-extra parser to interpret and structure multiline entries. This option can be used to define multiple parsers, e.g: Parser\_1 ab1, Parser\_2 ab2, Parser\_N abN. | | | Docker\_Mode | If enabled, the plugin will recombine split Docker log lines before passing them to any parser as configured above. This mode cannot be used at the same time as Multiline. | Off | -| Docker\_Mode\_Flush | Wait period time in seconds to flush queued unfinished split lines. | 4 | \ No newline at end of file +| Docker\_Mode\_Flush | Wait period time in seconds to flush queued unfinished split lines. | 4 | + +## Buffering + +### BufferStorage +A mechanism to place processed data into a temporal location until is ready to be shipped. [More Info](https://docs.fluentbit.io/manual/configuration/buffering) + + +| Key | Description | Default | +| :--- | :--- | :--- | +| storage.path | Set an optional location in the file system to store streams and chunks of data. If this parameter is not set, Input plugins can only use in-memory buffering. | | +| storage.sync | Configure the synchronization mode used to store the data into the file system. It can take the values normal or full. | normal | +| storage.checksum | Enable the data integrity check when writing and reading data from the filesystem. The storage layer uses the CRC32 algorithm. | Off | +| storage.backlog.mem_limit | If storage.path is set, Fluent Bit will look for data chunks that were not delivered and are still in the storage layer, these are called backlog data. This option configure a hint of maximum value of memory to use when processing these records. | 5M | + + +#### Default configuration + +If nothing is set, by default it configures the `storage.path` explicitly to use `/buffers` and leaves fluent-bit defaults for the other options. + +``` +apiVersion: logging.banzaicloud.io/v1beta1 +kind: Logging +metadata: + name: default-logging-simple +spec: + fluentd: {} + fluentbit: + bufferStorage: + storage.path: /buffers + controlNamespace: logging +``` diff --git a/pkg/k8sutil/resource.go b/pkg/k8sutil/resource.go index 24abd265e..350d8ff4a 100644 --- a/pkg/k8sutil/resource.go +++ b/pkg/k8sutil/resource.go @@ -122,7 +122,6 @@ func (r *GenericResourceReconciler) ReconcileResource(desired runtime.Object, de } log.Info("resource updated", "resource", desired.GetObjectKind().GroupVersionKind()) } - } return nil } @@ -141,7 +140,6 @@ func (r *GenericResourceReconciler) createIfNotExists(desired runtime.Object) (b if err != nil { return false, current, err } - } key, err := runtimeClient.ObjectKeyFromObject(current) if err != nil { @@ -183,9 +181,7 @@ func (r *GenericResourceReconciler) delete(desired runtime.Object) (bool, error) "resource", desired.GetObjectKind().GroupVersionKind(), "type", reflect.TypeOf(desired)) } return false, nil - } - } key, err := runtimeClient.ObjectKeyFromObject(current) @@ -200,9 +196,7 @@ func (r *GenericResourceReconciler) delete(desired runtime.Object) (bool, error) } else { log.Info("resource not found skipping delete", "resource", current.GetObjectKind().GroupVersionKind()) return false, nil - } - } err = r.Client.Delete(context.TODO(), current) if err != nil { diff --git a/pkg/model/render/fluent.go b/pkg/model/render/fluent.go index 8be294809..e5bd47b21 100644 --- a/pkg/model/render/fluent.go +++ b/pkg/model/render/fluent.go @@ -78,7 +78,6 @@ func (f *FluentRender) indented(indent int, format string, values ...interface{} fmt.Fprintln(f.Out, "") } } - } func tag(tag string) string { diff --git a/pkg/model/render/fluent_test.go b/pkg/model/render/fluent_test.go index 14f1be6c2..f100abc2d 100644 --- a/pkg/model/render/fluent_test.go +++ b/pkg/model/render/fluent_test.go @@ -31,7 +31,6 @@ import ( ) func TestRenderDirective(t *testing.T) { - var tests = []struct { name string directive types.Directive @@ -266,7 +265,6 @@ func TestRenderDirective(t *testing.T) { t.Errorf("[%s] Result does not match (-actual vs +expected):\n%v", test.name, diff.LineDiff(a, e)) } } - } } @@ -492,7 +490,7 @@ func TestRenderS3(t *testing.T) { } for _, item := range table { t.Logf("> %s\n", item.name) - err := ValidateRenderS3(t, item.s3Config, item.expected) + err := ValidateRenderS3(t, &item.s3Config, item.expected) if item.err != "" { if err == nil { t.Errorf("expected error: %s", item.err) @@ -510,7 +508,7 @@ func TestRenderS3(t *testing.T) { } } -func ValidateRenderS3(t *testing.T, s3Config output.S3OutputConfig, expected string) error { +func ValidateRenderS3(t *testing.T, s3Config plugins.DirectiveConverter, expected string) error { system := types.NewSystem(toDirective(t, input.NewTailInputConfig("input.log")), types.NewRouter("test")) s3Plugin, err := s3Config.ToDirective(secret.NewSecretLoader(nil, "", "", nil), "test") diff --git a/pkg/model/types/stringmaps.go b/pkg/model/types/stringmaps.go index e800f51e8..18c9f2988 100644 --- a/pkg/model/types/stringmaps.go +++ b/pkg/model/types/stringmaps.go @@ -194,7 +194,6 @@ func (s *StructToStringMapper) fillMap(value reflect.Value, out map[string]strin } } } - } } return multierror diff --git a/pkg/resources/fluentbit/config.go b/pkg/resources/fluentbit/config.go index d814cc963..be5438906 100644 --- a/pkg/resources/fluentbit/config.go +++ b/pkg/resources/fluentbit/config.go @@ -20,11 +20,16 @@ var fluentBitConfigTemplate = ` Daemon Off Log_Level info Parsers_File parsers.conf -{{- if .Monitor.Enabled }} + {{- if .Monitor.Enabled }} HTTP_Server On HTTP_Listen 0.0.0.0 HTTP_Port {{ .Monitor.Port }} -{{- end }} + {{- end }} + {{- range $key, $value := .BufferStorage }} + {{- if $value }} + {{ $key }} {{$value}} + {{- end }} + {{- end }} [INPUT] Name tail @@ -37,7 +42,7 @@ var fluentBitConfigTemplate = ` [FILTER] Name kubernetes {{- range $key, $value := .Filter }} - {{- if $value }} + {{- if $value }} {{ $key }} {{$value}} {{- end }} {{- end }} diff --git a/pkg/resources/fluentbit/configsecret.go b/pkg/resources/fluentbit/configsecret.go index 0f3cfcda0..53925c68a 100644 --- a/pkg/resources/fluentbit/configsecret.go +++ b/pkg/resources/fluentbit/configsecret.go @@ -39,12 +39,13 @@ type fluentBitConfig struct { Port int32 Path string } - Output map[string]string - TargetHost string - TargetPort int32 - Parser string - Input map[string]string - Filter map[string]string + Output map[string]string + TargetHost string + TargetPort int32 + Parser string + Input map[string]string + Filter map[string]string + BufferStorage map[string]string } func (r *Reconciler) configSecret() (runtime.Object, k8sutil.DesiredState) { @@ -75,6 +76,11 @@ func (r *Reconciler) configSecret() (runtime.Object, k8sutil.DesiredState) { log.Error(err) } + fluentbitBufferStorage, err := mapper.StringsMap(r.Logging.Spec.FluentbitSpec.BufferStorage) + if err != nil { + log.Error(err) + } + input := fluentBitConfig{ Namespace: r.Logging.Spec.ControlNamespace, TLS: struct { @@ -84,11 +90,12 @@ func (r *Reconciler) configSecret() (runtime.Object, k8sutil.DesiredState) { Enabled: r.Logging.Spec.FluentbitSpec.TLS.Enabled, SharedKey: r.Logging.Spec.FluentbitSpec.TLS.SharedKey, }, - Monitor: monitor, - TargetHost: fmt.Sprintf("%s.%s.svc", r.Logging.QualifiedName(fluentd.ServiceName), r.Logging.Spec.ControlNamespace), - TargetPort: r.Logging.Spec.FluentdSpec.Port, - Input: fluentbitInput, - Filter: fluentbitFilter, + Monitor: monitor, + TargetHost: fmt.Sprintf("%s.%s.svc", r.Logging.QualifiedName(fluentd.ServiceName), r.Logging.Spec.ControlNamespace), + TargetPort: r.Logging.Spec.FluentdSpec.Port, + Input: fluentbitInput, + Filter: fluentbitFilter, + BufferStorage: fluentbitBufferStorage, } if r.Logging.Spec.FluentbitSpec.Parser != "" { input.Parser = r.Logging.Spec.FluentbitSpec.Parser @@ -101,11 +108,14 @@ func (r *Reconciler) configSecret() (runtime.Object, k8sutil.DesiredState) { if r.Logging.Spec.FluentbitSpec.TargetPort != 0 { input.TargetPort = r.Logging.Spec.FluentbitSpec.TargetPort } + + r.desiredConfig = generateConfig(input) + return &corev1.Secret{ ObjectMeta: templates.FluentbitObjectMeta( r.Logging.QualifiedName(fluentBitSecretConfigName), r.Logging.Labels, r.Logging), Data: map[string][]byte{ - "fluent-bit.conf": []byte(generateConfig(input)), + "fluent-bit.conf": []byte(r.desiredConfig), }, }, k8sutil.StatePresent } diff --git a/pkg/resources/fluentbit/daemonset.go b/pkg/resources/fluentbit/daemonset.go index 773f089c8..c8ef3ba82 100644 --- a/pkg/resources/fluentbit/daemonset.go +++ b/pkg/resources/fluentbit/daemonset.go @@ -15,6 +15,9 @@ package fluentbit import ( + "crypto/sha256" + "fmt" + "github.com/banzaicloud/logging-operator/api/v1beta1" "github.com/banzaicloud/logging-operator/pkg/k8sutil" "github.com/banzaicloud/logging-operator/pkg/resources/templates" @@ -25,11 +28,12 @@ import ( "k8s.io/apimachinery/pkg/runtime" ) -const TailPositionVolume = "positiondb" +const ( + TailPositionVolume = "positiondb" + BufferStorageVolume = "buffers" +) -// TODO in case of rbac add created serviceAccount name func (r *Reconciler) daemonSet() (runtime.Object, k8sutil.DesiredState) { - var containerPorts []corev1.ContainerPort if r.Logging.Spec.FluentbitSpec.Metrics != nil && r.Logging.Spec.FluentbitSpec.Metrics.Port != 0 { @@ -41,17 +45,24 @@ func (r *Reconciler) daemonSet() (runtime.Object, k8sutil.DesiredState) { } labels := util.MergeLabels(r.Logging.Labels, r.getFluentBitLabels()) + meta := templates.FluentbitObjectMeta(r.Logging.QualifiedName(fluentbitDaemonSetName), labels, r.Logging) + podMeta := metav1.ObjectMeta{ + Labels: labels, + Annotations: r.Logging.Spec.FluentbitSpec.Annotations, + } + + if r.desiredConfig != "" { + h := sha256.New() + _, _ = h.Write([]byte(r.desiredConfig)) + templates.Annotate(podMeta, "checksum/config", fmt.Sprintf("%x", h.Sum(nil))) + } return &appsv1.DaemonSet{ - ObjectMeta: templates.FluentbitObjectMeta( - r.Logging.QualifiedName(fluentbitDaemonSetName), labels, r.Logging), + ObjectMeta: meta, Spec: appsv1.DaemonSetSpec{ Selector: &metav1.LabelSelector{MatchLabels: util.MergeLabels(r.Logging.Labels, r.getFluentBitLabels())}, Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: labels, - Annotations: r.Logging.Spec.FluentbitSpec.Annotations, - }, + ObjectMeta: podMeta, Spec: corev1.PodSpec{ ServiceAccountName: r.getServiceAccount(), Volumes: r.generateVolume(), @@ -96,6 +107,10 @@ func (r *Reconciler) generateVolumeMounts() (v []corev1.VolumeMount) { Name: TailPositionVolume, MountPath: "/tail-db", }, + { + Name: BufferStorageVolume, + MountPath: r.Logging.Spec.FluentbitSpec.BufferStorage.StoragePath, + }, { Name: "varlogs", ReadOnly: true, @@ -183,6 +198,7 @@ func (r *Reconciler) generateVolume() (v []corev1.Volume) { v = append(v, tlsRelatedVolume) } v = append(v, GetVolumeFromKubernetesStorage(r.Logging.Spec.FluentbitSpec.PositionDB, TailPositionVolume)) + v = append(v, GetVolumeFromKubernetesStorage(r.Logging.Spec.FluentbitSpec.BufferStorageVolume, BufferStorageVolume)) return } diff --git a/pkg/resources/fluentbit/fluentbit.go b/pkg/resources/fluentbit/fluentbit.go index 1d26ceb9c..7ed44d60a 100644 --- a/pkg/resources/fluentbit/fluentbit.go +++ b/pkg/resources/fluentbit/fluentbit.go @@ -16,6 +16,7 @@ package fluentbit import ( "emperror.dev/errors" + "k8s.io/apimachinery/pkg/runtime" "github.com/banzaicloud/logging-operator/api/v1beta1" "github.com/banzaicloud/logging-operator/pkg/k8sutil" @@ -53,10 +54,17 @@ func (r *Reconciler) getServiceAccount() string { return r.Logging.QualifiedName(defaultServiceAccountName) } +type DesiredObject struct { + Object runtime.Object + State k8sutil.DesiredState +} + // Reconciler holds info what resource to reconcile type Reconciler struct { Logging *v1beta1.Logging *k8sutil.GenericResourceReconciler + + desiredConfig string } // NewReconciler creates a new Fluentbit reconciler @@ -69,7 +77,7 @@ func New(client client.Client, logger logr.Logger, logging *v1beta1.Logging) *Re // Reconcile reconciles the fluentBit resource func (r *Reconciler) Reconcile() (*reconcile.Result, error) { - for _, res := range []resources.Resource{ + for _, factory := range []resources.Resource{ r.serviceAccount, r.clusterRole, r.clusterRoleBinding, @@ -81,14 +89,15 @@ func (r *Reconciler) Reconcile() (*reconcile.Result, error) { r.serviceMetrics, r.monitorServiceMetrics, } { - o, state := res() + o, state := factory() if o == nil { - return nil, errors.Errorf("Reconcile error! Resource %#v returns with nil object", res) + return nil, errors.Errorf("Reconcile error! Resource %#v returns with nil object", factory) } err := r.ReconcileResource(o, state) if err != nil { return nil, emperror.WrapWith(err, "failed to reconcile resource", "resource", o.GetObjectKind().GroupVersionKind()) } } + return nil, nil } diff --git a/pkg/resources/fluentbit/psp.go b/pkg/resources/fluentbit/psp.go index 796a508f6..fca7943be 100644 --- a/pkg/resources/fluentbit/psp.go +++ b/pkg/resources/fluentbit/psp.go @@ -26,7 +26,6 @@ import ( func (r *Reconciler) clusterPodSecurityPolicy() (runtime.Object, k8sutil.DesiredState) { if r.Logging.Spec.FluentbitSpec.Security.PodSecurityPolicyCreate { - return &policyv1beta1.PodSecurityPolicy{ ObjectMeta: templates.FluentbitObjectMetaClusterScope( r.Logging.QualifiedName(fluentbitPodSecurityPolicyName), @@ -64,7 +63,6 @@ func (r *Reconciler) clusterPodSecurityPolicy() (runtime.Object, k8sutil.Desired }}, }, }, k8sutil.StatePresent - } return &policyv1beta1.PodSecurityPolicy{ ObjectMeta: templates.FluentbitObjectMeta( @@ -73,7 +71,6 @@ func (r *Reconciler) clusterPodSecurityPolicy() (runtime.Object, k8sutil.Desired r.Logging), Spec: policyv1beta1.PodSecurityPolicySpec{}, }, k8sutil.StateAbsent - } func (r *Reconciler) pspClusterRole() (runtime.Object, k8sutil.DesiredState) { diff --git a/pkg/resources/fluentbit/service.go b/pkg/resources/fluentbit/service.go index ebf3cb85a..b4352cac8 100644 --- a/pkg/resources/fluentbit/service.go +++ b/pkg/resources/fluentbit/service.go @@ -70,5 +70,4 @@ func (r *Reconciler) monitorServiceMetrics() (runtime.Object, k8sutil.DesiredSta }, k8sutil.StatePresent } return &v1.ServiceMonitor{ObjectMeta: templates.FluentbitObjectMeta(r.Logging.QualifiedName(fluentbitServiceName+"-metrics"), util.MergeLabels(r.Logging.Labels, r.getFluentBitLabels()), r.Logging), Spec: v1.ServiceMonitorSpec{}}, k8sutil.StateAbsent - } diff --git a/pkg/resources/fluentd/psp.go b/pkg/resources/fluentd/psp.go index 84e2ee0fe..e5ccf51fc 100644 --- a/pkg/resources/fluentd/psp.go +++ b/pkg/resources/fluentd/psp.go @@ -26,7 +26,6 @@ import ( func (r *Reconciler) clusterPodSecurityPolicy() (runtime.Object, k8sutil.DesiredState) { if r.Logging.Spec.FluentdSpec.Security.PodSecurityPolicyCreate { - return &policyv1beta1.PodSecurityPolicy{ ObjectMeta: templates.FluentdObjectMetaClusterScope( r.Logging.QualifiedName(PodSecurityPolicyName), @@ -54,7 +53,6 @@ func (r *Reconciler) clusterPodSecurityPolicy() (runtime.Object, k8sutil.Desired AllowPrivilegeEscalation: util.BoolPointer(false), }, }, k8sutil.StatePresent - } return &policyv1beta1.PodSecurityPolicy{ ObjectMeta: templates.FluentdObjectMeta( @@ -66,7 +64,6 @@ func (r *Reconciler) clusterPodSecurityPolicy() (runtime.Object, k8sutil.Desired func (r *Reconciler) pspRole() (runtime.Object, k8sutil.DesiredState) { if *r.Logging.Spec.FluentdSpec.Security.RoleBasedAccessControlCreate && r.Logging.Spec.FluentdSpec.Security.PodSecurityPolicyCreate { - return &rbacv1.Role{ ObjectMeta: templates.FluentdObjectMeta(r.Logging.QualifiedName(roleName+"-psp"), r.Logging.Labels, r.Logging), Rules: []rbacv1.PolicyRule{ @@ -88,7 +85,6 @@ func (r *Reconciler) pspRole() (runtime.Object, k8sutil.DesiredState) { func (r *Reconciler) pspRoleBinding() (runtime.Object, k8sutil.DesiredState) { if *r.Logging.Spec.FluentdSpec.Security.RoleBasedAccessControlCreate && r.Logging.Spec.FluentdSpec.Security.PodSecurityPolicyCreate { - return &rbacv1.RoleBinding{ ObjectMeta: templates.FluentdObjectMeta(r.Logging.QualifiedName(roleBindingName+"-psp"), r.Logging.Labels, r.Logging), RoleRef: rbacv1.RoleRef{ diff --git a/pkg/resources/fluentd/rbac.go b/pkg/resources/fluentd/rbac.go index 46245f4dc..2e0b57656 100644 --- a/pkg/resources/fluentd/rbac.go +++ b/pkg/resources/fluentd/rbac.go @@ -24,7 +24,6 @@ import ( func (r *Reconciler) role() (runtime.Object, k8sutil.DesiredState) { if *r.Logging.Spec.FluentdSpec.Security.RoleBasedAccessControlCreate { - return &rbacv1.Role{ ObjectMeta: templates.FluentdObjectMeta(r.Logging.QualifiedName(roleName), r.Logging.Labels, r.Logging), Rules: []rbacv1.PolicyRule{ @@ -46,7 +45,6 @@ func (r *Reconciler) role() (runtime.Object, k8sutil.DesiredState) { func (r *Reconciler) roleBinding() (runtime.Object, k8sutil.DesiredState) { if *r.Logging.Spec.FluentdSpec.Security.RoleBasedAccessControlCreate { - return &rbacv1.RoleBinding{ ObjectMeta: templates.FluentdObjectMeta(r.Logging.QualifiedName(roleBindingName), r.Logging.Labels, r.Logging), RoleRef: rbacv1.RoleRef{ @@ -73,7 +71,6 @@ func (r *Reconciler) roleBinding() (runtime.Object, k8sutil.DesiredState) { func (r *Reconciler) serviceAccount() (runtime.Object, k8sutil.DesiredState) { if *r.Logging.Spec.FluentdSpec.Security.RoleBasedAccessControlCreate && r.Logging.Spec.FluentdSpec.Security.ServiceAccount == "" { - return &corev1.ServiceAccount{ ObjectMeta: templates.FluentdObjectMeta( r.Logging.QualifiedName(defaultServiceAccountName), diff --git a/pkg/resources/fluentd/service.go b/pkg/resources/fluentd/service.go index a81afeb91..e60779e12 100644 --- a/pkg/resources/fluentd/service.go +++ b/pkg/resources/fluentd/service.go @@ -90,5 +90,4 @@ func (r *Reconciler) monitorServiceMetrics() (runtime.Object, k8sutil.DesiredSta }, k8sutil.StatePresent } return &v1.ServiceMonitor{ObjectMeta: templates.FluentdObjectMeta(r.Logging.QualifiedName(ServiceName+"-metrics"), util.MergeLabels(r.Logging.Labels, r.getFluentdLabels()), r.Logging), Spec: v1.ServiceMonitorSpec{}}, k8sutil.StateAbsent - } diff --git a/pkg/resources/templates/templates.go b/pkg/resources/templates/templates.go index 3c7bc1bd5..c1231c431 100644 --- a/pkg/resources/templates/templates.go +++ b/pkg/resources/templates/templates.go @@ -20,6 +20,14 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +func Annotate(meta metav1.ObjectMeta, key, val string) metav1.ObjectMeta { + if meta.Annotations == nil { + meta.Annotations = make(map[string]string) + } + meta.Annotations[key] = val + return meta +} + // FluentbitObjectMeta creates an objectMeta for resource fluentbit func FluentbitObjectMeta(name string, labels map[string]string, logging *v1beta1.Logging) metav1.ObjectMeta { o := metav1.ObjectMeta{