Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add context awareness to dgrep #22

Merged
merged 17 commits into from
Mar 29, 2021
Merged
14 changes: 5 additions & 9 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
*_proprietary.go
<<<<<<< HEAD
cache/
log/
tags
=======
/cache/
/log/
/cache
/log
/tags
/dtail
/dgrep
/dcat
/dmap
/dserver
>>>>>>> 7ee0121afed3e7cab6457142f70e411020ab2b21
/bin
serverlist.txt
15 changes: 10 additions & 5 deletions cmd/dgrep/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,24 @@ func main() {
flag.IntVar(&sshPort, "port", 2222, "SSH server port")
flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method")
flag.StringVar(&args.PrivateKeyPathFile, "key", "", "Path to private key")
flag.StringVar(&args.RegexStr, "regex", ".", "Regular expression")
flag.StringVar(&args.ServersStr, "servers", "", "Remote servers to connect")
flag.StringVar(&args.UserName, "user", userName, "Your system user name")
flag.StringVar(&args.What, "files", "", "File(s) to read")
flag.StringVar(&cfgFile, "cfg", "", "Config file path")

// Line context awareness.
flag.StringVar(&args.RegexStr, "regex", ".", "Regular expression")
flag.StringVar(&grep, "grep", "", "Alias for -regex")
flag.IntVar(&args.LContext.BeforeContext, "before", 0, "Print lines of leading context before matching lines")
flag.IntVar(&args.LContext.AfterContext, "after", 0, "Print lines of trailing context after matching lines")
flag.IntVar(&args.LContext.MaxCount, "max", 0, "Stop reading file after NUM matching lines")

flag.Parse()

if grep != "" {
args.RegexStr = grep
}

config.Read(cfgFile, sshPort)
color.Colored = !noColor

Expand All @@ -58,10 +67,6 @@ func main() {
Quiet: args.Quiet,
})

if grep != "" {
args.RegexStr = grep
}

client, err := clients.NewGrepClient(args)
if err != nil {
panic(err)
Expand Down
9 changes: 7 additions & 2 deletions cmd/dtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,19 @@ func main() {
flag.IntVar(&sshPort, "port", 2222, "SSH server port")
flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method")
flag.StringVar(&args.PrivateKeyPathFile, "key", "", "Path to private key")
flag.StringVar(&args.RegexStr, "regex", ".", "Regular expression")
flag.StringVar(&args.ServersStr, "servers", "", "Remote servers to connect")
flag.StringVar(&args.UserName, "user", userName, "Your system user name")
flag.StringVar(&args.What, "files", "", "File(s) to read")
flag.StringVar(&cfgFile, "cfg", "", "Config file path")
flag.StringVar(&grep, "grep", "", "Alias for -regex")
flag.StringVar(&queryStr, "query", "", "Map reduce query")

// Line context awareness.
flag.StringVar(&args.RegexStr, "regex", ".", "Regular expression")
flag.StringVar(&grep, "grep", "", "Alias for -regex")
flag.IntVar(&args.LContext.BeforeContext, "before", 0, "Print lines of leading context before matching lines")
flag.IntVar(&args.LContext.AfterContext, "after", 0, "Print lines of trailing context after matching lines")
flag.IntVar(&args.LContext.MaxCount, "max", 0, "Stop reading file after NUM matching lines")

flag.Parse()

if grep != "" {
Expand Down
3 changes: 2 additions & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ USER dserver

WORKDIR /var/run/dserver
EXPOSE 2222/tcp
EXPOSE 8080/tcp

CMD ["/usr/local/bin/dserver", "-relaxedAuth", "-cfg", "/etc/dserver/dtail.json"]
CMD ["/usr/local/bin/dserver", "-relaxedAuth", "-cfg", "/etc/dserver/dtail.json", "-pprof", "8080"]
3 changes: 2 additions & 1 deletion docker/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ all:
docker build . -t dserver:develop
rm ./dserver
run:
docker run -p 2222:2222 dserver:develop
# http://localhost:8080/debug/pprof/goroutines?debug=1
docker run -p 2222:2222 -p 8080:8080 dserver:develop
spinup:
./spinup.sh 10
spindown:
Expand Down
1 change: 1 addition & 0 deletions docker/spindown.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ declare -i BASE_PORT=2222
for (( i=0; i < $NUM_INSTANCES; i++ )); do
port=$[ BASE_PORT + i + 1 ]
docker stop dserver-serv$i
docker rm dserver-serv$i
done
3 changes: 3 additions & 0 deletions docker/spinup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
declare -i NUM_INSTANCES=$1
declare -i BASE_PORT=2222

echo > serverlist.txt

for (( i=0; i < $NUM_INSTANCES; i++ )); do
port=$[ BASE_PORT + i + 1 ]
docker run -d --name dserver-serv$i --hostname serv$i -p $port:2222 dserver:develop
echo localhost:$port >> serverlist.txt
done
6 changes: 4 additions & 2 deletions internal/clients/args.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
package clients

import (
"github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/omode"

gossh "golang.org/x/crypto/ssh"
)

// Args is a helper struct to summarize common client arguments.
type Args struct {
lcontext.LContext
RegexStr string
Mode omode.Mode
ServersStr string
UserName string
What string
Arguments []string
RegexStr string
RegexInvert bool
TrustAllHosts bool
Discovery string
Expand All @@ -22,5 +24,5 @@ type Args struct {
SSHAuthMethods []gossh.AuthMethod
SSHHostKeyCallback gossh.HostKeyCallback
PrivateKeyPathFile string
Quiet bool
Quiet bool
}
38 changes: 37 additions & 1 deletion internal/clients/baseclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package clients

import (
"context"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -118,10 +120,44 @@ func (c *baseClient) start(ctx context.Context, active chan struct{}, i int, con
}
}

func (c *baseClient) makeCommandOptions() map[string]string {
options := make(map[string]string)

if c.Args.Quiet {
options["quiet"] = fmt.Sprintf("%v", c.Args.Quiet)
}
if c.Args.LContext.MaxCount != 0 {
options["max"] = fmt.Sprintf("%d", c.Args.LContext.MaxCount)
}
if c.Args.LContext.BeforeContext != 0 {
options["before"] = fmt.Sprintf("%d", c.Args.LContext.BeforeContext)
}
if c.Args.LContext.AfterContext != 0 {
options["after"] = fmt.Sprintf("%d", c.Args.LContext.AfterContext)
}

return options
}

func (c *baseClient) commandOptionsToString(options map[string]string) string {
var sb strings.Builder

count := 0
for k, v := range options {
if count > 0 {
sb.WriteString(":")
}
sb.WriteString(fmt.Sprintf("%s=%s", k, v))
count++
}

return sb.String()
}

func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback client.HostKeyCallback) *remote.Connection {
conn := remote.NewConnection(server, c.UserName, sshAuthMethods, hostKeyCallback)
conn.Handler = c.maker.makeHandler(server)
conn.Commands = c.maker.makeCommands()
conn.Commands = c.maker.makeCommands(c.makeCommandOptions())

return conn
}
Expand Down
6 changes: 3 additions & 3 deletions internal/clients/catclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ func (c CatClient) makeHandler(server string) handlers.Handler {
return handlers.NewClientHandler(server)
}

func (c CatClient) makeCommands() (commands []string) {
options := fmt.Sprintf("quiet=%v", c.Args.Quiet)
func (c CatClient) makeCommands(options map[string]string) (commands []string) {
optionsStr := c.commandOptionsToString(options)
for _, file := range strings.Split(c.What, ",") {
commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize()))
commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), optionsStr, file, c.Regex.Serialize()))
}
return
}
6 changes: 3 additions & 3 deletions internal/clients/grepclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ func (c GrepClient) makeHandler(server string) handlers.Handler {
return handlers.NewClientHandler(server)
}

func (c GrepClient) makeCommands() (commands []string) {
options := fmt.Sprintf("quiet=%v", c.Args.Quiet)
func (c GrepClient) makeCommands(options map[string]string) (commands []string) {
optionsStr := c.commandOptionsToString(options)
for _, file := range strings.Split(c.What, ",") {
commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize()))
commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), optionsStr, file, c.Regex.Serialize()))
}

return
Expand Down
10 changes: 1 addition & 9 deletions internal/clients/handlers/basehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,7 @@ func (h *baseHandler) handleMessageType(message string) {
// Handle messages received from server which are not meant to be displayed
// to the end user.
func (h *baseHandler) handleHiddenMessage(message string) {
switch {
case strings.HasPrefix(message, ".syn close connection"):
if strings.HasPrefix(message, ".syn close connection") {
h.SendMessage(".ack close connection")
select {
case <-time.After(time.Second * 5):
logger.Debug("Shutting down client after timeout and sending ack to server")
h.Shutdown()
case <-h.Done():
return
}
}
}
2 changes: 1 addition & 1 deletion internal/clients/maker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ import (
// and send different commands to the DTail server.
type maker interface {
makeHandler(server string) handlers.Handler
makeCommands() (commands []string)
makeCommands(options map[string]string) (commands []string)
}
10 changes: 7 additions & 3 deletions internal/clients/maprclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,27 +110,31 @@ func (c MaprClient) makeHandler(server string) handlers.Handler {
return handlers.NewMaprHandler(server, c.query, c.globalGroup)
}

func (c MaprClient) makeCommands() (commands []string) {
func (c MaprClient) makeCommands(options map[string]string) (commands []string) {
commands = append(commands, fmt.Sprintf("map %s", c.query.RawQuery))
options := fmt.Sprintf("quiet=%v", c.Args.Quiet)

modeStr := "cat"
if c.Mode == omode.TailClient {
modeStr = "tail"
}
optionsStr := c.commandOptionsToString(options)

for _, file := range strings.Split(c.What, ",") {
if c.Timeout > 0 {
commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout, modeStr, file, c.Regex.Serialize()))
continue
}
commands = append(commands, fmt.Sprintf("%s:%s %s %s", modeStr, options, file, c.Regex.Serialize()))
commands = append(commands, fmt.Sprintf("%s:%s %s %s", modeStr, optionsStr, file, c.Regex.Serialize()))
}

return
}

func (c *MaprClient) periodicReportResults(ctx context.Context) {
rampUpSleep := c.query.Interval / 2
logger.Debug("Ramp up sleeping before processing mapreduce results", rampUpSleep)
time.Sleep(rampUpSleep)

for {
select {
case <-time.After(c.query.Interval):
Expand Down
6 changes: 3 additions & 3 deletions internal/clients/tailclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ func (c TailClient) makeHandler(server string) handlers.Handler {
return handlers.NewClientHandler(server)
}

func (c TailClient) makeCommands() (commands []string) {
options := fmt.Sprintf("quiet=%v", c.Args.Quiet)
func (c TailClient) makeCommands(options map[string]string) (commands []string) {
optionsStr := c.commandOptionsToString(options)
for _, file := range strings.Split(c.What, ",") {
commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize()))
commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), optionsStr, file, c.Regex.Serialize()))
}
logger.Debug(commands)

Expand Down
49 changes: 49 additions & 0 deletions internal/datas/rbuffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package datas

import "fmt"

// RBuffer is a simple circular string ring buffer data structure.
type RBuffer struct {
Capacity int
size int
readPos int
writePos int
data []string
}

// NewRBuffer creates a new string ring buffer.
func NewRBuffer(capacity int) (*RBuffer, error) {
if capacity < 1 {
return nil, fmt.Errorf("RBuffer capacity must not be less than 1")
}

r := RBuffer{
Capacity: capacity,
size: capacity + 1,
data: make([]string, capacity+1),
}

return &r, nil
}

// Add a value.
func (r *RBuffer) Add(value string) {
r.data[r.writePos] = value
r.writePos = (r.writePos + 1) % r.size

if r.writePos == r.readPos {
r.readPos = (r.readPos + 1) % r.size
}
}

// Get a value.
func (r *RBuffer) Get() (string, bool) {
if r.readPos == r.writePos {
// RBuffer is empty.
return "", false
}

value := r.data[r.readPos]
r.readPos = (r.readPos + 1) % r.size
return value, true
}
Loading