Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logstash to serverless provider #1646

Merged
merged 16 commits into from
Feb 8, 2024
Merged
26 changes: 26 additions & 0 deletions internal/kibana/fleet.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ import (
"net/http"
)

type FleetOutput struct {
ID string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Hosts []string `json:"hosts,omitempty"`
Type string `json:"type,omitempty"`
}

// DefaultFleetServerURL returns the default Fleet server configured in Kibana
func (c *Client) DefaultFleetServerURL() (string, error) {
path := fmt.Sprintf("%s/fleet_server_hosts", FleetAPI)
Expand Down Expand Up @@ -43,3 +50,22 @@ func (c *Client) DefaultFleetServerURL() (string, error) {

return "", errors.New("could not find the fleet server URL for this project")
}

// AddFleetOutput adds an additional output to kibana eg., logstash
func (c *Client) AddFleetOutput(fo FleetOutput) error {
reqBody, err := json.Marshal(fo)
if err != nil {
return fmt.Errorf("could not convert fleetOutput (request) to JSON: %w", err)
}

statusCode, respBody, err := c.post(fmt.Sprintf("%s/outputs", FleetAPI), reqBody)
if err != nil {
return fmt.Errorf("could not create fleet output: %w", err)
}

if statusCode != http.StatusOK {
return fmt.Errorf("could not get status data; API status code = %d; response body = %s", statusCode, respBody)
}

return nil
}
22 changes: 21 additions & 1 deletion internal/serverless/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,21 @@ func (p *Project) DefaultFleetServerURL(kibanaClient *kibana.Client) (string, er
return fleetURL, nil
}

func (p *Project) AddLogstashFleetOutput(kibanaClient *kibana.Client) error {
logstashFleetOutput := kibana.FleetOutput{
Name: "logstash-output",
ID: "logstash-fleet-output",
Type: "logstash",
Hosts: []string{"logstash:5044"},
}

if err := kibanaClient.AddFleetOutput(logstashFleetOutput); err != nil {
return fmt.Errorf("failed to add logstash fleet output: %w", err)
}

return nil
}

func (p *Project) getESHealth(ctx context.Context, elasticsearchClient *elasticsearch.Client) error {
return elasticsearchClient.CheckHealth(ctx)
}
Expand Down Expand Up @@ -177,7 +192,7 @@ func (p *Project) getFleetHealth(ctx context.Context) error {
return nil
}

func (p *Project) CreateAgentPolicy(stackVersion string, kibanaClient *kibana.Client) error {
func (p *Project) CreateAgentPolicy(stackVersion string, kibanaClient *kibana.Client, logstashEnabled bool) error {
systemPackages, err := registry.Production.Revisions("system", registry.SearchOptions{
KibanaVersion: strings.TrimSuffix(stackVersion, kibana.SNAPSHOT_SUFFIX),
})
Expand All @@ -196,6 +211,11 @@ func (p *Project) CreateAgentPolicy(stackVersion string, kibanaClient *kibana.Cl
Namespace: "default",
MonitoringEnabled: []string{"logs", "metrics"},
}

if logstashEnabled {
policy.DataOutputID = "fleet-logstash-output"
}

newPolicy, err := kibanaClient.CreatePolicy(policy)
if err != nil {
return fmt.Errorf("error while creating agent policy: %w", err)
Expand Down
36 changes: 36 additions & 0 deletions internal/stack/_static/serverless-logstash.yml.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
version: '2.3'
services:
{{ $logstash_enabled := fact "logstash_enabled" }}
{{ if eq $logstash_enabled "true" }}
logstash:
depends_on:
elasticsearch:
condition: service_healthy
kibana:
condition: service_healthy
image: ${LOGSTASH_IMAGE_REF}
healthcheck:
test: bin/logstash -t
interval: 60s
timeout: 50s
retries: 5
command: bash -c 'if [[ ! $(bin/logstash-plugin list) == *"logstash-filter-elastic_integration"* ]]; then echo "Missing plugin logstash-filter-elastic_integration, installing now" && bin/logstash-plugin install logstash-filter-elastic_integration; fi && bin/logstash -f /usr/share/logstashpipeline/ logstash.conf'
volumes:
- "../certs/logstash:/usr/share/logstash/config/certs"
- "../certs/elasticsearch/cert.pem:/usr/share/logstash/config/certs/elasticsearchpem"
- "./logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro"
ports:
- "127.0.0.1:5044:5044"
- "127.0.0.1:9600:9600"
environment:
- xpack.monitoring.enabled=false
- ELASTIC_USER=elastic
- ELASTIC_PASSWORD=changeme
- ELASTIC_HOSTS=https://127.0.0.1:9200

logstash_is_ready:
image: tianon/true
depends_on:
logstash:
condition: service_healthy
{{ end }}
3 changes: 3 additions & 0 deletions internal/stack/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ const (
// LogstashConfigFile is the logstash config file.
LogstashConfigFile = "logstash.conf"

// LogstashEnvFile is the logstash docker compose file
LogstashComposeFile = "logstash.yml"

// KibanaHealthcheckFile is the kibana healthcheck.
KibanaHealthcheckFile = "kibana_healthcheck.sh"

Expand Down
75 changes: 74 additions & 1 deletion internal/stack/serverless.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ func (sp *serverlessProvider) createProject(settings projectSettings, options Op
}
project.Endpoints.Fleet = config.Parameters[paramServerlessFleetURL]

err = project.AddLogstashFleetOutput(sp.kibanaClient)
if err != nil {
return Config{}, fmt.Errorf("failed to add logstash fleet output: %w", err)
}

printUserConfig(options.Printer, config)

// update config with latest updates (e.g. fleet server url)
Expand Down Expand Up @@ -145,6 +150,11 @@ func (sp *serverlessProvider) currentProjectWithClientsAndFleetEndpoint(config C
}
project.Endpoints.Fleet = fleetURL

err = project.AddLogstashFleetOutput(sp.kibanaClient)
if err != nil {
return nil, fmt.Errorf("failed to add logstash fleet output: %w", err)
}

return project, nil
}

Expand Down Expand Up @@ -242,6 +252,11 @@ func (sp *serverlessProvider) BootUp(options Options) error {
return fmt.Errorf("serverless project type not supported: %s", settings.Type)
}

logstashEnabled := false
if options.Profile.Config("stack.logstash_enabled", "false") == "true" {
logstashEnabled = true
}

var project *serverless.Project

project, err = sp.currentProject(config)
Expand All @@ -261,7 +276,8 @@ func (sp *serverlessProvider) BootUp(options Options) error {
}

logger.Infof("Creating agent policy")
err = project.CreateAgentPolicy(options.StackVersion, sp.kibanaClient)
err = project.CreateAgentPolicy(options.StackVersion, sp.kibanaClient, logstashEnabled)

if err != nil {
return fmt.Errorf("failed to create agent policy: %w", err)
}
Expand All @@ -279,6 +295,14 @@ func (sp *serverlessProvider) BootUp(options Options) error {
return fmt.Errorf("failed to start local agent: %w", err)
}

if logstashEnabled {
logger.Infof("Starting local logstash")
err = sp.startLocalLogstash(options, config)
if err != nil {
return fmt.Errorf("failed to start local logstash: %w", err)
}
}

return nil
}

Expand All @@ -291,6 +315,11 @@ func (sp *serverlessProvider) localAgentComposeProject() (*compose.Project, erro
return compose.NewProject(sp.composeProjectName(), composeFile)
}

func (sp *serverlessProvider) localLogstashComposeProject() (*compose.Project, error) {
composeFile := sp.profile.Path(profileStackPath, LogstashComposeFile)
return compose.NewProject(sp.composeProjectName(), composeFile)
}

func (sp *serverlessProvider) startLocalAgent(options Options, config Config) error {
err := applyServerlessResources(sp.profile, options.StackVersion, config)
if err != nil {
Expand Down Expand Up @@ -325,6 +354,30 @@ func (sp *serverlessProvider) startLocalAgent(options Options, config Config) er
return nil
}

func (sp *serverlessProvider) startLocalLogstash(options Options, config Config) error {
err := applyServerlessResources(sp.profile, options.StackVersion, config)
if err != nil {
return fmt.Errorf("could not initialize compose files for local logstash: %w", err)
}

project, err := sp.localLogstashComposeProject()
if err != nil {
return fmt.Errorf("could not initialize local logstash compose project")
}

err = project.Build(compose.CommandOptions{})
if err != nil {
return fmt.Errorf("failed to build images for local logstash: %w", err)
}

err = project.Up(compose.CommandOptions{ExtraArgs: []string{"-d"}})
if err != nil {
return fmt.Errorf("failed to start local logstash: %w", err)
}

return nil
}

func (sp *serverlessProvider) TearDown(options Options) error {
config, err := LoadConfig(sp.profile)
if err != nil {
Expand All @@ -339,6 +392,12 @@ func (sp *serverlessProvider) TearDown(options Options) error {
errs = fmt.Errorf("failed to destroy local agent: %w", err)
}

err = sp.destroyLocalLogstash()
if err != nil {
logger.Errorf("failed to destroy local logstash: %v", err)
errs = fmt.Errorf("failed to destroy local logstash: %w", err)
}

project, err := sp.currentProject(config)
if err != nil {
return fmt.Errorf("failed to find current project: %w", err)
Expand Down Expand Up @@ -371,6 +430,20 @@ func (sp *serverlessProvider) destroyLocalAgent() error {
return nil
}

func (sp *serverlessProvider) destroyLocalLogstash() error {
project, err := sp.localLogstashComposeProject()
if err != nil {
return fmt.Errorf("could not initialize local logstash compose project")
}

err = project.Down(compose.CommandOptions{})
if err != nil {
return fmt.Errorf("failed to destroy local logstash: %w", err)
}

return nil
}

func (sp *serverlessProvider) Update(options Options) error {
return fmt.Errorf("not implemented")
}
Expand Down
21 changes: 15 additions & 6 deletions internal/stack/serverlessresources.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ var (
Path: ElasticAgentEnvFile,
Content: staticSource.Template("_static/elastic-agent.env.tmpl"),
},
&resource.File{
Path: LogstashConfigFile,
Content: staticSource.Template("_static/logstash.conf.tmpl"),
},
&resource.File{
Path: LogstashComposeFile,
Content: staticSource.Template("_static/serverless-logstash.yml.tmpl"),
},
}
)

Expand All @@ -39,12 +47,13 @@ func applyServerlessResources(profile *profile.Profile, stackVersion string, con

resourceManager := resource.NewManager()
resourceManager.AddFacter(resource.StaticFacter{
"agent_version": stackVersion,
"agent_image": appConfig.StackImageRefs(stackVersion).ElasticAgent,
"username": config.ElasticsearchUsername,
"password": config.ElasticsearchPassword,
"kibana_host": config.KibanaHost,
"fleet_url": config.Parameters[paramServerlessFleetURL],
"agent_version": stackVersion,
"agent_image": appConfig.StackImageRefs(stackVersion).ElasticAgent,
"username": config.ElasticsearchUsername,
"password": config.ElasticsearchPassword,
"kibana_host": config.KibanaHost,
"fleet_url": config.Parameters[paramServerlessFleetURL],
"logstash_enabled": profile.Config("stack.logstash_enabled", "false"),
})

os.MkdirAll(stackDir, 0755)
Expand Down