Skip to content

Commit

Permalink
Fix JSON service log scraping with too long tokens (#115)
Browse files Browse the repository at this point in the history
When JSON service log scraper receives too long token it should not stop
the service log scraping - it should just recreate the scanner.
  • Loading branch information
medzin authored Jun 12, 2018
1 parent dc8f9fb commit 81ecabf
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 15 deletions.
37 changes: 23 additions & 14 deletions servicelog/scraper/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,35 @@ func (j *JSON) StartScraping(reader io.Reader) <-chan servicelog.Entry {
logEntries := make(chan servicelog.Entry)

go func() {
for scanner.Scan() {
logEntry := servicelog.Entry{}
if err := json.Unmarshal(scanner.Bytes(), &logEntry); err != nil && j.ScrapUnmarshallableLogs {
log.WithError(err).Debug("Unable to unmarshal log entry - wrapping in default entry")
logEntry = j.wrapInDefault(scanner.Bytes())
} else if j.KeyFilter != nil {
for key := range logEntry {
if j.KeyFilter.Match([]byte(key)) {
delete(logEntry, key)
}
}
}
logEntries <- logEntry
for {
err := j.scanLoop(reader, logEntries)
log.WithError(err).Warn("Service log scraping failed, restarting")
}
log.WithError(scanner.Err()).Error("Service log scraping failed")
}()

return logEntries
}

func (j *JSON) scanLoop(reader io.Reader, logEntries chan<- servicelog.Entry) error {
scanner := bufio.NewScanner(reader)
scanner.Buffer(make([]byte, 64*kilobyte), megabyte)
for scanner.Scan() {
logEntry := servicelog.Entry{}
if err := json.Unmarshal(scanner.Bytes(), &logEntry); err != nil && j.ScrapUnmarshallableLogs {
log.WithError(err).Debug("Unable to unmarshal log entry - wrapping in default entry")
logEntry = j.wrapInDefault(scanner.Bytes())
} else if j.KeyFilter != nil {
for key := range logEntry {
if j.KeyFilter.Match([]byte(key)) {
delete(logEntry, key)
}
}
}
logEntries <- logEntry
}
return scanner.Err()
}

func (j *JSON) wrapInDefault(bytes []byte) servicelog.Entry {
return servicelog.Entry{
"time": time.Now().Format(time.RFC3339Nano),
Expand Down
22 changes: 21 additions & 1 deletion servicelog/scraper/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestIfFiltersKeysFromScrapedJSONs(t *testing.T) {
assert.Len(t, entry, 1)
}

func TestIfWrapsInDefualtValuesInvalidLogEntriesWhenEnabled(t *testing.T) {
func TestIfWrapsInDefaultValuesInvalidLogEntriesWhenEnabled(t *testing.T) {
reader, writer := io.Pipe()
scraper := JSON{
ScrapUnmarshallableLogs: true,
Expand All @@ -51,3 +51,23 @@ func TestIfWrapsInDefualtValuesInvalidLogEntriesWhenEnabled(t *testing.T) {
assert.Equal(t, "invalid-format", entry["logger"])
assert.Equal(t, "INFO", entry["level"])
}

func TestIfNotFailsWithTooLongTokens(t *testing.T) {
reader, writer := io.Pipe()
scraper := JSON{
ScrapUnmarshallableLogs: true,
}

entries := scraper.StartScraping(reader)

// send to long token (size > 1MB)
go writer.Write(make([]byte, 1024*1024*5, 1024*1024*5))
// send valid log, it should be scraped
go writer.Write([]byte("{\"a\":\"b\", \"c\":\"d\"}\n"))

entry := <-entries

assert.Equal(t, "b", entry["a"])
assert.Equal(t, "d", entry["c"])
assert.Len(t, entry, 2)
}

0 comments on commit 81ecabf

Please sign in to comment.