Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logstash to serverless provider #1646

Merged
merged 16 commits into from
Feb 8, 2024
Merged
26 changes: 26 additions & 0 deletions internal/kibana/fleet.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ import (
"net/http"
)

type FleetOutput struct {
ID string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Hosts []string `json:"hosts,omitempty"`
Type string `json:"type,omitempty"`
}

// DefaultFleetServerURL returns the default Fleet server configured in Kibana
func (c *Client) DefaultFleetServerURL() (string, error) {
path := fmt.Sprintf("%s/fleet_server_hosts", FleetAPI)
Expand Down Expand Up @@ -43,3 +50,22 @@ func (c *Client) DefaultFleetServerURL() (string, error) {

return "", errors.New("could not find the fleet server URL for this project")
}

// AddFleetOutput adds an additional output to kibana eg., logstash
func (c *Client) AddFleetOutput(fo FleetOutput) error {
reqBody, err := json.Marshal(fo)
if err != nil {
return fmt.Errorf("could not convert fleetOutput (request) to JSON: %w", err)
}

statusCode, respBody, err := c.post(fmt.Sprintf("%s/outputs", FleetAPI), reqBody)
if err != nil {
return fmt.Errorf("could not create fleet output: %w", err)
}

if statusCode != http.StatusOK {
return fmt.Errorf("could not get status data; API status code = %d; response body = %s", statusCode, respBody)
}

return nil
}
22 changes: 21 additions & 1 deletion internal/serverless/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,21 @@ func (p *Project) DefaultFleetServerURL(kibanaClient *kibana.Client) (string, er
return fleetURL, nil
}

func (p *Project) AddLogstashFleetOutput(kibanaClient *kibana.Client) error {
logstashFleetOutput := kibana.FleetOutput{
Name: "logstash-output",
ID: "logstash-fleet-output",
Type: "logstash",
Hosts: []string{"logstash:5044"},
}

if err := kibanaClient.AddFleetOutput(logstashFleetOutput); err != nil {
return fmt.Errorf("failed to add logstash fleet output: %w", err)
}

return nil
}

func (p *Project) getESHealth(ctx context.Context, elasticsearchClient *elasticsearch.Client) error {
return elasticsearchClient.CheckHealth(ctx)
}
Expand Down Expand Up @@ -177,7 +192,7 @@ func (p *Project) getFleetHealth(ctx context.Context) error {
return nil
}

func (p *Project) CreateAgentPolicy(stackVersion string, kibanaClient *kibana.Client) error {
func (p *Project) CreateAgentPolicy(stackVersion string, kibanaClient *kibana.Client, logstashEnabled bool) error {
systemPackages, err := registry.Production.Revisions("system", registry.SearchOptions{
KibanaVersion: strings.TrimSuffix(stackVersion, kibana.SNAPSHOT_SUFFIX),
})
Expand All @@ -196,6 +211,11 @@ func (p *Project) CreateAgentPolicy(stackVersion string, kibanaClient *kibana.Cl
Namespace: "default",
MonitoringEnabled: []string{"logs", "metrics"},
}

if logstashEnabled {
policy.DataOutputID = "fleet-logstash-output"
}

newPolicy, err := kibanaClient.CreatePolicy(policy)
if err != nil {
return fmt.Errorf("error while creating agent policy: %w", err)
Expand Down
61 changes: 61 additions & 0 deletions internal/stack/_static/serverless-docker-compose.yml.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
version: '2.3'
services:
elastic-agent:
image: "{{ fact "agent_image" }}"
healthcheck:
test: "elastic-agent status"
timeout: 2s
start_period: 360s
retries: 180
interval: 5s
hostname: docker-fleet-agent
env_file: "./elastic-agent.env"
volumes:
- type: bind
source: ../../../tmp/service_logs/
target: /tmp/service_logs/
# Mount service_logs under /run too as a testing workaround for the journald input (see elastic-package#1235).
- type: bind
source: ../../../tmp/service_logs/
target: /run/service_logs/

elastic-agent_is_ready:
image: tianon/true
depends_on:
elastic-agent:
condition: service_healthy

{{ $logstash_enabled := fact "logstash_enabled" }}
{{ if eq $logstash_enabled "true" }}
logstash:
depends_on:
elasticsearch:
condition: service_healthy
kibana:
condition: service_healthy
image: ${LOGSTASH_IMAGE_REF}
healthcheck:
test: bin/logstash -t
interval: 60s
timeout: 50s
retries: 5
command: bash -c 'if [[ ! $(bin/logstash-plugin list) == *"logstash-filter-elastic_integration"* ]]; then echo "Missing plugin logstash-filter-elastic_integration, installing now" && bin/logstash-plugin install logstash-filter-elastic_integration; fi && bin/logstash -f /usr/share/logstashpipeline/ logstash.conf'
volumes:
- "../certs/logstash:/usr/share/logstash/config/certs"
- "../certs/elasticsearch/cert.pem:/usr/share/logstash/config/certs/elasticsearchpem"
- "./logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro"
ports:
- "127.0.0.1:5044:5044"
- "127.0.0.1:9600:9600"
environment:
- xpack.monitoring.enabled=false
- ELASTIC_USER=elastic
- ELASTIC_PASSWORD=changeme
- ELASTIC_HOSTS=https://127.0.0.1:9200

logstash_is_ready:
image: tianon/true
depends_on:
logstash:
condition: service_healthy
{{ end }}
26 changes: 0 additions & 26 deletions internal/stack/_static/serverless-elastic-agent.yml.tmpl

This file was deleted.

45 changes: 28 additions & 17 deletions internal/stack/serverless.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ func (sp *serverlessProvider) createProject(settings projectSettings, options Op
}
project.Endpoints.Fleet = config.Parameters[paramServerlessFleetURL]

err = project.AddLogstashFleetOutput(sp.kibanaClient)
if err != nil {
return Config{}, fmt.Errorf("failed to add logstash fleet output: %w", err)
}

printUserConfig(options.Printer, config)

// update config with latest updates (e.g. fleet server url)
Expand Down Expand Up @@ -242,6 +247,11 @@ func (sp *serverlessProvider) BootUp(options Options) error {
return fmt.Errorf("serverless project type not supported: %s", settings.Type)
}

logstashEnabled := false
if options.Profile.Config("stack.logstash_enabled", "false") == "true" {
logstashEnabled = true
}

var project *serverless.Project

project, err = sp.currentProject(config)
Expand All @@ -261,7 +271,8 @@ func (sp *serverlessProvider) BootUp(options Options) error {
}

logger.Infof("Creating agent policy")
err = project.CreateAgentPolicy(options.StackVersion, sp.kibanaClient)
err = project.CreateAgentPolicy(options.StackVersion, sp.kibanaClient, logstashEnabled)

if err != nil {
return fmt.Errorf("failed to create agent policy: %w", err)
}
Expand All @@ -273,10 +284,10 @@ func (sp *serverlessProvider) BootUp(options Options) error {
printUserConfig(options.Printer, config)
}

logger.Infof("Starting local agent")
err = sp.startLocalAgent(options, config)
logger.Infof("Starting local services")
err = sp.startLocalServices(options, config)
if err != nil {
return fmt.Errorf("failed to start local agent: %w", err)
return fmt.Errorf("failed to start local service: %w", err)
}

return nil
Expand All @@ -286,25 +297,25 @@ func (sp *serverlessProvider) composeProjectName() string {
return DockerComposeProjectName(sp.profile)
}

func (sp *serverlessProvider) localAgentComposeProject() (*compose.Project, error) {
func (sp *serverlessProvider) localServiceComposeProject() (*compose.Project, error) {
composeFile := sp.profile.Path(profileStackPath, SnapshotFile)
return compose.NewProject(sp.composeProjectName(), composeFile)
}

func (sp *serverlessProvider) startLocalAgent(options Options, config Config) error {
func (sp *serverlessProvider) startLocalServices(options Options, config Config) error {
err := applyServerlessResources(sp.profile, options.StackVersion, config)
if err != nil {
return fmt.Errorf("could not initialize compose files for local agent: %w", err)
return fmt.Errorf("could not initialize compose files for local service: %w", err)
}

project, err := sp.localAgentComposeProject()
project, err := sp.localServiceComposeProject()
if err != nil {
return fmt.Errorf("could not initialize local agent compose project")
return fmt.Errorf("could not initialize local service compose project")
}

err = project.Build(compose.CommandOptions{})
if err != nil {
return fmt.Errorf("failed to build images for local agent: %w", err)
return fmt.Errorf("failed to build images for local service: %w", err)
}

err = project.Up(compose.CommandOptions{ExtraArgs: []string{"-d"}})
Expand Down Expand Up @@ -333,10 +344,10 @@ func (sp *serverlessProvider) TearDown(options Options) error {

var errs error

err = sp.destroyLocalAgent()
err = sp.destroyLocalService()
if err != nil {
logger.Errorf("failed to destroy local agent: %v", err)
errs = fmt.Errorf("failed to destroy local agent: %w", err)
logger.Errorf("failed to destroy local service: %v", err)
errs = fmt.Errorf("failed to destroy local service: %w", err)
}

project, err := sp.currentProject(config)
Expand All @@ -357,15 +368,15 @@ func (sp *serverlessProvider) TearDown(options Options) error {
return errs
}

func (sp *serverlessProvider) destroyLocalAgent() error {
project, err := sp.localAgentComposeProject()
func (sp *serverlessProvider) destroyLocalService() error {
project, err := sp.localServiceComposeProject()
if err != nil {
return fmt.Errorf("could not initialize local agent compose project")
return fmt.Errorf("could not initialize local services compose project")
}

err = project.Down(compose.CommandOptions{})
if err != nil {
return fmt.Errorf("failed to destroy local agent: %w", err)
return fmt.Errorf("failed to destroy local services: %w", err)
}

return nil
Expand Down
19 changes: 12 additions & 7 deletions internal/stack/serverlessresources.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@ var (
serverlessStackResources = []resource.Resource{
&resource.File{
Path: SnapshotFile,
Content: staticSource.Template("_static/serverless-elastic-agent.yml.tmpl"),
Content: staticSource.Template("_static/serverless-docker-compose.yml.tmpl"),
},
&resource.File{
Path: ElasticAgentEnvFile,
Content: staticSource.Template("_static/elastic-agent.env.tmpl"),
},
&resource.File{
Path: LogstashConfigFile,
Content: staticSource.Template("_static/logstash.conf.tmpl"),
},
}
)

Expand All @@ -39,12 +43,13 @@ func applyServerlessResources(profile *profile.Profile, stackVersion string, con

resourceManager := resource.NewManager()
resourceManager.AddFacter(resource.StaticFacter{
"agent_version": stackVersion,
"agent_image": appConfig.StackImageRefs(stackVersion).ElasticAgent,
"username": config.ElasticsearchUsername,
"password": config.ElasticsearchPassword,
"kibana_host": config.KibanaHost,
"fleet_url": config.Parameters[paramServerlessFleetURL],
"agent_version": stackVersion,
"agent_image": appConfig.StackImageRefs(stackVersion).ElasticAgent,
"username": config.ElasticsearchUsername,
"password": config.ElasticsearchPassword,
"kibana_host": config.KibanaHost,
"fleet_url": config.Parameters[paramServerlessFleetURL],
"logstash_enabled": profile.Config("stack.logstash_enabled", "false"),
})

os.MkdirAll(stackDir, 0755)
Expand Down