Skip to content

Commit

Permalink
Merge pull request #22 from snonux/develop
Browse files Browse the repository at this point in the history
Add context awareness to dgrep
  • Loading branch information
pbuetow authored Mar 29, 2021
2 parents e811d17 + 93fce24 commit 9a467da
Show file tree
Hide file tree
Showing 29 changed files with 604 additions and 199 deletions.
14 changes: 5 additions & 9 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
*_proprietary.go
<<<<<<< HEAD
cache/
log/
tags
=======
/cache/
/log/
/cache
/log
/tags
/dtail
/dgrep
/dcat
/dmap
/dserver
>>>>>>> 7ee0121afed3e7cab6457142f70e411020ab2b21
/bin
serverlist.txt
15 changes: 10 additions & 5 deletions cmd/dgrep/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,24 @@ func main() {
flag.IntVar(&sshPort, "port", 2222, "SSH server port")
flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method")
flag.StringVar(&args.PrivateKeyPathFile, "key", "", "Path to private key")
flag.StringVar(&args.RegexStr, "regex", ".", "Regular expression")
flag.StringVar(&args.ServersStr, "servers", "", "Remote servers to connect")
flag.StringVar(&args.UserName, "user", userName, "Your system user name")
flag.StringVar(&args.What, "files", "", "File(s) to read")
flag.StringVar(&cfgFile, "cfg", "", "Config file path")

// Line context awareness.
flag.StringVar(&args.RegexStr, "regex", ".", "Regular expression")
flag.StringVar(&grep, "grep", "", "Alias for -regex")
flag.IntVar(&args.LContext.BeforeContext, "before", 0, "Print lines of leading context before matching lines")
flag.IntVar(&args.LContext.AfterContext, "after", 0, "Print lines of trailing context after matching lines")
flag.IntVar(&args.LContext.MaxCount, "max", 0, "Stop reading file after NUM matching lines")

flag.Parse()

if grep != "" {
args.RegexStr = grep
}

config.Read(cfgFile, sshPort)
color.Colored = !noColor

Expand All @@ -58,10 +67,6 @@ func main() {
Quiet: args.Quiet,
})

if grep != "" {
args.RegexStr = grep
}

client, err := clients.NewGrepClient(args)
if err != nil {
panic(err)
Expand Down
9 changes: 7 additions & 2 deletions cmd/dtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,19 @@ func main() {
flag.IntVar(&sshPort, "port", 2222, "SSH server port")
flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method")
flag.StringVar(&args.PrivateKeyPathFile, "key", "", "Path to private key")
flag.StringVar(&args.RegexStr, "regex", ".", "Regular expression")
flag.StringVar(&args.ServersStr, "servers", "", "Remote servers to connect")
flag.StringVar(&args.UserName, "user", userName, "Your system user name")
flag.StringVar(&args.What, "files", "", "File(s) to read")
flag.StringVar(&cfgFile, "cfg", "", "Config file path")
flag.StringVar(&grep, "grep", "", "Alias for -regex")
flag.StringVar(&queryStr, "query", "", "Map reduce query")

// Line context awareness.
flag.StringVar(&args.RegexStr, "regex", ".", "Regular expression")
flag.StringVar(&grep, "grep", "", "Alias for -regex")
flag.IntVar(&args.LContext.BeforeContext, "before", 0, "Print lines of leading context before matching lines")
flag.IntVar(&args.LContext.AfterContext, "after", 0, "Print lines of trailing context after matching lines")
flag.IntVar(&args.LContext.MaxCount, "max", 0, "Stop reading file after NUM matching lines")

flag.Parse()

if grep != "" {
Expand Down
3 changes: 2 additions & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ USER dserver

WORKDIR /var/run/dserver
EXPOSE 2222/tcp
EXPOSE 8080/tcp

CMD ["/usr/local/bin/dserver", "-relaxedAuth", "-cfg", "/etc/dserver/dtail.json"]
CMD ["/usr/local/bin/dserver", "-relaxedAuth", "-cfg", "/etc/dserver/dtail.json", "-pprof", "8080"]
3 changes: 2 additions & 1 deletion docker/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ all:
docker build . -t dserver:develop
rm ./dserver
run:
docker run -p 2222:2222 dserver:develop
# http://localhost:8080/debug/pprof/goroutines?debug=1
docker run -p 2222:2222 -p 8080:8080 dserver:develop
spinup:
./spinup.sh 10
spindown:
Expand Down
1 change: 1 addition & 0 deletions docker/spindown.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ declare -i BASE_PORT=2222
for (( i=0; i < $NUM_INSTANCES; i++ )); do
port=$[ BASE_PORT + i + 1 ]
docker stop dserver-serv$i
docker rm dserver-serv$i
done
3 changes: 3 additions & 0 deletions docker/spinup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
declare -i NUM_INSTANCES=$1
declare -i BASE_PORT=2222

echo > serverlist.txt

for (( i=0; i < $NUM_INSTANCES; i++ )); do
port=$[ BASE_PORT + i + 1 ]
docker run -d --name dserver-serv$i --hostname serv$i -p $port:2222 dserver:develop
echo localhost:$port >> serverlist.txt
done
6 changes: 4 additions & 2 deletions internal/clients/args.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
package clients

import (
"github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/omode"

gossh "golang.org/x/crypto/ssh"
)

// Args is a helper struct to summarize common client arguments.
type Args struct {
lcontext.LContext
RegexStr string
Mode omode.Mode
ServersStr string
UserName string
What string
Arguments []string
RegexStr string
RegexInvert bool
TrustAllHosts bool
Discovery string
Expand All @@ -22,5 +24,5 @@ type Args struct {
SSHAuthMethods []gossh.AuthMethod
SSHHostKeyCallback gossh.HostKeyCallback
PrivateKeyPathFile string
Quiet bool
Quiet bool
}
38 changes: 37 additions & 1 deletion internal/clients/baseclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package clients

import (
"context"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -118,10 +120,44 @@ func (c *baseClient) start(ctx context.Context, active chan struct{}, i int, con
}
}

func (c *baseClient) makeCommandOptions() map[string]string {
options := make(map[string]string)

if c.Args.Quiet {
options["quiet"] = fmt.Sprintf("%v", c.Args.Quiet)
}
if c.Args.LContext.MaxCount != 0 {
options["max"] = fmt.Sprintf("%d", c.Args.LContext.MaxCount)
}
if c.Args.LContext.BeforeContext != 0 {
options["before"] = fmt.Sprintf("%d", c.Args.LContext.BeforeContext)
}
if c.Args.LContext.AfterContext != 0 {
options["after"] = fmt.Sprintf("%d", c.Args.LContext.AfterContext)
}

return options
}

func (c *baseClient) commandOptionsToString(options map[string]string) string {
var sb strings.Builder

count := 0
for k, v := range options {
if count > 0 {
sb.WriteString(":")
}
sb.WriteString(fmt.Sprintf("%s=%s", k, v))
count++
}

return sb.String()
}

func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback client.HostKeyCallback) *remote.Connection {
conn := remote.NewConnection(server, c.UserName, sshAuthMethods, hostKeyCallback)
conn.Handler = c.maker.makeHandler(server)
conn.Commands = c.maker.makeCommands()
conn.Commands = c.maker.makeCommands(c.makeCommandOptions())

return conn
}
Expand Down
6 changes: 3 additions & 3 deletions internal/clients/catclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ func (c CatClient) makeHandler(server string) handlers.Handler {
return handlers.NewClientHandler(server)
}

func (c CatClient) makeCommands() (commands []string) {
options := fmt.Sprintf("quiet=%v", c.Args.Quiet)
func (c CatClient) makeCommands(options map[string]string) (commands []string) {
optionsStr := c.commandOptionsToString(options)
for _, file := range strings.Split(c.What, ",") {
commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize()))
commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), optionsStr, file, c.Regex.Serialize()))
}
return
}
6 changes: 3 additions & 3 deletions internal/clients/grepclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ func (c GrepClient) makeHandler(server string) handlers.Handler {
return handlers.NewClientHandler(server)
}

func (c GrepClient) makeCommands() (commands []string) {
options := fmt.Sprintf("quiet=%v", c.Args.Quiet)
func (c GrepClient) makeCommands(options map[string]string) (commands []string) {
optionsStr := c.commandOptionsToString(options)
for _, file := range strings.Split(c.What, ",") {
commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize()))
commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), optionsStr, file, c.Regex.Serialize()))
}

return
Expand Down
10 changes: 1 addition & 9 deletions internal/clients/handlers/basehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,7 @@ func (h *baseHandler) handleMessageType(message string) {
// Handle messages received from server which are not meant to be displayed
// to the end user.
func (h *baseHandler) handleHiddenMessage(message string) {
switch {
case strings.HasPrefix(message, ".syn close connection"):
if strings.HasPrefix(message, ".syn close connection") {
h.SendMessage(".ack close connection")
select {
case <-time.After(time.Second * 5):
logger.Debug("Shutting down client after timeout and sending ack to server")
h.Shutdown()
case <-h.Done():
return
}
}
}
2 changes: 1 addition & 1 deletion internal/clients/maker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ import (
// and send different commands to the DTail server.
type maker interface {
makeHandler(server string) handlers.Handler
makeCommands() (commands []string)
makeCommands(options map[string]string) (commands []string)
}
10 changes: 7 additions & 3 deletions internal/clients/maprclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,27 +110,31 @@ func (c MaprClient) makeHandler(server string) handlers.Handler {
return handlers.NewMaprHandler(server, c.query, c.globalGroup)
}

func (c MaprClient) makeCommands() (commands []string) {
func (c MaprClient) makeCommands(options map[string]string) (commands []string) {
commands = append(commands, fmt.Sprintf("map %s", c.query.RawQuery))
options := fmt.Sprintf("quiet=%v", c.Args.Quiet)

modeStr := "cat"
if c.Mode == omode.TailClient {
modeStr = "tail"
}
optionsStr := c.commandOptionsToString(options)

for _, file := range strings.Split(c.What, ",") {
if c.Timeout > 0 {
commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout, modeStr, file, c.Regex.Serialize()))
continue
}
commands = append(commands, fmt.Sprintf("%s:%s %s %s", modeStr, options, file, c.Regex.Serialize()))
commands = append(commands, fmt.Sprintf("%s:%s %s %s", modeStr, optionsStr, file, c.Regex.Serialize()))
}

return
}

func (c *MaprClient) periodicReportResults(ctx context.Context) {
rampUpSleep := c.query.Interval / 2
logger.Debug("Ramp up sleeping before processing mapreduce results", rampUpSleep)
time.Sleep(rampUpSleep)

for {
select {
case <-time.After(c.query.Interval):
Expand Down
6 changes: 3 additions & 3 deletions internal/clients/tailclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ func (c TailClient) makeHandler(server string) handlers.Handler {
return handlers.NewClientHandler(server)
}

func (c TailClient) makeCommands() (commands []string) {
options := fmt.Sprintf("quiet=%v", c.Args.Quiet)
func (c TailClient) makeCommands(options map[string]string) (commands []string) {
optionsStr := c.commandOptionsToString(options)
for _, file := range strings.Split(c.What, ",") {
commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize()))
commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), optionsStr, file, c.Regex.Serialize()))
}
logger.Debug(commands)

Expand Down
49 changes: 49 additions & 0 deletions internal/datas/rbuffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package datas

import "fmt"

// RBuffer is a simple circular string ring buffer data structure.
type RBuffer struct {
Capacity int
size int
readPos int
writePos int
data []string
}

// NewRBuffer creates a new string ring buffer.
func NewRBuffer(capacity int) (*RBuffer, error) {
if capacity < 1 {
return nil, fmt.Errorf("RBuffer capacity must not be less than 1")
}

r := RBuffer{
Capacity: capacity,
size: capacity + 1,
data: make([]string, capacity+1),
}

return &r, nil
}

// Add a value.
func (r *RBuffer) Add(value string) {
r.data[r.writePos] = value
r.writePos = (r.writePos + 1) % r.size

if r.writePos == r.readPos {
r.readPos = (r.readPos + 1) % r.size
}
}

// Get a value.
func (r *RBuffer) Get() (string, bool) {
if r.readPos == r.writePos {
// RBuffer is empty.
return "", false
}

value := r.data[r.readPos]
r.readPos = (r.readPos + 1) % r.size
return value, true
}
Loading

0 comments on commit 9a467da

Please sign in to comment.