Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logstash to serverless provider #1646

Merged
merged 16 commits into from
Feb 8, 2024
Merged
26 changes: 26 additions & 0 deletions internal/kibana/fleet.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ import (
"net/http"
)

type FleetOutput struct {
ID string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Hosts []string `json:"hosts,omitempty"`
Type string `json:"type,omitempty"`
}

// DefaultFleetServerURL returns the default Fleet server configured in Kibana
func (c *Client) DefaultFleetServerURL() (string, error) {
path := fmt.Sprintf("%s/fleet_server_hosts", FleetAPI)
Expand Down Expand Up @@ -43,3 +50,22 @@ func (c *Client) DefaultFleetServerURL() (string, error) {

return "", errors.New("could not find the fleet server URL for this project")
}

// AddFleetOutput adds an additional output to kibana eg., logstash
func (c *Client) AddFleetOutput(fo FleetOutput) error {
reqBody, err := json.Marshal(fo)
if err != nil {
return fmt.Errorf("could not convert fleetOutput (request) to JSON: %w", err)
}

statusCode, respBody, err := c.post(fmt.Sprintf("%s/outputs", FleetAPI), reqBody)
if err != nil {
return fmt.Errorf("could not create fleet output: %w", err)
}

if statusCode != http.StatusOK {
return fmt.Errorf("could not add fleet output; API status code = %d; response body = %s", statusCode, respBody)
}

return nil
}
22 changes: 21 additions & 1 deletion internal/serverless/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,21 @@ func (p *Project) DefaultFleetServerURL(kibanaClient *kibana.Client) (string, er
return fleetURL, nil
}

func (p *Project) AddLogstashFleetOutput(kibanaClient *kibana.Client) error {
logstashFleetOutput := kibana.FleetOutput{
Name: "logstash-output",
ID: "logstash-fleet-output",
Type: "logstash",
Hosts: []string{"logstash:5044"},
}

if err := kibanaClient.AddFleetOutput(logstashFleetOutput); err != nil {
return fmt.Errorf("failed to add logstash fleet output: %w", err)
}

return nil
}

func (p *Project) getESHealth(ctx context.Context, elasticsearchClient *elasticsearch.Client) error {
return elasticsearchClient.CheckHealth(ctx)
}
Expand Down Expand Up @@ -177,7 +192,7 @@ func (p *Project) getFleetHealth(ctx context.Context) error {
return nil
}

func (p *Project) CreateAgentPolicy(stackVersion string, kibanaClient *kibana.Client) error {
func (p *Project) CreateAgentPolicy(stackVersion string, kibanaClient *kibana.Client, logstashEnabled bool) error {
systemPackages, err := registry.Production.Revisions("system", registry.SearchOptions{
KibanaVersion: strings.TrimSuffix(stackVersion, kibana.SNAPSHOT_SUFFIX),
})
Expand All @@ -196,6 +211,11 @@ func (p *Project) CreateAgentPolicy(stackVersion string, kibanaClient *kibana.Cl
Namespace: "default",
MonitoringEnabled: []string{"logs", "metrics"},
}

if logstashEnabled {
policy.DataOutputID = "logstash-fleet-output"
}

newPolicy, err := kibanaClient.CreatePolicy(policy)
if err != nil {
return fmt.Errorf("error while creating agent policy: %w", err)
Expand Down
1 change: 1 addition & 0 deletions internal/stack/_static/docker-compose-stack.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ services:
env_file: "./elastic-agent.env"
volumes:
- "../certs/ca-cert.pem:/etc/ssl/certs/elastic-package.pem"
- "../certs/logstash/ca-cert.pem:/etc/ssl/certs/logstash.pem"
- type: bind
source: ../../../tmp/service_logs/
target: /tmp/service_logs/
Expand Down
2 changes: 1 addition & 1 deletion internal/stack/_static/logstash.conf.tmpl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
input {
elastic_agent {
port => 5044
ssl_enabled => false
ssl_enabled => true
ssl_certificate_authorities => ["/usr/share/logstash/config/certs/ca-cert.pem"]
ssl_certificate => "/usr/share/logstash/config/certs/cert.pem"
ssl_key => "/usr/share/logstash/config/certs/key.pem"
Expand Down
56 changes: 56 additions & 0 deletions internal/stack/_static/serverless-docker-compose.yml.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
version: '2.3'
services:
elastic-agent:
image: "{{ fact "agent_image" }}"
healthcheck:
test: "elastic-agent status"
timeout: 2s
start_period: 360s
retries: 180
interval: 5s
hostname: docker-fleet-agent
env_file: "./elastic-agent.env"
volumes:
- type: bind
source: ../../../tmp/service_logs/
target: /tmp/service_logs/
# Mount service_logs under /run too as a testing workaround for the journald input (see elastic-package#1235).
- type: bind
source: ../../../tmp/service_logs/
target: /run/service_logs/
- "../certs/logstash/ca-cert.pem:/etc/ssl/certs/logstash.pem"

elastic-agent_is_ready:
image: tianon/true
depends_on:
elastic-agent:
condition: service_healthy

{{ $logstash_enabled := fact "logstash_enabled" }}
{{ if eq $logstash_enabled "true" }}
logstash:
image: "{{ fact "logstash_image" }}"
healthcheck:
test: bin/logstash -t
interval: 60s
timeout: 50s
retries: 5
command: bash -c 'if [[ ! $(bin/logstash-plugin list) == *"logstash-filter-elastic_integration"* ]]; then echo "Missing plugin logstash-filter-elastic_integration, installing now" && bin/logstash-plugin install logstash-filter-elastic_integration; fi && bin/logstash -f /usr/share/logstash/pipeline/logstash.conf'
volumes:
- "../certs/logstash:/usr/share/logstash/config/certs"
- "./logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro"
ports:
- "127.0.0.1:5044:5044"
- "127.0.0.1:9600:9600"
environment:
- xpack.monitoring.enabled=false
- ELASTIC_USER={{ fact "username" }}
- ELASTIC_PASSWORD={{ fact "password" }}
- ELASTIC_HOSTS={{ fact "elasticsearch_host" }}

logstash_is_ready:
image: tianon/true
depends_on:
logstash:
condition: service_healthy
{{ end }}
26 changes: 0 additions & 26 deletions internal/stack/_static/serverless-elastic-agent.yml.tmpl

This file was deleted.

33 changes: 33 additions & 0 deletions internal/stack/_static/serverless-logstash.conf.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
input {
elastic_agent {
port => 5044
ssl_enabled => true
ssl_certificate_authorities => ["/usr/share/logstash/config/certs/ca-cert.pem"]
ssl_certificate => "/usr/share/logstash/config/certs/cert.pem"
ssl_key => "/usr/share/logstash/config/certs/key.pem"
}
}


filter {
elastic_integration {
remove_field => ['@version']
hosts => ["{{ fact "elasticsearch_host" }}"]
username => {{ fact "username" }}
password => {{ fact "password" }}
ssl_enabled => true
ssl_verification_mode => "full"
}
}


output {
elasticsearch {
hosts => ["{{ fact "elasticsearch_host" }}"]
user => {{ fact "username" }}
password => {{ fact "password" }}
ssl_enabled => true
data_stream => "true"
document_id => "%{[@metadata][_ingest_document][id]}"
}
}
6 changes: 5 additions & 1 deletion internal/stack/certs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ var tlsServices = []string{
"logstash",
}

var tlsServicesServerless = []string{
"logstash",
}

var (
// CertificatesDirectory is the path to the certificates directory inside a profile.
CertificatesDirectory = "certs"
Expand All @@ -43,7 +47,7 @@ var (
// initTLSCertificates initializes all the certificates needed to run the services
// managed by elastic-package stack. It includes a CA, and a pair of keys and
// certificates for each service.
func initTLSCertificates(fileProvider string, profilePath string) ([]resource.Resource, error) {
func initTLSCertificates(fileProvider string, profilePath string, tlsServices []string) ([]resource.Resource, error) {
certsDir := filepath.Join(profilePath, CertificatesDirectory)
caCertFile := filepath.Join(profilePath, string(CACertificateFile))
caKeyFile := filepath.Join(profilePath, string(CAKeyFile))
Expand Down
4 changes: 2 additions & 2 deletions internal/stack/certs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestTLSCertsInitialization(t *testing.T) {
assert.Error(t, verifyTLSCertificates(caCertFile, caCertFile, caKeyFile, ""))

providerName := "test-file"
resources, err := initTLSCertificates(providerName, profilePath)
resources, err := initTLSCertificates(providerName, profilePath, tlsServices)
require.NoError(t, err)

resourceManager := resource.NewManager()
Expand Down Expand Up @@ -54,7 +54,7 @@ func TestTLSCertsInitialization(t *testing.T) {
assert.Error(t, verifyTLSCertificates(caCertFile, serviceCertFile, serviceKeyFile, service))

// Check it is created again and is validated by the same CA.
resources, err := initTLSCertificates(providerName, profilePath)
resources, err := initTLSCertificates(providerName, profilePath, tlsServices)
require.NoError(t, err)
_, err = resourceManager.Apply(resources)
require.NoError(t, err)
Expand Down
7 changes: 4 additions & 3 deletions internal/stack/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,9 @@ func applyResources(profile *profile.Profile, stackVersion string) error {
"kibana_version": stackVersion,
"agent_version": stackVersion,

"kibana_host": "https://kibana:5601",
"fleet_url": "https://fleet-server:8220",
"kibana_host": "https://kibana:5601",
"fleet_url": "https://fleet-server:8220",
"elasticsearch_host": "https://elasticsearch:9200",

"username": elasticsearchUsername,
"password": elasticsearchPassword,
Expand All @@ -144,7 +145,7 @@ func applyResources(profile *profile.Profile, stackVersion string) error {
resourceManager.RegisterProvider("certs", &resource.FileProvider{
Prefix: profile.ProfilePath,
})
certResources, err := initTLSCertificates("certs", profile.ProfilePath)
certResources, err := initTLSCertificates("certs", profile.ProfilePath, tlsServices)
if err != nil {
return fmt.Errorf("failed to create TLS files: %w", err)
}
Expand Down
Loading