Skip to content

Commit

Permalink
Webhooks - YAML payload support and code coverage (#4)
Browse files Browse the repository at this point in the history
* Initialization tests

* More init tests

* YAML support and code coverage

* Improve logging of OpErrors for Travis build

* Timing issue on test in Travis

* Travis failed requiring --allow-unauthenticated on solc

Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
  • Loading branch information
peterbroadhurst authored Jul 5, 2018
1 parent c154c61 commit bff4588
Show file tree
Hide file tree
Showing 4 changed files with 523 additions and 29 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ language: go
before_install:
- sudo add-apt-repository -y ppa:ethereum/ethereum
- sudo apt-get update
- sudo apt-get install -y solc
- sudo apt-get install --allow-unauthenticated -y solc
after_success:
- bash <(curl -s https://codecov.io/bash)
go:
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ deps:
$(GOGET) github.com/nu7hatch/gouuid
$(GOGET) github.com/stretchr/testify/assert
$(GOGET) github.com/golang/mock/gomock
$(GOGET) github.com/icza/dyno
$(GOGET) gopkg.in/yaml.v2

build-linux:
Expand Down
97 changes: 69 additions & 28 deletions internal/kldwebhooks/webhooksbridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
package kldwebhooks

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"reflect"
"strings"
"sync"
Expand All @@ -27,10 +29,11 @@ import (
"gopkg.in/yaml.v2"

"github.com/Shopify/sarama"
log "github.com/sirupsen/logrus"

"github.com/icza/dyno"
"github.com/kaleido-io/ethconnect/internal/kldkafka"
"github.com/kaleido-io/ethconnect/internal/kldmessages"
"github.com/kaleido-io/ethconnect/internal/kldutils"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

Expand All @@ -43,9 +46,12 @@ const (

// WebhooksBridge receives messages over HTTP POST and sends them to Kafka
type WebhooksBridge struct {
Conf struct {
conf struct {
LocalAddr string
Port int
}
kafka kldkafka.KafkaCommon
srv *http.Server
}

// NewWebhooksBridge constructor
Expand Down Expand Up @@ -73,6 +79,8 @@ func (w *WebhooksBridge) CobraInit() (cmd *cobra.Command) {
},
}
w.kafka.CobraInit(cmd)
cmd.Flags().StringVarP(&w.conf.LocalAddr, "listen-addr", "L", os.Getenv("WEBHOOKS_LISTEN_ADDR"), "Local address to listen on")
cmd.Flags().IntVarP(&w.conf.Port, "listen-port", "l", kldutils.DefInt("WEBHOOKS_LISTEN_PORT", 8080), "Port to listen on")
return
}

Expand Down Expand Up @@ -100,14 +108,25 @@ func (w *WebhooksBridge) ProducerSuccessLoop(consumer kldkafka.KafkaConsumer, pr
wg.Done()
}

type httpError struct {
type errMsg struct {
Message string `json:"error"`
}

func errReply(res http.ResponseWriter, err error, status int) {
reply, _ := json.Marshal(&httpError{Message: err.Error()})
res.Write(reply)
reply, _ := json.Marshal(&errMsg{Message: err.Error()})
res.WriteHeader(400)
res.Write(reply)
return
}

type okMsg struct {
OK bool `json:"ok"`
}

func okReply(res http.ResponseWriter) {
reply, _ := json.Marshal(&okMsg{OK: true})
res.WriteHeader(200)
res.Write(reply)
return
}

Expand All @@ -117,23 +136,36 @@ func (w *WebhooksBridge) webhookHandler(res http.ResponseWriter, req *http.Reque
errReply(res, fmt.Errorf("Message exceeds maximum allowable size"), 400)
return
}
originalPayload, err := ioutil.ReadAll(req.Body)
payloadToForward, err := ioutil.ReadAll(req.Body)
if err != nil {
errReply(res, fmt.Errorf("Unable to read input dataL: %s", err), 400)
errReply(res, fmt.Errorf("Unable to read input data: %s", err), 400)
return
}

genericPayload := make(map[string]interface{})
// We support both YAML and JSON input.
// We parse the message into a generic string->interface map, that lets
// us check a couple of routing fields needed to dispatch the messages
// to Kafka (always in JSON). However, we do not perform full parsing.
var genericPayload map[string]interface{}
contentType := strings.ToLower(req.Header.Get("Content-type"))
log.Infof("Received message 'Content-Type: %s' Length: %d", contentType, req.ContentLength)
if contentType == "application/x-yaml" || contentType == "text/yaml" {
err := yaml.Unmarshal(originalPayload, genericPayload)
yamlGenericPayload := make(map[interface{}]interface{})
err := yaml.Unmarshal(payloadToForward, &yamlGenericPayload)
if err != nil {
errReply(res, fmt.Errorf("Unable to parse YAML: %s", err), 400)
return
}
genericPayload = dyno.ConvertMapI2MapS(yamlGenericPayload).(map[string]interface{})
// Reseialize back to JSON
payloadToForward, err = json.Marshal(&genericPayload)
if err != nil {
errReply(res, fmt.Errorf("Unable to reserialize YAML payload as JSON: %s", err), 500)
return
}
} else {
err := json.Unmarshal(originalPayload, genericPayload)
genericPayload = make(map[string]interface{})
err := json.Unmarshal(payloadToForward, &genericPayload)
if err != nil {
errReply(res, fmt.Errorf("Unable to parse JSON: %s", err), 400)
return
Expand All @@ -147,51 +179,54 @@ func (w *WebhooksBridge) webhookHandler(res http.ResponseWriter, req *http.Reque
errReply(res, fmt.Errorf("Invalid message - missing 'headers' (or not an object)"), 400)
return
}
msgType, exists := headers.(map[interface{}]interface{})["type"]
msgType, exists := headers.(map[string]interface{})["type"]
if !exists || reflect.TypeOf(msgType).Kind() != reflect.String {
errReply(res, fmt.Errorf("Invalid message - missing 'headers.type' (or not a string)"), 400)
return
}
var key string
switch msgType {
case kldmessages.MsgTypeDeployContract:
case kldmessages.MsgTypeSendTransaction:
to, exists := genericPayload["to"]
if !exists || reflect.TypeOf(to).Kind() != reflect.String {
errReply(res, fmt.Errorf("Invalid message - missing 'to' (or not a string)"), 400)
case kldmessages.MsgTypeDeployContract, kldmessages.MsgTypeSendTransaction:
from, exists := genericPayload["from"]
if !exists || reflect.TypeOf(from).Kind() != reflect.String {
errReply(res, fmt.Errorf("Invalid message - missing 'from' (or not a string)"), 400)
return
}
key = to.(string)
key = from.(string)
break
default:
errReply(res, fmt.Errorf("Invalid message type: %s", msgType), 400)
return
}

log.Debugf("Forwarding message to Kafka bridge: %s", originalPayload)
log.Debugf("Forwarding message to Kafka bridge: %s", payloadToForward)
w.kafka.Producer().Input() <- &sarama.ProducerMessage{
Topic: w.kafka.Conf().TopicOut,
Key: sarama.StringEncoder(key),
Value: sarama.ByteEncoder(originalPayload),
Value: sarama.ByteEncoder(payloadToForward),
}

res.WriteHeader(204)
okReply(res)
}

// Start kicks off the bridge
func (w *WebhooksBridge) statusHandler(res http.ResponseWriter, req *http.Request) {
okReply(res)
}

// Start kicks off the HTTP and Kafka listeners
func (w *WebhooksBridge) Start() (err error) {

mux := http.NewServeMux()
th := http.HandlerFunc(w.webhookHandler)
mux.Handle("/message", th)
mux.Handle("/message", http.HandlerFunc(w.webhookHandler))
mux.Handle("/status", http.HandlerFunc(w.statusHandler))

tlsConfig, err := w.kafka.CreateTLSConfiguration()
if err != nil {
return
}

srv := &http.Server{
Addr: ":8080",
w.srv = &http.Server{
Addr: fmt.Sprintf("%s:%d", w.conf.LocalAddr, w.conf.Port),
TLSConfig: tlsConfig,
Handler: mux,
MaxHeaderBytes: MaxHeaderSize,
Expand All @@ -202,13 +237,19 @@ func (w *WebhooksBridge) Start() (err error) {
for w.kafka.Producer() == nil {
time.Sleep(500)
}
log.Printf("Listening on :8080")
if err = srv.ListenAndServe(); err != nil {
log.Printf("Listening on %s", w.srv.Addr)
if err = w.srv.ListenAndServe(); err != nil {
return
}
}()

// Defer to KafkaCommon processing
err = w.kafka.Start()

// Ensure we shutdown the server
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
w.srv.Shutdown(ctx)
defer cancel()

return
}
Loading

0 comments on commit bff4588

Please sign in to comment.