Skip to content

Commit

Permalink
Merge branch 'release/v1.7.3'
Browse files Browse the repository at this point in the history
  • Loading branch information
jirenius committed Oct 25, 2021
2 parents 2865843 + 597d036 commit b2f15c4
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 12 deletions.
24 changes: 14 additions & 10 deletions server/apiHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,14 @@ func (s *Service) apiHandler(w http.ResponseWriter, r *http.Request) {
return
}

s.temporaryConn(w, r, func(c *wsConn, cb func([]byte, error)) {
s.temporaryConn(w, r, func(c *wsConn, cb func([]byte, error, bool)) {
c.GetSubscription(rid, func(sub *Subscription, err error) {
if err != nil {
cb(nil, err)
cb(nil, err, false)
return
}
cb(s.enc.EncodeGET(sub))
b, err := s.enc.EncodeGET(sub)
cb(b, err, false)
})
})
return
Expand Down Expand Up @@ -164,30 +165,31 @@ func (s *Service) handleCall(w http.ResponseWriter, r *http.Request, rid string,
}
}

s.temporaryConn(w, r, func(c *wsConn, cb func([]byte, error)) {
s.temporaryConn(w, r, func(c *wsConn, cb func([]byte, error, bool)) {
c.CallHTTPResource(rid, s.cfg.APIPath, action, params, func(r json.RawMessage, href string, err error) {
if err != nil {
cb(nil, err)
cb(nil, err, false)
} else if href != "" {
w.Header().Set("Location", href)
w.WriteHeader(http.StatusOK)
cb(nil, nil)
cb(nil, nil, true)
} else {
cb(s.enc.EncodePOST(r))
b, err := s.enc.EncodePOST(r)
cb(b, err, false)
}
})
})
}

func (s *Service) temporaryConn(w http.ResponseWriter, r *http.Request, cb func(*wsConn, func([]byte, error))) {
func (s *Service) temporaryConn(w http.ResponseWriter, r *http.Request, cb func(*wsConn, func([]byte, error, bool))) {
c := s.newWSConn(nil, r, versionLatest)
if c == nil {
httpError(w, reserr.ErrServiceUnavailable, s.enc)
return
}

done := make(chan struct{})
rs := func(out []byte, err error) {
rs := func(out []byte, err error, headerWritten bool) {
defer c.dispose()
defer close(done)

Expand All @@ -209,7 +211,9 @@ func (s *Service) temporaryConn(w http.ResponseWriter, r *http.Request, cb func(
return
}

w.WriteHeader(http.StatusNoContent)
if !headerWritten {
w.WriteHeader(http.StatusNoContent)
}
}
c.Enqueue(func() {
if s.cfg.HeaderAuth != nil {
Expand Down
2 changes: 1 addition & 1 deletion server/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import "time"

const (
// Version is the current version for the server.
Version = "1.7.2"
Version = "1.7.3"

// ProtocolVersion is the implemented RES protocol version.
ProtocolVersion = "1.2.2"
Expand Down
2 changes: 1 addition & 1 deletion server/rescache/throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,5 @@ func (t *Throttle) Done() {
cb := t.queue[0]
t.queue = t.queue[1:]
t.mu.Unlock()
cb()
go cb()
}
67 changes: 67 additions & 0 deletions test/50bug_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"encoding/json"
"fmt"
"testing"

"github.com/resgateio/resgate/server"
)

// Test to replicate the bug about possible client resource inconsistency.
Expand Down Expand Up @@ -50,3 +52,68 @@ func TestBug_PossibleClientResourceInconsistency(t *testing.T) {
})
}
}

// Test to replicate the bug: Deadlock on throttled access requests to same resource
//
// See: https://github.com/resgateio/resgate/issues/217
func TestBug_DeadlockOnThrottledAccessRequestsToSameResource(t *testing.T) {
const connectionCount = 32
const resetThrottle = 3
rid := "test.model"
model := resources[rid].data
runTest(t, func(s *Session) {
// Create a set of connections subscribing to the same resource
conns := make([]*Conn, 0, connectionCount)
for i := 0; i < connectionCount; i++ {
c := s.Connect()

creq := c.Request("subscribe."+rid, nil)
reqCount := 1
if i == 0 {
reqCount = 2
}
// Handle access request (and model request for the first connection)
mreqs := s.GetParallelRequests(t, reqCount)
// Handle access
mreqs.GetRequest(t, "access."+rid).
RespondSuccess(json.RawMessage(`{"get":true}`))
if i == 0 {
// Handle get
mreqs.GetRequest(t, "get."+rid).
RespondSuccess(json.RawMessage(fmt.Sprintf(`{"model":%s}`, model)))
}
creq.GetResponse(t)

conns = append(conns, c)
}

// Send system reset
s.SystemEvent("reset", json.RawMessage(`{"resources":null,"access":["test.>"]}`))
// Get throttled number of requests
mreqs := s.GetParallelRequests(t, resetThrottle)
requestCount := resetThrottle

// Respond to requests one by one
for len(mreqs) > 0 {
r := mreqs[0]
mreqs = mreqs[1:]
r.RespondSuccess(json.RawMessage(`{"get":true}`))

// If we still have remaining get or access requests not yet received
if requestCount < connectionCount {
// For each response, a new request should be sent.
req := s.GetRequest(t)
mreqs = append(mreqs, req)
requestCount++
}
}

// Assert no other requests are sent
for _, c := range conns {
c.AssertNoNATSRequest(t, rid)
}

}, func(c *server.Config) {
c.ResetThrottle = resetThrottle
})
}

0 comments on commit b2f15c4

Please sign in to comment.