Skip to content

Commit

Permalink
Initial webhooks implementation (#3)
Browse files Browse the repository at this point in the history
* Refactor KafkaBridge -> KafkaCommon+KafkaBridge for webhooks

* Refactor common parts out of kafka bridge

* Skeleton of webhooks code

* JSON working, need YAML->JSON code

* Fix go deps

Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
  • Loading branch information
peterbroadhurst authored Jul 4, 2018
1 parent a676416 commit c154c61
Show file tree
Hide file tree
Showing 11 changed files with 1,302 additions and 787 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ BINARY_WIN=$(BINARY_NAME)-win

all: deps build test
build:
$(GOBUILD) -o $(BINARY_NAME) -v
$(GOBUILD) -tags=prod -o $(BINARY_NAME) -v
test:
$(GOTEST) ./... -cover -coverprofile=coverage.txt -covermode=atomic
clean:
Expand All @@ -30,6 +30,7 @@ deps:
$(GOGET) github.com/nu7hatch/gouuid
$(GOGET) github.com/stretchr/testify/assert
$(GOGET) github.com/golang/mock/gomock
$(GOGET) gopkg.in/yaml.v2

build-linux:
GOOS=linux GOARCH=amd64 $(GOBUILD) -o $(BINARY_UNIX) -v
Expand Down
4 changes: 4 additions & 0 deletions cmd/ethconnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"

"github.com/kaleido-io/ethconnect/internal/kldkafka"
"github.com/kaleido-io/ethconnect/internal/kldwebhooks"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -57,6 +58,9 @@ func init() {

kafkaBridge := kldkafka.NewKafkaBridge()
rootCmd.AddCommand(kafkaBridge.CobraInit())

webhooksBridge := kldwebhooks.NewWebhooksBridge()
rootCmd.AddCommand(webhooksBridge.CobraInit())
}

// Execute is called by the main method of the package
Expand Down
51 changes: 32 additions & 19 deletions internal/kldkafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,55 @@
package kldkafka

import (
"sync"

"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
)

// We define minimal interfaces for the parts of the sarama/sarama-cluster
// package we use, to allow for stubbing in our unit tests

type kafkaFactory interface {
newClient(*KafkaBridge, *cluster.Config) (kafkaClient, error)
}

type kafkaClient interface {
newProducer(*KafkaBridge) (kafkaProducer, error)
newConsumer(*KafkaBridge) (kafkaConsumer, error)
Brokers() []*sarama.Broker
// KafkaGoRoutines defines goroutines for processing Kafka messages from KafkaCommon
type KafkaGoRoutines interface {
ConsumerMessagesLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup)
ProducerErrorLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup)
ProducerSuccessLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup)
}

type kafkaProducer interface {
// KafkaProducer provides the interface passed from KafkaCommon to produce messages (subset of sarama)
type KafkaProducer interface {
AsyncClose()
Input() chan<- *sarama.ProducerMessage
Successes() <-chan *sarama.ProducerMessage
Errors() <-chan *sarama.ProducerError
}

type kafkaConsumer interface {
// KafkaConsumer provides the interface passed from KafkaCommon to consume messages (subset of sarama-cluster)
type KafkaConsumer interface {
Close() error
Messages() <-chan *sarama.ConsumerMessage
Notifications() <-chan *cluster.Notification
Errors() <-chan error
MarkOffset(*sarama.ConsumerMessage, string)
}

type saramaKafkaFactory struct{}
// KafkaFactory builds new clients
type KafkaFactory interface {
NewClient(KafkaCommon, *cluster.Config) (KafkaClient, error)
}

// KafkaClient is the kafka client
type KafkaClient interface {
NewProducer(KafkaCommon) (KafkaProducer, error)
NewConsumer(KafkaCommon) (KafkaConsumer, error)
Brokers() []*sarama.Broker
}

// SaramaKafkaFactory - uses sarama and sarama-cluster
type SaramaKafkaFactory struct{}

func (f saramaKafkaFactory) newClient(k *KafkaBridge, clientConf *cluster.Config) (c kafkaClient, err error) {
if client, err := cluster.NewClient(k.Conf.Brokers, clientConf); err == nil {
// NewClient - returns a new sarama-cluster client
func (f *SaramaKafkaFactory) NewClient(k KafkaCommon, clientConf *cluster.Config) (c KafkaClient, err error) {
var client *cluster.Client
if client, err = cluster.NewClient(k.Conf().Brokers, clientConf); err == nil {
c = &saramaKafkaClient{client: client}
}
return
Expand All @@ -64,10 +77,10 @@ func (c *saramaKafkaClient) Brokers() []*sarama.Broker {
return c.client.Brokers()
}

func (c *saramaKafkaClient) newProducer(k *KafkaBridge) (kafkaProducer, error) {
func (c *saramaKafkaClient) NewProducer(k KafkaCommon) (KafkaProducer, error) {
return sarama.NewAsyncProducerFromClient(c.client.Client)
}

func (c *saramaKafkaClient) newConsumer(k *KafkaBridge) (kafkaConsumer, error) {
return cluster.NewConsumerFromClient(c.client, k.Conf.ConsumerGroup, []string{k.Conf.TopicIn})
func (c *saramaKafkaClient) NewConsumer(k KafkaCommon) (KafkaConsumer, error) {
return cluster.NewConsumerFromClient(c.client, k.Conf().ConsumerGroup, []string{k.Conf().TopicIn})
}
156 changes: 156 additions & 0 deletions internal/kldkafka/clientmock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright 2018 Kaleido, a ConsenSys business

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

// http://www.apache.org/licenses/LICENSE-2.0

// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kldkafka

import (
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
)

// MockKafkaFactory - mock
type MockKafkaFactory struct {
ClientConf *cluster.Config
ErrorOnNewClient error
ErrorOnNewProducer error
ErrorOnNewConsumer error
Producer *MockKafkaProducer
Consumer *MockKafkaConsumer
}

// NewMockKafkaFactory - mock
func NewMockKafkaFactory() *MockKafkaFactory {
return &MockKafkaFactory{}
}

// NewErrorMockKafkaFactory - mock
func NewErrorMockKafkaFactory(errorOnNewClient error, errorOnNewConsumer error, errorOnNewProducer error) *MockKafkaFactory {
f := NewMockKafkaFactory()
f.ErrorOnNewClient = errorOnNewClient
f.ErrorOnNewConsumer = errorOnNewConsumer
f.ErrorOnNewProducer = errorOnNewProducer
return f
}

// NewClient - mock
func (f *MockKafkaFactory) NewClient(k KafkaCommon, clientConf *cluster.Config) (KafkaClient, error) {
f.ClientConf = clientConf
return f, f.ErrorOnNewClient
}

// Brokers - mock
func (f *MockKafkaFactory) Brokers() []*sarama.Broker {
return []*sarama.Broker{
&sarama.Broker{},
}
}

// NewProducer - mock
func (f *MockKafkaFactory) NewProducer(k KafkaCommon) (KafkaProducer, error) {
f.Producer = &MockKafkaProducer{
MockInput: make(chan *sarama.ProducerMessage),
MockSuccesses: make(chan *sarama.ProducerMessage),
MockErrors: make(chan *sarama.ProducerError),
}
return f.Producer, f.ErrorOnNewProducer
}

// NewConsumer - mock
func (f *MockKafkaFactory) NewConsumer(k KafkaCommon) (KafkaConsumer, error) {
f.Consumer = &MockKafkaConsumer{
MockMessages: make(chan *sarama.ConsumerMessage),
MockNotifications: make(chan *cluster.Notification),
MockErrors: make(chan error),
OffsetsByPartition: make(map[int32]int64),
}
return f.Consumer, f.ErrorOnNewConsumer
}

// MockKafkaProducer - mock
type MockKafkaProducer struct {
MockInput chan *sarama.ProducerMessage
MockSuccesses chan *sarama.ProducerMessage
MockErrors chan *sarama.ProducerError
}

// AsyncClose - mock
func (p *MockKafkaProducer) AsyncClose() {
if p.MockInput != nil {
close(p.MockInput)
}
if p.MockSuccesses != nil {
close(p.MockSuccesses)
}
if p.MockErrors != nil {
close(p.MockErrors)
}
}

// Input - mock
func (p *MockKafkaProducer) Input() chan<- *sarama.ProducerMessage {
return p.MockInput
}

// Successes - mock
func (p *MockKafkaProducer) Successes() <-chan *sarama.ProducerMessage {
return p.MockSuccesses
}

// Errors - mock
func (p *MockKafkaProducer) Errors() <-chan *sarama.ProducerError {
return p.MockErrors
}

// MockKafkaConsumer - mock
type MockKafkaConsumer struct {
MockMessages chan *sarama.ConsumerMessage
MockNotifications chan *cluster.Notification
MockErrors chan error
OffsetsByPartition map[int32]int64
}

// Close - mock
func (c *MockKafkaConsumer) Close() error {
if c.MockMessages != nil {
close(c.MockMessages)
}
if c.MockNotifications != nil {
close(c.MockNotifications)
}
if c.MockErrors != nil {
close(c.MockErrors)
}
return nil
}

// Messages - mock
func (c *MockKafkaConsumer) Messages() <-chan *sarama.ConsumerMessage {
return c.MockMessages
}

// Notifications - mock
func (c *MockKafkaConsumer) Notifications() <-chan *cluster.Notification {
return c.MockNotifications
}

// Errors - mock
func (c *MockKafkaConsumer) Errors() <-chan error {
return c.MockErrors
}

// MarkOffset - mock
func (c *MockKafkaConsumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
c.OffsetsByPartition[msg.Partition] = msg.Offset
return
}
Loading

0 comments on commit c154c61

Please sign in to comment.