Skip to content
This repository has been archived by the owner on Sep 15, 2023. It is now read-only.

Commit

Permalink
Merge pull request #144 from askuy/feature/kafkatrace
Browse files Browse the repository at this point in the history
kafka custom trace
  • Loading branch information
askuy authored Aug 3, 2021
2 parents f657ecd + 9841728 commit 0d238d2
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 53 deletions.
90 changes: 46 additions & 44 deletions ekafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/gotomicro/ego/core/etrace"
"github.com/gotomicro/ego/core/transport"
"github.com/opentracing/opentracing-go"
"github.com/segmentio/kafka-go"
)
Expand All @@ -25,22 +26,33 @@ type logMessage struct {
HighWaterMark int64
Key string
Value string
Headers []kafka.Header
Headers []logHeader

// If not set at the creation, Time will be automatically set when
// writing the message.
Time time.Time
}
type logHeader struct {
Key string
Value string
}

func messageToLog(value Message) logMessage {
headers := make([]logHeader, 0, len(value.Headers))
for _, val := range value.Headers {
headers = append(headers, logHeader{
Key: val.Key,
Value: string(val.Value),
})
}
return logMessage{
Topic: value.Topic,
Partition: value.Partition,
Offset: value.Offset,
HighWaterMark: value.HighWaterMark,
Key: string(value.Key),
Value: string(value.Value),
Headers: value.Headers,
Headers: headers,
Time: value.Time,
}
}
Expand All @@ -58,16 +70,7 @@ func (m Messages) ToNoPointer() []Message {
func (m Messages) ToLog() []logMessage {
output := make([]logMessage, 0)
for _, value := range m {
output = append(output, logMessage{
Topic: value.Topic,
Partition: value.Partition,
Offset: value.Offset,
HighWaterMark: value.HighWaterMark,
Key: string(value.Key),
Value: string(value.Value),
Headers: value.Headers,
Time: value.Time,
})
output = append(output, messageToLog(*value))
}
return output
}
Expand All @@ -93,22 +96,7 @@ func (r *Consumer) CommitMessages(ctx context.Context, msgs ...*Message) (err er
func (r *Consumer) FetchMessage(ctx context.Context) (msg Message, ctxOutput context.Context, err error) {
err = r.processor(func(ctx context.Context, msgs Messages, c *cmd) error {
msg, err = r.r.FetchMessage(ctx)
// 我也不想这么处理trace。奈何协议头在用户数据里,无能为力。。。
if opentracing.IsGlobalTracerRegistered() {
mds := make(map[string][]string)
for _, value := range msg.Headers {
mds[value.Key] = []string{string(value.Value)}
}
md := etrace.MetadataReaderWriter{MD: mds}
sc, _ := opentracing.GlobalTracer().Extract(opentracing.HTTPHeaders, md)

// 重新赋值ctx
_, ctxOutput = etrace.StartSpanFromContext(
ctx,
"kafka",
opentracing.ChildOf(sc),
)
}
ctxOutput = getCtx(ctx, msg)
logCmd(r.logMode, c, "FetchMessage", cmdWithMsg(msg))
return err
})(ctx, nil, &cmd{})
Expand All @@ -135,22 +123,7 @@ func (r *Consumer) ReadLag(ctx context.Context) (lag int64, err error) {
func (r *Consumer) ReadMessage(ctx context.Context) (msg Message, ctxOutput context.Context, err error) {
err = r.processor(func(ctx context.Context, msgs Messages, c *cmd) error {
msg, err = r.r.ReadMessage(ctx)
// 我也不想这么处理trace。奈何协议头在用户数据里,无能为力。。。
if opentracing.IsGlobalTracerRegistered() {
mds := make(map[string][]string)
for _, value := range msg.Headers {
mds[value.Key] = []string{string(value.Value)}
}
md := etrace.MetadataReaderWriter{MD: mds}
sc, _ := opentracing.GlobalTracer().Extract(opentracing.HTTPHeaders, md)

// 重新赋值ctx
_, ctxOutput = etrace.StartSpanFromContext(
ctx,
"kafka",
opentracing.ChildOf(sc),
)
}
ctxOutput = getCtx(ctx, msg)
logCmd(r.logMode, c, "ReadMessage", cmdWithRes(msg), cmdWithMsg(msg))
return err
})(ctx, nil, &cmd{})
Expand All @@ -170,3 +143,32 @@ func (r *Consumer) SetOffsetAt(ctx context.Context, t time.Time) (err error) {
return r.r.SetOffsetAt(ctx, t)
})(ctx, nil, &cmd{})
}

func getCtx(ctx context.Context, msg Message) context.Context {
ctxOutput := ctx
// 我也不想这么处理trace。奈何协议头在用户数据里,无能为力。。。
if opentracing.IsGlobalTracerRegistered() {
mds := make(map[string][]string)
for _, value := range msg.Headers {
mds[value.Key] = []string{string(value.Value)}
}
md := etrace.MetadataReaderWriter{MD: mds}
sc, _ := opentracing.GlobalTracer().Extract(opentracing.HTTPHeaders, md)

// 重新赋值ctx
_, ctxOutput = etrace.StartSpanFromContext(
ctx,
"kafka",
opentracing.ChildOf(sc),
)

for _, key := range transport.CustomContextKeys() {
for _, value := range msg.Headers {
if value.Key == key {
ctxOutput = transport.WithValue(ctxOutput, value.Key, string(value.Value))
}
}
}
}
return ctxOutput
}
5 changes: 5 additions & 0 deletions ekafka/examples/custometrace/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
run:export EGO_DEBUG=true
run:export EGO_LOG_EXTRA_KEYS=X-Ego-Uid
run:
go run main.go --config=config.toml

13 changes: 13 additions & 0 deletions ekafka/examples/custometrace/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[kafka]
debug = true
EnableAccessInterceptor = true
EnableAccessInterceptorReq = true
EnableAccessInterceptorRes = true
brokers = ["10.8.0.1:9092"]
[kafka.client]
timeout = "3s"
[kafka.producers.p1] # 定义了名字为p1的producer
topic = "test" # 指定生产消息的topic
[kafka.consumers.c1] # 定义了名字为c1的consumer
topic = "test" # 指定消费的topic
groupID = "group-1" # 如果配置了groupID,将初始化为consumerGroup
62 changes: 62 additions & 0 deletions ekafka/examples/custometrace/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package main

import (
"context"
"fmt"
"log"

"github.com/gotomicro/ego"
"github.com/gotomicro/ego-component/ekafka"
"github.com/gotomicro/ego/core/transport"
)

// export EGO_DEBUG=true
func main() {
ego.New().Invoker(func() error {
ctx := context.Background()
ctx = transport.WithValue(ctx, "X-Ego-Uid", 9527)
// 初始化ekafka组件
cmp := ekafka.Load("kafka").Build()
// 使用p1生产者生产消息
produce(ctx, cmp.Producer("p1"))
// 使用c1消费者消费消息
consume(cmp.Consumer("c1"))
return nil
}).Run()

}

// produce 生产消息
func produce(ctx context.Context, w *ekafka.Producer) {
// 生产3条消息
ctx = context.WithValue(ctx, "hello", "world")
err := w.WriteMessages(ctx,
&ekafka.Message{Key: []byte("Key-A"), Value: []byte("Hellohahah World!22222")},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}
if err := w.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
}

// consume 使用consumer/consumerGroup消费消息
func consume(r *ekafka.Consumer) {
ctx := context.Background()
for {
// ReadMessage 再收到下一个Message时,会阻塞
msg, ctxOutput, err := r.ReadMessage(ctx)
if err != nil {
panic("could not read message " + err.Error())
}

// 打印消息
fmt.Println("received headers: ", msg.Headers)
fmt.Println("received: ", string(msg.Value))
err = r.CommitMessages(ctxOutput, &msg)
if err != nil {
log.Printf("fail to commit msg:%v", err)
}
}
}
3 changes: 2 additions & 1 deletion ekafka/examples/traceinterceptor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/gotomicro/ego"
"github.com/gotomicro/ego-component/ekafka"
"github.com/gotomicro/ego/core/etrace"
)

func main() {
Expand Down Expand Up @@ -54,7 +55,7 @@ func consume(r *ekafka.Consumer) {
if err != nil {
panic("could not read message " + err.Error())
}

fmt.Printf("etrace.ExtractTraceID(ctxOutput)--------------->"+"%+v\n", etrace.ExtractTraceID(ctxOutput))
// 打印消息
fmt.Println("received headers: ", msg.Headers)
fmt.Println("received: ", string(msg.Value))
Expand Down
1 change: 1 addition & 0 deletions ekafka/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/klauspost/compress v1.12.1 // indirect
github.com/opentracing/opentracing-go v1.1.0 // indirect
github.com/segmentio/kafka-go v0.4.17
github.com/spf13/cast v1.3.1 // indirect
github.com/stretchr/testify v1.7.0
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee // indirect
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect
Expand Down
1 change: 1 addition & 0 deletions ekafka/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ github.com/shirou/gopsutil v3.21.3+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/spf13/cast v1.3.1 h1:nFm6S0SMdyzrzcmThSipiEubIDy8WEXKNZ0UOgiRpng=
github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
35 changes: 29 additions & 6 deletions ekafka/interceptor_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@ import (
"strings"
"time"

"github.com/gotomicro/ego/core/eapp"
"github.com/gotomicro/ego/core/elog"
"github.com/gotomicro/ego/core/emetric"
"github.com/gotomicro/ego/core/etrace"
"github.com/gotomicro/ego/core/transport"
"github.com/gotomicro/ego/core/util/xdebug"
"github.com/gotomicro/ego/core/util/xstring"
"github.com/opentracing/opentracing-go"
"github.com/segmentio/kafka-go"

"github.com/gotomicro/ego/core/eapp"
"github.com/gotomicro/ego/core/util/xdebug"
"github.com/spf13/cast"
)

type ctxStartTimeKey struct{}
Expand Down Expand Up @@ -62,6 +63,8 @@ func traceClientInterceptor(compName string, c *config) ClientInterceptor {
_, ctx = etrace.StartSpanFromContext(
ctx,
"kafka",
etrace.TagSpanKind("client"),
etrace.TagComponent("kafka"),
)
md := etrace.MetadataReaderWriter{MD: map[string][]string{}}
span := opentracing.SpanFromContext(ctx)
Expand All @@ -75,7 +78,7 @@ func traceClientInterceptor(compName string, c *config) ClientInterceptor {
return nil
})
for _, value := range msgs {
value.Headers = headers
value.Headers = append(value.Headers, headers...)
value.Time = time.Now()
}
err := next(ctx, msgs, cmd)
Expand All @@ -87,11 +90,30 @@ func traceClientInterceptor(compName string, c *config) ClientInterceptor {
func accessClientInterceptor(compName string, c *config) ClientInterceptor {
return func(next clientProcessFn) clientProcessFn {
return func(ctx context.Context, msgs Messages, cmd *cmd) error {
loggerKeys := transport.CustomContextKeys()
fields := make([]elog.Field, 0, 10+len(loggerKeys))

if c.EnableAccessInterceptor {

headers := make([]kafka.Header, 0)
for _, key := range loggerKeys {
if value := cast.ToString(transport.Value(ctx, key)); value != "" {
fields = append(fields, elog.FieldCustomKeyValue(key, value))
headers = append(headers, kafka.Header{
Key: key,
Value: []byte(value),
})
}
}
for _, value := range msgs {
value.Headers = append(value.Headers, headers...)
value.Time = time.Now()
}
}

err := next(ctx, msgs, cmd)
cost := time.Since(ctx.Value(ctxStartTimeKey{}).(time.Time))
if c.EnableAccessInterceptor {
var fields = make([]elog.Field, 0, 10)

fields = append(fields,
elog.FieldMethod(cmd.name),
elog.FieldCost(cost),
Expand All @@ -107,6 +129,7 @@ func accessClientInterceptor(compName string, c *config) ClientInterceptor {
if c.EnableAccessInterceptorRes {
fields = append(fields, elog.Any("res", json.RawMessage(xstring.JSON(cmd.res))))
}

elog.Info("access", fields...)
}

Expand Down
16 changes: 14 additions & 2 deletions ekafka/interceptor_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/gotomicro/ego/core/elog"
"github.com/gotomicro/ego/core/emetric"
"github.com/gotomicro/ego/core/etrace"
"github.com/gotomicro/ego/core/transport"
"github.com/gotomicro/ego/core/util/xdebug"
"github.com/gotomicro/ego/core/util/xstring"
"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -79,8 +80,11 @@ func accessServerInterceptor(compName string, c *config) ServerInterceptor {
return func(next serverProcessFn) serverProcessFn {
return func(ctx context.Context, msgs Messages, cmd *cmd) error {
err := next(ctx, msgs, cmd)
// 为了性能考虑,如果要加日志字段,需要改变slice大小
loggerKeys := transport.CustomContextKeys()
// kafka 比较坑爹,合在一起处理链路
if c.EnableTraceInterceptor {

mds := make(map[string][]string)
for _, value := range cmd.msg.Headers {
mds[value.Key] = []string{string(value.Value)}
Expand All @@ -98,7 +102,7 @@ func accessServerInterceptor(compName string, c *config) ServerInterceptor {

cost := time.Since(ctx.Value(ctxStartTimeKey{}).(time.Time))
if c.EnableAccessInterceptor {
var fields = make([]elog.Field, 0, 10)
var fields = make([]elog.Field, 0, 10+len(loggerKeys))

fields = append(fields,
elog.FieldMethod(cmd.name),
Expand All @@ -108,6 +112,14 @@ func accessServerInterceptor(compName string, c *config) ServerInterceptor {
// 开启了链路,那么就记录链路id
if c.EnableTraceInterceptor && opentracing.IsGlobalTracerRegistered() {
fields = append(fields, elog.FieldTid(etrace.ExtractTraceID(ctx)))

for _, key := range loggerKeys {
for _, value := range cmd.msg.Headers {
if value.Key == key {
fields = append(fields, elog.FieldCustomKeyValue(key, string(value.Value)))
}
}
}
}
if c.EnableAccessInterceptorReq {
fields = append(fields, elog.Any("req", json.RawMessage(xstring.JSON(msgs.ToLog()))))
Expand All @@ -127,7 +139,7 @@ func accessServerInterceptor(compName string, c *config) ServerInterceptor {
)
} else {
log.Println("[ekafka.response]", xdebug.MakeReqResInfo(compName,
fmt.Sprintf("%v", c.Brokers), cost, fmt.Sprintf("%s %v", cmd.name, xstring.JSON(msgs)), xstring.JSON(messageToLog(cmd.msg))),
fmt.Sprintf("%v", c.Brokers), cost, fmt.Sprintf("%s %v", cmd.name, xstring.JSON(msgs.ToLog())), xstring.JSON(messageToLog(cmd.msg))),
)
}
return err
Expand Down

0 comments on commit 0d238d2

Please sign in to comment.