Skip to content

Commit

Permalink
Adds go-kit Endoint functionality
Browse files Browse the repository at this point in the history
Now it is possible to use an Endpoint instead of implementing a BatchProcessor
or Processor.
  • Loading branch information
andremissaglia committed Aug 24, 2020
1 parent 3e09754 commit 706a394
Show file tree
Hide file tree
Showing 7 changed files with 332 additions and 0 deletions.
21 changes: 21 additions & 0 deletions engine/batchstreamengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ import (
"time"

"github.com/arquivei/goduck"
"github.com/arquivei/goduck/middleware/processormiddleware"

"github.com/arquivei/foundationkit/errors"
"github.com/go-kit/kit/endpoint"
)

// BatchStreamEngine is an engine that processes a batch of messages from
// a stream, with the order preserved.
type BatchStreamEngine struct {
streams []goduck.Stream
nWorkers int
Expand All @@ -23,6 +27,22 @@ type BatchStreamEngine struct {
processorError error
}

// NewFromEndpoint creates a BatchProcessor from a go-kit endpoint
func NewFromEndpoint(
processor endpoint.Endpoint,
maxBatchSize int,
maxBatchTimeout time.Duration,
streams []goduck.Stream,
) *BatchStreamEngine {
return New(
processormiddleware.WrapEndpointInProcessor(processor),
maxBatchSize,
maxBatchTimeout,
streams,
)
}

// New creates a new BackStreamEngine.
func New(
processor goduck.BatchProcessor,
maxBatchSize int,
Expand All @@ -42,6 +62,7 @@ func New(
return engine
}

// Run starts processing the messages, until @ctx is closed
func (e *BatchStreamEngine) Run(ctx context.Context) error {
ctx, e.cancelFn = context.WithCancel(ctx)

Expand Down
15 changes: 15 additions & 0 deletions engine/jobpoolengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ import (
"io"

"github.com/arquivei/goduck"
"github.com/arquivei/goduck/middleware/processormiddleware"
"github.com/go-kit/kit/endpoint"

"github.com/arquivei/foundationkit/errors"
)

// JobPoolEngine processes the messages from a JobPool in parallel, without any
// ordering guarantees
type JobPoolEngine struct {
queue goduck.MessagePool
nextMessage chan goduck.RawMessage
Expand All @@ -19,6 +23,16 @@ type JobPoolEngine struct {
processorError error
}

// NewFromEndpoint creates a JobPoolEngine from a go-kit endpoint
func NewFromEndpoint(queue goduck.MessagePool, processor endpoint.Endpoint, nWorkers int) *JobPoolEngine {
return New(
queue,
processormiddleware.WrapEndpointInProcessor(processor),
nWorkers,
)
}

// New creates a new JobPoolEngine
func New(queue goduck.MessagePool, processor goduck.Processor, nWorkers int) *JobPoolEngine {
engine := &JobPoolEngine{
queue: queue,
Expand All @@ -31,6 +45,7 @@ func New(queue goduck.MessagePool, processor goduck.Processor, nWorkers int) *Jo
return engine
}

// Run starts processing the messages, until @ctx is closed
func (e *JobPoolEngine) Run(ctx context.Context) error {
ctx, e.cancelFn = context.WithCancel(ctx)
for i := 0; i < e.nWorkers; i++ {
Expand Down
18 changes: 18 additions & 0 deletions engine/streamengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@ import (
"sync"

"github.com/arquivei/goduck"
"github.com/arquivei/goduck/middleware/processormiddleware"
"github.com/go-kit/kit/endpoint"

"github.com/arquivei/foundationkit/errors"
)

// StreamEngine is an engine that processes a of messages from a stream, with
// the order preserved.
type StreamEngine struct {
streams []goduck.Stream
nWorkers int
Expand All @@ -20,6 +24,18 @@ type StreamEngine struct {
processorError error
}

// NewFromEndpoint creates a StreamEngine from a go-kit endpoint
func NewFromEndpoint(
processor endpoint.Endpoint,
streams []goduck.Stream,
) *StreamEngine {
return New(
processormiddleware.WrapEndpointInProcessor(processor),
streams,
)
}

// New creates a new StreamEngine
func New(processor goduck.Processor, streams []goduck.Stream) *StreamEngine {
engine := &StreamEngine{
streams: streams,
Expand All @@ -32,6 +48,7 @@ func New(processor goduck.Processor, streams []goduck.Stream) *StreamEngine {
return engine
}

// Run starts processing the messages, until @ctx is closed
func (e *StreamEngine) Run(ctx context.Context) error {
ctx, e.cancelFn = context.WithCancel(ctx)

Expand All @@ -56,6 +73,7 @@ func (e *StreamEngine) pollMessages(ctx context.Context, stream goduck.Stream) {
e.handleMessage(ctx, stream, msg)
}
}

func (e *StreamEngine) handleMessage(ctx context.Context, stream goduck.Stream, msg goduck.RawMessage) {
for {
err := e.processor.Process(context.Background(), msg.Bytes())
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/Shopify/sarama v1.26.1
github.com/arquivei/foundationkit v0.0.0-20200302211010-306c93c13d0d
github.com/confluentinc/confluent-kafka-go v1.4.2 // indirect
github.com/go-kit/kit v0.10.0
github.com/imkira/go-observer v1.0.3
github.com/rs/zerolog v1.14.3
github.com/segmentio/kafka-go v0.3.5
Expand Down
Loading

0 comments on commit 706a394

Please sign in to comment.