Skip to content

Commit

Permalink
bump up version to 3.0.0. can run continuous background mapreduce que…
Browse files Browse the repository at this point in the history
…ries, useful for log file monitorig for example. breaking protocol change which allows to mapreduce aggreate messages containing the default field separator |. add of more unit tests. add logformat mapreduce query keyword. add set mapreduce clause support and support to evaluate built-in functions such as md5sum() and maskdigits().
  • Loading branch information
pbuetow committed Aug 13, 2020
1 parent 8f9f976 commit c5a0ba7
Show file tree
Hide file tree
Showing 32 changed files with 642 additions and 257 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
GO ?= go
all: build
all: test build
build:
${GO} build -o dserver ./cmd/dserver/main.go
${GO} build -o dcat ./cmd/dcat/main.go
Expand Down Expand Up @@ -29,3 +29,5 @@ lint:
echo ${GOPATH}/bin/golint $$dir; \
${GOPATH}/bin/golint $$dir; \
done
test:
${GO} test ./... -v
7 changes: 5 additions & 2 deletions cmd/dtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,12 @@ func main() {
version.PrintAndExit()
}

ctx, _ := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if shutdownAfter > 0 {
ctx, _ = context.WithTimeout(ctx, time.Duration(shutdownAfter)*time.Second)
ctx, cancel = context.WithTimeout(ctx, time.Duration(shutdownAfter)*time.Second)
defer cancel()
}

if checkHealth {
Expand Down
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
module github.com/mimecast/dtail

go 1.13
go 1.15

require (
github.com/DataDog/zstd v1.4.4
golang.org/x/crypto v0.0.0-20191227163750-53104e6ec876
github.com/DataDog/zstd v1.4.5
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
golang.org/x/sys v0.0.0-20200812155832-6a926be9bd1d // indirect
)
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
github.com/DataDog/zstd v1.4.4 h1:+IawcoXhCBylN7ccwdwf8LOH2jKq7NavGpEPanrlTzE=
github.com/DataDog/zstd v1.4.4/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ=
github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191227163750-53104e6ec876 h1:sKJQZMuxjOAR/Uo2LBfU90onWEf1dF4C+0hPJCc9Mpc=
golang.org/x/crypto v0.0.0-20191227163750-53104e6ec876/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de h1:ikNHVSjEfnvz6sxdSPCaPt572qowuyMDMJLLm3Db3ig=
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/lint v0.0.0-20200130185559-910be7a94367 h1:0IiAsCRByjO2QjX7ZPkw5oU9x+n1YqRL802rjC0c3Aw=
golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b h1:Wh+f8QHJXR411sJR8/vRBTZ7YapZaRvUcLFFJhusH0k=
Expand All @@ -15,6 +19,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200812155832-6a926be9bd1d h1:QQrM/CCYEzTs91GZylDCQjGHudbPTxF/1fvXdVh5lMo=
golang.org/x/sys v0.0.0-20200812155832-6a926be9bd1d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7 h1:EBZoQjiKKPaLbPrbpssUfuHtwM6KV/vb4U85g/cigFY=
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
Expand Down
7 changes: 1 addition & 6 deletions internal/clients/handlers/maprhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,11 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) {
// related data.
func (h *MaprHandler) handleAggregateMessage(message string) {
h.count++
parts := strings.Split(message, "|")
parts := strings.Split(message, "")

// Index 0 contains 'AGGREGATE', 1 contains server host.
// Aggregation data begins from index 2.
logger.Debug("Received aggregate data", h.server, h.count, parts)
/*
for k, v := range parts {
logger.Debug(k, v)
}
*/
h.aggregate.Aggregate(parts[2:])
logger.Debug("Aggregated aggregate data", h.server, h.count)
}
10 changes: 8 additions & 2 deletions internal/clients/maprclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@ import (
"github.com/mimecast/dtail/internal/omode"
)

// MaprClientMode determines whether to use cumulative mode or not.
type MaprClientMode int

const (
DefaultMode MaprClientMode = iota
CumulativeMode MaprClientMode = iota
// DefaultMode behaviour
DefaultMode MaprClientMode = iota
// CumulativeMode means results are added to prev interval
CumulativeMode MaprClientMode = iota
// NonCumulativeMode means results are from 0 for each interval
NonCumulativeMode MaprClientMode = iota
)

Expand Down Expand Up @@ -60,6 +64,8 @@ func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (*
cumulative = args.Mode == omode.MapClient || query.HasOutfile()
}

logger.Debug("Cumulative mapreduce mode?", cumulative)

c := MaprClient{
baseClient: baseClient{
Args: args,
Expand Down
9 changes: 6 additions & 3 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (
)

// ControlUser is used for various DTail specific operations.
const ControlUser string = "DTAIL-CONTROL-USER"
const ControlUser string = "DTAIL-CONTROL"

// BackgroundUser is used for non-interactive scheduled queries and log monitoring and such.
const BackgroundUser string = "DTAIL-BACKGROUND-USER"
// ScheduleUser is used for non-interactive scheduled mapreduce queries.
const ScheduleUser string = "DTAIL-SCHEDULE"

// ContinuousUser is used for non-interactive continuous mapreduce queries.
const ContinuousUser string = "DTAIL-CONTINUOUS"

// Client holds a DTail client configuration.
var Client *ClientConfig
Expand Down
6 changes: 3 additions & 3 deletions internal/config/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type Permissions struct {
}

// JobCommons summarises common job fields
type JobCommons struct {
type jobCommons struct {
Name string
Enable bool
Files string
Expand All @@ -27,13 +27,13 @@ type JobCommons struct {

// Scheduled allows to configure scheduled mapreduce jobs.
type Scheduled struct {
JobCommons
jobCommons
TimeRange [2]int
}

// Continuous allows to configure continuous running mapreduce jobs.
type Continuous struct {
JobCommons
jobCommons
RestartOnDayChange bool `json:",omitempty"`
}

Expand Down
14 changes: 4 additions & 10 deletions internal/mapr/aggregateset.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package mapr

import (
"context"
"encoding/base64"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -71,25 +70,20 @@ func (s *AggregateSet) Serialize(ctx context.Context, groupKey string, ch chan<-
var sb strings.Builder

sb.WriteString(groupKey)
sb.WriteString("|")
sb.WriteString(fmt.Sprintf("%d|", s.Samples))
sb.WriteString("")
sb.WriteString(fmt.Sprintf("%d", s.Samples))

for k, v := range s.FValues {
sb.WriteString(k)
sb.WriteString("=")
sb.WriteString(fmt.Sprintf("%v|", v))
sb.WriteString(fmt.Sprintf("%v", v))
}

for k, v := range s.SValues {
sb.WriteString(k)
sb.WriteString("=")
if k == "$line" {
sb.WriteString(base64.StdEncoding.EncodeToString([]byte(v)))
sb.WriteString("|")
continue
}
sb.WriteString(v)
sb.WriteString("|")
sb.WriteString("")
}

select {
Expand Down
10 changes: 0 additions & 10 deletions internal/mapr/client/aggregate.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package client

import (
"encoding/base64"
"strconv"
"strings"

Expand Down Expand Up @@ -75,15 +74,6 @@ func (a *Aggregate) makeFields(parts []string) map[string]string {
if len(kv) < 2 {
continue
}
if kv[0] == "$line" {
decoded, err := base64.StdEncoding.DecodeString(kv[1])
if err != nil {
logger.Error("Unable to decode $line", kv[1], err)
continue
}
fields[kv[0]] = string(decoded)
continue
}
fields[kv[0]] = kv[1]
}

Expand Down
29 changes: 29 additions & 0 deletions internal/mapr/fieldtypes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package mapr

import "fmt"

type fieldType int

// The possible field types.
const (
UndefFieldType fieldType = iota
Field fieldType = iota
String fieldType = iota
Float fieldType = iota
FunctionStack fieldType = iota
)

func (w fieldType) String() string {
switch w {
case Field:
return fmt.Sprintf("Field")
case String:
return fmt.Sprintf("String")
case Float:
return fmt.Sprintf("Float")
case FunctionStack:
return fmt.Sprintf("FunctionStack")
default:
return fmt.Sprintf("UndefFieldType")
}
}
66 changes: 66 additions & 0 deletions internal/mapr/funcs/function.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package funcs

import (
"fmt"
"strings"
)

// CallbackFunc is a function which can be executed by the mapreduce engine
type CallbackFunc func(text string) string

// Function embeddes the function name to the callback function
type Function struct {
// Name of the callback function
Name string
call CallbackFunc
}

// FunctionStack is a list of functions stacked each other
type FunctionStack []Function

// NewFunctionStack parses the input string, e.g. foo(bar("arg")) and returns a corresponding function stack.
func NewFunctionStack(in string) (FunctionStack, string, error) {
var fs FunctionStack

getCallback := func(name string) (CallbackFunc, error) {
var cb CallbackFunc

switch name {
case "md5sum":
return Md5Sum, nil
case "maskdigits":
return MaskDigits, nil
default:
return cb, fmt.Errorf("unknown function '%s'", name)
}
}

aux := in
for strings.HasSuffix(aux, ")") {
index := strings.Index(aux, "(")
if index <= 0 {
return fs, "", fmt.Errorf("unable to parse function '%s' at '%s'", in, aux)
}
name := aux[0:index]

call, err := getCallback(name)
if err != nil {
return fs, "", err
}
fs = append(fs, Function{name, call})
aux = aux[index+1 : len(aux)-1]
}

return fs, aux, nil
}

// Call the function stack.
func (fs FunctionStack) Call(str string) string {
for i := len(fs) - 1; i >= 0; i-- {
//logger.Debug("Call", fs[i].Name, str)
str = fs[i].call(str)
//logger.Debug("Call.result", fs[i].Name, str)
}

return str
}
45 changes: 45 additions & 0 deletions internal/mapr/funcs/function_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package funcs

import "testing"

func TestFunction(t *testing.T) {
input := "md5sum($line)"
fs, arg, err := NewFunctionStack(input)
if err != nil {
t.Errorf("error parsing function input '%s': %s (%v)\n", input, err.Error(), fs)
}
if arg != "$line" {
t.Errorf("error parsing function input '%s': expected argument '$line' but got '%s' (%v)\n", input, arg, fs)
}
t.Log(input, fs, arg)

result := fs.Call(input)
if result != "b38699013d79e50d9d122433753959c1" {
t.Errorf("error executing function stack '%s': expected result 'b38699013d79e50d9d122433753959c1' but got '%s' (%v)\n", input, result, fs)
}

input = "maskdigits(md5sum(maskdigits($line)))"
fs, arg, err = NewFunctionStack(input)
if err != nil {
t.Errorf("error parsing function input '%s': %s (%v)\n", input, err.Error(), fs)
}
if arg != "$line" {
t.Errorf("error parsing function input '%s': expected argument '$line' but got '%s' (%v)\n", input, arg, fs)
}
t.Log(input, fs, arg)

result = fs.Call(input)
if result != ".fac.bbe..bb.........d...a.c..b." {
t.Errorf("error executing function stack '%s': expected result '.fac.bbe..bb.........d...a.c..b.' but got '%s' (%v)\n", input, result, fs)
}

input = "md5sum$line)"
if fs, _, err := NewFunctionStack(input); err == nil {
t.Errorf("Expected error parsing function input '%s' (%v) but got no error\n", input, fs)
}

input = "md5sum(makedigits$line))"
if fs, _, err := NewFunctionStack(input); err == nil {
t.Errorf("Expected error parsing function input '%s' (%v) but got no error\n", input, fs)
}
}
14 changes: 14 additions & 0 deletions internal/mapr/funcs/maskdigits.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package funcs

// MaskDigits masks all digits (replaces them with .)
func MaskDigits(input string) string {
s := []byte(input)

for i, b := range s {
if '0' <= b && b <= '9' {
s[i] = '.'
}
}

return string(s)
}
12 changes: 12 additions & 0 deletions internal/mapr/funcs/md5sum.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package funcs

import (
"crypto/md5"
"encoding/hex"
)

// Md5Sum returns the hex encoded MD5 checksum of a given input string.
func Md5Sum(text string) string {
hash := md5.Sum([]byte(text))
return hex.EncodeToString(hash[:])
}
1 change: 1 addition & 0 deletions internal/mapr/logformat/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ func (p *Parser) MakeFieldsDEFAULT(maprLine string) (map[string]string, error) {
}
fields[strings.ToLower(keyAndValue[0])] = keyAndValue[1]
}

return fields, nil
}
Loading

0 comments on commit c5a0ba7

Please sign in to comment.