diff --git a/executor.go b/executor.go index 7f4f1132..76ae488b 100644 --- a/executor.go +++ b/executor.go @@ -458,7 +458,10 @@ func (e *Executor) createOptionsForLogstashServiceLogScrapping(taskInfo mesos.Ta values = append(values, []byte(ignoredKey)) } filter := scraper.ValueFilter{Values: values} - scr := &scraper.JSON{KeyFilter: filter} + scr := &scraper.JSON{ + KeyFilter: filter, + ScrapUnmarshallableLogs: utilTaskInfo.GetLabelValue("log-scraping-all") != "", + } apr, err := appender.LogstashAppenderFromEnv() if err != nil { return nil, fmt.Errorf("cannot configure service log scraping: %s", err) diff --git a/servicelog/scraper/json.go b/servicelog/scraper/json.go index 8d146b9e..6364e644 100644 --- a/servicelog/scraper/json.go +++ b/servicelog/scraper/json.go @@ -18,7 +18,8 @@ const ( // JSON is a scraper for logs represented as JSON objects. type JSON struct { - KeyFilter Filter + KeyFilter Filter + ScrapUnmarshallableLogs bool } // StartScraping starts scraping logs in JSON format from given reader and sends @@ -32,7 +33,7 @@ func (j *JSON) StartScraping(reader io.Reader) <-chan servicelog.Entry { go func() { for scanner.Scan() { logEntry := servicelog.Entry{} - if err := json.Unmarshal(scanner.Bytes(), &logEntry); err != nil { + if err := json.Unmarshal(scanner.Bytes(), &logEntry); err != nil && j.ScrapUnmarshallableLogs { log.WithError(err).Debug("Unable to unmarshal log entry - wrapping in default entry") logEntry = j.wrapInDefault(scanner.Bytes()) } else if j.KeyFilter != nil { diff --git a/servicelog/scraper/json_test.go b/servicelog/scraper/json_test.go index f0a6c1a0..499bc197 100644 --- a/servicelog/scraper/json_test.go +++ b/servicelog/scraper/json_test.go @@ -36,9 +36,11 @@ func TestIfFiltersKeysFromScrapedJSONs(t *testing.T) { assert.Len(t, entry, 1) } -func TestIfWrapsInDefualtValuesInvalidLogEntries(t *testing.T) { +func TestIfWrapsInDefualtValuesInvalidLogEntriesWhenEnabled(t *testing.T) { reader, writer := io.Pipe() - scraper := JSON{} + scraper := JSON{ + ScrapUnmarshallableLogs: true, + } entries := scraper.StartScraping(reader) go writer.Write([]byte("ERROR my invalid format\n"))