Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-107: Added Parallel flag to Handler. #109

Merged
merged 2 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion group.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,12 @@ UnexpectedEnd:
}

func (g group) toString(rname string, tokens []string) string {
if g == nil {
return rname
}
l := len(g)
if l == 0 {
return rname
return ""
}
if l == 1 && g[0].str != "" {
return g[0].str
Expand Down
8 changes: 7 additions & 1 deletion mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,15 @@ func (m *Mux) Handle(pattern string, hf ...Option) {
// AddHandler register a handler for the given resource pattern.
// The pattern used is the same as described for Handle.
func (m *Mux) AddHandler(pattern string, hs Handler) {
var g group
if hs.Parallel {
g = []gpart{}
} else {
g = parseGroup(hs.Group, pattern)
}
h := regHandler{
Handler: hs,
group: parseGroup(hs.Group, pattern),
group: g,
}

m.add(pattern, &h)
Expand Down
6 changes: 3 additions & 3 deletions restest/assert.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
// AssertEqualJSON expects that a and b json marshals into equal values, and
// returns true if they do, otherwise logs a fatal error and returns false.
func AssertEqualJSON(t *testing.T, name string, result, expected interface{}, ctx ...interface{}) bool {
aa, aj := jsonMap(t, result)
bb, bj := jsonMap(t, expected)
aa, aj := jsonMap(result)
bb, bj := jsonMap(expected)

if !reflect.DeepEqual(aa, bb) {
t.Fatalf("expected %s to be:\n\t%s\nbut got:\n\t%s%s", name, bj, aj, ctxString(ctx))
Expand Down Expand Up @@ -117,7 +117,7 @@ func ctxString(ctx []interface{}) string {
return "\nin " + fmt.Sprint(ctx...)
}

func jsonMap(t *testing.T, v interface{}) (interface{}, []byte) {
func jsonMap(v interface{}) (interface{}, []byte) {
var err error
j, err := json.Marshal(v)
if err != nil {
Expand Down
24 changes: 24 additions & 0 deletions restest/natsrequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ type NATSRequest struct {
inb string
}

// NATSRequests represents a slice of requests sent over NATS to the service,
// but that may get responses in undetermined order.
type NATSRequests []*NATSRequest

// Response gets the next pending message that is published to NATS by the
// service.
//
Expand All @@ -19,6 +23,26 @@ func (nr *NATSRequest) Response() *Msg {
return m
}

// Response gets the next pending message that is published to NATS by the
// service, and matches it to one of the requests.
//
// If no message is received within a set amount of time, or if the message is
// not a response to one of the requests, it will log it as a fatal error.
//
// The matching request will be set to nil.
func (nrs NATSRequests) Response(c *MockConn) *Msg {
m := c.GetMsg()
for i := 0; i < len(nrs); i++ {
nr := nrs[i]
if nr != nil && nr.inb == m.Subject {
nrs[i] = nil
return m
}
}
c.t.Fatalf("expected to find request matching response %s, but found none", m.Subject)
return nil
}

// Get sends a get request to the service.
//
// The resource ID, rid, may contain a query part:
Expand Down
53 changes: 42 additions & 11 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ type Handler struct {
// resource name will be used as identifier.
Group string

// Parallel is a flag telling that all requests to the handler may be
// handled in parallel on different goroutines. If set to true, any value in
// Group will be ignored.
Parallel bool

// OnRegister is callback that is to be call when the handler has been
// registered to a service.
//
Expand Down Expand Up @@ -196,7 +201,9 @@ type Service struct {
nc Conn // NATS Server connection
inCh chan *nats.Msg // Channel for incoming nats messages
rwork map[string]*work // map of resource work
workCh chan *work // Resource work channel, listened to by the workers
workqueue []*work // Resource work queue.
workbuf []*work // Underlying buffer of the workqueue
workcond sync.Cond // Cond waited on by workers and signaled when work is added to workqueue
wg sync.WaitGroup // WaitGroup for all workers
mu sync.Mutex // Mutex to protect rwork map
logger logger.Logger // Logger
Expand Down Expand Up @@ -517,6 +524,16 @@ func Group(group string) Option {
})
}

// Parallel sets the parallel flag. All requests for the handler may be handled
// in parallel on different worker goroutines.
//
// If set to true, any value in Group will be ignored.
func Parallel(parallel bool) Option {
return OptionFunc(func(hs *Handler) {
hs.Parallel = parallel
})
}

// OnRegister sets a callback to be called when the handler is registered to a
// service.
//
Expand Down Expand Up @@ -648,14 +665,16 @@ func (s *Service) serve(nc Conn) error {
workCh := make(chan *work, 1)
s.nc = nc
s.inCh = inCh
s.workCh = workCh
s.rwork = make(map[string]*work)
s.workcond = sync.Cond{L: &s.mu}
s.workbuf = make([]*work, s.inChannelSize)
s.workqueue = s.workbuf[:0]
s.rwork = make(map[string]*work, s.inChannelSize)
s.queryTQ = timerqueue.New(s.queryEventExpire, s.queryDuration)

// Start workers
s.wg.Add(s.workerCount)
for i := 0; i < s.workerCount; i++ {
go s.startWorker(s.workCh)
go s.startWorker()
}

atomic.StoreInt32(&s.state, stateStarted)
Expand Down Expand Up @@ -699,7 +718,6 @@ func (s *Service) Shutdown() error {

s.inCh = nil
s.nc = nil
s.workCh = nil

atomic.StoreInt32(&s.state, stateStopped)

Expand All @@ -709,6 +727,11 @@ func (s *Service) Shutdown() error {

// close calls Close on the NATS connection, and closes the incoming channel
func (s *Service) close() {
s.mu.Lock()
s.workqueue = nil
s.mu.Unlock()
s.workcond.Broadcast()

s.nc.Close()
close(s.inCh)
}
Expand Down Expand Up @@ -956,17 +979,25 @@ func (s *Service) runWith(wid string, cb func()) {

s.mu.Lock()
// Get current work queue for the resource
w, ok := s.rwork[wid]
var w *work
var ok bool
if wid != "" {
w, ok = s.rwork[wid]
}
if !ok {
// Create a new work queue and pass it to a worker
w = &work{
s: s,
wid: wid,
queue: []func(){cb},
s: s,
wid: wid,
single: [1]func(){cb},
}
w.queue = w.single[:1]
if wid != "" {
s.rwork[wid] = w
}
s.rwork[wid] = w
s.workqueue = append(s.workqueue, w)
s.mu.Unlock()
s.workCh <- w
s.workcond.Signal()
} else {
// Append callback to existing work queue
w.queue = append(w.queue, cb)
Expand Down
50 changes: 41 additions & 9 deletions test/00service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,7 @@ func TestServiceSetOnServe_ValidCallback_IsCalledOnServe(t *testing.T) {
select {
case <-ch:
case <-time.After(timeoutDuration):
if t == nil {
t.Fatal("expected OnServe callback to be called, but it wasn't")
}
t.Fatal("expected OnServe callback to be called, but it wasn't")
}
})
}
Expand Down Expand Up @@ -257,9 +255,7 @@ func TestServiceWithResource_WithMatchingResource_CallsCallback(t *testing.T) {
select {
case <-ch:
case <-time.After(timeoutDuration):
if t == nil {
t.Fatal("expected WithResource callback to be called, but it wasn't")
}
t.Fatal("expected WithResource callback to be called, but it wasn't")
}
})
}
Expand All @@ -276,9 +272,7 @@ func TestServiceWithGroup_WithMatchingResource_CallsCallback(t *testing.T) {
select {
case <-ch:
case <-time.After(timeoutDuration):
if t == nil {
t.Fatal("expected WithGroup callback to be called, but it wasn't")
}
t.Fatal("expected WithGroup callback to be called, but it wasn't")
}
})
}
Expand Down Expand Up @@ -380,3 +374,41 @@ func TestServiceSetInChannelSize_GreaterThanZero_DoesNotPanic(t *testing.T) {
s.SetInChannelSize(10)
}, nil, restest.WithoutReset)
}

func TestServiceWithParallel_WithMultipleCallsOnSameResource_CallsCallbacksInParallel(t *testing.T) {
ch := make(chan bool)
done := make(chan bool)
runTest(t, func(s *res.Service) {
s.Handle("model",
res.Parallel(true),
res.GetResource(func(r res.GetRequest) {
ch <- true
<-done
r.NotFound()
}),
)
}, func(s *restest.Session) {
// Test getting the same model twice
reqs := restest.NATSRequests{
s.Get("test.model"),
s.Get("test.model"),
s.Get("test.model"),
}

for i := 0; i < len(reqs); i++ {
select {
case <-ch:
case <-time.After(timeoutDuration):
t.Fatal("expected get handler to be called twice in parallel, but it wasn't")
}
}

close(done)

for i := len(reqs); i > 0; i-- {
reqs.
Response(s.MockConn).
AssertError(res.ErrNotFound)
}
})
}
4 changes: 1 addition & 3 deletions test/22query_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,9 +580,7 @@ func TestInvalidQueryResponse(t *testing.T) {
select {
case <-ch:
case <-time.After(timeoutDuration):
if t == nil {
t.Fatal("expected query request to get a query response, but it timed out")
}
t.Fatal("expected query request to get a query response, but it timed out")
}
if t.Failed() {
t.Logf("failed on test idx %d", i)
Expand Down
36 changes: 26 additions & 10 deletions worker.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,49 @@
package res

type work struct {
s *Service
wid string // Worker ID for the work queue
queue []func() // Callback queue
s *Service
wid string // Worker ID for the work queue
single [1]func()
queue []func() // Callback queue
}

// startWorker starts a new resource worker that will listen for resources to
// process requests on.
func (s *Service) startWorker(ch chan *work) {
for w := range ch {
func (s *Service) startWorker() {
s.mu.Lock()
defer s.mu.Unlock()
defer s.wg.Done()
// workqueue being nil signals we the service is closing
for s.workqueue != nil {
for len(s.workqueue) == 0 {
s.workcond.Wait()
if s.workqueue == nil {
return
}
}
w := s.workqueue[0]
if len(s.workqueue) == 1 {
s.workqueue = s.workbuf[:0]
} else {
s.workqueue = s.workqueue[1:]
}
w.processQueue()
}
s.wg.Done()
}

func (w *work) processQueue() {
var f func()
idx := 0

w.s.mu.Lock()
for len(w.queue) > idx {
f = w.queue[idx]
w.s.mu.Unlock()
idx++
f()
w.s.mu.Lock()
}
// Work complete
delete(w.s.rwork, w.wid)
w.s.mu.Unlock()
// Work complete. Delete if it has a work ID.
if w.wid != "" {
delete(w.s.rwork, w.wid)
}
}