Skip to content

Commit

Permalink
Update EventStream (http PATCH) (#73)
Browse files Browse the repository at this point in the history
* Add command to gen html for coverage

* Update inflight count only if we processed event

processing is skipped if we are paused/stopped. Hence check that we
processed the event in the previous section before decrementing the
in-flight counter

* Adds a PATCH route to update event streams

* Event streams could be updated while batches of events are currently
being processed. The existing logic around concurrent handling of batch
processing of events and any pause/stopping of event streams ensures
that batch processing exits as soon as a pause/stop event is detected.
This PR adds an additional context (and a cancel function) to the event
stream spec so that an update to the event stream can signal all the
batch processing related go routines to invoke the cancel function
on the context (via select)
* All fields in the event stream except the ID are allowed to be updated

Addresses one of the items in: #68

* remove spurious entry added in Makefile

* Use a empty struct channel to signal a cancelation, instead of ctx

* Use an empty struct channel that can be closed to signal that the
update of the stream is underway. The handler go routines select on the
updateInterrupt ch so that they are woken up when the update is
underway and can exit
* The following go routines & functions react to the updateInterrupt
channel - event poller, batch processor, batch dispatcher, individual
process batch (Blocking and sleeps until timeout),
perform webhook action (blocking and sleeps until timeout)
* adds pre/post update helpers notify routines waiting on the condition
variable and to reset flags respectively
* adds a helper function to kick off a fresh set of go routines (used
by resume and at the end of update)

* Check error returned from subMgr updateStream call

* Remove unused dispatcherDone bool flag

* Move some calls to a defer and other PR comments

adds a comment for updateInterrupt as a empty struct{} channel

Signed-off-by: Vinod Damle <vinod.damle@kaleido.io>
  • Loading branch information
vdamle authored Jul 8, 2020
1 parent 5321835 commit 5fc3c2a
Show file tree
Hide file tree
Showing 7 changed files with 464 additions and 53 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ build:
$(VGO) build -ldflags "-X main.buildDate=`date -u +\"%Y-%m-%dT%H:%M:%SZ\"` -X main.buildVersion=$(BUILD_VERSION)" -tags=prod -o $(BINARY_NAME) -v
coverage.txt: $(GOFILES)
$(VGO) test ./... -cover -coverprofile=coverage.txt -covermode=atomic
coverage.html:
$(VGO) tool cover -html=coverage.txt
test: coverage.txt
coverage: coverage.txt coverage.html
clean:
$(VGO) clean
rm -f coverage.txt
Expand Down
20 changes: 12 additions & 8 deletions internal/kldcontracts/rest2eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,24 @@ func (m *mockRPC) CallContext(ctx context.Context, result interface{}, method st
}

type mockSubMgr struct {
err error
sub *kldevents.SubscriptionInfo
stream *kldevents.StreamInfo
subs []*kldevents.SubscriptionInfo
streams []*kldevents.StreamInfo
suspended bool
resumed bool
capturedAddr *kldbind.Address
err error
updateStreamErr error
sub *kldevents.SubscriptionInfo
stream *kldevents.StreamInfo
subs []*kldevents.SubscriptionInfo
streams []*kldevents.StreamInfo
suspended bool
resumed bool
capturedAddr *kldbind.Address
}

func (m *mockSubMgr) Init() error { return m.err }
func (m *mockSubMgr) AddStream(ctx context.Context, spec *kldevents.StreamInfo) (*kldevents.StreamInfo, error) {
return spec, m.err
}
func (m *mockSubMgr) UpdateStream(ctx context.Context, id string, spec *kldevents.StreamInfo) (*kldevents.StreamInfo, error) {
return m.stream, m.updateStreamErr
}
func (m *mockSubMgr) Streams(ctx context.Context) []*kldevents.StreamInfo { return m.streams }
func (m *mockSubMgr) StreamByID(ctx context.Context, id string) (*kldevents.StreamInfo, error) {
return m.stream, m.err
Expand Down
36 changes: 36 additions & 0 deletions internal/kldcontracts/smartcontractgw.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func (g *smartContractGW) AddRoutes(router *httprouter.Router) {
router.GET("/gateways/:gateway_lookup", g.getRemoteRegistrySwaggerOrABI)
router.GET("/g/:gateway_lookup", g.getRemoteRegistrySwaggerOrABI)
router.POST(kldevents.StreamPathPrefix, g.withEventsAuth(g.createStream))
router.PATCH(kldevents.StreamPathPrefix, g.withEventsAuth(g.updateStream))
router.GET(kldevents.StreamPathPrefix, g.withEventsAuth(g.listStreamsOrSubs))
router.GET(kldevents.SubPathPrefix, g.withEventsAuth(g.listStreamsOrSubs))
router.GET(kldevents.StreamPathPrefix+"/:id", g.withEventsAuth(g.getStreamOrSub))
Expand Down Expand Up @@ -654,6 +655,41 @@ func (g *smartContractGW) createStream(res http.ResponseWriter, req *http.Reques
enc.Encode(&newSpec)
}

// updateStream updates a stream
func (g *smartContractGW) updateStream(res http.ResponseWriter, req *http.Request, params httprouter.Params) {
log.Infof("--> %s %s", req.Method, req.URL)

if g.sm == nil {
g.gatewayErrReply(res, req, errors.New(errEventSupportMissing), 405)
return
}

streamID := params.ByName("id")
_, err := g.sm.StreamByID(req.Context(), streamID)
if err != nil {
g.gatewayErrReply(res, req, err, 404)
return
}
var spec kldevents.StreamInfo
if err := json.NewDecoder(req.Body).Decode(&spec); err != nil {
g.gatewayErrReply(res, req, klderrors.Errorf(klderrors.RESTGatewayEventStreamInvalid, err), 400)
return
}
newSpec, err := g.sm.UpdateStream(req.Context(), streamID, &spec)
if err != nil {
g.gatewayErrReply(res, req, err, 500)
return
}

status := 200
log.Infof("<-- %s %s [%d]", req.Method, req.URL, status)
res.Header().Set("Content-Type", "application/json")
res.WriteHeader(status)
enc := json.NewEncoder(res)
enc.SetIndent("", " ")
enc.Encode(&newSpec)
}

// listStreamsOrSubs sorts by Title then Address and returns an array
func (g *smartContractGW) listStreamsOrSubs(res http.ResponseWriter, req *http.Request, params httprouter.Params) {
log.Infof("--> %s %s", req.Method, req.URL)
Expand Down
79 changes: 77 additions & 2 deletions internal/kldcontracts/smartcontractgw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1643,6 +1643,81 @@ func TestAddStreamSubMgrError(t *testing.T) {
assert.Regexp("pop", resError.Message)
}

func TestUpdateStreamNoSubMgr(t *testing.T) {
assert := assert.New(t)
res := testGWPath("PATCH", kldevents.StreamPathPrefix, nil, nil)
assert.Equal(405, res.Result().StatusCode)
}

func TestUpdateStreamOK(t *testing.T) {
assert := assert.New(t)
spec := &kldevents.StreamInfo{Type: "webhook", Name: "stream-new-name"}
b, _ := json.Marshal(spec)
req := httptest.NewRequest("PATCH", kldevents.StreamPathPrefix, bytes.NewReader(b))
res := httptest.NewRecorder()
s := &smartContractGW{}
s.sm = &mockSubMgr{}
r := &httprouter.Router{}
s.AddRoutes(r)
r.ServeHTTP(res, req)
var newSpec kldevents.StreamInfo
json.NewDecoder(res.Body).Decode(&newSpec)
assert.Equal(200, res.Result().StatusCode)
s.Shutdown()
}

func TestUpdateStreamBadData(t *testing.T) {
assert := assert.New(t)
req := httptest.NewRequest("PATCH", kldevents.StreamPathPrefix, bytes.NewReader([]byte(":bad json")))
res := httptest.NewRecorder()
s := &smartContractGW{}
s.sm = &mockSubMgr{}
r := &httprouter.Router{}
s.AddRoutes(r)
r.ServeHTTP(res, req)
var resError restErrMsg
json.NewDecoder(res.Body).Decode(&resError)
assert.Equal(400, res.Result().StatusCode)
assert.Regexp("Invalid event stream specification", resError.Message)
}

func TestUpdateStreamNotFoundError(t *testing.T) {
assert := assert.New(t)
spec := &kldevents.StreamInfo{Type: "webhook"}
b, _ := json.Marshal(spec)
req := httptest.NewRequest("PATCH", kldevents.StreamPathPrefix, bytes.NewReader(b))
res := httptest.NewRecorder()
s := &smartContractGW{}
s.sm = &mockSubMgr{err: fmt.Errorf("pop")}
r := &httprouter.Router{}
s.AddRoutes(r)
r.ServeHTTP(res, req)
var resError restErrMsg
json.NewDecoder(res.Body).Decode(&resError)
assert.Equal(404, res.Result().StatusCode)
assert.Regexp("pop", resError.Message)
}

func TestUpdateStreamSubMgrError(t *testing.T) {
assert := assert.New(t)
spec := &kldevents.StreamInfo{Type: "webhook"}
b, _ := json.Marshal(spec)
req := httptest.NewRequest("PATCH", kldevents.StreamPathPrefix, bytes.NewReader(b))
res := httptest.NewRecorder()
s := &smartContractGW{}
s.sm = &mockSubMgr{
updateStreamErr: fmt.Errorf("pop"),
err: nil,
}
r := &httprouter.Router{}
s.AddRoutes(r)
r.ServeHTTP(res, req)
var resError restErrMsg
json.NewDecoder(res.Body).Decode(&resError)
assert.Equal(500, res.Result().StatusCode)
assert.Regexp("pop", resError.Message)
}

func TestListStreamsNoSubMgr(t *testing.T) {
assert := assert.New(t)
res := testGWPath("GET", kldevents.StreamPathPrefix, nil, nil)
Expand Down Expand Up @@ -1681,12 +1756,12 @@ func TestListSubs(t *testing.T) {

mockSubMgr := &mockSubMgr{
subs: []*kldevents.SubscriptionInfo{
&kldevents.SubscriptionInfo{
{
TimeSorted: kldmessages.TimeSorted{
CreatedISO8601: time.Now().UTC().Format(time.RFC3339),
}, ID: "earlier",
},
&kldevents.SubscriptionInfo{
{
TimeSorted: kldmessages.TimeSorted{
CreatedISO8601: time.Now().UTC().Add(1 * time.Hour).Format(time.RFC3339),
}, ID: "later",
Expand Down
Loading

0 comments on commit 5fc3c2a

Please sign in to comment.