diff --git a/servicelog/scraper/json.go b/servicelog/scraper/json.go index 073ff9cc..6b15ba02 100644 --- a/servicelog/scraper/json.go +++ b/servicelog/scraper/json.go @@ -3,7 +3,9 @@ package scraper import ( "bufio" "encoding/json" + "fmt" "io" + "os" "time" log "github.com/sirupsen/logrus" @@ -18,6 +20,7 @@ const ( // JSON is a scraper for logs represented as JSON objects. type JSON struct { + InvalidLogsWriter io.Writer KeyFilter Filter ScrapUnmarshallableLogs bool } @@ -41,13 +44,21 @@ func (j *JSON) StartScraping(reader io.Reader) <-chan servicelog.Entry { } func (j *JSON) scanLoop(reader io.Reader, logEntries chan<- servicelog.Entry) error { + var invalidLogsWriter io.Writer = os.Stdout + if j.InvalidLogsWriter != nil { + invalidLogsWriter = j.InvalidLogsWriter + } scanner := bufio.NewScanner(reader) scanner.Buffer(make([]byte, 64*kilobyte), megabyte) for scanner.Scan() { logEntry := servicelog.Entry{} - if err := json.Unmarshal(scanner.Bytes(), &logEntry); err != nil && j.ScrapUnmarshallableLogs { - log.WithError(err).Debug("Unable to unmarshal log entry - wrapping in default entry") - logEntry = j.wrapInDefault(scanner.Bytes()) + if err := json.Unmarshal(scanner.Bytes(), &logEntry); err != nil { + if j.ScrapUnmarshallableLogs { + log.WithError(err).Debug("Unable to unmarshal log entry - wrapping in default entry") + logEntry = j.wrapInDefault(scanner.Bytes()) + } else { + fmt.Fprintf(invalidLogsWriter, "%s\n", scanner.Bytes()) + } } else if j.KeyFilter != nil { for key := range logEntry { if j.KeyFilter.Match([]byte(key)) { diff --git a/servicelog/scraper/json_test.go b/servicelog/scraper/json_test.go index d6a57d59..b829d625 100644 --- a/servicelog/scraper/json_test.go +++ b/servicelog/scraper/json_test.go @@ -4,8 +4,10 @@ import ( "bytes" "io" "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) func TestIfScrapsLogsProperlyInJSONFormat(t *testing.T) { @@ -36,6 +38,22 @@ func TestIfFiltersKeysFromScrapedJSONs(t *testing.T) { assert.Len(t, entry, 1) } +func TestIfPrintsToStdoutValuesInvalidLogEntriesWhenDisabled(t *testing.T) { + mockStdout := &mockWriter{} + mockStdout.On("Write", []byte("ERROR my invalid format\n")).Return(0, nil).Once() + + reader, writer := io.Pipe() + scraper := JSON{ + InvalidLogsWriter: mockStdout, + } + + _ = scraper.StartScraping(reader) + go writer.Write([]byte("ERROR my invalid format\n")) + time.Sleep(time.Millisecond) + + mockStdout.AssertExpectations(t) +} + func TestIfWrapsInDefaultValuesInvalidLogEntriesWhenEnabled(t *testing.T) { reader, writer := io.Pipe() scraper := JSON{ @@ -71,3 +89,12 @@ func TestIfNotFailsWithTooLongTokens(t *testing.T) { assert.Equal(t, "d", entry["c"]) assert.Len(t, entry, 2) } + +type mockWriter struct { + mock.Mock +} + +func (w *mockWriter) Write(p []byte) (n int, err error) { + args := w.Called(p) + return args.Int(0), args.Error(1) +}