Skip to content

Commit

Permalink
Make unmarshallable JSON logs scraping optional (#114)
Browse files Browse the repository at this point in the history
  • Loading branch information
medzin authored May 17, 2018
1 parent 93a26d2 commit dc8f9fb
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 5 deletions.
5 changes: 4 additions & 1 deletion executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,10 @@ func (e *Executor) createOptionsForLogstashServiceLogScrapping(taskInfo mesos.Ta
values = append(values, []byte(ignoredKey))
}
filter := scraper.ValueFilter{Values: values}
scr := &scraper.JSON{KeyFilter: filter}
scr := &scraper.JSON{
KeyFilter: filter,
ScrapUnmarshallableLogs: utilTaskInfo.GetLabelValue("log-scraping-all") != "",
}
apr, err := appender.LogstashAppenderFromEnv()
if err != nil {
return nil, fmt.Errorf("cannot configure service log scraping: %s", err)
Expand Down
5 changes: 3 additions & 2 deletions servicelog/scraper/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ const (

// JSON is a scraper for logs represented as JSON objects.
type JSON struct {
KeyFilter Filter
KeyFilter Filter
ScrapUnmarshallableLogs bool
}

// StartScraping starts scraping logs in JSON format from given reader and sends
Expand All @@ -32,7 +33,7 @@ func (j *JSON) StartScraping(reader io.Reader) <-chan servicelog.Entry {
go func() {
for scanner.Scan() {
logEntry := servicelog.Entry{}
if err := json.Unmarshal(scanner.Bytes(), &logEntry); err != nil {
if err := json.Unmarshal(scanner.Bytes(), &logEntry); err != nil && j.ScrapUnmarshallableLogs {
log.WithError(err).Debug("Unable to unmarshal log entry - wrapping in default entry")
logEntry = j.wrapInDefault(scanner.Bytes())
} else if j.KeyFilter != nil {
Expand Down
6 changes: 4 additions & 2 deletions servicelog/scraper/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ func TestIfFiltersKeysFromScrapedJSONs(t *testing.T) {
assert.Len(t, entry, 1)
}

func TestIfWrapsInDefualtValuesInvalidLogEntries(t *testing.T) {
func TestIfWrapsInDefualtValuesInvalidLogEntriesWhenEnabled(t *testing.T) {
reader, writer := io.Pipe()
scraper := JSON{}
scraper := JSON{
ScrapUnmarshallableLogs: true,
}

entries := scraper.StartScraping(reader)
go writer.Write([]byte("ERROR my invalid format\n"))
Expand Down

0 comments on commit dc8f9fb

Please sign in to comment.