Skip to content

Commit

Permalink
The mediator pattern implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivan Sushkov committed May 29, 2024
1 parent ced92b6 commit d61b353
Show file tree
Hide file tree
Showing 6 changed files with 724 additions and 4 deletions.
90 changes: 90 additions & 0 deletions grade/internal/infrastructure/seedwork/mediator/global.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package mediator

import (
"context"
"reflect"
"sync"

"github.com/emacsway/grade/grade/internal/domain/seedwork/disposable"
"github.com/hashicorp/go-multierror"
)

var (
hLock = sync.RWMutex{}
sLock = sync.RWMutex{}

handlers = map[reflect.Type]reflect.Value{}
subscribers = map[reflect.Type]map[reflect.Value]struct{}{}
)

func Send[T any](ctx context.Context, command T) error {
hLock.RLock()
defer hLock.RUnlock()

commandType := reflect.TypeOf(command)
if handler, found := handlers[commandType]; found {
return call(handler, ctx, command)
}

return nil
}

func Register[T any](command T, handler func(context.Context, T) error) disposable.Disposable {
hLock.Lock()
defer hLock.Unlock()

commandType := reflect.TypeOf(command)
handlers[commandType] = reflect.ValueOf(handler)

return disposable.NewDisposable(func() {
Unregister(command)
})
}

func Unregister[T any](command T) {
hLock.Lock()
defer hLock.Unlock()

commandType := reflect.TypeOf(command)
delete(handlers, commandType)
}

func Subscribe[E any](event E, handler func(context.Context, E) error) disposable.Disposable {
sLock.Lock()
defer sLock.Unlock()

eventType := reflect.TypeOf(event)
if _, found := subscribers[eventType]; !found {
subscribers[eventType] = map[reflect.Value]struct{}{}
}

handlerValue := reflect.ValueOf(handler)
subscribers[eventType][handlerValue] = struct{}{}

return disposable.NewDisposable(func() {
Unsubscribe(event, handler)
})
}

func Unsubscribe[E any](event E, handler func(context.Context, E) error) {
sLock.Lock()
defer sLock.Unlock()

eventType := reflect.TypeOf(event)
handlerValue := reflect.ValueOf(handler)

delete(subscribers[eventType], handlerValue)
}

func Publish[E any](ctx context.Context, event E) error {
sLock.RLock()
defer sLock.RUnlock()

var errs error
eventType := reflect.TypeOf(event)
for handler := range subscribers[eventType] {
errs = multierror.Append(errs, call(handler, ctx, event))
}

return errs
}
217 changes: 217 additions & 0 deletions grade/internal/infrastructure/seedwork/mediator/global_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
package mediator

import (
"context"
"errors"
"reflect"
"testing"

"github.com/hashicorp/go-multierror"
"github.com/stretchr/testify/assert"
)

type eventHandler struct {
counter int
}

func (e *eventHandler) Handle(ctx context.Context, t Event) error {
e.counter += 1
return nil
}

func TestGlobalMediator(t *testing.T) {

tests := []struct {
name string

assertion func(t *testing.T)
}{
{
name: "test_publish",

assertion: func(t *testing.T) {
times := 0
handler := func(ctx context.Context, t Event) error {
times++
return nil
}

times2 := 0
handler2 := func(ctx context.Context, t Event) error {
times2++
return nil
}

Subscribe(Event{}, handler)
Subscribe(Event{}, handler2)

_ = Publish(context.Background(), Event{})
assert.Equal(t, 1, times)
assert.Equal(t, 1, times2)
},
},

{
name: "test_struct_subscriber",

assertion: func(t *testing.T) {

handler := eventHandler{}
Subscribe(Event{}, handler.Handle)

_ = Publish(context.Background(), Event{})
assert.Equal(t, 1, handler.counter)
},
},

{
name: "test_unsubscribe",
assertion: func(t *testing.T) {

times := 0
handler := func(ctx context.Context, e Event) error {
times++

return nil
}

times2 := 0
handler2 := func(ctx context.Context, e Event) error {
times2++

return nil
}

Subscribe(Event{}, handler)
Subscribe(Event{}, handler2)
Unsubscribe(Event{}, handler)

_ = Publish(context.Background(), Event{})

assert.Equal(t, 0, times)
assert.Equal(t, 1, times2)
},
},

{
name: "test_disposable_event",
assertion: func(t *testing.T) {

times := 0
handler := func(ctx context.Context, e Event) error {
times++
return nil
}

times2 := 0
handler2 := func(ctx context.Context, e Event) error {
times2++
return nil
}

Subscribe(Event{}, handler2)

disposable := Subscribe(Event{}, handler)
disposable.Dispose()

_ = Publish(context.Background(), Event{})

assert.Equal(t, 0, times)
assert.Equal(t, 1, times2)
},
},

{
name: "test_send",
assertion: func(t *testing.T) {
times := 0
handler := func(ctx context.Context, e Command) error {
times++
return nil
}

Register(Command{}, handler)
_ = Send(context.Background(), Command{})

assert.Equal(t, 1, times)
},
},

{
name: "test_unregister",
assertion: func(t *testing.T) {
times := 0
handler := func(ctx context.Context, e Command) error {
times++
return nil
}

Register(Command{}, handler)
Unregister(Command{})

_ = Send(context.Background(), Command{})

assert.Equal(t, 0, times)
},
},

{
name: "test_disposable_command",
assertion: func(t *testing.T) {
times := 0
handler := func(ctx context.Context, e Command) error {
times++
return nil
}

disposable := Register[Command](Command{}, handler)
disposable.Dispose()

_ = Send[Command](context.Background(), Command{})

assert.Equal(t, 0, times)
},
},

{
name: "test_returning_errors",
assertion: func(t *testing.T) {
handlerError := errors.New("")

handler := func(ctx context.Context, e Command) error {
return handlerError
}

handler2 := func(ctx context.Context, e Event) error {
return handlerError
}

handler3 := func(ctx context.Context, e Event) error {
return handlerError
}

Register(Command{}, handler)

Subscribe(Event{}, handler2)
Subscribe(Event{}, handler3)

var errs error
errs = multierror.Append(errs, handlerError, handlerError)
assert.Equal(t, errs, Publish(context.Background(), Event{}))

assert.Equal(t, handlerError, Send(context.Background(), Command{}))
},
},
}

for _, tt := range tests {
tt := tt

t.Run(tt.name, func(t *testing.T) {
handlers = map[reflect.Type]reflect.Value{}
subscribers = map[reflect.Type]map[reflect.Value]struct{}{}

tt.assertion(t)
})
}
}
4 changes: 0 additions & 4 deletions grade/internal/infrastructure/seedwork/mediator/mediator.go

This file was deleted.

52 changes: 52 additions & 0 deletions grade/internal/infrastructure/seedwork/mediator/reflection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package mediator

import (
"context"
"reflect"
)

var ctxInterface = reflect.TypeOf((*context.Context)(nil)).Elem()

// call - вызывает обработчик
func call(callable reflect.Value, args ...any) error {
in := make([]reflect.Value, 0, len(args))

for _, arg := range args {
in = append(in, reflect.ValueOf(arg))
}

result := callable.Call(in)

if len(result) != 1 {
return nil
}

if err, ok := result[0].Interface().(error); ok {
return err
}

return nil
}

// compareWithHandlerSignature - проверяет, что handler имеет правильную сигнатуру
func compareWithHandlerSignature(initiator any, handler any) error {

Check failure on line 32 in grade/internal/infrastructure/seedwork/mediator/reflection.go

View workflow job for this annotation

GitHub Actions / lint

paramTypeCombine: func(initiator any, handler any) error could be replaced with func(initiator, handler any) error (gocritic)

Check failure on line 32 in grade/internal/infrastructure/seedwork/mediator/reflection.go

View workflow job for this annotation

GitHub Actions / lint

paramTypeCombine: func(initiator any, handler any) error could be replaced with func(initiator, handler any) error (gocritic)
handlerType := reflect.TypeOf(handler)
if handlerType.Kind() != reflect.Func {
return ErrNonCallableHandler
}

if handlerType.NumIn() < 2 {
return ErrUnsuitableHandlerSignature
}

if !handlerType.In(0).Implements(ctxInterface) {
return ErrUnsuitableHandlerSignature
}

initiatorType := reflect.TypeOf(initiator)
if handlerType.In(1) != initiatorType {
return ErrUnsuitableHandlerSignature
}

return nil
}
Loading

0 comments on commit d61b353

Please sign in to comment.