From 8b264a9edc00557760e61639ed72f00dd152d2a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Samuel=20Jir=C3=A9nius?= Date: Thu, 21 Oct 2021 14:58:37 +0200 Subject: [PATCH 1/5] GH-215: Prevented writing superfluous HTTP header on resource response. --- server/apiHandler.go | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/server/apiHandler.go b/server/apiHandler.go index d9606f8..2f44cf6 100644 --- a/server/apiHandler.go +++ b/server/apiHandler.go @@ -95,13 +95,14 @@ func (s *Service) apiHandler(w http.ResponseWriter, r *http.Request) { return } - s.temporaryConn(w, r, func(c *wsConn, cb func([]byte, error)) { + s.temporaryConn(w, r, func(c *wsConn, cb func([]byte, error, bool)) { c.GetSubscription(rid, func(sub *Subscription, err error) { if err != nil { - cb(nil, err) + cb(nil, err, false) return } - cb(s.enc.EncodeGET(sub)) + b, err := s.enc.EncodeGET(sub) + cb(b, err, false) }) }) return @@ -164,22 +165,23 @@ func (s *Service) handleCall(w http.ResponseWriter, r *http.Request, rid string, } } - s.temporaryConn(w, r, func(c *wsConn, cb func([]byte, error)) { + s.temporaryConn(w, r, func(c *wsConn, cb func([]byte, error, bool)) { c.CallHTTPResource(rid, s.cfg.APIPath, action, params, func(r json.RawMessage, href string, err error) { if err != nil { - cb(nil, err) + cb(nil, err, false) } else if href != "" { w.Header().Set("Location", href) w.WriteHeader(http.StatusOK) - cb(nil, nil) + cb(nil, nil, true) } else { - cb(s.enc.EncodePOST(r)) + b, err := s.enc.EncodePOST(r) + cb(b, err, false) } }) }) } -func (s *Service) temporaryConn(w http.ResponseWriter, r *http.Request, cb func(*wsConn, func([]byte, error))) { +func (s *Service) temporaryConn(w http.ResponseWriter, r *http.Request, cb func(*wsConn, func([]byte, error, bool))) { c := s.newWSConn(nil, r, versionLatest) if c == nil { httpError(w, reserr.ErrServiceUnavailable, s.enc) @@ -187,7 +189,7 @@ func (s *Service) temporaryConn(w http.ResponseWriter, r *http.Request, cb func( } done := make(chan struct{}) - rs := func(out []byte, err error) { + rs := func(out []byte, err error, headerWritten bool) { defer c.dispose() defer close(done) @@ -209,7 +211,9 @@ func (s *Service) temporaryConn(w http.ResponseWriter, r *http.Request, cb func( return } - w.WriteHeader(http.StatusNoContent) + if !headerWritten { + w.WriteHeader(http.StatusNoContent) + } } c.Enqueue(func() { if s.cfg.HeaderAuth != nil { From 5711fd7fc60a53ce37d9c74043ef397bfd1290ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Samuel=20Jir=C3=A9nius?= Date: Thu, 21 Oct 2021 15:31:15 +0200 Subject: [PATCH 2/5] GH-217: Added test to replicate the deadlock bug. --- test/50bug_test.go | 67 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/test/50bug_test.go b/test/50bug_test.go index b87f8b9..bebb8b4 100644 --- a/test/50bug_test.go +++ b/test/50bug_test.go @@ -4,6 +4,8 @@ import ( "encoding/json" "fmt" "testing" + + "github.com/resgateio/resgate/server" ) // Test to replicate the bug about possible client resource inconsistency. @@ -50,3 +52,68 @@ func TestBug_PossibleClientResourceInconsistency(t *testing.T) { }) } } + +// Test to replicate the bug: Deadlock on throttled access requests to same resource +// +// See: https://github.com/resgateio/resgate/issues/217 +func TestBug_DeadlockOnThrottledAccessRequestsToSameResource(t *testing.T) { + const connectionCount = 4 + const resetThrottle = 3 + rid := "test.model" + model := resources[rid].data + runTest(t, func(s *Session) { + // Create a set of connections subscribing to the same resource + conns := make([]*Conn, 0, connectionCount) + for i := 0; i < connectionCount; i++ { + c := s.Connect() + + creq := c.Request("subscribe."+rid, nil) + reqCount := 1 + if i == 0 { + reqCount = 2 + } + // Handle access request (and model request for the first connection) + mreqs := s.GetParallelRequests(t, reqCount) + // Handle access + mreqs.GetRequest(t, "access."+rid). + RespondSuccess(json.RawMessage(`{"get":true}`)) + if i == 0 { + // Handle get + mreqs.GetRequest(t, "get."+rid). + RespondSuccess(json.RawMessage(fmt.Sprintf(`{"model":%s}`, model))) + } + creq.GetResponse(t) + + conns = append(conns, c) + } + + // Send system reset + s.SystemEvent("reset", json.RawMessage(`{"resources":null,"access":["test.>"]}`)) + // Get throttled number of requests + mreqs := s.GetParallelRequests(t, resetThrottle) + requestCount := resetThrottle + + // Respond to requests one by one + for len(mreqs) > 0 { + r := mreqs[0] + mreqs = mreqs[1:] + r.RespondSuccess(json.RawMessage(`{"get":true}`)) + + // If we still have remaining get or access requests not yet received + if requestCount < connectionCount { + // For each response, a new request should be sent. + req := s.GetRequest(t) + mreqs = append(mreqs, req) + requestCount++ + } + } + + // Assert no other requests are sent + for _, c := range conns { + c.AssertNoNATSRequest(t, rid) + } + + }, func(c *server.Config) { + c.ResetThrottle = resetThrottle + }) +} From 66ff726fc318e209b071f7a325e0813980446ad0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Samuel=20Jir=C3=A9nius?= Date: Thu, 21 Oct 2021 15:33:22 +0200 Subject: [PATCH 3/5] GH-217: Throttled callbacks are started in separate go routine. --- server/rescache/throttle.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/rescache/throttle.go b/server/rescache/throttle.go index cf9e90c..aa7a615 100644 --- a/server/rescache/throttle.go +++ b/server/rescache/throttle.go @@ -58,5 +58,5 @@ func (t *Throttle) Done() { cb := t.queue[0] t.queue = t.queue[1:] t.mu.Unlock() - cb() + go cb() } From 18456aa45bbda69ddd6d88b17eac6e1363c89b4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Samuel=20Jir=C3=A9nius?= Date: Thu, 21 Oct 2021 15:38:04 +0200 Subject: [PATCH 4/5] GH-217: Added the number of connections in bug test. --- test/50bug_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/50bug_test.go b/test/50bug_test.go index bebb8b4..877d78e 100644 --- a/test/50bug_test.go +++ b/test/50bug_test.go @@ -57,7 +57,7 @@ func TestBug_PossibleClientResourceInconsistency(t *testing.T) { // // See: https://github.com/resgateio/resgate/issues/217 func TestBug_DeadlockOnThrottledAccessRequestsToSameResource(t *testing.T) { - const connectionCount = 4 + const connectionCount = 32 const resetThrottle = 3 rid := "test.model" model := resources[rid].data From 597d0362a3a7b9b260b65583a57f5e013b9b89f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Samuel=20Jir=C3=A9nius?= Date: Mon, 25 Oct 2021 09:34:00 +0200 Subject: [PATCH 5/5] Prepare release v1.7.3 --- server/const.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/const.go b/server/const.go index e743a68..37594a8 100644 --- a/server/const.go +++ b/server/const.go @@ -4,7 +4,7 @@ import "time" const ( // Version is the current version for the server. - Version = "1.7.2" + Version = "1.7.3" // ProtocolVersion is the implemented RES protocol version. ProtocolVersion = "1.2.2"