Skip to content

Commit

Permalink
Merge branch 'release/v1.7.4'
Browse files Browse the repository at this point in the history
  • Loading branch information
jirenius committed Nov 12, 2021
2 parents b2f15c4 + 8e67f3b commit 07aff00
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 7 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ Configuration is a JSON encoded file. If no config file is found at the given pa
// Timeout in milliseconds for NATS requests.
"requestTimeout": 3000,

// Size of message buffer for incoming NATS requests.
"bufferSize": 8192,

// Header authentication resource method for web resources.
// Prior to accessing the resource, this resource method will be
// called, allowing an auth service to set a token using
Expand Down
5 changes: 5 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type Config struct {
NatsTLSKey string `json:"natsKey"`
NatsRootCAs []string `json:"natsRootCAs"`
RequestTimeout int `json:"requestTimeout"`
BufferSize int `json:"bufferSize"`
Debug bool `json:"debug"`
Trace bool `json:"trace"`
server.Config
Expand Down Expand Up @@ -110,6 +111,9 @@ func (c *Config) SetDefault() {
if c.NatsRootCAs == nil {
c.NatsRootCAs = []string{}
}
if c.BufferSize == 0 {
c.BufferSize = 8192
}
c.Config.SetDefault()
}

Expand Down Expand Up @@ -303,6 +307,7 @@ func main() {
ClientKey: cfg.NatsTLSKey,
RootCAs: cfg.NatsRootCAs,
RequestTimeout: time.Duration(cfg.RequestTimeout) * time.Millisecond,
BufferSize: cfg.BufferSize,
Logger: l,
}, cfg.Config)
if err != nil {
Expand Down
20 changes: 14 additions & 6 deletions nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ import (
"github.com/resgateio/resgate/server/mq"
)

const (
natsChannelSize = 256
)

// Client holds a client connection to a nats server.
type Client struct {
RequestTimeout time.Duration
Expand All @@ -26,6 +22,7 @@ type Client struct {
ClientKey string
RootCAs []string
Logger logger.Logger
BufferSize int

mq *nats.Conn
mqCh chan *nats.Msg
Expand Down Expand Up @@ -75,7 +72,11 @@ func (c *Client) Connect() error {
c.Logf("Connecting to NATS at %s", c.URL)

// Create connection options
opts := []nats.Option{nats.NoReconnect(), nats.ClosedHandler(c.onClose)}
opts := []nats.Option{
nats.NoReconnect(),
nats.ClosedHandler(c.onClose),
nats.ErrorHandler(c.onError),
}
if c.Creds != "" {
opts = append(opts, nats.UserCredentials(c.Creds))
}
Expand All @@ -98,7 +99,7 @@ func (c *Client) Connect() error {
}

c.mq = nc
c.mqCh = make(chan *nats.Msg, natsChannelSize)
c.mqCh = make(chan *nats.Msg, c.BufferSize)
c.mqReqs = make(map[*nats.Subscription]*responseCont)
c.tq = timerqueue.New(c.onTimeout, c.RequestTimeout)
c.stopped = make(chan struct{})
Expand Down Expand Up @@ -173,6 +174,13 @@ func (c *Client) onClose(conn *nats.Conn) {
}
}

func (c *Client) onError(conn *nats.Conn, sub *nats.Subscription, err error) {
c.Logger.Error(err.Error())
if err == nats.ErrSlowConsumer {
c.Close()
}
}

// SendRequest sends a request to the MQ.
func (c *Client) SendRequest(subj string, payload []byte, cb mq.Response) {
inbox := nats.NewInbox()
Expand Down
2 changes: 1 addition & 1 deletion server/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import "time"

const (
// Version is the current version for the server.
Version = "1.7.3"
Version = "1.7.4"

// ProtocolVersion is the implemented RES protocol version.
ProtocolVersion = "1.2.2"
Expand Down

0 comments on commit 07aff00

Please sign in to comment.