-
-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathqueryevent.go
263 lines (230 loc) · 6.42 KB
/
queryevent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
package res
import (
"encoding/json"
"errors"
"fmt"
"strconv"
"time"
nats "github.com/nats-io/nats.go"
)
const queryEventChannelSize = 10
// QueryRequest has methods for responding to query requests.
type QueryRequest interface {
Resource
Model(model interface{})
Collection(collection interface{})
NotFound()
InvalidQuery(message string)
Error(err error)
Timeout(d time.Duration)
}
type queryRequest struct {
resource
msg *nats.Msg
events []resEvent
replied bool // Flag telling if a reply has been made
}
type queryEvent struct {
r resource
sub *nats.Subscription
ch chan *nats.Msg
cb func(r QueryRequest)
}
// Model sends a model response for the query request.
// The model represents the current state of query model
// for the given query.
// Only valid for a query model resource.
func (qr *queryRequest) Model(model interface{}) {
if qr.h.Type == TypeCollection {
panic("res: model response not allowed on query collections")
}
qr.success(modelResponse{Model: model})
}
// Collection sends a collection response for the query request.
// The collection represents the current state of query collection
// for the given query.
// Only valid for a query collection resource.
func (qr *queryRequest) Collection(collection interface{}) {
if qr.h.Type == TypeModel {
panic("res: collection response not allowed on query models")
}
qr.success(collectionResponse{Collection: collection})
}
// ChangeEvent adds a change event to the query response.
// If ev is empty, no event is added.
// Only valid for a query model resource.
func (qr *queryRequest) ChangeEvent(ev map[string]interface{}) {
if qr.h.Type == TypeCollection {
panic("res: change event not allowed on query collections")
}
if len(ev) == 0 {
return
}
qr.events = append(qr.events, resEvent{Event: "change", Data: changeEvent{Values: ev}})
}
// AddEvent adds an add event to the query response,
// adding the value v at index idx.
// Only valid for a query collection resource.
func (qr *queryRequest) AddEvent(v interface{}, idx int) {
if qr.h.Type == TypeModel {
panic("res: add event not allowed on query models")
}
if idx < 0 {
panic("res: add event idx less than zero")
}
qr.events = append(qr.events, resEvent{Event: "add", Data: addEvent{Value: v, Idx: idx}})
}
// RemoveEvent adds a remove event to the query response,
// removing the value at index idx.
// Only valid for a query collection resource.
func (qr *queryRequest) RemoveEvent(idx int) {
if qr.h.Type == TypeModel {
panic("res: remove event not allowed on query models")
}
if idx < 0 {
panic("res: remove event idx less than zero")
}
qr.events = append(qr.events, resEvent{Event: "remove", Data: removeEvent{Idx: idx}})
}
// NotFound sends a system.notFound response for the query request.
func (qr *queryRequest) NotFound() {
qr.reply(responseNotFound)
}
// InvalidQuery sends a system.invalidQuery response for the query request.
// An empty message will default to "Invalid query".
func (qr *queryRequest) InvalidQuery(message string) {
if message == "" {
qr.reply(responseInvalidQuery)
} else {
qr.error(&Error{Code: CodeInvalidQuery, Message: message})
}
}
// Error sends a custom error response for the query request.
func (qr *queryRequest) Error(err error) {
qr.error(ToError(err))
}
// Timeout attempts to set the timeout duration of the query request.
// The call has no effect if the requester has already timed out the request.
func (qr *queryRequest) Timeout(d time.Duration) {
if d < 0 {
panic("res: negative timeout duration")
}
out := []byte(`timeout:"` + strconv.FormatInt(int64(d/time.Millisecond), 10) + `"`)
qr.s.rawEvent(qr.msg.Reply, out)
}
// startQueryListener listens for query requests and passes them on to a worker.
func (qe *queryEvent) startQueryListener() {
for m := range qe.ch {
m := m
qe.r.s.runWith(qe.r.Group(), func() {
qe.handleQueryRequest(m)
})
}
}
// handleQueryRequest is called by the query listener on incoming query requests.
func (qe *queryEvent) handleQueryRequest(m *nats.Msg) {
s := qe.r.s
s.tracef("Q=> %s: %s", qe.r.rname, m.Data)
qr := &queryRequest{
resource: qe.r,
msg: m,
}
var rqr resQueryRequest
var err error
if len(m.Data) > 0 {
err = json.Unmarshal(m.Data, &rqr)
if err != nil {
s.errorf("Error unmarshaling incoming query request: %s", err)
qr.error(ToError(err))
return
}
}
if rqr.Query == "" {
s.errorf("Missing query on incoming query request: %s", m.Data)
qr.reply(responseMissingQuery)
return
}
qr.query = rqr.Query
qr.executeCallback(qe.cb)
if qr.replied {
return
}
var data []byte
if len(qr.events) == 0 {
data = responseNoQueryEvents
} else {
data, err = json.Marshal(successResponse{Result: queryResponse{Events: qr.events}})
if err != nil {
data = responseInternalError
}
}
qr.reply(data)
}
func (qr *queryRequest) executeCallback(cb func(QueryRequest)) {
// Recover from panics inside query event callback
defer func() {
v := recover()
if v == nil {
return
}
var str string
switch e := v.(type) {
case *Error:
if !qr.replied {
qr.error(e)
// Return without logging, as panicing with an *Error is considered
// a valid way of sending an error response.
return
}
str = e.Message
case error:
str = e.Error()
if !qr.replied {
qr.error(ToError(e))
}
case string:
str = e
if !qr.replied {
qr.error(ToError(errors.New(e)))
}
default:
str = fmt.Sprintf("%v", e)
if !qr.replied {
qr.error(ToError(errors.New(str)))
}
}
qr.s.errorf("Error handling query request %s: %s", qr.rname, str)
}()
cb(qr)
}
// error sends an error response as a reply.
func (qr *queryRequest) error(e *Error) {
data, err := json.Marshal(errorResponse{Error: e})
if err != nil {
data = responseInternalError
}
qr.reply(data)
}
// success sends a successful response as a reply.
func (qr *queryRequest) success(result interface{}) {
data, err := json.Marshal(successResponse{Result: result})
if err != nil {
qr.error(ToError(err))
return
}
qr.reply(data)
}
// reply sends an encoded payload to as a reply.
// If a reply is already sent, reply will log an error.
func (qr *queryRequest) reply(payload []byte) {
if qr.replied {
qr.s.errorf("Response already sent on query request %s", qr.rname)
return
}
qr.replied = true
qr.s.tracef("<=Q %s: %s", qr.rname, payload)
err := qr.s.nc.Publish(qr.msg.Reply, payload)
if err != nil {
qr.s.errorf("Error sending query reply %s: %s", qr.rname, err)
}
}