Skip to content

Commit

Permalink
Fluentbit support for filesystem buffer storage and rolling upgrade o…
Browse files Browse the repository at this point in the history
…n config change (#213)

* Fluentbit support for filesystem buffer storage and rolling upgrade on config change
* Separate in-container path of the buffer from the hostpath used for the mount. Setup postionDb by default for fluent-bit. Add docs.
* add some lint checks back and fix lint errors
* restore sample config, reword docs
  • Loading branch information
pepov authored and ahma committed Nov 6, 2019
1 parent c689b53 commit 76bbe22
Show file tree
Hide file tree
Showing 22 changed files with 238 additions and 76 deletions.
2 changes: 0 additions & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@ linters:
- stylecheck
- staticcheck
- gosimple
- whitespace
- gocritic
- godox
- interfacer
- gocyclo

issues:
Expand Down
46 changes: 31 additions & 15 deletions api/v1beta1/fluentbit_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,23 @@ import (

// FluentbitSpec defines the desired state of Fluentbit
type FluentbitSpec struct {
Annotations map[string]string `json:"annotations,omitempty"`
Image ImageSpec `json:"image,omitempty"`
TLS FluentbitTLS `json:"tls,omitempty"`
TargetHost string `json:"targetHost,omitempty"`
TargetPort int32 `json:"targetPort,omitempty"`
Resources corev1.ResourceRequirements `json:"resources,omitempty"`
Parser string `json:"parser,omitempty"`
Tolerations []corev1.Toleration `json:"tolerations,omitempty"`
Metrics *Metrics `json:"metrics,omitempty"`
Security *Security `json:"security,omitempty"`
PositionDB *KubernetesStorage `json:"position_db,omitempty"`
MountPath string `json:"mountPath,omitempty"`
InputTail InputTail `json:"inputTail,omitempty"`
FilterKubernetes FilterKubernetes `json:"filterKubernetes,omitempty"`
CustomConfigSecret string `json:"customConfigSecret,omitempty"`
Annotations map[string]string `json:"annotations,omitempty"`
Image ImageSpec `json:"image,omitempty"`
TLS FluentbitTLS `json:"tls,omitempty"`
TargetHost string `json:"targetHost,omitempty"`
TargetPort int32 `json:"targetPort,omitempty"`
Resources corev1.ResourceRequirements `json:"resources,omitempty"`
Parser string `json:"parser,omitempty"`
Tolerations []corev1.Toleration `json:"tolerations,omitempty"`
Metrics *Metrics `json:"metrics,omitempty"`
Security *Security `json:"security,omitempty"`
PositionDB *KubernetesStorage `json:"position_db,omitempty"`
MountPath string `json:"mountPath,omitempty"`
InputTail InputTail `json:"inputTail,omitempty"`
FilterKubernetes FilterKubernetes `json:"filterKubernetes,omitempty"`
BufferStorage BufferStorage `json:"bufferStorage,omitempty"`
BufferStorageVolume *KubernetesStorage `json:"bufferStorageVolume,omitempty"`
CustomConfigSecret string `json:"customConfigSecret,omitempty"`
}

// +kubebuilder:object:generate=true
Expand All @@ -63,8 +65,22 @@ func (spec FluentbitSpec) GetPrometheusPortFromAnnotation() int32 {
return int32(port)
}

// BufferStorage is the Service Section Configuration of fluent-bit
type BufferStorage struct {
// Set an optional location in the file system to store streams and chunks of data. If this parameter is not set, Input plugins can only use in-memory buffering.
StoragePath string `json:"storage.path,omitempty"`
// Configure the synchronization mode used to store the data into the file system. It can take the values normal or full. (default:normal)
StorageSync string `json:"storage.sync,omitempty"`
// Enable the data integrity check when writing and reading data from the filesystem. The storage layer uses the CRC32 algorithm. (default:Off)
StorageChecksum string `json:"storage.checksum,omitempty"`
// If storage.path is set, Fluent Bit will look for data chunks that were not delivered and are still in the storage layer, these are called backlog data. This option configure a hint of maximum value of memory to use when processing these records. (default:5M)
StorageBacklogMemLimit string `json:"storage.backlog.mem_limit,omitempty"`
}

// InputTail defines Fluentbit tail input configuration The tail input plugin allows to monitor one or several text files. It has a similar behavior like tail -f shell command.
type InputTail struct {
// Specify the buffering mechanism to use. It can be memory or filesystem. (default:memory)
StorageType string `json:"storage.type,omitempty"`
// Set the buffer size for HTTP client when reading responses from Kubernetes API server. The value must be according to the Unit Size specification. (default:32k)
BufferChunkSize string `json:"Buffer_Chunk_Size,omitempty"`
// Set the limit of the buffer size per monitored file. When a buffer needs to be increased (e.g: very long lines), this value is used to restrict how much the memory buffer can grow. If reading a file exceed this limit, the file is removed from the monitored file list. The value must be according to the Unit Size specification. (default:Buffer_Chunk_Size)
Expand Down
25 changes: 22 additions & 3 deletions api/v1beta1/logging_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
BuffersPath = "/opt/fluent-bit/%s/buf"
PositionDbPath = "/opt/fluent-bit/%s/pos"
)

// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.

Expand Down Expand Up @@ -114,7 +119,6 @@ func (l *Logging) SetDefaults() *Logging {
}

if copy.Spec.FluentdSpec.Metrics.PrometheusAnnotations {

copy.Spec.FluentdSpec.Annotations["prometheus.io/scrape"] = "true"

copy.Spec.FluentdSpec.Annotations["prometheus.io/path"] = copy.Spec.FluentdSpec.Metrics.Path
Expand Down Expand Up @@ -226,7 +230,6 @@ func (l *Logging) SetDefaults() *Logging {
copy.Spec.FluentbitSpec.Metrics.Interval = "15s"
}
if copy.Spec.FluentbitSpec.Metrics.PrometheusAnnotations {

copy.Spec.FluentbitSpec.Annotations["prometheus.io/scrape"] = "true"
copy.Spec.FluentbitSpec.Annotations["prometheus.io/path"] = copy.Spec.FluentbitSpec.Metrics.Path
copy.Spec.FluentbitSpec.Annotations["prometheus.io/port"] = string(copy.Spec.FluentbitSpec.Metrics.Port)
Expand All @@ -235,7 +238,23 @@ func (l *Logging) SetDefaults() *Logging {
if copy.Spec.FluentbitSpec.MountPath == "" {
copy.Spec.FluentbitSpec.MountPath = "/var/lib/docker/containers"
}

if copy.Spec.FluentbitSpec.PositionDB == nil {
copy.Spec.FluentbitSpec.PositionDB = &KubernetesStorage{
HostPath: &v1.HostPathVolumeSource{
Path: fmt.Sprintf(PositionDbPath, copy.Name),
},
}
}
if copy.Spec.FluentbitSpec.BufferStorage.StoragePath == "" {
copy.Spec.FluentbitSpec.BufferStorage.StoragePath = "/buffers"
}
if copy.Spec.FluentbitSpec.BufferStorageVolume == nil {
copy.Spec.FluentbitSpec.BufferStorageVolume = &KubernetesStorage{
HostPath: &v1.HostPathVolumeSource{
Path: fmt.Sprintf(BuffersPath, copy.Name),
},
}
}
}
return copy
}
Expand Down
21 changes: 21 additions & 0 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions cmd/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ func (d *doc) checkNodes(n ast.Node) bool {
d.append(fmt.Sprintf("| %s | %s | %s | %s | %s |", name, normaliseType(item.Type), required, def, com))
}
}

}
}

Expand Down Expand Up @@ -169,7 +168,6 @@ func main() {
}

index.generate()

}

func getPrefixedLine(origin, expression string) string {
Expand Down Expand Up @@ -249,7 +247,6 @@ func getValuesFromItem(item *ast.Field) (name, comment, def, required string) {
}

return nameResult, getLink(commentWithDefault), "-", required

}

func getDocumentParser(file plugin) *doc {
Expand Down
51 changes: 51 additions & 0 deletions config/crd/bases/logging.banzaicloud.io_loggings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,53 @@ spec:
additionalProperties:
type: string
type: object
bufferStorage:
description: BufferStorage is the Service Section Configuration
of fluent-bit
properties:
storage.backlog.mem_limit:
description: If storage.path is set, Fluent Bit will look for
data chunks that were not delivered and are still in the storage
layer, these are called backlog data. This option configure
a hint of maximum value of memory to use when processing these
records. (default:5M)
type: string
storage.checksum:
description: Enable the data integrity check when writing and
reading data from the filesystem. The storage layer uses the
CRC32 algorithm. (default:Off)
type: string
storage.path:
description: Set an optional location in the file system to
store streams and chunks of data. If this parameter is not
set, Input plugins can only use in-memory buffering.
type: string
storage.sync:
description: Configure the synchronization mode used to store
the data into the file system. It can take the values normal
or full. (default:normal)
type: string
type: object
bufferStorageVolume:
properties:
host_path:
description: Represents a host path mapped into a pod. Host
path volumes do not support ownership management or SELinux
relabeling.
properties:
path:
description: 'Path of the directory on the host. If the
path is a symlink, it will follow the link to the real
path. More info: https://kubernetes.io/docs/concepts/storage/volumes#hostpath'
type: string
type:
description: 'Type for HostPath Volume Defaults to "" More
info: https://kubernetes.io/docs/concepts/storage/volumes#hostpath'
type: string
required:
- path
type: object
type: object
customConfigSecret:
type: string
filterKubernetes:
Expand Down Expand Up @@ -278,6 +325,10 @@ spec:
Tag_Regex:
description: Set a regex to extract fields from the file.
type: string
storage.type:
description: Specify the buffering mechanism to use. It can
be memory or filesystem. (default:memory)
type: string
type: object
metrics:
description: Metrics defines the service monitor endpoints
Expand Down
4 changes: 3 additions & 1 deletion docs/crds.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,11 @@ spec:
| tolerations | [Toleration](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.12/#toleration-v1-core) | {} | Pod toleration |
| metrics | [Metrics](./logging-operator-monitoring.md#metrics-variables) | {} | Metrics defines the service monitor endpoints |
| security | [Security](./security#security-variables) | {} | Security defines Fluentd, Fluentbit deployment security properties |
| position_db | [KubernetesStorage](#KubernetesStorage) | nil | Add position db storage support |
| position_db | [KubernetesStorage](#KubernetesStorage) | nil | Add position db storage support. If nothing is configured a `hostPath` volume is used with the path `/opt/fluent-bit/<name of the logging CR>/pos`. |
| inputTail | [InputTail](./fluentbit.md#tail-inputtail) | {} | The tail input plugin allows to monitor one or several text files. |
| filterKubernetes | [FilterKubernetes](./fluentbit.md#kubernetes-filterkubernetes) | {} | Fluent Bit Kubernetes Filter allows to enrich your log files with Kubernetes metadata. |
| bufferStorage | [BufferStorage](./fluentbit.md#bufferstorage) | | Buffer Storage configures persistent buffer to avoid losing data in case of a failure |
| bufferStorageVolume | [KubernetesStorage](#KubernetesStorage) | nil | Volume definition for the Buffer Storage. If nothing is configured a `hostPath` volume is used with the path `/opt/fluent-bit/<name of the logging CR>/buf`. |
| customConfigSecret | string | "" | Custom secret to use as fluent-bit config.<br /> It must include all the config files necessary to run fluent-bit (_fluent-bit.conf_, _parsers*.conf_) |

**`logging` with custom fluent-bit annotations**
Expand Down
34 changes: 33 additions & 1 deletion docs/fluentbit.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ The plugin supports the following configuration parameters:

| Key | Description | Default |
| :--- | :--- | :--- |
| storage.type | Specify the buffering mechanism to use. It can be memory or filesystem. | memory |
| Buffer\_Chunk\_Size | Set the initial buffer size to read files data. This value is used too to increase buffer size. The value must be according to the [Unit Size](../configuration/unit_sizes.md) specification. | 32k |
| Buffer\_Max\_Size | Set the limit of the buffer size per monitored file. When a buffer needs to be increased \(e.g: very long lines\), this value is used to restrict how much the memory buffer can grow. If reading a file exceed this limit, the file is removed from the monitored file list. The value must be according to the [Unit Size](../configuration/unit_sizes.md) specification. | Buffer\_Chunk\_Size |
| Path | Pattern specifying a specific log files or multiple ones through the use of common wildcards. | |
Expand All @@ -102,4 +103,35 @@ The plugin supports the following configuration parameters:
| Parser\_Firstline | Name of the parser that matchs the beginning of a multiline message. Note that the regular expression defined in the parser must include a group name \(named capture\) | |
| Parser\_N | Optional-extra parser to interpret and structure multiline entries. This option can be used to define multiple parsers, e.g: Parser\_1 ab1, Parser\_2 ab2, Parser\_N abN. | |
| Docker\_Mode | If enabled, the plugin will recombine split Docker log lines before passing them to any parser as configured above. This mode cannot be used at the same time as Multiline. | Off |
| Docker\_Mode\_Flush | Wait period time in seconds to flush queued unfinished split lines. | 4 |
| Docker\_Mode\_Flush | Wait period time in seconds to flush queued unfinished split lines. | 4 |

## Buffering

### BufferStorage
A mechanism to place processed data into a temporal location until is ready to be shipped. [More Info](https://docs.fluentbit.io/manual/configuration/buffering)


| Key | Description | Default |
| :--- | :--- | :--- |
| storage.path | Set an optional location in the file system to store streams and chunks of data. If this parameter is not set, Input plugins can only use in-memory buffering. | |
| storage.sync | Configure the synchronization mode used to store the data into the file system. It can take the values normal or full. | normal |
| storage.checksum | Enable the data integrity check when writing and reading data from the filesystem. The storage layer uses the CRC32 algorithm. | Off |
| storage.backlog.mem_limit | If storage.path is set, Fluent Bit will look for data chunks that were not delivered and are still in the storage layer, these are called backlog data. This option configure a hint of maximum value of memory to use when processing these records. | 5M |


#### Default configuration

If nothing is set, by default it configures the `storage.path` explicitly to use `/buffers` and leaves fluent-bit defaults for the other options.

```
apiVersion: logging.banzaicloud.io/v1beta1
kind: Logging
metadata:
name: default-logging-simple
spec:
fluentd: {}
fluentbit:
bufferStorage:
storage.path: /buffers
controlNamespace: logging
```
6 changes: 0 additions & 6 deletions pkg/k8sutil/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ func (r *GenericResourceReconciler) ReconcileResource(desired runtime.Object, de
}
log.Info("resource updated", "resource", desired.GetObjectKind().GroupVersionKind())
}

}
return nil
}
Expand All @@ -141,7 +140,6 @@ func (r *GenericResourceReconciler) createIfNotExists(desired runtime.Object) (b
if err != nil {
return false, current, err
}

}
key, err := runtimeClient.ObjectKeyFromObject(current)
if err != nil {
Expand Down Expand Up @@ -183,9 +181,7 @@ func (r *GenericResourceReconciler) delete(desired runtime.Object) (bool, error)
"resource", desired.GetObjectKind().GroupVersionKind(), "type", reflect.TypeOf(desired))
}
return false, nil

}

}

key, err := runtimeClient.ObjectKeyFromObject(current)
Expand All @@ -200,9 +196,7 @@ func (r *GenericResourceReconciler) delete(desired runtime.Object) (bool, error)
} else {
log.Info("resource not found skipping delete", "resource", current.GetObjectKind().GroupVersionKind())
return false, nil

}

}
err = r.Client.Delete(context.TODO(), current)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion pkg/model/render/fluent.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ func (f *FluentRender) indented(indent int, format string, values ...interface{}
fmt.Fprintln(f.Out, "")
}
}

}

func tag(tag string) string {
Expand Down
6 changes: 2 additions & 4 deletions pkg/model/render/fluent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
)

func TestRenderDirective(t *testing.T) {

var tests = []struct {
name string
directive types.Directive
Expand Down Expand Up @@ -266,7 +265,6 @@ func TestRenderDirective(t *testing.T) {
t.Errorf("[%s] Result does not match (-actual vs +expected):\n%v", test.name, diff.LineDiff(a, e))
}
}

}
}

Expand Down Expand Up @@ -492,7 +490,7 @@ func TestRenderS3(t *testing.T) {
}
for _, item := range table {
t.Logf("> %s\n", item.name)
err := ValidateRenderS3(t, item.s3Config, item.expected)
err := ValidateRenderS3(t, &item.s3Config, item.expected)
if item.err != "" {
if err == nil {
t.Errorf("expected error: %s", item.err)
Expand All @@ -510,7 +508,7 @@ func TestRenderS3(t *testing.T) {
}
}

func ValidateRenderS3(t *testing.T, s3Config output.S3OutputConfig, expected string) error {
func ValidateRenderS3(t *testing.T, s3Config plugins.DirectiveConverter, expected string) error {
system := types.NewSystem(toDirective(t, input.NewTailInputConfig("input.log")), types.NewRouter("test"))

s3Plugin, err := s3Config.ToDirective(secret.NewSecretLoader(nil, "", "", nil), "test")
Expand Down
1 change: 0 additions & 1 deletion pkg/model/types/stringmaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ func (s *StructToStringMapper) fillMap(value reflect.Value, out map[string]strin
}
}
}

}
}
return multierror
Expand Down
Loading

0 comments on commit 76bbe22

Please sign in to comment.