diff --git a/.gitignore b/.gitignore index c9e8333..bac8dd1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,14 +1,10 @@ -*_proprietary.go -<<<<<<< HEAD -cache/ -log/ -tags -======= -/cache/ -/log/ +/cache +/log +/tags /dtail /dgrep /dcat /dmap /dserver ->>>>>>> 7ee0121afed3e7cab6457142f70e411020ab2b21 +/bin +serverlist.txt diff --git a/cmd/dgrep/main.go b/cmd/dgrep/main.go index 4da1bb3..123d061 100644 --- a/cmd/dgrep/main.go +++ b/cmd/dgrep/main.go @@ -36,15 +36,24 @@ func main() { flag.IntVar(&sshPort, "port", 2222, "SSH server port") flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method") flag.StringVar(&args.PrivateKeyPathFile, "key", "", "Path to private key") - flag.StringVar(&args.RegexStr, "regex", ".", "Regular expression") flag.StringVar(&args.ServersStr, "servers", "", "Remote servers to connect") flag.StringVar(&args.UserName, "user", userName, "Your system user name") flag.StringVar(&args.What, "files", "", "File(s) to read") flag.StringVar(&cfgFile, "cfg", "", "Config file path") + + // Line context awareness. + flag.StringVar(&args.RegexStr, "regex", ".", "Regular expression") flag.StringVar(&grep, "grep", "", "Alias for -regex") + flag.IntVar(&args.LContext.BeforeContext, "before", 0, "Print lines of leading context before matching lines") + flag.IntVar(&args.LContext.AfterContext, "after", 0, "Print lines of trailing context after matching lines") + flag.IntVar(&args.LContext.MaxCount, "max", 0, "Stop reading file after NUM matching lines") flag.Parse() + if grep != "" { + args.RegexStr = grep + } + config.Read(cfgFile, sshPort) color.Colored = !noColor @@ -58,10 +67,6 @@ func main() { Quiet: args.Quiet, }) - if grep != "" { - args.RegexStr = grep - } - client, err := clients.NewGrepClient(args) if err != nil { panic(err) diff --git a/cmd/dtail/main.go b/cmd/dtail/main.go index f2a039f..2639b4b 100644 --- a/cmd/dtail/main.go +++ b/cmd/dtail/main.go @@ -50,14 +50,19 @@ func main() { flag.IntVar(&sshPort, "port", 2222, "SSH server port") flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method") flag.StringVar(&args.PrivateKeyPathFile, "key", "", "Path to private key") - flag.StringVar(&args.RegexStr, "regex", ".", "Regular expression") flag.StringVar(&args.ServersStr, "servers", "", "Remote servers to connect") flag.StringVar(&args.UserName, "user", userName, "Your system user name") flag.StringVar(&args.What, "files", "", "File(s) to read") flag.StringVar(&cfgFile, "cfg", "", "Config file path") - flag.StringVar(&grep, "grep", "", "Alias for -regex") flag.StringVar(&queryStr, "query", "", "Map reduce query") + // Line context awareness. + flag.StringVar(&args.RegexStr, "regex", ".", "Regular expression") + flag.StringVar(&grep, "grep", "", "Alias for -regex") + flag.IntVar(&args.LContext.BeforeContext, "before", 0, "Print lines of leading context before matching lines") + flag.IntVar(&args.LContext.AfterContext, "after", 0, "Print lines of trailing context after matching lines") + flag.IntVar(&args.LContext.MaxCount, "max", 0, "Stop reading file after NUM matching lines") + flag.Parse() if grep != "" { diff --git a/docker/Dockerfile b/docker/Dockerfile index 8632832..61a1f7d 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -14,5 +14,6 @@ USER dserver WORKDIR /var/run/dserver EXPOSE 2222/tcp +EXPOSE 8080/tcp -CMD ["/usr/local/bin/dserver", "-relaxedAuth", "-cfg", "/etc/dserver/dtail.json"] +CMD ["/usr/local/bin/dserver", "-relaxedAuth", "-cfg", "/etc/dserver/dtail.json", "-pprof", "8080"] diff --git a/docker/Makefile b/docker/Makefile index 029adf6..f09d9e0 100644 --- a/docker/Makefile +++ b/docker/Makefile @@ -3,7 +3,8 @@ all: docker build . -t dserver:develop rm ./dserver run: - docker run -p 2222:2222 dserver:develop + # http://localhost:8080/debug/pprof/goroutines?debug=1 + docker run -p 2222:2222 -p 8080:8080 dserver:develop spinup: ./spinup.sh 10 spindown: diff --git a/docker/spindown.sh b/docker/spindown.sh index 73ed059..b3d3a8c 100755 --- a/docker/spindown.sh +++ b/docker/spindown.sh @@ -6,4 +6,5 @@ declare -i BASE_PORT=2222 for (( i=0; i < $NUM_INSTANCES; i++ )); do port=$[ BASE_PORT + i + 1 ] docker stop dserver-serv$i + docker rm dserver-serv$i done diff --git a/docker/spinup.sh b/docker/spinup.sh index 3890ce6..0625967 100755 --- a/docker/spinup.sh +++ b/docker/spinup.sh @@ -3,7 +3,10 @@ declare -i NUM_INSTANCES=$1 declare -i BASE_PORT=2222 +echo > serverlist.txt + for (( i=0; i < $NUM_INSTANCES; i++ )); do port=$[ BASE_PORT + i + 1 ] docker run -d --name dserver-serv$i --hostname serv$i -p $port:2222 dserver:develop + echo localhost:$port >> serverlist.txt done diff --git a/internal/clients/args.go b/internal/clients/args.go index 7f782f1..684dadd 100644 --- a/internal/clients/args.go +++ b/internal/clients/args.go @@ -1,6 +1,7 @@ package clients import ( + "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/omode" gossh "golang.org/x/crypto/ssh" @@ -8,12 +9,13 @@ import ( // Args is a helper struct to summarize common client arguments. type Args struct { + lcontext.LContext + RegexStr string Mode omode.Mode ServersStr string UserName string What string Arguments []string - RegexStr string RegexInvert bool TrustAllHosts bool Discovery string @@ -22,5 +24,5 @@ type Args struct { SSHAuthMethods []gossh.AuthMethod SSHHostKeyCallback gossh.HostKeyCallback PrivateKeyPathFile string - Quiet bool + Quiet bool } diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index f20156f..f83fcfd 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -2,6 +2,8 @@ package clients import ( "context" + "fmt" + "strings" "sync" "time" @@ -118,10 +120,44 @@ func (c *baseClient) start(ctx context.Context, active chan struct{}, i int, con } } +func (c *baseClient) makeCommandOptions() map[string]string { + options := make(map[string]string) + + if c.Args.Quiet { + options["quiet"] = fmt.Sprintf("%v", c.Args.Quiet) + } + if c.Args.LContext.MaxCount != 0 { + options["max"] = fmt.Sprintf("%d", c.Args.LContext.MaxCount) + } + if c.Args.LContext.BeforeContext != 0 { + options["before"] = fmt.Sprintf("%d", c.Args.LContext.BeforeContext) + } + if c.Args.LContext.AfterContext != 0 { + options["after"] = fmt.Sprintf("%d", c.Args.LContext.AfterContext) + } + + return options +} + +func (c *baseClient) commandOptionsToString(options map[string]string) string { + var sb strings.Builder + + count := 0 + for k, v := range options { + if count > 0 { + sb.WriteString(":") + } + sb.WriteString(fmt.Sprintf("%s=%s", k, v)) + count++ + } + + return sb.String() +} + func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback client.HostKeyCallback) *remote.Connection { conn := remote.NewConnection(server, c.UserName, sshAuthMethods, hostKeyCallback) conn.Handler = c.maker.makeHandler(server) - conn.Commands = c.maker.makeCommands() + conn.Commands = c.maker.makeCommands(c.makeCommandOptions()) return conn } diff --git a/internal/clients/catclient.go b/internal/clients/catclient.go index b7b6131..db892f1 100644 --- a/internal/clients/catclient.go +++ b/internal/clients/catclient.go @@ -41,10 +41,10 @@ func (c CatClient) makeHandler(server string) handlers.Handler { return handlers.NewClientHandler(server) } -func (c CatClient) makeCommands() (commands []string) { - options := fmt.Sprintf("quiet=%v", c.Args.Quiet) +func (c CatClient) makeCommands(options map[string]string) (commands []string) { + optionsStr := c.commandOptionsToString(options) for _, file := range strings.Split(c.What, ",") { - commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), optionsStr, file, c.Regex.Serialize())) } return } diff --git a/internal/clients/grepclient.go b/internal/clients/grepclient.go index 652c31b..567193a 100644 --- a/internal/clients/grepclient.go +++ b/internal/clients/grepclient.go @@ -40,10 +40,10 @@ func (c GrepClient) makeHandler(server string) handlers.Handler { return handlers.NewClientHandler(server) } -func (c GrepClient) makeCommands() (commands []string) { - options := fmt.Sprintf("quiet=%v", c.Args.Quiet) +func (c GrepClient) makeCommands(options map[string]string) (commands []string) { + optionsStr := c.commandOptionsToString(options) for _, file := range strings.Split(c.What, ",") { - commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), optionsStr, file, c.Regex.Serialize())) } return diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index f07fd90..602a7ac 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -101,15 +101,7 @@ func (h *baseHandler) handleMessageType(message string) { // Handle messages received from server which are not meant to be displayed // to the end user. func (h *baseHandler) handleHiddenMessage(message string) { - switch { - case strings.HasPrefix(message, ".syn close connection"): + if strings.HasPrefix(message, ".syn close connection") { h.SendMessage(".ack close connection") - select { - case <-time.After(time.Second * 5): - logger.Debug("Shutting down client after timeout and sending ack to server") - h.Shutdown() - case <-h.Done(): - return - } } } diff --git a/internal/clients/maker.go b/internal/clients/maker.go index d5ffd8b..a1d6864 100644 --- a/internal/clients/maker.go +++ b/internal/clients/maker.go @@ -9,5 +9,5 @@ import ( // and send different commands to the DTail server. type maker interface { makeHandler(server string) handlers.Handler - makeCommands() (commands []string) + makeCommands(options map[string]string) (commands []string) } diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 1c0c2cc..feb7e47 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -110,27 +110,31 @@ func (c MaprClient) makeHandler(server string) handlers.Handler { return handlers.NewMaprHandler(server, c.query, c.globalGroup) } -func (c MaprClient) makeCommands() (commands []string) { +func (c MaprClient) makeCommands(options map[string]string) (commands []string) { commands = append(commands, fmt.Sprintf("map %s", c.query.RawQuery)) - options := fmt.Sprintf("quiet=%v", c.Args.Quiet) modeStr := "cat" if c.Mode == omode.TailClient { modeStr = "tail" } + optionsStr := c.commandOptionsToString(options) for _, file := range strings.Split(c.What, ",") { if c.Timeout > 0 { commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout, modeStr, file, c.Regex.Serialize())) continue } - commands = append(commands, fmt.Sprintf("%s:%s %s %s", modeStr, options, file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s:%s %s %s", modeStr, optionsStr, file, c.Regex.Serialize())) } return } func (c *MaprClient) periodicReportResults(ctx context.Context) { + rampUpSleep := c.query.Interval / 2 + logger.Debug("Ramp up sleeping before processing mapreduce results", rampUpSleep) + time.Sleep(rampUpSleep) + for { select { case <-time.After(c.query.Interval): diff --git a/internal/clients/tailclient.go b/internal/clients/tailclient.go index cefbaa7..853ef1d 100644 --- a/internal/clients/tailclient.go +++ b/internal/clients/tailclient.go @@ -37,10 +37,10 @@ func (c TailClient) makeHandler(server string) handlers.Handler { return handlers.NewClientHandler(server) } -func (c TailClient) makeCommands() (commands []string) { - options := fmt.Sprintf("quiet=%v", c.Args.Quiet) +func (c TailClient) makeCommands(options map[string]string) (commands []string) { + optionsStr := c.commandOptionsToString(options) for _, file := range strings.Split(c.What, ",") { - commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), optionsStr, file, c.Regex.Serialize())) } logger.Debug(commands) diff --git a/internal/datas/rbuffer.go b/internal/datas/rbuffer.go new file mode 100644 index 0000000..df8f622 --- /dev/null +++ b/internal/datas/rbuffer.go @@ -0,0 +1,49 @@ +package datas + +import "fmt" + +// RBuffer is a simple circular string ring buffer data structure. +type RBuffer struct { + Capacity int + size int + readPos int + writePos int + data []string +} + +// NewRBuffer creates a new string ring buffer. +func NewRBuffer(capacity int) (*RBuffer, error) { + if capacity < 1 { + return nil, fmt.Errorf("RBuffer capacity must not be less than 1") + } + + r := RBuffer{ + Capacity: capacity, + size: capacity + 1, + data: make([]string, capacity+1), + } + + return &r, nil +} + +// Add a value. +func (r *RBuffer) Add(value string) { + r.data[r.writePos] = value + r.writePos = (r.writePos + 1) % r.size + + if r.writePos == r.readPos { + r.readPos = (r.readPos + 1) % r.size + } +} + +// Get a value. +func (r *RBuffer) Get() (string, bool) { + if r.readPos == r.writePos { + // RBuffer is empty. + return "", false + } + + value := r.data[r.readPos] + r.readPos = (r.readPos + 1) % r.size + return value, true +} diff --git a/internal/datas/rbuffer_test.go b/internal/datas/rbuffer_test.go new file mode 100644 index 0000000..456511a --- /dev/null +++ b/internal/datas/rbuffer_test.go @@ -0,0 +1,106 @@ +package datas + +import ( + "fmt" + "math/rand" + "testing" + "time" +) + +func TestRBufferOneElement(t *testing.T) { + r, err := NewRBuffer(1) + if err != nil { + t.Errorf("Expected error creating ring buffer with capacity 1") + } + + testRBufferValues(t, r, []string{"Hello world"}) + testRBufferValues(t, r, []string{"Hello world", "Hello universe"}) +} + +func TestRBuffer(t *testing.T) { + if _, err := NewRBuffer(0); err == nil { + t.Errorf("Expected error creating ring buffer with capacity 0") + } + + r, err := NewRBuffer(10) + if err != nil { + t.Errorf("Error creating ring buffer with capacity 10: %v", err) + } + + fiveValues := []string{ + "42 is the answer!", + "Scroption: Get over here!", + "Have you swiped your nectar card?", + "Please mind the gap between the train and the platform!", + "Visit DTail at https://dtail.dev", + } + testRBufferValues(t, r, fiveValues) + + moreFiveValues := []string{ + "I love Golang", + "As a contrast, I also love Perl", + "Mimecast: Stop Bad Things From Happening to Good Organizations", + "We are the Buetow Brothers", + "London is calling", + } + tenValues := append(fiveValues, moreFiveValues...) + testRBufferValues(t, r, tenValues) +} + +func TestRandomRBuffer(t *testing.T) { + for i := 0; i < 100; i++ { + testRandomRBuffer(t) + } +} + +func testRandomRBuffer(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + + maxCapacity := 1000 + minCapacity := 1 + capacity := rand.Intn(maxCapacity-minCapacity) + minCapacity + r, err := NewRBuffer(capacity) + if err != nil { + t.Errorf("Error creating ring buffer with capacity %d: %v", capacity, err) + } + + numValues := rand.Intn(capacity * 2) + values := make([]string, numValues) + for i := 0; i < numValues; i++ { + values = append(values, fmt.Sprintf("%d.%d", i, rand.Int())) + } + + testRBufferValues(t, r, values) +} + +func testRBufferValues(t *testing.T, r *RBuffer, values []string) { + value, ok := r.Get() + if ok { + t.Errorf("Expected not ok reading from empty ring buffer but got ok and value '%s'", value) + } + + for _, value := range values { + r.Add(value) + } + + expectedValues := values + overCapacity := len(values) - r.Capacity + if overCapacity > 0 { + expectedValues = values[overCapacity:] + } + + for _, expected := range expectedValues { + value, ok := r.Get() + if !ok { + t.Errorf("Expected value '%s' but got nothing", expected) + } + if value != expected { + t.Errorf("Expected value '%s' but got value '%v'", expected, value) + } + } + + value, ok = r.Get() + if ok { + t.Errorf("Expected not ok reading from empty ring buffer but got ok and value '%s'", value) + } +} diff --git a/internal/io/fs/filereader.go b/internal/io/fs/filereader.go index 0774837..efd410e 100644 --- a/internal/io/fs/filereader.go +++ b/internal/io/fs/filereader.go @@ -4,12 +4,13 @@ import ( "context" "github.com/mimecast/dtail/internal/io/line" + "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/regex" ) // FileReader is the interface used on the dtail server to read/cat/grep/mapr... a file. type FileReader interface { - Start(ctx context.Context, lines chan<- line.Line, re regex.Regex) error + Start(ctx context.Context, lContext lcontext.LContext, lines chan<- line.Line, re regex.Regex) error FilePath() string Retry() bool } diff --git a/internal/io/fs/filter.go b/internal/io/fs/filter.go new file mode 100644 index 0000000..c4f605e --- /dev/null +++ b/internal/io/fs/filter.go @@ -0,0 +1,167 @@ +package fs + +import ( + "context" + + "github.com/mimecast/dtail/internal/io/line" + "github.com/mimecast/dtail/internal/lcontext" + "github.com/mimecast/dtail/internal/regex" +) + +func (f readFile) filter(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex, lContext lcontext.LContext) { + // Do we have any kind of local context settings? If so then run the more complex + // filterWithLContext method. + if lContext.Has() { + // We can not skip transmitting any lines to the client with a local + // grep context specified. + f.canSkipLines = false + f.filterWithLContext(ctx, rawLines, lines, re, lContext) + return + } + + f.filterWithoutLContext(ctx, rawLines, lines, re) +} + +// Filter log lines matching a given regular expression, however with local grep context. +func (f readFile) filterWithLContext(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex, lContext lcontext.LContext) { + // Scenario 1: Finish once maxCount hits found + maxCount := lContext.MaxCount + processMaxCount := maxCount > 0 + maxReached := false + + // Scenario 2: Print prev. N lines when current line matches. + before := lContext.BeforeContext + processBefore := before > 0 + var beforeBuf chan []byte + if processBefore { + beforeBuf = make(chan []byte, before) + } + + // Screnario 3: Print next N lines when current line matches. + after := 0 + processAfter := lContext.AfterContext > 0 + + for rawLine := range rawLines { + // logger.Debug("rawLine", string(rawLine)) + f.updatePosition() + + if !re.Match(rawLine) { + f.updateLineNotMatched() + + if processAfter && after > 0 { + after-- + myLine := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: 100} + select { + case lines <- myLine: + case <-ctx.Done(): + return + } + + } else if processBefore { + // Keep last num BeforeContext raw messages. + select { + case beforeBuf <- rawLine: + default: + <-beforeBuf + beforeBuf <- rawLine + } + } + continue + } + + f.updateLineMatched() + + if processAfter { + if maxReached { + return + } + after = lContext.AfterContext + } + + if processBefore { + i := uint64(len(beforeBuf)) + for { + select { + case myRawLine := <-beforeBuf: + myLine := line.Line{Content: myRawLine, SourceID: f.globID, Count: f.totalLineCount() - i, TransmittedPerc: 100} + i-- + select { + case lines <- myLine: + case <-ctx.Done(): + return + } + default: + // beforeBuf is now empty. + } + if len(beforeBuf) == 0 { + break + } + } + } + + line := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: 100} + + select { + case lines <- line: + if processMaxCount { + maxCount-- + if maxCount == 0 { + if !processAfter || after == 0 { + return + } + // Unfortunatley we have to continue filter, as there might be more lines to print + maxReached = true + } + } + case <-ctx.Done(): + return + } + } +} + +// Filter log lines matching a given regular expression, there is no local grep context specified. +func (f readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) { + for { + select { + case rawLine, ok := <-rawLines: + f.updatePosition() + if !ok { + return + } + + if f.lineUntransmittable(rawLine, len(lines), cap(lines), re) { + continue + } + + line := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: f.transmittedPerc()} + + select { + case lines <- line: + continue + case <-ctx.Done(): + return + } + } + } +} + +func (f readFile) lineUntransmittable(rawLine []byte, length, capacity int, re regex.Regex) bool { + if !re.Match(rawLine) { + f.updateLineNotMatched() + f.updateLineNotTransmitted() + // Regex dosn't match, so not interested in it. + return true + } + f.updateLineMatched() + + // Can we actually send more messages, channel capacity reached? + if f.canSkipLines && length >= capacity { + f.updateLineNotTransmitted() + // Matching, not transmittable + return true + } + f.updateLineTransmitted() + + // Matching, transmittable + return false +} diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index 6757bd6..161e3f0 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -4,16 +4,15 @@ import ( "bufio" "compress/gzip" "context" - "errors" "fmt" "io" "os" "strings" - "sync" "time" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/regex" "github.com/DataDog/zstd" @@ -38,6 +37,28 @@ type readFile struct { limiter chan struct{} } +func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) { + switch { + case strings.HasSuffix(f.FilePath(), ".gz"): + fallthrough + case strings.HasSuffix(f.FilePath(), ".gzip"): + logger.Info(f.FilePath(), "Detected gzip compression format") + var gzipReader *gzip.Reader + gzipReader, err = gzip.NewReader(fd) + if err != nil { + return + } + reader = bufio.NewReader(gzipReader) + case strings.HasSuffix(f.FilePath(), ".zst"): + logger.Info(f.FilePath(), "Detected zstd compression format") + reader = bufio.NewReader(zstd.NewReader(fd)) + default: + reader = bufio.NewReader(fd) + } + + return +} + // String returns the string representation of the readFile func (f readFile) String() string { return fmt.Sprintf("readFile(filePath:%s,globID:%s,retry:%v,canSkipLines:%v,seekEOF:%v)", @@ -59,7 +80,7 @@ func (f readFile) Retry() bool { } // Start tailing a log file. -func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Regex) error { +func (f readFile) Start(ctx context.Context, lContext lcontext.LContext, lines chan<- line.Line, re regex.Regex) error { logger.Debug("readFile", f) defer func() { select { @@ -90,58 +111,28 @@ func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Re } rawLines := make(chan []byte, 100) - truncate := make(chan struct{}) - - var wg sync.WaitGroup - wg.Add(1) - - go f.periodicTruncateCheck(ctx, truncate) - go f.filter(ctx, &wg, rawLines, lines, re) + readCtx, readCancel := context.WithCancel(ctx) + + filterDone := make(chan struct{}) + go func() { + f.filter(ctx, rawLines, lines, re, lContext) + close(filterDone) + // If the filter stopped, make the reader stop too, no need to read + // more data if there is nothing more the filter wants to filter for! + // E.g. it could be that we only want to filter N matches but not more. + readCancel() + }() - err = f.read(ctx, fd, rawLines, truncate) + err = f.read(readCtx, fd, rawLines) close(rawLines) - wg.Wait() - - return err -} -func (f readFile) periodicTruncateCheck(ctx context.Context, truncate chan struct{}) { - for { - select { - case <-time.After(time.Second * 3): - select { - case truncate <- struct{}{}: - case <-ctx.Done(): - } - case <-ctx.Done(): - return - } - } -} - -func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) { - switch { - case strings.HasSuffix(f.FilePath(), ".gz"): - fallthrough - case strings.HasSuffix(f.FilePath(), ".gzip"): - logger.Info(f.FilePath(), "Detected gzip compression format") - var gzipReader *gzip.Reader - gzipReader, err = gzip.NewReader(fd) - if err != nil { - return - } - reader = bufio.NewReader(gzipReader) - case strings.HasSuffix(f.FilePath(), ".zst"): - logger.Info(f.FilePath(), "Detected zstd compression format") - reader = bufio.NewReader(zstd.NewReader(fd)) - default: - reader = bufio.NewReader(fd) - } + // Filter may flushes some data still. So wait until it is done here. + <-filterDone - return + return err } -func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, truncate <-chan struct{}) error { +func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte) error { var offset uint64 reader, err := f.makeReader(fd) @@ -153,6 +144,8 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t lineLengthThreshold := 1024 * 1024 // 1mb longLineWarning := false + checkTruncate := f.truncateTimer(ctx) + for { select { case <-ctx.Done(): @@ -161,7 +154,7 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t } select { - case <-truncate: + case <-checkTruncate: if isTruncated, err := f.truncated(fd); isTruncated { return err } @@ -225,82 +218,3 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t } } } - -// Filter log lines matching a given regular expression. -func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) { - defer wg.Done() - - for { - select { - case line, ok := <-rawLines: - f.updatePosition() - if !ok { - return - } - if filteredLine, ok := f.transmittable(line, len(lines), cap(lines), re); ok { - select { - case lines <- filteredLine: - case <-ctx.Done(): - return - } - } - } - } -} - -func (f readFile) transmittable(lineBytes []byte, length, capacity int, re regex.Regex) (line.Line, bool) { - var read line.Line - - if !re.Match(lineBytes) { - f.updateLineNotMatched() - f.updateLineNotTransmitted() - return read, false - } - f.updateLineMatched() - - // Can we actually send more messages, channel capacity reached? - if f.canSkipLines && length >= capacity { - f.updateLineNotTransmitted() - return read, false - } - f.updateLineTransmitted() - - read = line.Line{ - Content: lineBytes, - SourceID: f.globID, - Count: f.totalLineCount(), - TransmittedPerc: f.transmittedPerc(), - } - - return read, true -} - -// Check wether log file is truncated. Returns nil if not. -func (f readFile) truncated(fd *os.File) (bool, error) { - logger.Debug(f.filePath, "File truncation check") - - // Can not seek currently open FD. - curPos, err := fd.Seek(0, os.SEEK_CUR) - if err != nil { - return true, err - } - - // Can not open file at original path. - pathFd, err := os.Open(f.filePath) - if err != nil { - return true, err - } - defer pathFd.Close() - - // Can not seek file at original path. - pathPos, err := pathFd.Seek(0, io.SeekEnd) - if err != nil { - return true, err - } - - if curPos > pathPos { - return true, errors.New("File got truncated") - } - - return false, nil -} diff --git a/internal/io/fs/truncate.go b/internal/io/fs/truncate.go new file mode 100644 index 0000000..a8d59ac --- /dev/null +++ b/internal/io/fs/truncate.go @@ -0,0 +1,61 @@ +package fs + +import ( + "context" + "errors" + "io" + "os" + "time" + + "github.com/mimecast/dtail/internal/io/logger" +) + +func (f readFile) truncateTimer(ctx context.Context) (checkTruncate chan struct{}) { + checkTruncate = make(chan struct{}) + + go func() { + for { + select { + case <-time.After(time.Second * 3): + select { + case checkTruncate <- struct{}{}: + case <-ctx.Done(): + } + case <-ctx.Done(): + return + } + } + }() + + return +} + +// Check wether log file is truncated. Returns nil if not. +func (f readFile) truncated(fd *os.File) (bool, error) { + logger.Debug(f.filePath, "File truncation check") + + // Can not seek currently open FD. + curPos, err := fd.Seek(0, os.SEEK_CUR) + if err != nil { + return true, err + } + + // Can not open file at original path. + pathFd, err := os.Open(f.filePath) + if err != nil { + return true, err + } + defer pathFd.Close() + + // Can not seek file at original path. + pathPos, err := pathFd.Seek(0, io.SeekEnd) + if err != nil { + return true, err + } + + if curPos > pathPos { + return true, errors.New("File got truncated") + } + + return false, nil +} diff --git a/internal/lcontext/lcontext.go b/internal/lcontext/lcontext.go new file mode 100644 index 0000000..89cb7c3 --- /dev/null +++ b/internal/lcontext/lcontext.go @@ -0,0 +1,22 @@ +package lcontext + +// LContext stands for line context and is here to help filtering out only specific lines. +type LContext struct { + AfterContext int + BeforeContext int + MaxCount int +} + +// Has returns true if it has any parameter set. +func (c LContext) Has() bool { + if c.AfterContext > 0 { + return true + } + if c.BeforeContext > 0 { + return true + } + if c.MaxCount > 0 { + return true + } + return false +} diff --git a/internal/mapr/groupset.go b/internal/mapr/groupset.go index 50155f8..b5c8a48 100644 --- a/internal/mapr/groupset.go +++ b/internal/mapr/groupset.go @@ -68,15 +68,11 @@ func (g *GroupSet) WriteResult(query *Query) error { } // -1: Don't limit the result, include all data sets - result, count, err := g.limitedResult(query, query.Limit, "", ",", true) + result, _, err := g.limitedResult(query, query.Limit, "", ",", true) if err != nil { return err } - if count == 0 { - logger.Warn("Not writing outfile this time as empty result set", query.Outfile) - } - logger.Info("Writing outfile", query.Outfile) tmpOutfile := fmt.Sprintf("%s.tmp", query.Outfile) diff --git a/internal/options/options.go b/internal/options/options.go new file mode 100644 index 0000000..816ddc9 --- /dev/null +++ b/internal/options/options.go @@ -0,0 +1,3 @@ +package options + +type Options map[string]string diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 5bab26f..b659c06 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -9,6 +9,7 @@ import ( "github.com/mimecast/dtail/internal/io/fs" "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/omode" "github.com/mimecast/dtail/internal/regex" ) @@ -25,7 +26,7 @@ func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand { } } -func (r *readCommand) Start(ctx context.Context, argc int, args []string, retries int) { +func (r *readCommand) Start(ctx context.Context, lContext lcontext.LContext, argc int, args []string, retries int) { re := regex.NewNoop() if argc >= 4 { @@ -40,10 +41,10 @@ func (r *readCommand) Start(ctx context.Context, argc int, args []string, retrie r.server.sendServerWarnMessage(logger.Warn(r.server.user, commandParseWarning, args, argc)) return } - r.readGlob(ctx, args[1], re, retries) + r.readGlob(ctx, lContext, args[1], re, retries) } -func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, retries int) { +func (r *readCommand) readGlob(ctx context.Context, lContext lcontext.LContext, glob string, re regex.Regex, retries int) { retryInterval := time.Second * 5 glob = filepath.Clean(glob) @@ -67,7 +68,7 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, continue } - r.readFiles(ctx, paths, glob, re, retryInterval) + r.readFiles(ctx, lContext, paths, glob, re, retryInterval) return } @@ -75,18 +76,18 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, return } -func (r *readCommand) readFiles(ctx context.Context, paths []string, glob string, re regex.Regex, retryInterval time.Duration) { +func (r *readCommand) readFiles(ctx context.Context, lContext lcontext.LContext, paths []string, glob string, re regex.Regex, retryInterval time.Duration) { var wg sync.WaitGroup wg.Add(len(paths)) for _, path := range paths { - go r.readFileIfPermissions(ctx, &wg, path, glob, re) + go r.readFileIfPermissions(ctx, lContext, &wg, path, glob, re) } wg.Wait() } -func (r *readCommand) readFileIfPermissions(ctx context.Context, wg *sync.WaitGroup, path, glob string, re regex.Regex) { +func (r *readCommand) readFileIfPermissions(ctx context.Context, lContext lcontext.LContext, wg *sync.WaitGroup, path, glob string, re regex.Regex) { defer wg.Done() globID := r.makeGlobID(path, glob) @@ -96,10 +97,10 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, wg *sync.WaitGr return } - r.readFile(ctx, path, globID, re) + r.readFile(ctx, lContext, path, globID, re) } -func (r *readCommand) readFile(ctx context.Context, path, globID string, re regex.Regex) { +func (r *readCommand) readFile(ctx context.Context, lContext lcontext.LContext, path, globID string, re regex.Regex) { logger.Info(r.server.user, "Start reading file", path, globID) var reader fs.FileReader @@ -120,7 +121,7 @@ func (r *readCommand) readFile(ctx context.Context, path, globID string, re rege } for { - if err := reader.Start(ctx, lines, re); err != nil { + if err := reader.Start(ctx, lContext, lines, re); err != nil { logger.Error(r.server.user, path, globID, err) } diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 185e7c2..7da6012 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "os" + "strconv" "strings" "sync/atomic" "time" @@ -15,6 +16,7 @@ import ( "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/mapr/server" "github.com/mimecast/dtail/internal/omode" user "github.com/mimecast/dtail/internal/user/server" @@ -240,7 +242,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] splitted := strings.Split(args[0], ":") commandName := splitted[0] - options, err := readOptions(splitted[1:]) + options, lContext, err := readOptions(splitted[1:]) if err != nil { h.sendServerMessage(logger.Error(h.user, err)) commandFinished() @@ -258,7 +260,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] command := newReadCommand(h, omode.CatClient) go func() { h.incrementActiveReaders() - command.Start(ctx, argc, args, 1) + command.Start(ctx, lContext, argc, args, 1) readerFinished() commandFinished() }() @@ -267,7 +269,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] command := newReadCommand(h, omode.TailClient) go func() { h.incrementActiveReaders() - command.Start(ctx, argc, args, 10) + command.Start(ctx, lContext, argc, args, 10) readerFinished() commandFinished() }() @@ -390,13 +392,16 @@ func (h *ServerHandler) decrementActiveReaders() int32 { return atomic.LoadInt32(&h.activeReaders) } -func readOptions(opts []string) (map[string]string, error) { +// TODO: All options related code should be in its own package (client + server) +func readOptions(opts []string) (map[string]string, lcontext.LContext, error) { options := make(map[string]string, len(opts)) + // Local search context + var lContext lcontext.LContext for _, o := range opts { kv := strings.SplitN(o, "=", 2) if len(kv) != 2 { - return options, fmt.Errorf("Unable to parse options: %v", kv) + continue } key := kv[0] val := kv[1] @@ -405,13 +410,37 @@ func readOptions(opts []string) (map[string]string, error) { s := strings.SplitN(val, "%", 2) decoded, err := base64.StdEncoding.DecodeString(s[1]) if err != nil { - return options, err + return options, lContext, err } val = string(decoded) } - options[key] = val + switch key { + case "before": + iVal, err := strconv.Atoi(val) + if err != nil { + logger.Error(err) + continue + } + lContext.BeforeContext = iVal + case "after": + iVal, err := strconv.Atoi(val) + if err != nil { + logger.Error(err) + continue + } + lContext.AfterContext = iVal + case "max": + iVal, err := strconv.Atoi(val) + if err != nil { + logger.Error(err) + continue + } + lContext.MaxCount = iVal + default: + options[key] = val + } } - return options, nil + return options, lContext, nil } diff --git a/internal/server/server.go b/internal/server/server.go index a20737e..73822d5 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -141,7 +141,7 @@ func (s *Server) handleChannel(ctx context.Context, sshConn gossh.Conn, newChann } if err := s.handleRequests(ctx, sshConn, requests, channel, user); err != nil { - logger.Error(user, err) + logger.Error(user, "While handling request", err) sshConn.Close() } } @@ -190,7 +190,8 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-ch go func() { if err := sshConn.Wait(); err != nil && err != io.EOF { - logger.Error(user, err) + // Use of closed network connection. + logger.Debug(user, "While waiting for ssh connection", err) } s.stats.decrementConnections() logger.Info(user, "Good bye Mister!") diff --git a/internal/ssh/client/authmethods.go b/internal/ssh/client/authmethods.go index 2ff80b2..bbfb7be 100644 --- a/internal/ssh/client/authmethods.go +++ b/internal/ssh/client/authmethods.go @@ -77,6 +77,15 @@ func initKnownHostsAuthMethods(trustAllHosts bool, throttleCh chan struct{}, pri } logger.Debug("initKnownHostsAuthMethods", "Unable to use private key", privateKeyPath, err) + privateKeyPath = os.Getenv("HOME") + "/.ssh/id_ecdsa" + authMethod, err = ssh.PrivateKey(privateKeyPath) + if err == nil { + sshAuthMethods = append(sshAuthMethods, authMethod) + logger.Debug("initKnownHostsAuthmethods", "Added path to list of auth methods, not adding further methods", privateKeyPath) + return sshAuthMethods, knownHostsCallback + } + logger.Debug("initKnownHostsAuthMethods", "Unable to use private key", privateKeyPath, err) + logger.FatalExit("Unable to find private SSH key information") // Never reach this point. diff --git a/internal/version/version.go b/internal/version/version.go index 1d974f7..f9acd56 100644 --- a/internal/version/version.go +++ b/internal/version/version.go @@ -11,7 +11,7 @@ const ( // Name of DTail. Name string = "DTail" // Version of DTail. - Version string = "3.2.1" + Version string = "3.3.0" // Additional information for DTail Additional string = "develop" // ProtocolCompat -ibility version.