diff --git a/executor.go b/executor.go index 76ae488b..795c27da 100644 --- a/executor.go +++ b/executor.go @@ -460,6 +460,7 @@ func (e *Executor) createOptionsForLogstashServiceLogScrapping(taskInfo mesos.Ta filter := scraper.ValueFilter{Values: values} scr := &scraper.JSON{ KeyFilter: filter, + BufferSize: 2000, ScrapUnmarshallableLogs: utilTaskInfo.GetLabelValue("log-scraping-all") != "", } apr, err := appender.LogstashAppenderFromEnv() diff --git a/servicelog/scraper/json.go b/servicelog/scraper/json.go index 253884f7..f76ad8b8 100644 --- a/servicelog/scraper/json.go +++ b/servicelog/scraper/json.go @@ -24,6 +24,7 @@ var json = jsoniter.ConfigFastest type JSON struct { InvalidLogsWriter io.Writer KeyFilter Filter + BufferSize uint ScrapUnmarshallableLogs bool } @@ -33,7 +34,7 @@ type JSON struct { func (j *JSON) StartScraping(reader io.Reader) <-chan servicelog.Entry { scanner := bufio.NewScanner(reader) scanner.Buffer(make([]byte, 64*kilobyte), megabyte) - logEntries := make(chan servicelog.Entry) + logEntries := make(chan servicelog.Entry, j.BufferSize) go func() { for { @@ -69,6 +70,10 @@ func (j *JSON) scanLoop(reader io.Reader, logEntries chan<- servicelog.Entry) er } } } + if j.BufferSize > 0 && len(logEntries) >= int(j.BufferSize) { + log.Warnf("Dropping logs because of a buffer overflow (buffer size %s)", j.BufferSize) + continue + } logEntries <- logEntry } return scanner.Err() diff --git a/servicelog/scraper/json_test.go b/servicelog/scraper/json_test.go index 83d47184..b076a010 100644 --- a/servicelog/scraper/json_test.go +++ b/servicelog/scraper/json_test.go @@ -110,6 +110,19 @@ func TestIfIgnoresEmptyLogLines(t *testing.T) { assert.NoError(t, err2) } +func TestIfReadingIsNotBlockingAndLogsAreDroppedWhenBufferedWithOverflow(t *testing.T) { + reader, writer := io.Pipe() + scraper := JSON{ + BufferSize: 1, + } + + entries := scraper.StartScraping(reader) + writer.Write([]byte("{\"a\":\"b\", \"c\":\"d\"}\n")) // should not block + writer.Write([]byte("{\"a\":\"b\", \"c\":\"d\"}\n")) // should not block and should be dropped + writer.Write([]byte("{\"a\":\"b\", \"c\":\"d\"}\n")) // should not block and should be dropped + + assert.Len(t, entries, 1) +} func BenchmarkJSONScraping(b *testing.B) { exampleLog, err := ioutil.ReadFile("testdata/log.json") if err != nil {