Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logstash to serverless provider #1646

Merged
merged 16 commits into from
Feb 8, 2024
Merged
23 changes: 19 additions & 4 deletions internal/certs/certs.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func LoadCA(certFile, keyFile string) (*Issuer, error) {
}

func newCA(parent *Issuer) (*Issuer, error) {
cert, err := New(true, parent)
cert, err := New(true, false, parent)
if err != nil {
return nil, err
}
Expand All @@ -115,13 +115,19 @@ func (i *Issuer) IssueIntermediate() (*Issuer, error) {
// Issue issues a certificate with the given options. This certificate
// can be used to configure a TLS server.
func (i *Issuer) Issue(opts ...Option) (*Certificate, error) {
return New(false, i, opts...)
return New(false, false, i, opts...)
}

// IssueClient issues a certificate with the given options. This certificate
// can be used to configure a TLS client.
func (i *Issuer) IssueClient(opts ...Option) (*Certificate, error) {
return New(false, true, i, opts...)
}

// NewSelfSignedCert issues a self-signed certificate with the given options.
// This certificate can be used to configure a TLS server.
func NewSelfSignedCert(opts ...Option) (*Certificate, error) {
return New(false, nil, opts...)
return New(false, false, nil, opts...)
}

// Option is a function that can modify a certificate template. To be used
Expand All @@ -140,7 +146,7 @@ func WithName(name string) Option {

// New is the main helper to create a certificate, it is recommended to
// use the more specific ones for specific use cases.
func New(isCA bool, issuer *Issuer, opts ...Option) (*Certificate, error) {
func New(isCA, isClient bool, issuer *Issuer, opts ...Option) (*Certificate, error) {
key, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
if err != nil {
return nil, fmt.Errorf("failed to generate key: %w", err)
Expand Down Expand Up @@ -172,6 +178,15 @@ func New(isCA bool, issuer *Issuer, opts ...Option) (*Certificate, error) {
} else {
template.Subject.CommonName = "intermediate elastic-package CA"
}
// If the requester is a client we set clientAuth instead
} else if isClient {
template.ExtKeyUsage = []x509.ExtKeyUsage{
x509.ExtKeyUsageClientAuth,
}

// Include local hostname and ips as alternates in service certificates.
template.DNSNames = []string{"localhost"}
template.IPAddresses = []net.IP{net.ParseIP("127.0.0.1"), net.ParseIP("::1")}
} else {
template.ExtKeyUsage = []x509.ExtKeyUsage{
// Required for Chrome in OSX to show the "Proceed anyway" link.
Expand Down
53 changes: 53 additions & 0 deletions internal/kibana/fleet.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,20 @@ import (
"net/http"
)

type FleetOutput struct {
ID string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Hosts []string `json:"hosts,omitempty"`
Type string `json:"type,omitempty"`
SSL *AgentSSL `json:"ssl,omitempty"`
}

type AgentSSL struct {
Ca_authorities []string `json:"certificate_authorities,omitempty"`
Certificate string `json:"certificate,omitempty"`
Key string `json:"key,omitempty"`
}

// DefaultFleetServerURL returns the default Fleet server configured in Kibana
func (c *Client) DefaultFleetServerURL() (string, error) {
path := fmt.Sprintf("%s/fleet_server_hosts", FleetAPI)
Expand Down Expand Up @@ -43,3 +57,42 @@ func (c *Client) DefaultFleetServerURL() (string, error) {

return "", errors.New("could not find the fleet server URL for this project")
}

// UpdateFleetOutput updates an existing output to fleet
// For example, to update ssl certificates etc.,
func (c *Client) UpdateFleetOutput(fo FleetOutput, outputId string) error {
reqBody, err := json.Marshal(fo)
if err != nil {
return fmt.Errorf("could not convert fleetOutput (request) to JSON: %w", err)
}

statusCode, respBody, err := c.put(fmt.Sprintf("%s/outputs/%s", FleetAPI, outputId), reqBody)
if err != nil {
return fmt.Errorf("could not update fleet output: %w", err)
}

if statusCode != http.StatusOK {
return fmt.Errorf("could not update fleet output; API status code = %d; response body = %s", statusCode, respBody)
}

return nil
}

// AddFleetOutput adds an additional output to fleet eg., logstash
func (c *Client) AddFleetOutput(fo FleetOutput) error {
reqBody, err := json.Marshal(fo)
if err != nil {
return fmt.Errorf("could not convert fleetOutput (request) to JSON: %w", err)
}

statusCode, respBody, err := c.post(fmt.Sprintf("%s/outputs", FleetAPI), reqBody)
if err != nil {
return fmt.Errorf("could not create fleet output: %w", err)
}

if statusCode != http.StatusOK {
return fmt.Errorf("could not add fleet output; API status code = %d; response body = %s", statusCode, respBody)
}

return nil
}
57 changes: 56 additions & 1 deletion internal/serverless/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ import (
"io"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
"time"

"github.com/elastic/elastic-package/internal/elasticsearch"
"github.com/elastic/elastic-package/internal/kibana"
"github.com/elastic/elastic-package/internal/logger"
"github.com/elastic/elastic-package/internal/profile"
"github.com/elastic/elastic-package/internal/registry"
)

Expand Down Expand Up @@ -131,6 +134,53 @@ func (p *Project) DefaultFleetServerURL(kibanaClient *kibana.Client) (string, er
return fleetURL, nil
}

func (p *Project) AddLogstashFleetOutput(profile *profile.Profile, kibanaClient *kibana.Client) error {
logstashFleetOutput := kibana.FleetOutput{
Name: "logstash-output",
ID: "fleet-logstash-output",
Type: "logstash",
Hosts: []string{"logstash:5044"},
}

if err := kibanaClient.AddFleetOutput(logstashFleetOutput); err != nil {
return fmt.Errorf("failed to add logstash fleet output: %w", err)
}

return nil
}

func (p *Project) UpdateLogstashFleetOutput(profile *profile.Profile, kibanaClient *kibana.Client) error {
certsDir := filepath.Join(profile.ProfilePath, "certs", "elastic-agent")

caFile, err := os.ReadFile(filepath.Join(certsDir, "ca-cert.pem"))
if err != nil {
return fmt.Errorf("failed to read ca certificate: %w", err)
}

certFile, err := os.ReadFile(filepath.Join(certsDir, "cert.pem"))
if err != nil {
return fmt.Errorf("failed to read client certificate: %w", err)
}

keyFile, err := os.ReadFile(filepath.Join(certsDir, "key.pem"))
if err != nil {
return fmt.Errorf("failed to read client certificate public key: %w", err)
}

logstashFleetOutput := kibana.FleetOutput{
SSL: &kibana.AgentSSL{
Ca_authorities: []string{string(caFile)},
Certificate: string(certFile),
Key: string(keyFile)},
}

if err := kibanaClient.UpdateFleetOutput(logstashFleetOutput, "fleet-logstash-output"); err != nil {
return fmt.Errorf("failed to update logstash fleet output: %w", err)
}

return nil
}

func (p *Project) getESHealth(ctx context.Context, elasticsearchClient *elasticsearch.Client) error {
return elasticsearchClient.CheckHealth(ctx)
}
Expand Down Expand Up @@ -177,7 +227,7 @@ func (p *Project) getFleetHealth(ctx context.Context) error {
return nil
}

func (p *Project) CreateAgentPolicy(stackVersion string, kibanaClient *kibana.Client) error {
func (p *Project) CreateAgentPolicy(stackVersion string, kibanaClient *kibana.Client, logstashEnabled bool) error {
systemPackages, err := registry.Production.Revisions("system", registry.SearchOptions{
KibanaVersion: strings.TrimSuffix(stackVersion, kibana.SNAPSHOT_SUFFIX),
})
Expand All @@ -196,6 +246,11 @@ func (p *Project) CreateAgentPolicy(stackVersion string, kibanaClient *kibana.Cl
Namespace: "default",
MonitoringEnabled: []string{"logs", "metrics"},
}

if logstashEnabled {
policy.DataOutputID = "fleet-logstash-output"
}

newPolicy, err := kibanaClient.CreatePolicy(policy)
if err != nil {
return fmt.Errorf("error while creating agent policy: %w", err)
Expand Down
1 change: 1 addition & 0 deletions internal/stack/_static/docker-compose-stack.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ services:
env_file: "./elastic-agent.env"
volumes:
- "../certs/ca-cert.pem:/etc/ssl/certs/elastic-package.pem"
- "../certs/logstash/ca-cert.pem:/etc/ssl/certs/logstash.pem"
- type: bind
source: ../../../tmp/service_logs/
target: /tmp/service_logs/
Expand Down
56 changes: 56 additions & 0 deletions internal/stack/_static/serverless-docker-compose.yml.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
version: '2.3'
services:
elastic-agent:
image: "{{ fact "agent_image" }}"
healthcheck:
test: "elastic-agent status"
timeout: 2s
start_period: 360s
retries: 180
interval: 5s
hostname: docker-fleet-agent
env_file: "./elastic-agent.env"
volumes:
- type: bind
source: ../../../tmp/service_logs/
target: /tmp/service_logs/
# Mount service_logs under /run too as a testing workaround for the journald input (see elastic-package#1235).
- type: bind
source: ../../../tmp/service_logs/
target: /run/service_logs/
- "../certs/logstash/ca-cert.pem:/etc/ssl/certs/logstash.pem"

elastic-agent_is_ready:
image: tianon/true
depends_on:
elastic-agent:
condition: service_healthy

{{ $logstash_enabled := fact "logstash_enabled" }}
{{ if eq $logstash_enabled "true" }}
logstash:
image: "{{ fact "logstash_image" }}"
healthcheck:
test: bin/logstash -t
interval: 60s
timeout: 50s
retries: 5
command: bash -c 'openssl pkcs8 -inform PEM -in /usr/share/logstash/config/certs/key.pem -topk8 -nocrypt -outform PEM -out /usr/share/logstash/config/certs/logstash.pkcs8.key && chmod 777 /usr/share/logstash/config/certs/logstash.pkcs8.key && if [[ ! $(bin/logstash-plugin list) == *"logstash-filter-elastic_integration"* ]]; then echo "Missing plugin logstash-filter-elastic_integration, installing now" && bin/logstash-plugin install logstash-filter-elastic_integration; fi && bin/logstash -f /usr/share/logstash/pipeline/logstash.conf'
volumes:
- "../certs/logstash:/usr/share/logstash/config/certs"
- "./logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro"
ports:
- "127.0.0.1:5044:5044"
- "127.0.0.1:9600:9600"
environment:
- xpack.monitoring.enabled=false
- ELASTIC_USER={{ fact "username" }}
- ELASTIC_PASSWORD={{ fact "password" }}
- ELASTIC_HOSTS={{ fact "elasticsearch_host" }}

logstash_is_ready:
image: tianon/true
depends_on:
logstash:
condition: service_healthy
{{ end }}
26 changes: 0 additions & 26 deletions internal/stack/_static/serverless-elastic-agent.yml.tmpl

This file was deleted.

33 changes: 33 additions & 0 deletions internal/stack/_static/serverless-logstash.conf.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
input {
elastic_agent {
port => 5044
ssl_enabled => true
ssl_certificate_authorities => ["/usr/share/logstash/config/certs/ca-cert.pem"]
ssl_certificate => "/usr/share/logstash/config/certs/cert.pem"
ssl_key => "/usr/share/logstash/config/certs/logstash.pkcs8.key"
ssl_client_authentication => "required"
}
}


filter {
elastic_integration {
remove_field => ['@version']
hosts => ["{{ fact "elasticsearch_host" }}"]
username => '{{ fact "username" }}'
password => '{{ fact "password" }}'
ssl_enabled => true
ssl_verification_mode => "full"
}
}


output {
elasticsearch {
hosts => ["{{ fact "elasticsearch_host" }}"]
user => '{{ fact "username" }}'
password => '{{ fact "password" }}'
ssl_enabled => true
data_stream => "true"
}
}
22 changes: 18 additions & 4 deletions internal/stack/certs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ var tlsServices = []string{
"logstash",
}

var tlsServicesServerless = []string{
"logstash",
"elastic-agent",
}

var (
// CertificatesDirectory is the path to the certificates directory inside a profile.
CertificatesDirectory = "certs"
Expand All @@ -43,7 +48,7 @@ var (
// initTLSCertificates initializes all the certificates needed to run the services
// managed by elastic-package stack. It includes a CA, and a pair of keys and
// certificates for each service.
func initTLSCertificates(fileProvider string, profilePath string) ([]resource.Resource, error) {
func initTLSCertificates(fileProvider string, profilePath string, tlsServices []string) ([]resource.Resource, error) {
certsDir := filepath.Join(profilePath, CertificatesDirectory)
caCertFile := filepath.Join(profilePath, string(CACertificateFile))
caKeyFile := filepath.Join(profilePath, string(CAKeyFile))
Expand Down Expand Up @@ -140,9 +145,18 @@ func initServiceTLSCertificates(ca *certs.Issuer, caCertFile string, certFile, k
return certs.LoadCertificate(certFile, keyFile)
}

cert, err := ca.Issue(certs.WithName(service))
if err != nil {
return nil, fmt.Errorf("error initializing certificate for %q", service)
var cert *certs.Certificate
var err error
if service == "elastic-agent" {
cert, err = ca.IssueClient(certs.WithName(service))
if err != nil {
return nil, fmt.Errorf("error initializing certificate for %q", service)
}
} else {
cert, err = ca.Issue(certs.WithName(service))
if err != nil {
return nil, fmt.Errorf("error initializing certificate for %q", service)
}
}

return cert, nil
Expand Down
4 changes: 2 additions & 2 deletions internal/stack/certs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestTLSCertsInitialization(t *testing.T) {
assert.Error(t, verifyTLSCertificates(caCertFile, caCertFile, caKeyFile, ""))

providerName := "test-file"
resources, err := initTLSCertificates(providerName, profilePath)
resources, err := initTLSCertificates(providerName, profilePath, tlsServices)
require.NoError(t, err)

resourceManager := resource.NewManager()
Expand Down Expand Up @@ -54,7 +54,7 @@ func TestTLSCertsInitialization(t *testing.T) {
assert.Error(t, verifyTLSCertificates(caCertFile, serviceCertFile, serviceKeyFile, service))

// Check it is created again and is validated by the same CA.
resources, err := initTLSCertificates(providerName, profilePath)
resources, err := initTLSCertificates(providerName, profilePath, tlsServices)
require.NoError(t, err)
_, err = resourceManager.Apply(resources)
require.NoError(t, err)
Expand Down
Loading