Skip to content

Commit

Permalink
Support for traceparent correlation header (resgateio#4)
Browse files Browse the repository at this point in the history
* Support for traceparent correlation header

When receiving a NATS header containing a traceparent id then we relay
it to subsequent collection requests to allow the server application to
correlate them.
  • Loading branch information
lmendes86 authored Jul 12, 2023
1 parent 94c6c63 commit b448b78
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 59 deletions.
21 changes: 13 additions & 8 deletions nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,12 @@ func (c *Client) onError(conn *nats.Conn, sub *nats.Subscription, err error) {
}

// SendRequest sends a request to the MQ.
func (c *Client) SendRequest(subj string, payload []byte, cb mq.Response) {
func (c *Client) SendRequest(subj string, payload []byte, cb mq.Response, requestHeaders map[string][]string) {
inbox := nats.NewInbox()

// Validate max control line size
if len(subj)+len(inbox) > nats.MAX_CONTROL_LINE_SIZE {
go cb("", nil, mq.ErrSubjectTooLong)
go cb("", nil, nil, mq.ErrSubjectTooLong)
return
}

Expand All @@ -200,15 +200,20 @@ func (c *Client) SendRequest(subj string, payload []byte, cb mq.Response) {

sub, err := c.mq.ChanSubscribe(inbox, c.mqCh)
if err != nil {
go cb("", nil, err)
go cb("", nil, nil, err)
return
}
c.Tracef("<== (%s) %s: %s", inboxSubstr(inbox), subj, payload)

err = c.mq.PublishRequest(subj, inbox, payload)
natsMsg := nats.NewMsg(subj)
natsMsg.Reply = inbox
natsMsg.Data = payload
natsMsg.Header = requestHeaders
err = c.mq.PublishMsg(natsMsg)

if err != nil {
sub.Unsubscribe()
go cb("", nil, err)
go cb("", nil, nil, err)
return
}

Expand Down Expand Up @@ -279,14 +284,14 @@ func (c *Client) listener(ch chan *nats.Msg, stopped chan struct{}) {
// Handle no responders header, if available
if len(msg.Data) == 0 && msg.Header.Get("Status") == "503" {
c.Tracef("x=> (%s) No responders", inboxSubstr(msg.Subject))
rc.f("", nil, mq.ErrNoResponders)
rc.f("", nil, nil, mq.ErrNoResponders)
continue
}
c.Tracef("==> (%s): %s", inboxSubstr(msg.Subject), msg.Data)
} else {
c.Tracef("=>> %s: %s", msg.Subject, msg.Data)
}
rc.f(msg.Subject, msg.Data, nil)
rc.f(msg.Subject, msg.Data, msg.Header, nil)
}
}

Expand Down Expand Up @@ -333,7 +338,7 @@ func (c *Client) onTimeout(v interface{}) {
sub.Unsubscribe()

c.Tracef("x=> (%s) Request timeout", inboxSubstr(sub.Subject))
rc.f("", nil, mq.ErrRequestTimeout)
rc.f("", nil, nil, mq.ErrRequestTimeout)
}

func inboxSubstr(s string) string {
Expand Down
4 changes: 2 additions & 2 deletions server/mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package mq
import "github.com/resgateio/resgate/server/reserr"

// Response sends a response to the messaging system
type Response func(subj string, payload []byte, err error)
type Response func(subj string, payload []byte, responseHeaders map[string][]string, err error)

// Unsubscriber is the interface that wraps the basic Unsubscribe method
type Unsubscriber interface {
Expand All @@ -18,7 +18,7 @@ type Client interface {

// SendRequest sends an asynchronous request on a subject, expecting the Response
// callback to be called once on a separate go routine.
SendRequest(subject string, payload []byte, cb Response)
SendRequest(subject string, payload []byte, cb Response, requestHeaders map[string][]string)

// Subscribe to all events on a resource namespace.
// The namespace has the format "event."+resource
Expand Down
22 changes: 11 additions & 11 deletions server/rescache/eventSubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (e *EventSubscription) getResourceSubscription(q string) (rs *ResourceSubsc
return
}

func (e *EventSubscription) addSubscriber(sub Subscriber, t *Throttle) {
func (e *EventSubscription) addSubscriber(sub Subscriber, t *Throttle, requestHeaders map[string][]string) {
e.Enqueue(func() {
var rs *ResourceSubscription
q := sub.ResourceQuery()
Expand All @@ -88,15 +88,15 @@ func (e *EventSubscription) addSubscriber(sub Subscriber, t *Throttle) {
payload := codec.CreateGetRequest(q)
// Request directly if we don't throttle, or else add to throttle
if t == nil {
e.cache.mq.SendRequest(subj, payload, func(_ string, data []byte, err error) {
rs.enqueueGetResponse(data, err)
})
e.cache.mq.SendRequest(subj, payload, func(_ string, data []byte, responseHeaders map[string][]string, err error) {
rs.enqueueGetResponse(data, responseHeaders, err)
}, requestHeaders)
} else {
t.Add(func() {
e.cache.mq.SendRequest(subj, payload, func(_ string, data []byte, err error) {
rs.enqueueGetResponse(data, err)
e.cache.mq.SendRequest(subj, payload, func(_ string, data []byte, responseHeaders map[string][]string, err error) {
rs.enqueueGetResponse(data, responseHeaders, err)
t.Done()
})
}, requestHeaders)
})
}

Expand All @@ -112,13 +112,13 @@ func (e *EventSubscription) addSubscriber(sub Subscriber, t *Throttle) {
metrics.SubcriptionsCount.WithLabelValues(metrics.SanitizedString(e.ResourceName)).Dec()
e.mu.Unlock()
defer e.mu.Lock()
sub.Loaded(nil, rs.err)
sub.Loaded(nil, nil, rs.err)

// stateModel or stateCollection
default:
e.mu.Unlock()
defer e.mu.Lock()
sub.Loaded(rs, nil)
sub.Loaded(rs, nil, nil)
}
})
}
Expand Down Expand Up @@ -286,7 +286,7 @@ func (e *EventSubscription) handleQueryEvent(subj string, payload []byte) {
}
payload := codec.CreateEventQueryRequest(q)
rs := rs
e.cache.mq.SendRequest(qe.Subject, payload, func(subj string, data []byte, err error) {
e.cache.mq.SendRequest(qe.Subject, payload, func(subj string, data []byte, requestHeaders map[string][]string, err error) {
e.enqueueUnlock(func() {
if err != nil {
return
Expand Down Expand Up @@ -327,7 +327,7 @@ func (e *EventSubscription) handleQueryEvent(subj string, payload []byte) {
rs.processResetCollection(result.Collection)
}
})
})
}, nil)
}
}

Expand Down
28 changes: 14 additions & 14 deletions server/rescache/rescache.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Cache struct {
// Subscriber interface represents a subscription made on a client connection
type Subscriber interface {
CID() string
Loaded(resourceSub *ResourceSubscription, err error)
Loaded(resourceSub *ResourceSubscription, responseHeaders map[string][]string, err error)
Event(event *ResourceEvent)
ResourceName() string
ResourceQuery() string
Expand Down Expand Up @@ -99,7 +99,7 @@ func (c *Cache) Start() error {
go c.startWorker(inCh)
}

resetSub, err := c.mq.Subscribe("system", func(subj string, payload []byte, _ error) {
resetSub, err := c.mq.Subscribe("system", func(subj string, payload []byte, responseHeaders map[string][]string, _ error) {
ev := subj[7:]
switch ev {
case "reset":
Expand Down Expand Up @@ -131,14 +131,14 @@ func (c *Cache) Errorf(format string, v ...interface{}) {

// Subscribe fetches a resource from the cache, and if it is
// not cached, starts subscribing to the resource and sends a get request
func (c *Cache) Subscribe(sub Subscriber, t *Throttle) {
func (c *Cache) Subscribe(sub Subscriber, t *Throttle, requestHeaders map[string][]string) {
eventSub, err := c.getSubscription(sub.ResourceName(), true)
if err != nil {
sub.Loaded(nil, err)
sub.Loaded(nil, nil, err)
return
}

eventSub.addSubscriber(sub, t)
eventSub.addSubscriber(sub, t, requestHeaders)
}

// Access sends an access request
Expand All @@ -154,7 +154,7 @@ func (c *Cache) Access(sub Subscriber, token interface{}, callback func(access *

access, rerr := codec.DecodeAccessResponse(data)
callback(&Access{AccessResult: access, Error: rerr})
})
}, nil)
}

// Call sends a method call request
Expand Down Expand Up @@ -183,7 +183,7 @@ func (c *Cache) Call(req codec.Requester, rname, query, action string, token, pa
}

callback(codec.DecodeCallResponse(data))
})
}, nil)
}

// Auth sends an auth method call
Expand All @@ -197,30 +197,30 @@ func (c *Cache) Auth(req codec.AuthRequester, rname, query, action string, token
}

callback(codec.DecodeCallResponse(data))
})
}, nil)
}

// CustomAuth sends an auth method call to a custom subject
func (c *Cache) CustomAuth(req codec.AuthRequester, subj, query string, token, params interface{}, callback func(result json.RawMessage, rid string, err error)) {
payload := codec.CreateAuthRequest(params, req, query, token)
c.mq.SendRequest(subj, payload, func(_ string, data []byte, err error) {
c.mq.SendRequest(subj, payload, func(_ string, data []byte, responseHeaders map[string][]string, err error) {
if err != nil {
callback(nil, "", err)
return
}

callback(codec.DecodeCallResponse(data))
})
}, nil)
}

func (c *Cache) sendRequest(rname, subj string, payload []byte, cb func(data []byte, err error)) {
func (c *Cache) sendRequest(rname, subj string, payload []byte, cb func(data []byte, err error), requestHeaders map[string][]string) {
eventSub, _ := c.getSubscription(rname, false)
c.mq.SendRequest(subj, payload, func(_ string, data []byte, err error) {
c.mq.SendRequest(subj, payload, func(_ string, data []byte, responseHeaders map[string][]string, err error) {
eventSub.Enqueue(func() {
cb(data, err)
eventSub.removeCount(1)
})
})
}, requestHeaders)
}

// AddConn adds a connection listening to events such as system token reset
Expand Down Expand Up @@ -263,7 +263,7 @@ func (c *Cache) getSubscription(name string, subscribe bool) (*EventSubscription
}

if subscribe && eventSub.mqSub == nil {
mqSub, err := c.mq.Subscribe("event."+name, func(subj string, payload []byte, _ error) {
mqSub, err := c.mq.Subscribe("event."+name, func(subj string, payload []byte, responseHeaders map[string][]string, _ error) {
eventSub.enqueueEvent(subj, payload)
})
if err != nil {
Expand Down
14 changes: 7 additions & 7 deletions server/rescache/resourceSubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,19 +307,19 @@ func (rs *ResourceSubscription) handleEventDelete(r *ResourceEvent) {
rs.e.mu.Lock()
}

func (rs *ResourceSubscription) enqueueGetResponse(data []byte, err error) {
func (rs *ResourceSubscription) enqueueGetResponse(data []byte, responseHeaders map[string][]string, err error) {
rs.e.Enqueue(func() {
rs, sublist := rs.processGetResponse(data, err)

rs.e.mu.Unlock()
defer rs.e.mu.Lock()
if rs.state == stateError {
for _, sub := range sublist {
sub.Loaded(nil, rs.err)
sub.Loaded(nil, responseHeaders, rs.err)
}
} else {
for _, sub := range sublist {
sub.Loaded(rs, nil)
sub.Loaded(rs, responseHeaders, nil)
}
}
})
Expand Down Expand Up @@ -446,21 +446,21 @@ func (rs *ResourceSubscription) handleResetResource(t *Throttle) {

if t != nil {
t.Add(func() {
rs.e.cache.mq.SendRequest(subj, payload, func(_ string, data []byte, err error) {
rs.e.cache.mq.SendRequest(subj, payload, func(_ string, data []byte, responseHeaders map[string][]string, err error) {
rs.e.Enqueue(func() {
rs.resetting = false
rs.processResetGetResponse(data, err)
})
t.Done()
})
}, nil)
})
} else {
rs.e.cache.mq.SendRequest(subj, payload, func(_ string, data []byte, err error) {
rs.e.cache.mq.SendRequest(subj, payload, func(_ string, data []byte, responseHeaders map[string][]string, err error) {
rs.e.Enqueue(func() {
rs.resetting = false
rs.processResetGetResponse(data, err)
})
})
}, nil)
}
}

Expand Down
22 changes: 19 additions & 3 deletions server/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type ConnSubscriber interface {
Errorf(format string, v ...interface{})
CID() string
Token() json.RawMessage
Subscribe(rid string, direct bool, throttle *rescache.Throttle) (*Subscription, error)
Subscribe(rid string, direct bool, throttle *rescache.Throttle, headers map[string][]string) (*Subscription, error)
Unsubscribe(sub *Subscription, direct bool, count int, tryDelete bool)
Access(sub *Subscription, callback func(*rescache.Access))
Send(data []byte)
Expand Down Expand Up @@ -54,6 +54,7 @@ type Subscription struct {
accessCallbacks []func(*rescache.Access)
flags uint8
throttle *rescache.Throttle
traceparent string

// Protected by conn
direct int // Number of direct subscriptions
Expand Down Expand Up @@ -185,7 +186,7 @@ func (s *Subscription) Ref(rid string) *Subscription {
// Loaded is called by rescache when the subscribed resource has been loaded.
// If the resource was successfully loaded, err will be nil. If an error occurred
// when loading the resource, resourceSub will be nil, and err will be the error.
func (s *Subscription) Loaded(resourceSub *rescache.ResourceSubscription, err error) {
func (s *Subscription) Loaded(resourceSub *rescache.ResourceSubscription, responseHeaders map[string][]string, err error) {
if !s.c.Enqueue(func() {
if err != nil {
s.err = err
Expand All @@ -198,6 +199,14 @@ func (s *Subscription) Loaded(resourceSub *rescache.ResourceSubscription, err er
return
}

// We check if we received a traceparent header in the response. If we did, we set it on the subscription.
if responseHeaders != nil {
val, ok := responseHeaders["traceparent"]
if ok && len(val) > 0 && s.traceparent == "" {
s.traceparent = val[0]
}
}

s.resourceSub = resourceSub
s.typ = resourceSub.GetResourceType()
s.state = stateLoaded
Expand Down Expand Up @@ -510,8 +519,15 @@ func (s *Subscription) addReference(rid string) (*Subscription, error) {
ref = refs[rid]
}

requestHeaders := make(map[string][]string)

// If the subscription has a traceparent, add it to the subsiquent requests as a request header.
if s.traceparent != "" {
requestHeaders["traceparent"] = []string{s.traceparent}
}

if ref == nil {
sub, err := s.c.Subscribe(rid, false, s.throttle)
sub, err := s.c.Subscribe(rid, false, s.throttle, requestHeaders)

if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit b448b78

Please sign in to comment.