Skip to content

Commit

Permalink
init (#1)
Browse files Browse the repository at this point in the history
* init
  • Loading branch information
alexeyxo authored Jul 17, 2024
1 parent 0969305 commit 19ee56f
Show file tree
Hide file tree
Showing 41 changed files with 3,829 additions and 4 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: golangci-lint
name: Go Lint
on:
push:
branches:
Expand All @@ -18,9 +18,9 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-go@v4
with:
go-version: '1.21'
go-version: '1.22'
cache: false
- name: golangci-lint
- name: Golang lint
uses: golangci/golangci-lint-action@v3
with:
# Require: The version of golangci-lint to use.
Expand All @@ -35,7 +35,7 @@ jobs:
#
# Note: By default, the `.golangci.yml` file should be at the root of the repository.
# The location of the configuration file can be changed by using `--config=`
# args: --timeout=30m --config=/my/path/.golangci.yml --issues-exit-code=0
# args: --timeout=30m --config=/my/path/.golangci.yml --issues-exit-code=0

# Optional: show only new issues if it's a pull request. The default value is `false`.
# only-new-issues: true
Expand Down
23 changes: 23 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
name: Go Test

on: [ push ]

permissions:
contents: read

jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: '1.21.x'
- name: Install dependencies
run: |
go mod tidy
- name: Testing
run: |
go test -v ./
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@

# Go workspace file
go.work
.DS_Store
.idea
18 changes: 18 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
.PHONY: install-linter
install-linter:
@go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest

.PHONY: lint
lint:
@golangci-lint run --enable=gocritic,gocyclo,gofmt,gosec,misspell,unparam,asciicheck --timeout=30m

.PHONY: test
test:
@go test -count=1 -v ./

.PHONY: proto
proto:
@rm -rf service/proto/*
@docker run -v $(PWD):/defs namely/protoc-all:1.51_2 -i proto -d proto -o go -l go && \
mv go/github.com/pipelane/pipelaner/internal/service/* internal/service/ && \
rm -rf go
49 changes: 49 additions & 0 deletions agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) 2023 Alexey Khokhlov
*/

package pipelaner

import (
"context"
"os"
"os/signal"
"syscall"
)

type Agent struct {
tree *TreeLanes
ctx context.Context
cancel context.CancelFunc
}

func NewAgent(
file string,
) (*Agent, error) {

ctx, stop := signal.NotifyContext(
context.Background(),
os.Interrupt,
syscall.SIGTERM,
)
t, err := NewTreeFrom(
ctx,
file,
)
if err != nil {
return nil, err
}
return &Agent{
tree: t,
ctx: ctx,
cancel: stop,
}, err
}

func (a *Agent) Serve() {
<-a.ctx.Done()
}

func (a *Agent) Stop() {
a.cancel()
}
44 changes: 44 additions & 0 deletions channels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2023 Alexey Khokhlov
*/

package pipelaner

import (
"context"
"sync"
)

func mergeInputs[T any](ctx context.Context, chs ...chan T) chan T {
if len(chs) == 1 {
return chs[0]
}
lens := 0
for i := range chs {
lens += cap(chs[i])
}
res := make(chan T, lens)
gr := sync.WaitGroup{}
gr.Add(len(chs))
go func() {
gr.Wait()
close(res)
}()
for _, ch := range chs {
go func(c chan T) {
defer gr.Done()
for {
select {
case <-ctx.Done():
break
case v, ok := <-c:
if !ok {
break
}
res <- v
}
}
}(ch)
}
return res
}
53 changes: 53 additions & 0 deletions channels_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2023 Alexey Khokhlov
*/

package pipelaner

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
)

func Test_mergeInputs(t *testing.T) {
type args[T any] struct {
ctx context.Context
chs []chan T
}
type testCase[T any] struct {
name string
args args[T]
want int
}
tests := []testCase[int]{
{
name: "test 1:2 len = 3",
args: args[int]{
ctx: context.Background(),
chs: []chan int{
make(chan int, 1),
make(chan int, 2),
},
},
want: 3,
},
{
name: "test 1:1 len = 2",
args: args[int]{
ctx: context.Background(),
chs: []chan int{
make(chan int, 1),
make(chan int, 1),
},
},
want: 2,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.want, cap(mergeInputs(tt.args.ctx, tt.args.chs...)))
})
}
}
142 changes: 142 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright (c) 2023 Alexey Khokhlov
*/

package pipelaner

import (
"errors"

"github.com/BurntSushi/toml"
"github.com/mitchellh/mapstructure"
)

var (
ErrNameNotBeEmptyString = errors.New("ErrNameNotBeEmptyString")
)

type LaneTypes string

const (
InputType LaneTypes = "input"
MapType LaneTypes = "map"
SinkType LaneTypes = "sink"
)

type config struct {
Input map[string]any `pipeline:"input"`
Map map[string]any `pipeline:"map"`
Sink map[string]any `pipeline:"sink"`
}

type Internal struct {
Name string `pipelane:"-"`
LaneType LaneTypes `pipelane:"-"`
Extended any `pipelane:"-"`
_extended map[string]any `pipelane:"-"`
}

type BaseLaneConfig struct {
BufferSize int64 `pipelane:"buffer"`
Threads *int64 `pipelane:"threads"`
SourceName string `pipelane:"source_name"`
Inputs []string `pipelane:"inputs"`
Internal
}

func newConfig(c map[string]any) (*config, error) {
cfg := &config{}
dC := &mapstructure.DecoderConfig{
TagName: "pipelane",
Result: &cfg,
}
dec, err := mapstructure.NewDecoder(dC)
if err != nil {
return nil, err
}
err = dec.Decode(c)
if err != nil {
return nil, err
}
return cfg, nil
}

func NewBaseConfigWithTypeAndExtended(
itemType LaneTypes,
name string,
extended map[string]any,
) (*BaseLaneConfig, error) {
if name == "" {
return nil, ErrNameNotBeEmptyString
}
c := BaseLaneConfig{
Internal: Internal{
LaneType: itemType,
Name: name,
_extended: extended,
},
}
err := decode(extended, &c)
if err != nil {
return nil, err
}
if itemType == InputType {
c.Inputs = nil
}
if c.BufferSize == 0 {
c.BufferSize = 1
}
return &c, nil
}

func readToml(file string) (map[string]any, error) {
var c map[string]any
_, err := toml.DecodeFile(file, &c)
if err != nil {
return nil, err
}
return c, nil
}

func decodeTomlString(str string) (map[string]any, error) {
var c map[string]any
_, err := toml.Decode(str, &c)
if err != nil {
return nil, err
}
return c, nil
}

func NewBaseConfig(val map[string]any) (*BaseLaneConfig, error) {
var cfg BaseLaneConfig
err := decode(val, &cfg)
if err != nil {
return nil, err
}
return &cfg, nil
}

func (c *BaseLaneConfig) ParseExtended(v any) error {
err := decode(c._extended, v)
if err != nil {
return err
}
c.Extended = v
return nil
}

func decode(input map[string]any, output any) error {
dC := &mapstructure.DecoderConfig{
TagName: "pipelane",
Result: output,
}
dec, err := mapstructure.NewDecoder(dC)
if err != nil {
return err
}
err = dec.Decode(input)
if err != nil {
return err
}
return nil
}
Loading

0 comments on commit 19ee56f

Please sign in to comment.