Skip to content

Commit

Permalink
Add optional buffer to JSON scraper (#121)
Browse files Browse the repository at this point in the history
Scraping logs from the stdout may be sometimes blocking for the
application that is sending logs because executor can not keep up with
sending logs due to a temporary network delays. The executor should
drop the logs if there is a risk that the writes to stdout will become
blocking.
  • Loading branch information
medzin authored Jul 17, 2018
1 parent 17e6340 commit d057858
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 1 deletion.
1 change: 1 addition & 0 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ func (e *Executor) createOptionsForLogstashServiceLogScrapping(taskInfo mesos.Ta
filter := scraper.ValueFilter{Values: values}
scr := &scraper.JSON{
KeyFilter: filter,
BufferSize: 2000,
ScrapUnmarshallableLogs: utilTaskInfo.GetLabelValue("log-scraping-all") != "",
}
apr, err := appender.LogstashAppenderFromEnv()
Expand Down
7 changes: 6 additions & 1 deletion servicelog/scraper/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ var json = jsoniter.ConfigFastest
type JSON struct {
InvalidLogsWriter io.Writer
KeyFilter Filter
BufferSize uint
ScrapUnmarshallableLogs bool
}

Expand All @@ -33,7 +34,7 @@ type JSON struct {
func (j *JSON) StartScraping(reader io.Reader) <-chan servicelog.Entry {
scanner := bufio.NewScanner(reader)
scanner.Buffer(make([]byte, 64*kilobyte), megabyte)
logEntries := make(chan servicelog.Entry)
logEntries := make(chan servicelog.Entry, j.BufferSize)

go func() {
for {
Expand Down Expand Up @@ -69,6 +70,10 @@ func (j *JSON) scanLoop(reader io.Reader, logEntries chan<- servicelog.Entry) er
}
}
}
if j.BufferSize > 0 && len(logEntries) >= int(j.BufferSize) {
log.Warnf("Dropping logs because of a buffer overflow (buffer size %s)", j.BufferSize)
continue
}
logEntries <- logEntry
}
return scanner.Err()
Expand Down
13 changes: 13 additions & 0 deletions servicelog/scraper/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,19 @@ func TestIfIgnoresEmptyLogLines(t *testing.T) {
assert.NoError(t, err2)
}

func TestIfReadingIsNotBlockingAndLogsAreDroppedWhenBufferedWithOverflow(t *testing.T) {
reader, writer := io.Pipe()
scraper := JSON{
BufferSize: 1,
}

entries := scraper.StartScraping(reader)
writer.Write([]byte("{\"a\":\"b\", \"c\":\"d\"}\n")) // should not block
writer.Write([]byte("{\"a\":\"b\", \"c\":\"d\"}\n")) // should not block and should be dropped
writer.Write([]byte("{\"a\":\"b\", \"c\":\"d\"}\n")) // should not block and should be dropped

assert.Len(t, entries, 1)
}
func BenchmarkJSONScraping(b *testing.B) {
exampleLog, err := ioutil.ReadFile("testdata/log.json")
if err != nil {
Expand Down

0 comments on commit d057858

Please sign in to comment.