Skip to content

Commit

Permalink
Merge branch 'release/v0.4.7'
Browse files Browse the repository at this point in the history
  • Loading branch information
jirenius committed Sep 1, 2023
2 parents d5ebd39 + 2fbb849 commit 372a82d
Show file tree
Hide file tree
Showing 17 changed files with 140 additions and 79 deletions.
4 changes: 2 additions & 2 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ https://github.com/resgateio/resgate
The implementation provides structs and methods for creating services that
listen to requests and send events over NATS server.
Concurrency
# Concurrency
Requests are handled concurrently for multiple resources, but the package
guarantees that only one goroutine is executing handlers for any unique resource
at any one time. This allows handlers to modify models and collections without
additional synchronization such as mutexes.
Usage
# Usage
Create a new service:
Expand Down
2 changes: 1 addition & 1 deletion examples/04-book-collection-store/bookstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
// for storage, but any other database can be used. What is needed is a wrapper
// that implements the Store and QueryStore interfaces found in package:
//
// github.com/jirenius/go-res/store
// github.com/jirenius/go-res/store
type BookStore struct {
*badgerstore.Store
BooksByTitle *badgerstore.QueryStore
Expand Down
3 changes: 1 addition & 2 deletions examples/05-search-query/customerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import (
// customer is a document. What is needed is a wrapper that implements the Store
// and QueryStore interfaces found in package:
//
// github.com/jirenius/go-res/store
//
// github.com/jirenius/go-res/store
type CustomerStore struct {
*badgerstore.Store
CustomersQuery *badgerstore.QueryStore
Expand Down
3 changes: 1 addition & 2 deletions middleware/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@ to perform tasks such as:
Currently, only the BadgerDB middleware is created, to demonstrate
database persistence.
Usage
# Usage
Add middleware to a resource:
s.Handle("user.$id",
middlware.BadgerDB{DB: db},
)
*/
package middleware
4 changes: 3 additions & 1 deletion middleware/resbadger/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
//
// When used on Model resource, an index entry will be added for each model entry.
// An index entry will have no value (nil), and the key will have the following structure:
// <Name>:<Key>?<RID>
//
// <Name>:<Key>?<RID>
//
// Where:
// * <Name> is the name of the Index (so keep it rather short)
// * <Key> is the index value as returned from the Key callback
Expand Down
5 changes: 3 additions & 2 deletions middleware/resbadger/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ func (o Model) WithMap(m func(interface{}) (interface{}, error)) Model {
//
// The resource pattern should be the full pattern, including
// any service name. It may contain $tags, or end with a full wildcard (>).
// test.model.$id
// test.resource.>
//
// test.model.$id
// test.resource.>
func (o Model) RebuildIndexes(pattern string) error {
// Quick exit in case no index exists
if o.IndexSet == nil || len(o.IndexSet.Indexes) == 0 {
Expand Down
11 changes: 8 additions & 3 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,16 @@ func (m *Mux) callOnRegister() {
// A pattern may contain placeholders that acts as wildcards, and will be
// parsed and stored in the request.PathParams map.
// A placeholder is a resource name part starting with a dollar ($) character:
// s.Handle("user.$id", handler) // Will match "user.10", "user.foo", etc.
//
// s.Handle("user.$id", handler) // Will match "user.10", "user.foo", etc.
//
// An anonymous placeholder is a resource name part using an asterisk (*) character:
// s.Handle("user.*", handler) // Will match "user.10", "user.foo", etc.
//
// s.Handle("user.*", handler) // Will match "user.10", "user.foo", etc.
//
// A full wildcard can be used as last part using a greather than (>) character:
// s.Handle("data.>", handler) // Will match "data.foo", "data.foo.bar", etc.
//
// s.Handle("data.>", handler) // Will match "data.foo", "data.foo.bar", etc.
//
// If the pattern is already registered, or if there are conflicts among
// the handlers, Handle panics.
Expand Down
7 changes: 4 additions & 3 deletions pattern.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package res

// Pattern is a resource pattern that may contain wildcards and tags.
// Pattern("example.resource.>") // Full wild card (>) matches anything that follows
// Pattern("example.item.*") // Wild card (*) matches a single part
// Pattern("example.model.$id") // Tag (starting with $) matches a single part
//
// Pattern("example.resource.>") // Full wild card (>) matches anything that follows
// Pattern("example.item.*") // Wild card (*) matches a single part
// Pattern("example.model.$id") // Tag (starting with $) matches a single part
type Pattern string

// IsValid returns true if the pattern is valid, otherwise false.
Expand Down
3 changes: 2 additions & 1 deletion request.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,8 @@ func (r *Request) AccessDenied() {

// AccessGranted a successful response granting full access to the resource.
// Same as calling:
// Access(true, "*");
//
// Access(true, "*");
//
// Only valid for access requests.
func (r *Request) AccessGranted() {
Expand Down
4 changes: 1 addition & 3 deletions resprot/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ services using the RES Service Protocol over NATS server.
https://github.com/resgateio/resgate/blob/master/docs/res-service-protocol.md
Usage
# Usage
Make a request:
Expand All @@ -32,6 +31,5 @@ Call a method:
Sum float64 `json:"sum"`
}
err := response.ParseResult(&result)
*/
package resprot
32 changes: 16 additions & 16 deletions resprot/resprot.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,18 +418,18 @@ func SendRequest(nc res.Conn, subject string, req interface{}, timeout time.Dura
//
// If the JSON data start with an array, UnmarshalDataValue will return an error.
//
// UnmarshalDataValue([]byte(`42`), v) // sets v to 42
// UnmarshalDataValue([]byte(`"foo"`), v) // sets v to "foo"
// UnmarshalDataValue([]byte(`{"data":true}`), v) // sets v to true
// UnmarshalDataValue([]byte(`{"data":["foo","bar"]}`), v) // sets v to []string{"foo", "bar"}
// UnmarshalDataValue([]byte(`{"foo":"bar"}`), v) // returns error
// UnmarshalDataValue([]byte(`[1,2,3]`), v) // returns error
// UnmarshalDataValue([]byte(`42`), v) // sets v to 42
// UnmarshalDataValue([]byte(`"foo"`), v) // sets v to "foo"
// UnmarshalDataValue([]byte(`{"data":true}`), v) // sets v to true
// UnmarshalDataValue([]byte(`{"data":["foo","bar"]}`), v) // sets v to []string{"foo", "bar"}
// UnmarshalDataValue([]byte(`{"foo":"bar"}`), v) // returns error
// UnmarshalDataValue([]byte(`[1,2,3]`), v) // returns error
//
// UnmarshalDataValue can be used to implement the json.Unmarshaler interface:
//
// func (t *T) UnmarshalJSON([]byte) error
// return UnmarshalDataValue(data, t)
// }
// func (t *T) UnmarshalJSON([]byte) error
// return UnmarshalDataValue(data, t)
// }
//
// See:
// https://github.com/resgateio/resgate/blob/master/docs/res-protocol.md#data-values
Expand Down Expand Up @@ -475,16 +475,16 @@ func UnmarshalDataValue(data []byte, v interface{}) error {
//
// If v encodes into a JSON object or array, MarshalDataValue will wrap the value in a data object, where the value is stored under the key "data".
//
// MarshalDataValue(42) // Returns []byte(`42`)
// MarshalDataValue("foo"), v) // Returns []byte(`"foo"`)
// MarshalDataValue([]string{"foo", "bar"}) // Returns []byte(`{"data":["foo","bar"]}`)
// MarshalDataValue(map[string]int{"foo": 42}) // Returns []byte(`{"data":{"foo":42}}`)
// MarshalDataValue(42) // Returns []byte(`42`)
// MarshalDataValue("foo"), v) // Returns []byte(`"foo"`)
// MarshalDataValue([]string{"foo", "bar"}) // Returns []byte(`{"data":["foo","bar"]}`)
// MarshalDataValue(map[string]int{"foo": 42}) // Returns []byte(`{"data":{"foo":42}}`)
//
// MarshalDataValue can be used to implement the json.Marshaler interface:
//
// func (t T) MarshalJSON() ([]byte, error)
// return MarshalDataValue(t)
// }
// func (t T) MarshalJSON() ([]byte, error)
// return MarshalDataValue(t)
// }
//
// See:
// https://github.com/resgateio/resgate/blob/master/docs/res-protocol.md#data-values
Expand Down
12 changes: 8 additions & 4 deletions restest/natsrequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ func (nr *NATSRequest) Response() *Msg {
// Get sends a get request to the service.
//
// The resource ID, rid, may contain a query part:
// test.model?q=foo
//
// test.model?q=foo
func (c *MockConn) Get(rid string) *NATSRequest {
rname, q := parseRID(rid)
return c.Request("get."+rname, Request{Query: q})
Expand All @@ -33,7 +34,8 @@ func (c *MockConn) Get(rid string) *NATSRequest {
// A nil req value sends a DefaultCallRequest.
//
// The resource ID, rid, may contain a query part:
// test.model?q=foo
//
// test.model?q=foo
func (c *MockConn) Call(rid string, method string, req *Request) *NATSRequest {
if req == nil {
req = DefaultCallRequest()
Expand All @@ -51,7 +53,8 @@ func (c *MockConn) Call(rid string, method string, req *Request) *NATSRequest {
// A nil req value sends a DefaultAuthRequest.
//
// The resource ID, rid, may contain a query part:
// test.model?q=foo
//
// test.model?q=foo
func (c *MockConn) Auth(rid string, method string, req *Request) *NATSRequest {
if req == nil {
req = DefaultAuthRequest()
Expand All @@ -69,7 +72,8 @@ func (c *MockConn) Auth(rid string, method string, req *Request) *NATSRequest {
// A nil req value sends a DefaultAccessRequest.
//
// The resource ID, rid, may contain a query part:
// test.model?q=foo
//
// test.model?q=foo
func (c *MockConn) Access(rid string, req *Request) *NATSRequest {
if req == nil {
req = DefaultAccessRequest()
Expand Down
6 changes: 4 additions & 2 deletions restest/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ type SessionConfig struct {
//
// A service logger will by default be set to a new MemLogger. To set any other
// logger, add the option:
// WithLogger(logger)
//
// WithLogger(logger)
//
// If the tests sends any query event, a real NATS instance is required, which
// is slower than using the default mock connection. To use a real NATS
// instance, add the option:
// WithGnatsd
//
// WithGnatsd
func NewSession(t *testing.T, service *res.Service, opts ...func(*SessionConfig)) *Session {
cfg := &SessionConfig{
MockConnConfig: MockConnConfig{TimeoutDuration: DefaultTimeoutDuration},
Expand Down
74 changes: 58 additions & 16 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@ import (
// Supported RES protocol version.
const protocolVersion = "1.2.2"

// The size of the in channel receiving messages from NATS Server.
const inChannelSize = 256
// The default size of the in channel receiving messages from NATS Server.
const defaultInChannelSize = 1024

// The number of default workers handling resource requests.
const workerCount = 32
// The default number of workers handling resource requests.
const defaultWorkerCount = 32

// The default duration for which the service will listen for query requests
// sent on a query event
const defaultQueryEventDuration = time.Second * 3

// Common panic messages
const (
serviceAlreadyStarted = "res: service already started"
)

var (
errNotStopped = errors.New("res: service is not stopped")
errNotStarted = errors.New("res: service is not started")
Expand Down Expand Up @@ -198,6 +205,8 @@ type Service struct {
resetAccess []string // List of resource name patterns used system.reset for access. Defaults to serviceName+">"
queryTQ *timerqueue.Queue // Timer queue for query events duration
queryDuration time.Duration // Duration to listen for query requests on a query event
workerCount int // Number of workers handling resource requests
inChannelSize int // Size of the in channel receiving messages from NATS Server
onServe func(*Service) // Handler called after the starting to serve prior to calling system.reset
onDisconnect func(*Service) // Handler called after the service has been disconnected from NATS server.
onReconnect func(*Service) // Handler called after the service has reconnected to NATS server and sent a system reset event.
Expand All @@ -216,6 +225,8 @@ func NewService(name string) *Service {
queueGroup: name,
logger: logger.NewStdLogger(),
queryDuration: defaultQueryEventDuration,
workerCount: defaultWorkerCount,
inChannelSize: defaultInChannelSize,
}
s.Mux.Register(s)
return s
Expand All @@ -224,7 +235,7 @@ func NewService(name string) *Service {
// SetLogger sets the logger. Panics if service is already started.
func (s *Service) SetLogger(l logger.Logger) *Service {
if s.nc != nil {
panic("res: service already started")
panic(serviceAlreadyStarted)
}
s.logger = l
return s
Expand All @@ -234,12 +245,42 @@ func (s *Service) SetLogger(l logger.Logger) *Service {
// query requests sent on a query event. Default is 3 seconds
func (s *Service) SetQueryEventDuration(d time.Duration) *Service {
if s.nc != nil {
panic("res: service already started")
panic(serviceAlreadyStarted)
}
s.queryDuration = d
return s
}

// SetWorkerCount sets the number of workers handling incoming requests. Default
// is 32 workers.
//
// If count is less or equal to zero, the default value is used.
func (s *Service) SetWorkerCount(count int) *Service {
if s.nc != nil {
panic(serviceAlreadyStarted)
}
if count <= 0 {
count = defaultWorkerCount
}
s.workerCount = count
return s
}

// SetInChannelSize sets the size of the in channel receiving messages from NATS
// Server. Default is 1024.
//
// If size is less or equal to zero, the default value is used.
func (s *Service) SetInChannelSize(size int) *Service {
if s.nc != nil {
panic(serviceAlreadyStarted)
}
if size <= 0 {
size = defaultInChannelSize
}
s.inChannelSize = size
return s
}

// SetQueueGroup sets the queue group to use when subscribing to resources. By
// default it will be the same as the service name.
//
Expand Down Expand Up @@ -290,7 +331,8 @@ func (s *Service) ProtocolVersion() string {
//
// If the service was started using ListenAndServe, the connection will be of
// type *nats.Conn:
// nc := service.Conn().(*nats.Conn)
//
// nc := service.Conn().(*nats.Conn)
func (s *Service) Conn() Conn {
return s.nc
}
Expand Down Expand Up @@ -506,12 +548,12 @@ func (s *Service) SetReset(resources, access []string) *Service {
// requests. The access slice patterns will be listened to for access requests.
// These patterns will be used when a ResetAll is made.
//
// // Handle all requests for resources prefixed "library."
// service.SetOwnedResources([]string{"library.>"}, []string{"library.>"})
// // Handle access requests for any resource
// service.SetOwnedResources([]string{}, []string{">"})
// // Handle non-access requests for a subset of resources
// service.SetOwnedResources([]string{"library.book", "library.books.*"}, []string{})
// // Handle all requests for resources prefixed "library."
// service.SetOwnedResources([]string{"library.>"}, []string{"library.>"})
// // Handle access requests for any resource
// service.SetOwnedResources([]string{}, []string{">"})
// // Handle non-access requests for a subset of resources
// service.SetOwnedResources([]string{"library.book", "library.books.*"}, []string{})
//
// If set to nil (default), the service will default to set ownership of all
// resources prefixed with its own path if one was provided when creating the
Expand Down Expand Up @@ -602,7 +644,7 @@ func (s *Service) serve(nc Conn) error {
}

// Initialize fields
inCh := make(chan *nats.Msg, inChannelSize)
inCh := make(chan *nats.Msg, s.inChannelSize)
workCh := make(chan *work, 1)
s.nc = nc
s.inCh = inCh
Expand All @@ -611,8 +653,8 @@ func (s *Service) serve(nc Conn) error {
s.queryTQ = timerqueue.New(s.queryEventExpire, s.queryDuration)

// Start workers
s.wg.Add(workerCount)
for i := 0; i < workerCount; i++ {
s.wg.Add(s.workerCount)
for i := 0; i < s.workerCount; i++ {
go s.startWorker(s.workCh)
}

Expand Down
Loading

0 comments on commit 372a82d

Please sign in to comment.