Skip to content

Commit

Permalink
Code coverage
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
  • Loading branch information
Peter Broadhurst authored and Peter Broadhurst committed Jun 26, 2018
1 parent 8fef95a commit 5b8d994
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 71 deletions.
138 changes: 71 additions & 67 deletions internal/kldkafka/kafkabridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,9 +329,7 @@ func (c *msgContext) Reply(pFullMsg interface{}) (err error) {
replyHeaders.Context = c.requestCommon.Headers.Context
replyHeaders.OrigID = c.requestCommon.Headers.ID
replyHeaders.OrigMsg = c.origMsg
if c.replyBytes, err = json.Marshal(pFullMsg); err != nil {
return
}
c.replyBytes, _ = json.Marshal(pFullMsg)
log.Infof("Sending reply: %s", c)
c.bridge.producer.Input() <- &sarama.ProducerMessage{
Topic: c.bridge.Conf.TopicOut,
Expand Down Expand Up @@ -423,6 +421,44 @@ func (k *KafkaBridge) connect() (err error) {
return
}

func (k *KafkaBridge) producerErrorLoop() {
for err := range k.producer.Errors() {
k.inFlightCond.L.Lock()
// If we fail to send a reply, this is significant. We have a request in flight
// and we have probably already sent the message.
// Currently we panic, on the basis that we will be restarted by Docker
// to drive retry logic. In the future we might consider recreating the
// producer and attempting to resend the message a number of times -
// keeping a retry counter on the msgContext object
origMsg := err.Msg.Metadata.(string)
ctx := k.inFlight[origMsg]
log.Errorf("Kafka producer failed for reply %s to origMsg %s: %s", ctx, origMsg, err)
panic(err)
// k.inFlightCond.L.Unlock() - unreachable while we have a panic
}
k.producerWG.Done()
}

func (k *KafkaBridge) producerSuccessLoop() {
for msg := range k.producer.Successes() {
k.inFlightCond.L.Lock()
origMsg := msg.Metadata.(string)
if ctx, ok := k.inFlight[origMsg]; ok {
log.Infof("Reply sent: %s", ctx)
// While still holding the lock, add this to the completed list
k.setInFlightComplete(ctx)
// We've reduced the in-flight count - wake any waiting consumer go func
k.inFlightCond.Broadcast()
} else {
// This should never happen. Represents a logic bug that must be diagnosed.
err := fmt.Errorf("Received confirmation for message not in in-flight map: %s", origMsg)
panic(err)
}
k.inFlightCond.L.Unlock()
}
k.producerWG.Done()
}

func (k *KafkaBridge) startProducer() (err error) {

log.Debugf("Kafka Producer Topic=%s", k.Conf.TopicOut)
Expand All @@ -433,48 +469,43 @@ func (k *KafkaBridge) startProducer() (err error) {

k.producerWG.Add(2)

go func() {
for err := range k.producer.Errors() {
k.inFlightCond.L.Lock()
// If we fail to send a reply, this is significant. We have a request in flight
// and we have probably already sent the message.
// Currently we panic, on the basis that we will be restarted by Docker
// to drive retry logic. In the future we might consider recreating the
// producer and attempting to resend the message a number of times -
// keeping a retry counter on the msgContext object
origMsg := err.Msg.Metadata.(string)
ctx := k.inFlight[origMsg]
log.Errorf("Kafka producer failed for reply %s to origMsg %s: %s", ctx, origMsg, err)
panic(err)
// k.inFlightCond.L.Unlock() - unreachable while we have a panic
}
k.producerWG.Done()
}()
go k.producerErrorLoop()

go func() {
for msg := range k.producer.Successes() {
k.inFlightCond.L.Lock()
origMsg := msg.Metadata.(string)
if ctx, ok := k.inFlight[origMsg]; ok {
log.Infof("Reply sent: %s", ctx)
// While still holding the lock, add this to the completed list
k.setInFlightComplete(ctx)
// We've reduced the in-flight count - wake any waiting consumer go func
k.inFlightCond.Broadcast()
} else {
// This should never happen. Represents a logic bug that must be diagnosed.
log.Errorf("Received confirmation for message not in in-flight map: %s", origMsg)
panic(err)
}
k.inFlightCond.L.Unlock()
}
k.producerWG.Done()
}()
go k.producerSuccessLoop()

log.Infof("Kafka Created producer")
return
}

func (k *KafkaBridge) consumerMessagesLoop() {
for msg := range k.consumer.Messages() {
k.inFlightCond.L.Lock()
log.Infof("Kafka consumer received message: Partition=%d Offset=%d", msg.Partition, msg.Offset)

// We cannot build up an infinite number of messages in memory
for len(k.inFlight) >= k.Conf.MaxInFlight {
log.Infof("Too many messages in-flight: In-flight=%d Max=%d", len(k.inFlight), k.Conf.MaxInFlight)
k.inFlightCond.Wait()
}
// addInflightMsg always adds the message, even if it cannot
// be parsed
msgCtx, err := k.addInflightMsg(msg)
// Unlock before any further processing
k.inFlightCond.L.Unlock()
if err == nil {
// Dispatch for processing if we parsed the message successfully
k.processor.OnMessage(msgCtx)
} else {
// Dispatch a generic 'bad data' reply
var errMsg kldmessages.ReplyCommon
errMsg.Headers.Status = 400
errMsg.Headers.ErrorMessage = err.Error()
msgCtx.Reply(&errMsg)
}
}
k.consumerWG.Done()
}

func (k *KafkaBridge) startConsumer() (err error) {

log.Debugf("Kafka Consumer Topic=%s ConsumerGroup=%s", k.Conf.TopicIn, k.Conf.ConsumerGroup)
Expand All @@ -496,34 +527,7 @@ func (k *KafkaBridge) startConsumer() (err error) {
}
k.consumerWG.Done()
}()
go func() {
for msg := range k.consumer.Messages() {
k.inFlightCond.L.Lock()
log.Infof("Kafka consumer received message: Partition=%d Offset=%d", msg.Partition, msg.Offset)

// We cannot build up an infinite number of messages in memory
for len(k.inFlight) >= k.Conf.MaxInFlight {
log.Infof("Too many messages in-flight: In-flight=%d Max=%d", len(k.inFlight), k.Conf.MaxInFlight)
k.inFlightCond.Wait()
}
// addInflightMsg always adds the message, even if it cannot
// be parsed
msgCtx, err := k.addInflightMsg(msg)
// Unlock before any further processing
k.inFlightCond.L.Unlock()
if err == nil {
// Dispatch for processing if we parsed the message successfully
k.processor.OnMessage(msgCtx)
} else {
// Dispatch a generic 'bad data' reply
var errMsg kldmessages.ReplyCommon
errMsg.Headers.Status = 400
errMsg.Headers.ErrorMessage = err.Error()
msgCtx.Reply(&errMsg)
}
}
k.consumerWG.Done()
}()
go k.consumerMessagesLoop()

log.Infof("Kafka Created consumer")
return
Expand Down
130 changes: 126 additions & 4 deletions internal/kldkafka/kafkabridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
"github.com/kaleido-io/ethconnect/internal/kldmessages"
"github.com/kaleido-io/ethconnect/internal/kldutils"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -58,7 +57,9 @@ func (f *testKafkaFactory) newClient(k *KafkaBridge, clientConf *cluster.Config)
}

func (f *testKafkaFactory) Brokers() []*sarama.Broker {
return []*sarama.Broker{}
return []*sarama.Broker{
&sarama.Broker{},
}
}

func (f *testKafkaFactory) newProducer(k *KafkaBridge) (kafkaProducer, error) {
Expand Down Expand Up @@ -245,6 +246,18 @@ func TestExecuteWithIncompleteArgs(t *testing.T) {
testArgs = append(testArgs, []string{"--sasl-password", "testpass"}...)
}

func TestDefIntWithBadEnvVar(t *testing.T) {
assert := assert.New(t)

os.Setenv("KAFKA_MAX_INFLIGHT", "badness")
defer os.Unsetenv("KAFKA_MAX_INFLIGHT")

k, err := execBridgeWithArgs(assert, minWorkingArgs, &testKafkaFactory{})

assert.Nil(err)
assert.Equal(10, k.Conf.MaxInFlight)
}

func TestExecuteWithBadRPCURL(t *testing.T) {
assert := assert.New(t)

Expand Down Expand Up @@ -479,13 +492,13 @@ func TestSingleMessageWithReply(t *testing.T) {
// Send a minimal test message
msg1 := kldmessages.RequestCommon{}
msg1.Headers.MsgType = "TestAddInflightMsg"
msg1.Headers.ID = kldutils.UUIDv4()
msg1Ctx := struct {
Some string `json:"some"`
}{
Some: "data",
}
msg1.Headers.Context = &msg1Ctx
msg1.Headers.Account = "0xAA983AD2a0e0eD8ac639277F37be42F2A5d2618c"
msg1bytes, err := json.Marshal(&msg1)
log.Infof("Sent message: %s", string(msg1bytes))
f.consumer.messages <- &sarama.ConsumerMessage{
Expand All @@ -496,9 +509,13 @@ func TestSingleMessageWithReply(t *testing.T) {

// Get the message via the processor
msgContext1 := <-f.processor.messages
assert.Equal(msg1.Headers.ID, msgContext1.Headers().ID)
assert.NotEmpty(msgContext1.Headers().ID) // Generated one as not supplied
assert.Equal(msg1.Headers.MsgType, msgContext1.Headers().MsgType)
assert.Equal("data", msgContext1.Headers().Context.(map[string]interface{})["some"])
assert.Equal(len(msgContext1.(*msgContext).replyBytes), msgContext1.(*msgContext).Length())
var msgUnmarshaled kldmessages.RequestCommon
msgContext1.Unmarshal(&msgUnmarshaled)
assert.Equal(msg1.Headers.MsgType, msgUnmarshaled.Headers.MsgType)

// Send the reply in a go routine
go func() {
Expand Down Expand Up @@ -623,3 +640,108 @@ func TestMoreMessagesThanMaxInFlight(t *testing.T) {
assert.Equal(int64(19), f.consumer.offsetsByPartition[0])

}

func TestAddInflightMessageBadMessage(t *testing.T) {
assert := assert.New(t)

f := testKafkaFactory{
processor: &testKafkaMsgProcessor{
messages: make(chan MsgContext),
},
}
k, wg, err := startTestBridge(assert, minWorkingArgs, &f)
if err != nil {
return
}

f.consumer.messages <- &sarama.ConsumerMessage{
Value: []byte("badness"),
Partition: 64,
Offset: int64(42),
}

// Drain the producer
msg := <-f.producer.input
f.producer.successes <- msg

// Shut down
k.signals <- os.Interrupt
wg.Wait()

// Check we acknowledge all offsets
assert.Equal(int64(42), f.consumer.offsetsByPartition[64])

}

func TestProducerErrorLoopPanics(t *testing.T) {
assert := assert.New(t)

k := NewKafkaBridge()
f := testKafkaFactory{}
producer, _ := f.newProducer(k)
k.producer = producer

go func() {
producer.(*testKafkaProducer).errors <- &sarama.ProducerError{
Err: fmt.Errorf("pop"),
Msg: &sarama.ProducerMessage{},
}
}()

assert.Panics(func() {
k.producerErrorLoop()
})

}

func TestProducerSuccessLoopPanicsMsgNotInflight(t *testing.T) {
assert := assert.New(t)

k := NewKafkaBridge()
f := testKafkaFactory{}
producer, _ := f.newProducer(k)
k.producer = producer

go func() {
producer.(*testKafkaProducer).successes <- &sarama.ProducerMessage{
Metadata: "badness",
}
}()

assert.Panics(func() {
k.producerSuccessLoop()
})

}

func TestConsumerErrorLoopLogsAndContinues(t *testing.T) {
assert := assert.New(t)

f := testKafkaFactory{}
k, wg, err := startTestBridge(assert, minWorkingArgs, &f)
if err != nil {
return
}

f.consumer.errors <- fmt.Errorf("fizzle")

// Shut down
k.signals <- os.Interrupt
wg.Wait()
}

func TestConsumerNotificationsLoop(t *testing.T) {
assert := assert.New(t)

f := testKafkaFactory{}
k, wg, err := startTestBridge(assert, minWorkingArgs, &f)
if err != nil {
return
}

f.consumer.notifications <- &cluster.Notification{}

// Shut down
k.signals <- os.Interrupt
wg.Wait()
}

0 comments on commit 5b8d994

Please sign in to comment.