Skip to content

Commit

Permalink
add remap transformation (#11)
Browse files Browse the repository at this point in the history
* add remap transformation

* rename

* fix any arrays
  • Loading branch information
alexeyxo authored Aug 2, 2024
1 parent 11be100 commit dfcec8d
Show file tree
Hide file tree
Showing 6 changed files with 298 additions and 14 deletions.
1 change: 1 addition & 0 deletions source/embedded.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ import (
_ "github.com/pipelane/pipelaner/source/transform/chunks"
_ "github.com/pipelane/pipelaner/source/transform/debounce"
_ "github.com/pipelane/pipelaner/source/transform/filter"
_ "github.com/pipelane/pipelaner/source/transform/remap"
_ "github.com/pipelane/pipelaner/source/transform/throttling"
)
10 changes: 4 additions & 6 deletions source/sink/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,15 @@ import (
)

type Console struct {
logger zerolog.Logger
logger *zerolog.Logger
}

func init() {
pipelaner.RegisterSink("console", NewConsole())
pipelaner.RegisterSink("console", &Console{})
}

func NewConsole() *Console {
return &Console{}
}
func (c *Console) Init(_ *pipelaner.Context) error {
func (c *Console) Init(ctx *pipelaner.Context) error {
c.logger = ctx.Logger()
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions source/transform/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2024 Alexey Khokhlov
*/

package filter
package remap

import (
"encoding/json"
Expand All @@ -17,7 +17,7 @@ import (
)

var (
ErrInvalidDataType = errors.New("Error invalid sendded data type")
ErrInvalidDataType = errors.New("error invalid data type")
)

type EnvMap struct {
Expand Down
12 changes: 6 additions & 6 deletions source/transform/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2024 Alexey Khokhlov
*/

package filter
package remap

import (
"context"
Expand Down Expand Up @@ -44,7 +44,7 @@ func TestExprLanguage_Map(t *testing.T) {
name: "test filtering maps return nil",
args: args{
ctx: pipelaner.NewContext(context.Background(),
pipelaner.NewLaneItem(newCfg(pipelaner.SinkType,
pipelaner.NewLaneItem(newCfg(pipelaner.MapType,
map[string]any{
"code": "Data.count > 5",
}),
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestExprLanguage_String(t *testing.T) {
name: "test filtering string return nil",
args: args{
ctx: pipelaner.NewContext(context.Background(),
pipelaner.NewLaneItem(newCfg(pipelaner.SinkType,
pipelaner.NewLaneItem(newCfg(pipelaner.MapType,
map[string]any{
"code": "Data.count > 5",
}),
Expand All @@ -116,7 +116,7 @@ func TestExprLanguage_String(t *testing.T) {
name: "test filtering string return 10",
args: args{
ctx: pipelaner.NewContext(context.Background(),
pipelaner.NewLaneItem(newCfg(pipelaner.SinkType,
pipelaner.NewLaneItem(newCfg(pipelaner.MapType,
map[string]any{
"code": "Data.count > 5",
}),
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestExprLanguage_Bytes(t *testing.T) {
name: "test filtering string return nil",
args: args{
ctx: pipelaner.NewContext(context.Background(),
pipelaner.NewLaneItem(newCfg(pipelaner.SinkType,
pipelaner.NewLaneItem(newCfg(pipelaner.MapType,
map[string]any{
"code": "Data.count > 5",
}),
Expand All @@ -168,7 +168,7 @@ func TestExprLanguage_Bytes(t *testing.T) {
name: "test filtering string return 10",
args: args{
ctx: pipelaner.NewContext(context.Background(),
pipelaner.NewLaneItem(newCfg(pipelaner.SinkType,
pipelaner.NewLaneItem(newCfg(pipelaner.MapType,
map[string]any{
"code": "Data.count > 5",
}),
Expand Down
85 changes: 85 additions & 0 deletions source/transform/remap/remap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (c) 2024 Alexey Khokhlov
*/

package remap

import (
"encoding/json"
"errors"

"github.com/expr-lang/expr/vm"
"github.com/rs/zerolog"

"github.com/expr-lang/expr"

"github.com/pipelane/pipelaner"
)

var (
ErrInvalidDataType = errors.New("error invalid data type")
)

type EnvMap struct {
Data any
}

type ExprConfig struct {
Code string `pipelane:"code"`
}

type Remap struct {
cfg *pipelaner.BaseLaneConfig
logger zerolog.Logger
program *vm.Program
}

func init() {
pipelaner.RegisterMap("remap", &Remap{})
}

func (e *Remap) Init(ctx *pipelaner.Context) error {
e.cfg = ctx.LaneItem().Config()
e.logger = pipelaner.NewLogger()
v := &ExprConfig{}
err := e.cfg.ParseExtended(v)
if err != nil {
return err
}

program, err := expr.Compile(v.Code, expr.Env(EnvMap{}))
if err != nil {
return err
}
e.program = program
return nil
}

func (e *Remap) Map(_ *pipelaner.Context, val any) any {
var v any
switch value := val.(type) {
case map[string]any:
v = value
case map[string][]any:
v = value
case string:
b := []byte(value)
err := json.Unmarshal(b, &v)
if err != nil {
return err
}
case []byte:
err := json.Unmarshal(value, &v)
if err != nil {
return err
}
default:
return ErrInvalidDataType
}
output, err := expr.Run(e.program, EnvMap{Data: v})
if err != nil {
e.logger.Err(err).Msg("Expr: output error")
return err
}
return output
}
200 changes: 200 additions & 0 deletions source/transform/remap/remap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Copyright (c) 2024 Alexey Khokhlov
*/

package remap

import (
"context"
"testing"

"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/pipelane/pipelaner"
)

func newCfg(
itemType pipelaner.LaneTypes,
extended map[string]any,
) *pipelaner.BaseLaneConfig {
c, err := pipelaner.NewBaseConfigWithTypeAndExtended(
itemType,
"test_maps_sinks",
extended,
)
if err != nil {
return nil
}
return c
}

func TestExprLanguage_Map(t *testing.T) {
type args struct {
val any
ctx *pipelaner.Context
}
tests := []struct {
name string
args args
want any
}{
{
name: "test expr maps return nil",
args: args{
ctx: pipelaner.NewContext(context.Background(),
pipelaner.NewLaneItem(newCfg(pipelaner.MapType,
map[string]any{
"code": "{ \"value_name\": Data.name, \"value_price\": Data.price}",
}),
),
),
val: map[string]any{
"id": 1,
"name": "iPhone 12",
"price": 999,
"quantity": 1,
},
},
want: map[string]any{
"value_name": "iPhone 12",
"value_price": 999,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := &Remap{
logger: zerolog.Nop(),
}
err := e.Init(tt.args.ctx)
require.Nil(t, err)
got := e.Map(tt.args.ctx, tt.args.val)
assert.Equal(t, got, tt.want)
})
}
}

func TestExprLanguage_String(t *testing.T) {
type args struct {
val any
ctx *pipelaner.Context
}
tests := []struct {
name string
args args
want any
}{
{
name: "test remap string return nil",
args: args{
ctx: pipelaner.NewContext(context.Background(),
pipelaner.NewLaneItem(newCfg(pipelaner.MapType,
map[string]any{
"code": "{ \"value_name\": Data.name, \"value_price\": Data.price}",
}),
),
),
val: " {\"id\": 1,\"name\": \"iPhone 12\",\"price\": \"999\",\"quantity\": 1}",
},
want: map[string]any{
"value_name": "iPhone 12",
"value_price": "999",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := &Remap{
logger: zerolog.Nop(),
}
err := e.Init(tt.args.ctx)
require.Nil(t, err)
got := e.Map(tt.args.ctx, tt.args.val)
assert.Equal(t, got, tt.want)
})
}
}

func TestExprLanguage_StringArray(t *testing.T) {
type args struct {
val any
ctx *pipelaner.Context
}
tests := []struct {
name string
args args
want any
}{
{
name: "test remap string array",
args: args{
ctx: pipelaner.NewContext(context.Background(),
pipelaner.NewLaneItem(newCfg(pipelaner.MapType,
map[string]any{
"code": "{ \"value_name\": Data[0].name, \"value_price\": Data[0].price}",
}),
),
),
val: "[{\"id\": 1,\"name\": \"iPhone 12\",\"price\": \"999\",\"quantity\": 1}, {\"id\": 2,\"name\": \"iPhone 13\",\"price\": \"999\",\"quantity\": 1}]",
},
want: map[string]any{
"value_name": "iPhone 12",
"value_price": "999",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := &Remap{
logger: zerolog.Nop(),
}
err := e.Init(tt.args.ctx)
require.Nil(t, err)
got := e.Map(tt.args.ctx, tt.args.val)
assert.Equal(t, got, tt.want)
})
}
}

func TestExprLanguage_Bytes(t *testing.T) {
type args struct {
val any
ctx *pipelaner.Context
}
tests := []struct {
name string
args args
want any
}{
{
name: "test remap string return nil",
args: args{
ctx: pipelaner.NewContext(context.Background(),
pipelaner.NewLaneItem(newCfg(pipelaner.MapType,
map[string]any{
"code": "{ \"value_name\": Data.name, \"value_price\": Data.price}",
}),
),
),
val: []byte("{\"id\": 1,\"name\": \"iPhone 12\",\"price\": \"999\",\"quantity\": 1}"),
},
want: map[string]any{
"value_name": "iPhone 12",
"value_price": "999",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := &Remap{
logger: zerolog.Nop(),
}
err := e.Init(tt.args.ctx)
require.Nil(t, err)
got := e.Map(tt.args.ctx, tt.args.val)
assert.Equal(t, got, tt.want)
})
}
}

0 comments on commit dfcec8d

Please sign in to comment.