Skip to content

Commit

Permalink
Fix peqeditorsql
Browse files Browse the repository at this point in the history
  • Loading branch information
xackery committed Feb 27, 2024
1 parent c8dfe42 commit 746a2d1
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 105 deletions.
6 changes: 6 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"go.testEnvVars": {
"SINGLE_TEST": "1"
},
"files.trimTrailingWhitespace": true,
}
186 changes: 96 additions & 90 deletions peqeditorsql/peqeditorsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ func (t *PEQEditorSQL) Connect(ctx context.Context) error {
return nil
}

tlog.Infof("[peqeditorsql] tailing %s...", t.config.Path)

t.Disconnect(ctx)

t.ctx, t.cancel = context.WithCancel(ctx)
Expand All @@ -90,9 +88,12 @@ func (t *PEQEditorSQL) Connect(ctx context.Context) error {
}

func (t *PEQEditorSQL) loop(ctx context.Context) {
msgChan := make(chan string, 100)
tail1, err := newTailWatch(t.ctx, &tailReq{
id: 1,
msgCurrentChan := make(chan string, 100)
tailCtx, cancel := context.WithCancel(ctx)
defer cancel()

tailCurrent, err := newTailWatch(tailCtx, &tailReq{
id: "Current",
filePattern: t.config.FilePattern,
basePath: t.config.Path,
cfg: tail.Config{
Expand All @@ -101,23 +102,16 @@ func (t *PEQEditorSQL) loop(ctx context.Context) {
Poll: true,
Logger: tail.DiscardingLogger,
},
isNextMonth: false,
}, msgChan)
}, msgCurrentChan)
if err != nil {
tlog.Warnf("[peqeditorsql] tail1 creation failed: %s", err)
tlog.Warnf("[peqeditorsql] tailCurrent creation failed: %s", err)
t.Disconnect(ctx)
return
}

err = tail1.restart(msgChan)
if err != nil {
tlog.Warnf("[peqeditorsql] tail1 start failed: %s", err)
t.Disconnect(ctx)
return
}

tail2, err := newTailWatch(t.ctx, &tailReq{
id: 2,
msgNextChan := make(chan string, 100)
tailNextMonth, err := newTailWatch(tailCtx, &tailReq{
id: "Next",
filePattern: t.config.FilePattern,
basePath: t.config.Path,
cfg: tail.Config{
Expand All @@ -126,82 +120,32 @@ func (t *PEQEditorSQL) loop(ctx context.Context) {
Poll: true,
Logger: tail.DiscardingLogger,
},
isNextMonth: true,
}, msgChan)
}, msgNextChan)
if err != nil {
tlog.Warnf("[peqeditorsql] tail2 creation failed: %s", err)
tlog.Warnf("[peqeditorsql] tailNext creation failed: %s", err)
t.Disconnect(ctx)
return
}

err = tail2.restart(msgChan)
if err != nil {
tlog.Warnf("[peqeditorsql] tail2 start failed: %s", err)
t.Disconnect(ctx)
return
}

ticker := time.NewTicker(12 * time.Hour)
select {
case <-t.ctx.Done():
return
case <-ticker.C:
tail1.restart(msgChan)
tail2.restart(msgChan)
case line := <-msgChan:
for routeIndex, route := range t.config.Routes {
if !route.IsEnabled {
continue
}
pattern, err := regexp.Compile(route.Trigger.Regex)
if err != nil {
tlog.Debugf("[peqeditorsql] compile route %d skipped: %s", routeIndex, err)
continue
}
matches := pattern.FindAllStringSubmatch(line, -1)
if len(matches) == 0 {
continue
}

name := ""
message := ""
if route.Trigger.MessageIndex > 0 && route.Trigger.MessageIndex <= len(matches[0]) {
message = matches[0][route.Trigger.MessageIndex]
}
if route.Trigger.NameIndex > 0 && route.Trigger.NameIndex <= len(matches[0]) {
name = matches[0][route.Trigger.NameIndex]
}

buf := new(bytes.Buffer)
if err := route.MessagePatternTemplate().Execute(buf, struct {
Name string
Message string
}{
name,
message,
}); err != nil {
tlog.Warnf("[peqeditorsql] execute route %d skipped: %s", routeIndex, err)
continue
}
switch route.Target {
case "discord":
req := request.DiscordSend{
Ctx: ctx,
ChannelID: route.ChannelID,
Message: buf.String(),
}
for i, s := range t.subscribers {
err = s(req)
if err != nil {
tlog.Warnf("[peqeditorsql->discord subscriber %d] channel %s message %s failed: %s", i, route.ChannelID, req.Message)
continue
}
tlog.Infof("[peqeditorsql->discord subscribe %d] channel %s message: %s", i, route.ChannelID, req.Message)
}
default:
tlog.Warnf("[peqeditorsql] unsupported target type: %s", route.Target)
continue
}
for {
//ticker := time.NewTicker(12 * time.Hour)
select {
case <-t.ctx.Done():
return
// case <-ticker.C:
// tailCurrent.restart(msgCurrentChan)
// tailNextMonth.restart(msgCurrentChan)
case line := <-msgCurrentChan:
t.handleMessage(ctx, line)
case line := <-msgNextChan:
t.handleMessage(ctx, line)
tlog.Infof("[peqeditorsql] new month file activity detected, rotating log parsing")
tailCurrent.cancel()
tailNextMonth.cancel()
tailCtx.Done()

time.Sleep(500 * time.Millisecond)
go t.loop(ctx)
return
}
}
}
Expand All @@ -214,7 +158,7 @@ func (t *PEQEditorSQL) Disconnect(ctx context.Context) error {
return nil
}
if !t.isConnected {
tlog.Debugf("[peqeditorsql] already disconnected, skipping disconnect")
//tlog.Debugf("[peqeditorsql] already disconnected, skipping disconnect")
return nil
}
t.cancel()
Expand All @@ -234,3 +178,65 @@ func (t *PEQEditorSQL) Subscribe(ctx context.Context, onMessage func(interface{}
t.subscribers = append(t.subscribers, onMessage)
return nil
}

func (t *PEQEditorSQL) handleMessage(ctx context.Context, line string) {
isSent := false
for routeIndex, route := range t.config.Routes {
if !route.IsEnabled {
continue
}
pattern, err := regexp.Compile(route.Trigger.Regex)
if err != nil {
tlog.Debugf("[peqeditorsql] compile route %d skipped: %s", routeIndex, err)
continue
}
matches := pattern.FindAllStringSubmatch(line, -1)
if len(matches) == 0 {
continue
}

name := ""
message := ""
if route.Trigger.MessageIndex > 0 && route.Trigger.MessageIndex <= len(matches[0]) {
message = matches[0][route.Trigger.MessageIndex]
}
if route.Trigger.NameIndex > 0 && route.Trigger.NameIndex <= len(matches[0]) {
name = matches[0][route.Trigger.NameIndex]
}

buf := new(bytes.Buffer)
if err := route.MessagePatternTemplate().Execute(buf, struct {
Name string
Message string
}{
name,
message,
}); err != nil {
tlog.Warnf("[peqeditorsql] execute route %d skipped: %s", routeIndex, err)
continue
}
switch route.Target {
case "discord":
req := request.DiscordSend{
Ctx: ctx,
ChannelID: route.ChannelID,
Message: buf.String(),
}
for i, s := range t.subscribers {
err = s(req)
if err != nil {
tlog.Warnf("[peqeditorsql->discord subscriber %d] channel %s message %s failed: %s", i, route.ChannelID, req.Message)
continue
}
tlog.Infof("[peqeditorsql->discord subscribe %d] channel %s message: %s", i, route.ChannelID, req.Message)
}
isSent = true
default:
tlog.Warnf("[peqeditorsql] unsupported target type: %s", route.Target)
continue
}
}
if !isSent {
tlog.Debugf("[peqeditorsql] message '%s' was not sent (no route enabled)", line)
}
}
71 changes: 71 additions & 0 deletions peqeditorsql/peqeditorsql_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package peqeditorsql

import (
"context"
"fmt"
"os"
"testing"
"time"

"github.com/xackery/talkeq/config"
)

func TestLogRotation(t *testing.T) {
if os.Getenv("SINGLE_TEST") != "1" {
t.Skip("skipping test; SINGLE_TEST not set")
}
client, err := New(context.Background(), config.PEQEditorSQL{
IsEnabled: true,
Path: ".",
FilePattern: "sql_log_{{.Month}}-{{.Year}}.sql",
})
if err != nil {
t.Fatalf("new client: %s", err)
}

path1 := fmt.Sprintf("sql_log_%s-%d.sql", time.Now().Format("01"), time.Now().Year())

fmt.Println("test priming 'test'", path1)
w1, err := os.Create(path1)
if err != nil {
t.Fatalf("create: %s", err)
}
defer func() {
w1.Close()
os.Remove(path1)
}()
w1.WriteString("test\n")
err = client.Connect(context.Background())
if err != nil {
t.Fatalf("connect: %s", err)
}

time.Sleep(1 * time.Second)

fmt.Println("test update 'test2'", path1)
w1.WriteString("test2\n")

time.Sleep(1 * time.Second)

path2 := fmt.Sprintf("sql_log_%s-%d.sql", time.Now().AddDate(0, 1, 0).Format("01"), time.Now().Year())

w2, err := os.Create(path2)
if err != nil {
t.Fatalf("create: %s", err)
}
defer func() {
w2.Close()
os.Remove(path2)
}()

fmt.Println("test writing 'test3'", path2)
w2.WriteString("test3\n")

time.Sleep(1 * time.Second)

fmt.Println("test cleaning up")
client.Disconnect(context.Background())
//time.Sleep(1 * time.Second)
fmt.Println("test finished")

}
30 changes: 15 additions & 15 deletions peqeditorsql/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@ type tailWatch struct {
}

type tailReq struct {
id int
id string
filePattern string
basePath string
cfg tail.Config
isNextMonth bool
}

func newTailWatch(rootCtx context.Context, req *tailReq, msgChan chan string) (*tailWatch, error) {

e := &tailWatch{
rootCtx: rootCtx,
req: req,
Expand All @@ -50,14 +48,13 @@ func (e *tailWatch) restart(msgChan chan string) error {
if e.cancel != nil {
e.cancel()
}
time.Sleep(1 * time.Second)
e.ctx, e.cancel = context.WithCancel(context.Background())
buf := new(bytes.Buffer)
tmpl := template.New("filePattern")
tmpl.Parse(e.req.filePattern)

month := time.Now().Format("01")
if e.req.isNextMonth {
if e.req.id == "Next" {
month = time.Now().AddDate(0, 1, 0).Format("01")
}

Expand All @@ -79,26 +76,29 @@ func (e *tailWatch) restart(msgChan chan string) error {
if err != nil {
return fmt.Errorf("tail: %w", err)
}
tlog.Infof("[peqeditorsql] tail%s watching %s", e.req.id, finalPath)
go e.loop(msgChan)
return nil
}

func (e *tailWatch) loop(msgChan chan string) {
defer func() {
tlog.Debugf("[peqeditorsql] tail%d loop exiting for %s", e.req.id, e.tailer.Filename)
tlog.Debugf("[peqeditorsql] tail%s loop exiting for %s", e.req.id, e.tailer.Filename)
e.tailer.Cleanup()
}()

select {
case <-e.rootCtx.Done():
return
case <-e.ctx.Done():
return
case line := <-e.tailer.Lines:
if line.Err != nil {
tlog.Warnf("[peqeditorsql] tail%d error: %s", e.req.id, line.Err)
for {
select {
case <-e.rootCtx.Done():
return
case <-e.ctx.Done():
return
case line := <-e.tailer.Lines:
if line.Err != nil {
tlog.Warnf("[peqeditorsql] tail%s error: %s", e.req.id, line.Err)
return
}
msgChan <- line.Text
}
msgChan <- line.Text
}
}

0 comments on commit 746a2d1

Please sign in to comment.