Skip to content

Commit

Permalink
adding all logging context data to worker and saga log entries (#215)
Browse files Browse the repository at this point in the history
* adding all logging context data to worker and saga log entries

* added logging with context when command or reply received for saga def but no saga correlation id found

* removing minor discrepancies and updating documentation
  • Loading branch information
Guy Baron authored Oct 20, 2019
1 parent 657037d commit f6ae8c5
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 81 deletions.
6 changes: 5 additions & 1 deletion docs/LOGGING.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ annotated with the following contextual data (added as logrus fields to the log
allowing for a better debugging experience.

- _service: the service name
- correlation_id: the correlation id set for the message
- exchange: the exchange the message was published to
- handler_name: the name of the handler being invoked
- idempotency_key: the idempotency key set for the message
- message_id: the id of the processed message
- message_name: the type of the message that is being processed
- routing_key: the routing_key of the message
- saga_id: the id of the saga instance being invoked
- saga_def: the type of the saga that is being invoked
- saga_def: the type of the saga that is being invoked
- worker: the worker identifier that is processing the message

```go

Expand Down
15 changes: 15 additions & 0 deletions gbus/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/opentracing/opentracing-go/log"
"github.com/rs/xid"
"github.com/sirupsen/logrus"
"github.com/streadway/amqp"
)

Expand Down Expand Up @@ -109,6 +110,20 @@ func (bm *BusMessage) GetTraceLog() (fields []log.Field) {
}
}

func GetDeliveryLogEntries(delivery amqp.Delivery) logrus.Fields {

return logrus.Fields{
"message_name": castToString(delivery.Headers["x-msg-name"]),
"message_id": delivery.MessageId,
"routing_key": delivery.RoutingKey,
"exchange": delivery.Exchange,
"idempotency_key": castToString(delivery.Headers["x-idempotency-key"]),
"correlation_id": castToString(delivery.CorrelationId),
"rpc_id": castToString(delivery.Headers["x-grabbit-msg-rpc-id"]),
}

}

func castToString(i interface{}) string {
v, ok := i.(string)
if !ok {
Expand Down
34 changes: 20 additions & 14 deletions gbus/saga/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,24 +108,25 @@ func (imsm *Glue) handleNewSaga(def *Def, invocation gbus.Invocation, message *g
newInstance.StartedByRPCID = message.RPCID
newInstance.StartedByMessageID = message.ID

imsm.Log().
WithFields(logrus.Fields{"saga_def": def.String(), "saga_id": newInstance.ID}).
logInContext := invocation.Log().WithFields(logrus.Fields{"saga_def": def.String(), "saga_id": newInstance.ID})

logInContext.
Info("created new saga")
if invkErr := imsm.invokeSagaInstance(def, newInstance, invocation, message); invkErr != nil {
imsm.Log().WithError(invkErr).WithField("saga_id", newInstance.ID).Error("failed to invoke saga")
logInContext.Error("failed to invoke saga")
return invkErr
}

if !newInstance.isComplete() {
imsm.Log().WithField("saga_id", newInstance.ID).Info("saving new saga")
logInContext.Info("saving new saga")

if e := imsm.sagaStore.SaveNewSaga(invocation.Tx(), def.sagaType, newInstance); e != nil {
imsm.Log().WithError(e).WithField("saga_id", newInstance.ID).Error("saving new saga failed")
logInContext.Error("saving new saga failed")
return e
}

if requestsTimeout, duration := newInstance.requestsTimeout(); requestsTimeout {
imsm.Log().WithFields(logrus.Fields{"saga_id": newInstance.ID, "timeout_duration": duration}).Info("new saga requested timeout")
logInContext.WithField("timeout_duration", duration).Info("new saga requested timeout")
if tme := imsm.timeoutManager.RegisterTimeout(invocation.Tx(), newInstance.ID, duration); tme != nil {
return tme
}
Expand All @@ -152,15 +153,19 @@ func (imsm *Glue) SagaHandler(invocation gbus.Invocation, message *gbus.BusMessa
4) Else if message is not an event drop it (cmd messages should have 1 specific target)
5) Else iterate over all instances and invoke the needed handler
*/
logInContext := invocation.Log().WithFields(
logrus.Fields{"saga_def": def.String(),
"saga_type": def.sagaType})
startNew := def.shouldStartNewSaga(message)
if startNew {
return imsm.handleNewSaga(def, invocation, message)

} else if message.SagaCorrelationID != "" {
instance, getErr := imsm.sagaStore.GetSagaByID(invocation.Tx(), message.SagaCorrelationID)

logInContext = logInContext.WithField("saga_correlation_id", message.SagaCorrelationID)
if getErr != nil {
imsm.Log().WithError(getErr).WithField("saga_id", message.SagaCorrelationID).Error("failed to fetch saga by id")
logInContext.Error("failed to fetch saga by id")
return getErr
}
if instance == nil {
Expand All @@ -173,34 +178,35 @@ func (imsm *Glue) SagaHandler(invocation gbus.Invocation, message *gbus.BusMessa
https://github.com/wework/grabbit/issues/196
*/
imsm.Log().WithField("saga_correlation_id", message.SagaCorrelationID).Warn("message routed with SagaCorrelationID but no saga instance with the same id found")
logInContext.Warn("message routed with SagaCorrelationID but no saga instance with the same id found")
return nil
}
logInContext = logInContext.WithField("saga_id", instance.ID)
def.configureSaga(instance)
if invkErr := imsm.invokeSagaInstance(def, instance, invocation, message); invkErr != nil {
imsm.Log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga")
logInContext.WithError(invkErr).Error("failed to invoke saga")
return invkErr
}

return imsm.completeOrUpdateSaga(invocation.Tx(), instance)

} else if message.Semantics == gbus.CMD {
e := fmt.Errorf("Warning:Command or Reply message with no saga reference received. message will be dropped.\nmessage as of type:%v", reflect.TypeOf(message).Name())
return e
logInContext.Warn("command or reply message with no saga reference received")
return errors.New("can not resolve saga instance for message")
} else {

imsm.Log().WithFields(logrus.Fields{"saga_type": def.sagaType, "message": msgName}).Info("fetching saga instances by type")
logInContext.Info("fetching saga instances by type")
instances, e := imsm.sagaStore.GetSagasByType(invocation.Tx(), def.sagaType)

if e != nil {
return e
}
imsm.Log().WithFields(logrus.Fields{"message": msgName, "instances_fetched": len(instances)}).Info("fetched saga instances")
logInContext.WithFields(logrus.Fields{"instances_fetched": len(instances)}).Info("fetched saga instances")

for _, instance := range instances {
def.configureSaga(instance)
if invkErr := imsm.invokeSagaInstance(def, instance, invocation, message); invkErr != nil {
imsm.Log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga")
logInContext.WithError(invkErr).Error("failed to invoke saga")
return invkErr
}
e = imsm.completeOrUpdateSaga(invocation.Tx(), instance)
Expand Down
Loading

0 comments on commit f6ae8c5

Please sign in to comment.