From 81ecabf2641a823009cd279972480eb3a467d982 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Medzi=C5=84ski?= Date: Tue, 12 Jun 2018 11:09:51 +0200 Subject: [PATCH] Fix JSON service log scraping with too long tokens (#115) When JSON service log scraper receives too long token it should not stop the service log scraping - it should just recreate the scanner. --- servicelog/scraper/json.go | 37 ++++++++++++++++++++------------- servicelog/scraper/json_test.go | 22 +++++++++++++++++++- 2 files changed, 44 insertions(+), 15 deletions(-) diff --git a/servicelog/scraper/json.go b/servicelog/scraper/json.go index 6364e644..073ff9cc 100644 --- a/servicelog/scraper/json.go +++ b/servicelog/scraper/json.go @@ -31,26 +31,35 @@ func (j *JSON) StartScraping(reader io.Reader) <-chan servicelog.Entry { logEntries := make(chan servicelog.Entry) go func() { - for scanner.Scan() { - logEntry := servicelog.Entry{} - if err := json.Unmarshal(scanner.Bytes(), &logEntry); err != nil && j.ScrapUnmarshallableLogs { - log.WithError(err).Debug("Unable to unmarshal log entry - wrapping in default entry") - logEntry = j.wrapInDefault(scanner.Bytes()) - } else if j.KeyFilter != nil { - for key := range logEntry { - if j.KeyFilter.Match([]byte(key)) { - delete(logEntry, key) - } - } - } - logEntries <- logEntry + for { + err := j.scanLoop(reader, logEntries) + log.WithError(err).Warn("Service log scraping failed, restarting") } - log.WithError(scanner.Err()).Error("Service log scraping failed") }() return logEntries } +func (j *JSON) scanLoop(reader io.Reader, logEntries chan<- servicelog.Entry) error { + scanner := bufio.NewScanner(reader) + scanner.Buffer(make([]byte, 64*kilobyte), megabyte) + for scanner.Scan() { + logEntry := servicelog.Entry{} + if err := json.Unmarshal(scanner.Bytes(), &logEntry); err != nil && j.ScrapUnmarshallableLogs { + log.WithError(err).Debug("Unable to unmarshal log entry - wrapping in default entry") + logEntry = j.wrapInDefault(scanner.Bytes()) + } else if j.KeyFilter != nil { + for key := range logEntry { + if j.KeyFilter.Match([]byte(key)) { + delete(logEntry, key) + } + } + } + logEntries <- logEntry + } + return scanner.Err() +} + func (j *JSON) wrapInDefault(bytes []byte) servicelog.Entry { return servicelog.Entry{ "time": time.Now().Format(time.RFC3339Nano), diff --git a/servicelog/scraper/json_test.go b/servicelog/scraper/json_test.go index 499bc197..d6a57d59 100644 --- a/servicelog/scraper/json_test.go +++ b/servicelog/scraper/json_test.go @@ -36,7 +36,7 @@ func TestIfFiltersKeysFromScrapedJSONs(t *testing.T) { assert.Len(t, entry, 1) } -func TestIfWrapsInDefualtValuesInvalidLogEntriesWhenEnabled(t *testing.T) { +func TestIfWrapsInDefaultValuesInvalidLogEntriesWhenEnabled(t *testing.T) { reader, writer := io.Pipe() scraper := JSON{ ScrapUnmarshallableLogs: true, @@ -51,3 +51,23 @@ func TestIfWrapsInDefualtValuesInvalidLogEntriesWhenEnabled(t *testing.T) { assert.Equal(t, "invalid-format", entry["logger"]) assert.Equal(t, "INFO", entry["level"]) } + +func TestIfNotFailsWithTooLongTokens(t *testing.T) { + reader, writer := io.Pipe() + scraper := JSON{ + ScrapUnmarshallableLogs: true, + } + + entries := scraper.StartScraping(reader) + + // send to long token (size > 1MB) + go writer.Write(make([]byte, 1024*1024*5, 1024*1024*5)) + // send valid log, it should be scraped + go writer.Write([]byte("{\"a\":\"b\", \"c\":\"d\"}\n")) + + entry := <-entries + + assert.Equal(t, "b", entry["a"]) + assert.Equal(t, "d", entry["c"]) + assert.Len(t, entry, 2) +}