Skip to content

Commit

Permalink
Generalize dump stack function and fix issue when checking elasticsea…
Browse files Browse the repository at this point in the history
…rch logs (#1809)

Merge the multiple functionality to dump logs into a single `Dump` method, and add
different implementations for each stack provider.
`Dump` can be used now to dump logs to file or to variable, and to collect logs only
since a given time. This replaces `GetServiceLogs`.
`Dump` implementation in the serverless provider returns an specific error telling that
this functionality is not implemented for any service except elastic-package. This is
used to fix a regression that was producing failures when trying to get logs from
elasticsearch.

Some errors that were previously only logged or ignored are returned and raised now.
  • Loading branch information
jsoriano authored May 2, 2024
1 parent 45b0921 commit eba9ed5
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 70 deletions.
111 changes: 71 additions & 40 deletions internal/stack/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"os"
"path/filepath"
"slices"
"time"

"github.com/elastic/elastic-package/internal/logger"
Expand All @@ -22,82 +23,112 @@ const (

// DumpOptions defines dumping options for Elatic stack data.
type DumpOptions struct {
Output string
Profile *profile.Profile
}

func GetServiceLogs(ctx context.Context, serviceName string, profile *profile.Profile, since time.Time) ([]byte, error) {
services, err := localServiceNames(DockerComposeProjectName(profile))
if err != nil {
return nil, fmt.Errorf("failed to get local services: %w", err)
}
// Output is the path where the logs are copied. If not defined, logs are only returned as part of the dump results.
Output string

for _, service := range services {
if service != serviceName {
continue
}
// Services is the list of services to get the logs from. If not defined, logs from all available services are dumped.
Services []string

return dockerComposeLogsSince(ctx, serviceName, profile, since)
}
// Since is the time to dump logs from.
Since time.Time
}

return nil, fmt.Errorf("service %s not found in local services", serviceName)
// DumpResult contains the result of a dump operation.
type DumpResult struct {
ServiceName string
Logs []byte
LogsFile string
InternalLogsDir string
}

// Dump function exports stack data and dumps them as local artifacts, which can be used for debug purposes.
func Dump(ctx context.Context, options DumpOptions) (string, error) {
func Dump(ctx context.Context, options DumpOptions) ([]DumpResult, error) {
logger.Debugf("Dump Elastic stack data")

err := dumpStackLogs(ctx, options)
results, err := dumpStackLogs(ctx, options)
if err != nil {
return "", fmt.Errorf("can't dump Elastic stack logs: %w", err)
return nil, fmt.Errorf("can't dump Elastic stack logs: %w", err)
}
return options.Output, nil
return results, nil
}

func dumpStackLogs(ctx context.Context, options DumpOptions) error {
func dumpStackLogs(ctx context.Context, options DumpOptions) ([]DumpResult, error) {
logger.Debugf("Dump stack logs (location: %s)", options.Output)
err := os.RemoveAll(options.Output)
if err != nil {
return fmt.Errorf("can't remove output location: %w", err)
return nil, fmt.Errorf("can't remove output location: %w", err)
}

logsPath := filepath.Join(options.Output, "logs")
err = os.MkdirAll(logsPath, 0755)
services, err := localServiceNames(DockerComposeProjectName(options.Profile))
if err != nil {
return fmt.Errorf("can't create output location (path: %s): %w", logsPath, err)
return nil, fmt.Errorf("failed to get local services: %w", err)
}

services, err := localServiceNames(DockerComposeProjectName(options.Profile))
if err != nil {
return fmt.Errorf("failed to get local services: %w", err)
for _, requestedService := range options.Services {
if !slices.Contains(services, requestedService) {
return nil, fmt.Errorf("local service %s does not exist", requestedService)
}
}

var results []DumpResult
for _, serviceName := range services {
if len(options.Services) > 0 && !slices.Contains(options.Services, serviceName) {
continue
}

logger.Debugf("Dump stack logs for %s", serviceName)

content, err := dockerComposeLogs(ctx, serviceName, options.Profile)
content, err := dockerComposeLogsSince(ctx, serviceName, options.Profile, options.Since)
if err != nil {
logger.Errorf("can't fetch service logs (service: %s): %v", serviceName, err)
} else {
writeLogFiles(logsPath, serviceName, content)
return nil, fmt.Errorf("can't fetch service logs (service: %s): %v", serviceName, err)
}
if options.Output == "" {
results = append(results, DumpResult{
ServiceName: serviceName,
Logs: content,
})
continue
}

err = copyDockerInternalLogs(serviceName, logsPath, options.Profile)
result := DumpResult{
ServiceName: serviceName,
}

logsPath := filepath.Join(options.Output, "logs")
err = os.MkdirAll(logsPath, 0755)
if err != nil {
logger.Errorf("can't copy internal logs (service: %s): %v", serviceName, err)
return nil, fmt.Errorf("can't create output location (path: %s): %w", logsPath, err)
}

logPath, err := writeLogFiles(logsPath, serviceName, content)
if err != nil {
return nil, fmt.Errorf("can't write log files for service %q: %w", serviceName, err)
}
result.LogsFile = logPath

switch serviceName {
case elasticAgentService, fleetServerService:
logPath, err := copyDockerInternalLogs(serviceName, logsPath, options.Profile)
if err != nil {
return nil, fmt.Errorf("can't copy internal logs (service: %s): %w", serviceName, err)
}
result.InternalLogsDir = logPath
}

results = append(results, result)
}
return nil

return results, nil
}

func writeLogFiles(logsPath, serviceName string, content []byte) {
err := os.WriteFile(filepath.Join(logsPath, fmt.Sprintf("%s.log", serviceName)), content, 0644)
func writeLogFiles(logsPath, serviceName string, content []byte) (string, error) {
logPath := filepath.Join(logsPath, serviceName+".log")
err := os.WriteFile(logPath, content, 0644)
if err != nil {
logger.Errorf("can't write service logs (service: %s): %v", serviceName, err)
return "", fmt.Errorf("can't write service logs (service: %s): %v", serviceName, err)
}
}

// DumpLogsFile returns the file path to the logs of a given service
func DumpLogsFile(options DumpOptions, serviceName string) string {
return filepath.Join(options.Output, "logs", fmt.Sprintf("%s.log", serviceName))
return logPath, nil
}
10 changes: 10 additions & 0 deletions internal/stack/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,13 @@ func UndefinedEnvError(envName string) error {

// ErrUnavailableStack is an error about an unavailable Elastic stack.
var ErrUnavailableStack = errors.New("the Elastic stack is unavailable, remember to start it with 'elastic-package stack up', or configure elastic-package with environment variables")

// ErrNotImplemented is an error about a feature not implemented in a stack provider.
type ErrNotImplemented struct {
Operation string
Provider string
}

func (err *ErrNotImplemented) Error() string {
return fmt.Sprintf("operation not implemented in %q provider: %s", err.Provider, err.Operation)
}
18 changes: 4 additions & 14 deletions internal/stack/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ import (
"github.com/elastic/elastic-package/internal/profile"
)

func dockerComposeLogs(ctx context.Context, serviceName string, profile *profile.Profile) ([]byte, error) {
return dockerComposeLogsSince(ctx, serviceName, profile, time.Time{})
}

func dockerComposeLogsSince(ctx context.Context, serviceName string, profile *profile.Profile, since time.Time) ([]byte, error) {
appConfig, err := install.Configuration()
if err != nil {
Expand Down Expand Up @@ -53,23 +49,17 @@ func dockerComposeLogsSince(ctx context.Context, serviceName string, profile *pr
return out, nil
}

func copyDockerInternalLogs(serviceName, outputPath string, profile *profile.Profile) error {
switch serviceName {
case elasticAgentService, fleetServerService:
default:
return nil // we need to pull internal logs only from Elastic-Agent and Fleets Server container
}

func copyDockerInternalLogs(serviceName, outputPath string, profile *profile.Profile) (string, error) {
p, err := compose.NewProject(DockerComposeProjectName(profile))
if err != nil {
return fmt.Errorf("could not create docker compose project: %w", err)
return "", fmt.Errorf("could not create docker compose project: %w", err)
}

outputPath = filepath.Join(outputPath, serviceName+"-internal")
serviceContainer := p.ContainerName(serviceName)
err = docker.Copy(serviceContainer, "/usr/share/elastic-agent/state/data/logs/", outputPath)
if err != nil {
return fmt.Errorf("docker copy failed: %w", err)
return "", fmt.Errorf("docker copy failed: %w", err)
}
return nil
return outputPath, nil
}
4 changes: 2 additions & 2 deletions internal/stack/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type Provider interface {
Update(context.Context, Options) error

// Dump dumps data for debug purpouses.
Dump(context.Context, DumpOptions) (string, error)
Dump(context.Context, DumpOptions) ([]DumpResult, error)

// Status obtains status information of the stack.
Status(context.Context, Options) ([]ServiceStatus, error)
Expand Down Expand Up @@ -75,7 +75,7 @@ func (*composeProvider) Update(ctx context.Context, options Options) error {
return Update(ctx, options)
}

func (*composeProvider) Dump(ctx context.Context, options DumpOptions) (string, error) {
func (*composeProvider) Dump(ctx context.Context, options DumpOptions) ([]DumpResult, error) {
return Dump(ctx, options)
}

Expand Down
10 changes: 9 additions & 1 deletion internal/stack/serverless.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,15 @@ func (sp *serverlessProvider) Update(ctx context.Context, options Options) error
return fmt.Errorf("not implemented")
}

func (sp *serverlessProvider) Dump(ctx context.Context, options DumpOptions) (string, error) {
func (sp *serverlessProvider) Dump(ctx context.Context, options DumpOptions) ([]DumpResult, error) {
for _, service := range options.Services {
if service != "elastic-agent" {
return nil, &ErrNotImplemented{
Operation: fmt.Sprintf("logs dump for service %s", service),
Provider: ProviderServerless,
}
}
}
return Dump(ctx, options)
}

Expand Down
26 changes: 24 additions & 2 deletions internal/testrunner/runners/pipeline/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type runner struct {
pipelines []ingest.Pipeline

runCompareResults bool

provider stack.Provider
}

type IngestPipelineReroute struct {
Expand Down Expand Up @@ -78,6 +80,12 @@ func (r *runner) Run(ctx context.Context, options testrunner.TestOptions) ([]tes
return nil, err
}

provider, err := stack.BuildProvider(stackConfig.Provider, r.options.Profile)
if err != nil {
return nil, fmt.Errorf("failed to build stack provider: %w", err)
}
r.provider = provider

r.runCompareResults = true
if stackConfig.Provider == stack.ProviderServerless {
r.runCompareResults = true
Expand Down Expand Up @@ -204,16 +212,30 @@ func (r *runner) run(ctx context.Context) ([]testrunner.TestResult, error) {
}

func (r *runner) checkElasticsearchLogs(ctx context.Context, startTesting time.Time) ([]testrunner.TestResult, error) {

startTime := time.Now()

testingTime := startTesting.Truncate(time.Second)

elasticsearchLogs, err := stack.GetServiceLogs(ctx, "elasticsearch", r.options.Profile, testingTime)
dumpOptions := stack.DumpOptions{
Profile: r.options.Profile,
Services: []string{"elasticsearch"},
Since: testingTime,
}
dump, err := r.provider.Dump(ctx, dumpOptions)
var notImplementedErr *stack.ErrNotImplemented
if errors.As(err, &notImplementedErr) {
logger.Debugf("Not checking Elasticsearch logs: %s", err)
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("error at getting the logs of elasticsearch: %w", err)
}

if len(dump) != 1 || dump[0].ServiceName != "elasticsearch" {
return nil, errors.New("expected elasticsearch logs in dump")
}
elasticsearchLogs := dump[0].Logs

seenWarnings := make(map[string]any)
var processorRelatedWarnings []string
err = stack.ParseLogsFromReader(bytes.NewReader(elasticsearchLogs), stack.ParseLogsOptions{
Expand Down
29 changes: 24 additions & 5 deletions internal/testrunner/runners/system/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,13 +563,26 @@ func (r *runner) run(ctx context.Context) (results []testrunner.TestResult, err
}
defer os.RemoveAll(tempDir)

dumpOptions := stack.DumpOptions{Output: tempDir, Profile: r.options.Profile}
_, err = stack.Dump(context.WithoutCancel(ctx), dumpOptions)
stackConfig, err := stack.LoadConfig(r.options.Profile)
if err != nil {
return nil, err
}

provider, err := stack.BuildProvider(stackConfig.Provider, r.options.Profile)
if err != nil {
return nil, fmt.Errorf("failed to build stack provider: %w", err)
}

dumpOptions := stack.DumpOptions{
Output: tempDir,
Profile: r.options.Profile,
}
dump, err := provider.Dump(context.WithoutCancel(ctx), dumpOptions)
if err != nil {
return nil, fmt.Errorf("dump failed: %w", err)
}

logResults, err := r.checkAgentLogs(dumpOptions, startTesting, errorPatterns)
logResults, err := r.checkAgentLogs(dump, startTesting, errorPatterns)
if err != nil {
return result.WithError(err)
}
Expand Down Expand Up @@ -2016,11 +2029,17 @@ func (r *runner) checkNewAgentLogs(ctx context.Context, agent agentdeployer.Depl
return results, nil
}

func (r *runner) checkAgentLogs(dumpOptions stack.DumpOptions, startTesting time.Time, errorPatterns []logsByContainer) (results []testrunner.TestResult, err error) {
func (r *runner) checkAgentLogs(dump []stack.DumpResult, startTesting time.Time, errorPatterns []logsByContainer) (results []testrunner.TestResult, err error) {
for _, patternsContainer := range errorPatterns {
startTime := time.Now()

serviceLogsFile := stack.DumpLogsFile(dumpOptions, patternsContainer.containerName)
serviceDumpIndex := slices.IndexFunc(dump, func(d stack.DumpResult) bool {
return d.ServiceName == patternsContainer.containerName
})
if serviceDumpIndex < 0 {
return nil, fmt.Errorf("could not find logs dump for service %s", patternsContainer.containerName)
}
serviceLogsFile := dump[serviceDumpIndex].LogsFile

err = r.anyErrorMessages(serviceLogsFile, startTesting, patternsContainer.patterns)
if e, ok := err.(testrunner.ErrTestCaseFailed); ok {
Expand Down
13 changes: 7 additions & 6 deletions internal/testrunner/runners/system/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ func TestCheckAgentLogs(t *testing.T) {
err = os.MkdirAll(filepath.Join(logsDirTemp, "logs"), 0755)
require.NoError(t, err)

var dump []stack.DumpResult
for service, logs := range tc.sampleLogs {
logsFile := filepath.Join(logsDirTemp, "logs", fmt.Sprintf("%s.log", service))
file, err := os.Create(logsFile)
Expand All @@ -365,6 +366,11 @@ func TestCheckAgentLogs(t *testing.T) {
_, err = file.WriteString(strings.Join(logs, "\n"))
require.NoError(t, err)
file.Close()

dump = append(dump, stack.DumpResult{
ServiceName: service,
LogsFile: logsFile,
})
}

runner := runner{
Expand All @@ -375,13 +381,8 @@ func TestCheckAgentLogs(t *testing.T) {
},
},
}

dumpOptions := stack.DumpOptions{
Output: logsDirTemp,
}
results, err := runner.checkAgentLogs(dumpOptions, startTime, tc.errorPatterns)
results, err := runner.checkAgentLogs(dump, startTime, tc.errorPatterns)
require.NoError(t, err)

require.Len(t, results, tc.expectedErrors)

if tc.expectedErrors == 0 {
Expand Down

0 comments on commit eba9ed5

Please sign in to comment.