From f530d4303dd7a06321889352b14eed20c4fe3265 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Samuel=20Jir=C3=A9nius?= Date: Fri, 1 Sep 2023 10:36:41 +0200 Subject: [PATCH 1/3] GH-98: Added SetWorkerCount and SetInChannelSize methods to Service. --- service.go | 74 ++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 58 insertions(+), 16 deletions(-) diff --git a/service.go b/service.go index a86c998..549da03 100644 --- a/service.go +++ b/service.go @@ -17,14 +17,21 @@ import ( // Supported RES protocol version. const protocolVersion = "1.2.2" -// The size of the in channel receiving messages from NATS Server. -const inChannelSize = 256 +// The default size of the in channel receiving messages from NATS Server. +const defaultInChannelSize = 256 -// The number of default workers handling resource requests. -const workerCount = 32 +// The default number of workers handling resource requests. +const defaultWorkerCount = 32 +// The default duration for which the service will listen for query requests +// sent on a query event const defaultQueryEventDuration = time.Second * 3 +// Common panic messages +const ( + serviceAlreadyStarted = "res: service already started" +) + var ( errNotStopped = errors.New("res: service is not stopped") errNotStarted = errors.New("res: service is not started") @@ -198,6 +205,8 @@ type Service struct { resetAccess []string // List of resource name patterns used system.reset for access. Defaults to serviceName+">" queryTQ *timerqueue.Queue // Timer queue for query events duration queryDuration time.Duration // Duration to listen for query requests on a query event + workerCount int // Number of workers handling resource requests + inChannelSize int // Size of the in channel receiving messages from NATS Server onServe func(*Service) // Handler called after the starting to serve prior to calling system.reset onDisconnect func(*Service) // Handler called after the service has been disconnected from NATS server. onReconnect func(*Service) // Handler called after the service has reconnected to NATS server and sent a system reset event. @@ -216,6 +225,8 @@ func NewService(name string) *Service { queueGroup: name, logger: logger.NewStdLogger(), queryDuration: defaultQueryEventDuration, + workerCount: defaultWorkerCount, + inChannelSize: defaultInChannelSize, } s.Mux.Register(s) return s @@ -224,7 +235,7 @@ func NewService(name string) *Service { // SetLogger sets the logger. Panics if service is already started. func (s *Service) SetLogger(l logger.Logger) *Service { if s.nc != nil { - panic("res: service already started") + panic(serviceAlreadyStarted) } s.logger = l return s @@ -234,12 +245,42 @@ func (s *Service) SetLogger(l logger.Logger) *Service { // query requests sent on a query event. Default is 3 seconds func (s *Service) SetQueryEventDuration(d time.Duration) *Service { if s.nc != nil { - panic("res: service already started") + panic(serviceAlreadyStarted) } s.queryDuration = d return s } +// SetWorkerCount sets the number of workers handling incoming requests. Default +// is 32 workers. +// +// If count is less or equal to zero, the default value is used. +func (s *Service) SetWorkerCount(count int) *Service { + if s.nc != nil { + panic(serviceAlreadyStarted) + } + if count <= 0 { + count = defaultWorkerCount + } + s.workerCount = count + return s +} + +// SetInChannelSize sets the size of the in channel receiving messages from NATS +// Server. Default is 256. +// +// If size is less or equal to zero, the default value is used. +func (s *Service) SetInChannelSize(size int) *Service { + if s.nc != nil { + panic(serviceAlreadyStarted) + } + if size <= 0 { + size = defaultInChannelSize + } + s.inChannelSize = size + return s +} + // SetQueueGroup sets the queue group to use when subscribing to resources. By // default it will be the same as the service name. // @@ -290,7 +331,8 @@ func (s *Service) ProtocolVersion() string { // // If the service was started using ListenAndServe, the connection will be of // type *nats.Conn: -// nc := service.Conn().(*nats.Conn) +// +// nc := service.Conn().(*nats.Conn) func (s *Service) Conn() Conn { return s.nc } @@ -506,12 +548,12 @@ func (s *Service) SetReset(resources, access []string) *Service { // requests. The access slice patterns will be listened to for access requests. // These patterns will be used when a ResetAll is made. // -// // Handle all requests for resources prefixed "library." -// service.SetOwnedResources([]string{"library.>"}, []string{"library.>"}) -// // Handle access requests for any resource -// service.SetOwnedResources([]string{}, []string{">"}) -// // Handle non-access requests for a subset of resources -// service.SetOwnedResources([]string{"library.book", "library.books.*"}, []string{}) +// // Handle all requests for resources prefixed "library." +// service.SetOwnedResources([]string{"library.>"}, []string{"library.>"}) +// // Handle access requests for any resource +// service.SetOwnedResources([]string{}, []string{">"}) +// // Handle non-access requests for a subset of resources +// service.SetOwnedResources([]string{"library.book", "library.books.*"}, []string{}) // // If set to nil (default), the service will default to set ownership of all // resources prefixed with its own path if one was provided when creating the @@ -602,7 +644,7 @@ func (s *Service) serve(nc Conn) error { } // Initialize fields - inCh := make(chan *nats.Msg, inChannelSize) + inCh := make(chan *nats.Msg, s.inChannelSize) workCh := make(chan *work, 1) s.nc = nc s.inCh = inCh @@ -611,8 +653,8 @@ func (s *Service) serve(nc Conn) error { s.queryTQ = timerqueue.New(s.queryEventExpire, s.queryDuration) // Start workers - s.wg.Add(workerCount) - for i := 0; i < workerCount; i++ { + s.wg.Add(s.workerCount) + for i := 0; i < s.workerCount; i++ { go s.startWorker(s.workCh) } From 8e4f9d994c905b792b1be84b99f9ce3f80a5dcc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Samuel=20Jir=C3=A9nius?= Date: Fri, 1 Sep 2023 10:38:53 +0200 Subject: [PATCH 2/3] GH-98: Increased defaultInChannelSize from 256 to 1024. --- service.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/service.go b/service.go index 549da03..bdfa852 100644 --- a/service.go +++ b/service.go @@ -18,7 +18,7 @@ import ( const protocolVersion = "1.2.2" // The default size of the in channel receiving messages from NATS Server. -const defaultInChannelSize = 256 +const defaultInChannelSize = 1024 // The default number of workers handling resource requests. const defaultWorkerCount = 32 @@ -267,7 +267,7 @@ func (s *Service) SetWorkerCount(count int) *Service { } // SetInChannelSize sets the size of the in channel receiving messages from NATS -// Server. Default is 256. +// Server. Default is 1024. // // If size is less or equal to zero, the default value is used. func (s *Service) SetInChannelSize(size int) *Service { From 2fbb849825e2a7e1c3c5c062a701264f89348a16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Samuel=20Jir=C3=A9nius?= Date: Fri, 1 Sep 2023 10:51:07 +0200 Subject: [PATCH 3/3] Updated formatting of comments: gofmt -s -w . --- doc.go | 4 +-- .../04-book-collection-store/bookstore.go | 2 +- examples/05-search-query/customerstore.go | 3 +- middleware/doc.go | 3 +- middleware/resbadger/index.go | 4 ++- middleware/resbadger/model.go | 5 +-- mux.go | 11 +++++-- pattern.go | 7 ++-- request.go | 3 +- resprot/doc.go | 4 +-- resprot/resprot.go | 32 +++++++++--------- restest/natsrequest.go | 12 ++++--- restest/session.go | 6 ++-- store/badgerstore/index.go | 4 ++- store/transformer.go | 12 +++---- types.go | 33 +++++++++++-------- 16 files changed, 82 insertions(+), 63 deletions(-) diff --git a/doc.go b/doc.go index 98a58e3..e822558 100644 --- a/doc.go +++ b/doc.go @@ -7,14 +7,14 @@ https://github.com/resgateio/resgate The implementation provides structs and methods for creating services that listen to requests and send events over NATS server. -Concurrency +# Concurrency Requests are handled concurrently for multiple resources, but the package guarantees that only one goroutine is executing handlers for any unique resource at any one time. This allows handlers to modify models and collections without additional synchronization such as mutexes. -Usage +# Usage Create a new service: diff --git a/examples/04-book-collection-store/bookstore.go b/examples/04-book-collection-store/bookstore.go index 9bf23fe..060ea17 100644 --- a/examples/04-book-collection-store/bookstore.go +++ b/examples/04-book-collection-store/bookstore.go @@ -13,7 +13,7 @@ import ( // for storage, but any other database can be used. What is needed is a wrapper // that implements the Store and QueryStore interfaces found in package: // -// github.com/jirenius/go-res/store +// github.com/jirenius/go-res/store type BookStore struct { *badgerstore.Store BooksByTitle *badgerstore.QueryStore diff --git a/examples/05-search-query/customerstore.go b/examples/05-search-query/customerstore.go index e19455f..507034a 100644 --- a/examples/05-search-query/customerstore.go +++ b/examples/05-search-query/customerstore.go @@ -25,8 +25,7 @@ import ( // customer is a document. What is needed is a wrapper that implements the Store // and QueryStore interfaces found in package: // -// github.com/jirenius/go-res/store -// +// github.com/jirenius/go-res/store type CustomerStore struct { *badgerstore.Store CustomersQuery *badgerstore.QueryStore diff --git a/middleware/doc.go b/middleware/doc.go index aa6da89..05d8cec 100644 --- a/middleware/doc.go +++ b/middleware/doc.go @@ -14,13 +14,12 @@ to perform tasks such as: Currently, only the BadgerDB middleware is created, to demonstrate database persistence. -Usage +# Usage Add middleware to a resource: s.Handle("user.$id", middlware.BadgerDB{DB: db}, ) - */ package middleware diff --git a/middleware/resbadger/index.go b/middleware/resbadger/index.go index b3bd92e..f639604 100644 --- a/middleware/resbadger/index.go +++ b/middleware/resbadger/index.go @@ -12,7 +12,9 @@ import ( // // When used on Model resource, an index entry will be added for each model entry. // An index entry will have no value (nil), and the key will have the following structure: -// :? +// +// :? +// // Where: // * is the name of the Index (so keep it rather short) // * is the index value as returned from the Key callback diff --git a/middleware/resbadger/model.go b/middleware/resbadger/model.go index 1f42e19..61ee3ea 100644 --- a/middleware/resbadger/model.go +++ b/middleware/resbadger/model.go @@ -58,8 +58,9 @@ func (o Model) WithMap(m func(interface{}) (interface{}, error)) Model { // // The resource pattern should be the full pattern, including // any service name. It may contain $tags, or end with a full wildcard (>). -// test.model.$id -// test.resource.> +// +// test.model.$id +// test.resource.> func (o Model) RebuildIndexes(pattern string) error { // Quick exit in case no index exists if o.IndexSet == nil || len(o.IndexSet.Indexes) == 0 { diff --git a/mux.go b/mux.go index e6a782c..8eebf65 100644 --- a/mux.go +++ b/mux.go @@ -176,11 +176,16 @@ func (m *Mux) callOnRegister() { // A pattern may contain placeholders that acts as wildcards, and will be // parsed and stored in the request.PathParams map. // A placeholder is a resource name part starting with a dollar ($) character: -// s.Handle("user.$id", handler) // Will match "user.10", "user.foo", etc. +// +// s.Handle("user.$id", handler) // Will match "user.10", "user.foo", etc. +// // An anonymous placeholder is a resource name part using an asterisk (*) character: -// s.Handle("user.*", handler) // Will match "user.10", "user.foo", etc. +// +// s.Handle("user.*", handler) // Will match "user.10", "user.foo", etc. +// // A full wildcard can be used as last part using a greather than (>) character: -// s.Handle("data.>", handler) // Will match "data.foo", "data.foo.bar", etc. +// +// s.Handle("data.>", handler) // Will match "data.foo", "data.foo.bar", etc. // // If the pattern is already registered, or if there are conflicts among // the handlers, Handle panics. diff --git a/pattern.go b/pattern.go index 689b0c4..126e815 100644 --- a/pattern.go +++ b/pattern.go @@ -1,9 +1,10 @@ package res // Pattern is a resource pattern that may contain wildcards and tags. -// Pattern("example.resource.>") // Full wild card (>) matches anything that follows -// Pattern("example.item.*") // Wild card (*) matches a single part -// Pattern("example.model.$id") // Tag (starting with $) matches a single part +// +// Pattern("example.resource.>") // Full wild card (>) matches anything that follows +// Pattern("example.item.*") // Wild card (*) matches a single part +// Pattern("example.model.$id") // Tag (starting with $) matches a single part type Pattern string // IsValid returns true if the pattern is valid, otherwise false. diff --git a/request.go b/request.go index dca4518..90e4f3b 100644 --- a/request.go +++ b/request.go @@ -332,7 +332,8 @@ func (r *Request) AccessDenied() { // AccessGranted a successful response granting full access to the resource. // Same as calling: -// Access(true, "*"); +// +// Access(true, "*"); // // Only valid for access requests. func (r *Request) AccessGranted() { diff --git a/resprot/doc.go b/resprot/doc.go index 818f806..bea6589 100644 --- a/resprot/doc.go +++ b/resprot/doc.go @@ -4,8 +4,7 @@ services using the RES Service Protocol over NATS server. https://github.com/resgateio/resgate/blob/master/docs/res-service-protocol.md - -Usage +# Usage Make a request: @@ -32,6 +31,5 @@ Call a method: Sum float64 `json:"sum"` } err := response.ParseResult(&result) - */ package resprot diff --git a/resprot/resprot.go b/resprot/resprot.go index 59e7849..cb4d945 100644 --- a/resprot/resprot.go +++ b/resprot/resprot.go @@ -418,18 +418,18 @@ func SendRequest(nc res.Conn, subject string, req interface{}, timeout time.Dura // // If the JSON data start with an array, UnmarshalDataValue will return an error. // -// UnmarshalDataValue([]byte(`42`), v) // sets v to 42 -// UnmarshalDataValue([]byte(`"foo"`), v) // sets v to "foo" -// UnmarshalDataValue([]byte(`{"data":true}`), v) // sets v to true -// UnmarshalDataValue([]byte(`{"data":["foo","bar"]}`), v) // sets v to []string{"foo", "bar"} -// UnmarshalDataValue([]byte(`{"foo":"bar"}`), v) // returns error -// UnmarshalDataValue([]byte(`[1,2,3]`), v) // returns error +// UnmarshalDataValue([]byte(`42`), v) // sets v to 42 +// UnmarshalDataValue([]byte(`"foo"`), v) // sets v to "foo" +// UnmarshalDataValue([]byte(`{"data":true}`), v) // sets v to true +// UnmarshalDataValue([]byte(`{"data":["foo","bar"]}`), v) // sets v to []string{"foo", "bar"} +// UnmarshalDataValue([]byte(`{"foo":"bar"}`), v) // returns error +// UnmarshalDataValue([]byte(`[1,2,3]`), v) // returns error // // UnmarshalDataValue can be used to implement the json.Unmarshaler interface: // -// func (t *T) UnmarshalJSON([]byte) error -// return UnmarshalDataValue(data, t) -// } +// func (t *T) UnmarshalJSON([]byte) error +// return UnmarshalDataValue(data, t) +// } // // See: // https://github.com/resgateio/resgate/blob/master/docs/res-protocol.md#data-values @@ -475,16 +475,16 @@ func UnmarshalDataValue(data []byte, v interface{}) error { // // If v encodes into a JSON object or array, MarshalDataValue will wrap the value in a data object, where the value is stored under the key "data". // -// MarshalDataValue(42) // Returns []byte(`42`) -// MarshalDataValue("foo"), v) // Returns []byte(`"foo"`) -// MarshalDataValue([]string{"foo", "bar"}) // Returns []byte(`{"data":["foo","bar"]}`) -// MarshalDataValue(map[string]int{"foo": 42}) // Returns []byte(`{"data":{"foo":42}}`) +// MarshalDataValue(42) // Returns []byte(`42`) +// MarshalDataValue("foo"), v) // Returns []byte(`"foo"`) +// MarshalDataValue([]string{"foo", "bar"}) // Returns []byte(`{"data":["foo","bar"]}`) +// MarshalDataValue(map[string]int{"foo": 42}) // Returns []byte(`{"data":{"foo":42}}`) // // MarshalDataValue can be used to implement the json.Marshaler interface: // -// func (t T) MarshalJSON() ([]byte, error) -// return MarshalDataValue(t) -// } +// func (t T) MarshalJSON() ([]byte, error) +// return MarshalDataValue(t) +// } // // See: // https://github.com/resgateio/resgate/blob/master/docs/res-protocol.md#data-values diff --git a/restest/natsrequest.go b/restest/natsrequest.go index e8903d3..bf5b272 100644 --- a/restest/natsrequest.go +++ b/restest/natsrequest.go @@ -22,7 +22,8 @@ func (nr *NATSRequest) Response() *Msg { // Get sends a get request to the service. // // The resource ID, rid, may contain a query part: -// test.model?q=foo +// +// test.model?q=foo func (c *MockConn) Get(rid string) *NATSRequest { rname, q := parseRID(rid) return c.Request("get."+rname, Request{Query: q}) @@ -33,7 +34,8 @@ func (c *MockConn) Get(rid string) *NATSRequest { // A nil req value sends a DefaultCallRequest. // // The resource ID, rid, may contain a query part: -// test.model?q=foo +// +// test.model?q=foo func (c *MockConn) Call(rid string, method string, req *Request) *NATSRequest { if req == nil { req = DefaultCallRequest() @@ -51,7 +53,8 @@ func (c *MockConn) Call(rid string, method string, req *Request) *NATSRequest { // A nil req value sends a DefaultAuthRequest. // // The resource ID, rid, may contain a query part: -// test.model?q=foo +// +// test.model?q=foo func (c *MockConn) Auth(rid string, method string, req *Request) *NATSRequest { if req == nil { req = DefaultAuthRequest() @@ -69,7 +72,8 @@ func (c *MockConn) Auth(rid string, method string, req *Request) *NATSRequest { // A nil req value sends a DefaultAccessRequest. // // The resource ID, rid, may contain a query part: -// test.model?q=foo +// +// test.model?q=foo func (c *MockConn) Access(rid string, req *Request) *NATSRequest { if req == nil { req = DefaultAccessRequest() diff --git a/restest/session.go b/restest/session.go index 9c17f27..ba9a335 100644 --- a/restest/session.go +++ b/restest/session.go @@ -39,12 +39,14 @@ type SessionConfig struct { // // A service logger will by default be set to a new MemLogger. To set any other // logger, add the option: -// WithLogger(logger) +// +// WithLogger(logger) // // If the tests sends any query event, a real NATS instance is required, which // is slower than using the default mock connection. To use a real NATS // instance, add the option: -// WithGnatsd +// +// WithGnatsd func NewSession(t *testing.T, service *res.Service, opts ...func(*SessionConfig)) *Session { cfg := &SessionConfig{ MockConnConfig: MockConnConfig{TimeoutDuration: DefaultTimeoutDuration}, diff --git a/store/badgerstore/index.go b/store/badgerstore/index.go index 91ac412..7c0df3f 100644 --- a/store/badgerstore/index.go +++ b/store/badgerstore/index.go @@ -11,7 +11,9 @@ import ( // // When used on Model resource, an index entry will be added for each model entry. // An index entry will have no value (nil), and the key will have the following structure: -// :\x00 +// +// :\x00 +// // Where: // * is the name of the Index (so keep it rather short) // * is the index value as returned from the Key callback diff --git a/store/transformer.go b/store/transformer.go index 03730c0..e3e27af 100644 --- a/store/transformer.go +++ b/store/transformer.go @@ -51,8 +51,8 @@ func (t transformer) Transform(id string, v interface{}) (interface{}, error) { // IDTransformer returns a transformer where the resource ID contains a single // tag that is the internal ID. // -// // Assuming pattern is "library.book.$bookid" -// IDTransformer("bookId", nil) // transforms "library.book.42" <=> "42" +// // Assuming pattern is "library.book.$bookid" +// IDTransformer("bookId", nil) // transforms "library.book.42" <=> "42" func IDTransformer(tagName string, transform func(id string, v interface{}) (interface{}, error)) Transformer { return TransformFuncs( func(_ string, pathParams map[string]string) string { @@ -68,11 +68,11 @@ func IDTransformer(tagName string, transform func(id string, v interface{}) (int // IDToRIDCollectionTransformer is a QueryTransformer that handles the common // case of transforming a slice of id strings: // -// []string{"1", "2"} +// []string{"1", "2"} // // into slice of resource references: // -// []res.Ref{"library.book.1", "library.book.2"} +// []res.Ref{"library.book.1", "library.book.2"} // // The function converts a single ID returned by a the store into an external // resource ID. @@ -110,11 +110,11 @@ func (t IDToRIDCollectionTransformer) TransformEvents(evs []ResultEvent) ([]Resu // IDToRIDModelTransformer is a QueryTransformer that handles the common case of // transforming a slice of unique id strings: // -// []string{"1", "2"} +// []string{"1", "2"} // // into a map of resource references with id as key: // -// map[string]res.Ref{"1": "library.book.1", "2": "library.book.2"} +// map[string]res.Ref{"1": "library.book.1", "2": "library.book.2"} // // The function converts a single ID returned by a the store into an external // resource ID. diff --git a/types.go b/types.go index ed3e630..1640a67 100644 --- a/types.go +++ b/types.go @@ -5,14 +5,16 @@ import "encoding/json" // Ref is a resource reference to another resource ID. // // It marshals into a reference object, eg.: -// {"rid":"userService.user.42"} +// +// {"rid":"userService.user.42"} type Ref string // SoftRef is a soft resource reference to another resource ID which will not // automatically be followed by Resgate. // // It marshals into a soft reference object, eg.: -// {"rid":"userService.user.42","soft":true} +// +// {"rid":"userService.user.42","soft":true} type SoftRef string // DataValue is a wrapper for values that may marshal into any type of json @@ -22,22 +24,25 @@ type SoftRef string // DataValue or similar, or else the value will be considered invalid. // // Example: -// s.Handle("timezones", res.GetCollection(func(r res.CollectionRequest) { -// type tz struct { -// Abbr string `json:"abbr"` -// Offset int `json:"offset"` -// } -// r.Collection([]res.DataValue{ -// res.DataValue{tz{"GMT", 0}}, -// res.DataValue{tz{"CET", 1}}, -// }) -// })) +// +// s.Handle("timezones", res.GetCollection(func(r res.CollectionRequest) { +// type tz struct { +// Abbr string `json:"abbr"` +// Offset int `json:"offset"` +// } +// r.Collection([]res.DataValue{ +// res.DataValue{tz{"GMT", 0}}, +// res.DataValue{tz{"CET", 1}}, +// }) +// })) // // For objects and arrays, it marshals into a data value object, eg.: -// json.Marshal(res.DataValue{[]int{1, 2, 3}}) // Result: {"data":[1,2,3]} +// +// json.Marshal(res.DataValue{[]int{1, 2, 3}}) // Result: {"data":[1,2,3]} // // For strings, numbers, booleans, and null values, it marshals into a primitive value, eg.: -// json.Marshal(res.DataValue{nil}) // Result: null +// +// json.Marshal(res.DataValue{nil}) // Result: null type DataValue struct { Data interface{} `json:"data"` }