From 697cf6e9506ab7b527391e8744eb7f4f7316a743 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Samuel=20Jir=C3=A9nius?= Date: Fri, 12 Nov 2021 13:40:30 +0100 Subject: [PATCH 1/3] GH-220: Added channel buffer size as config property. Increased default buffer size from 256 to 8192. Closing server on nats.ErrSlowConsumer error. --- main.go | 5 +++++ nats/nats.go | 20 ++++++++++++++------ 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/main.go b/main.go index 3bcb02b..abcb709 100644 --- a/main.go +++ b/main.go @@ -78,6 +78,7 @@ type Config struct { NatsTLSKey string `json:"natsKey"` NatsRootCAs []string `json:"natsRootCAs"` RequestTimeout int `json:"requestTimeout"` + BufferSize int `json:"bufferSize"` Debug bool `json:"debug"` Trace bool `json:"trace"` server.Config @@ -110,6 +111,9 @@ func (c *Config) SetDefault() { if c.NatsRootCAs == nil { c.NatsRootCAs = []string{} } + if c.BufferSize == 0 { + c.BufferSize = 8192 + } c.Config.SetDefault() } @@ -303,6 +307,7 @@ func main() { ClientKey: cfg.NatsTLSKey, RootCAs: cfg.NatsRootCAs, RequestTimeout: time.Duration(cfg.RequestTimeout) * time.Millisecond, + BufferSize: cfg.BufferSize, Logger: l, }, cfg.Config) if err != nil { diff --git a/nats/nats.go b/nats/nats.go index c7e155d..b5a8b9e 100644 --- a/nats/nats.go +++ b/nats/nats.go @@ -13,10 +13,6 @@ import ( "github.com/resgateio/resgate/server/mq" ) -const ( - natsChannelSize = 256 -) - // Client holds a client connection to a nats server. type Client struct { RequestTimeout time.Duration @@ -26,6 +22,7 @@ type Client struct { ClientKey string RootCAs []string Logger logger.Logger + BufferSize int mq *nats.Conn mqCh chan *nats.Msg @@ -75,7 +72,11 @@ func (c *Client) Connect() error { c.Logf("Connecting to NATS at %s", c.URL) // Create connection options - opts := []nats.Option{nats.NoReconnect(), nats.ClosedHandler(c.onClose)} + opts := []nats.Option{ + nats.NoReconnect(), + nats.ClosedHandler(c.onClose), + nats.ErrorHandler(c.onError), + } if c.Creds != "" { opts = append(opts, nats.UserCredentials(c.Creds)) } @@ -98,7 +99,7 @@ func (c *Client) Connect() error { } c.mq = nc - c.mqCh = make(chan *nats.Msg, natsChannelSize) + c.mqCh = make(chan *nats.Msg, c.BufferSize) c.mqReqs = make(map[*nats.Subscription]*responseCont) c.tq = timerqueue.New(c.onTimeout, c.RequestTimeout) c.stopped = make(chan struct{}) @@ -173,6 +174,13 @@ func (c *Client) onClose(conn *nats.Conn) { } } +func (c *Client) onError(conn *nats.Conn, sub *nats.Subscription, err error) { + c.Logger.Error(err.Error()) + if err == nats.ErrSlowConsumer { + c.Close() + } +} + // SendRequest sends a request to the MQ. func (c *Client) SendRequest(subj string, payload []byte, cb mq.Response) { inbox := nats.NewInbox() From d5bc0866306493eb07476a5fb50009cf6b2aa7fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Samuel=20Jir=C3=A9nius?= Date: Fri, 12 Nov 2021 13:53:20 +0100 Subject: [PATCH 2/3] Added bufferSize configuration to README.md --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index c4d4cc0..0a95291 100644 --- a/README.md +++ b/README.md @@ -156,6 +156,9 @@ Configuration is a JSON encoded file. If no config file is found at the given pa // Timeout in milliseconds for NATS requests. "requestTimeout": 3000, + // Size of message buffer for incoming NATS requests. + "bufferSize": 8192, + // Header authentication resource method for web resources. // Prior to accessing the resource, this resource method will be // called, allowing an auth service to set a token using From 8e67f3b8550248f864e1ac7f8861fc000a2a58cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Samuel=20Jir=C3=A9nius?= Date: Fri, 12 Nov 2021 14:02:38 +0100 Subject: [PATCH 3/3] Prepare release v1.7.4 --- server/const.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/const.go b/server/const.go index 37594a8..508220b 100644 --- a/server/const.go +++ b/server/const.go @@ -4,7 +4,7 @@ import "time" const ( // Version is the current version for the server. - Version = "1.7.3" + Version = "1.7.4" // ProtocolVersion is the implemented RES protocol version. ProtocolVersion = "1.2.2"