Skip to content
This repository has been archived by the owner on Sep 15, 2023. It is now read-only.

Commit

Permalink
Merge pull request #145 from askuy/feature/kafkatrace
Browse files Browse the repository at this point in the history
kafka custom trace
  • Loading branch information
askuy authored Aug 3, 2021
2 parents 0d238d2 + b227a6e commit 978e07f
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 7 deletions.
4 changes: 2 additions & 2 deletions ekafka/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ func (c *Container) Build(options ...Option) *Component {
// 放第一个时间才准确
options = append(options, WithClientInterceptor(fixedClientInterceptor(c.name, c.config)))
options = append(options, WithClientInterceptor(traceClientInterceptor(c.name, c.config)))
options = append(options, WithClientInterceptor(accessClientInterceptor(c.name, c.config)))
options = append(options, WithClientInterceptor(accessClientInterceptor(c.name, c.config, c.logger)))
if c.config.EnableMetricInterceptor {
options = append(options, WithClientInterceptor(metricClientInterceptor(c.name, c.config)))
}

options = append(options, WithServerInterceptor(fixedServerInterceptor(c.name, c.config)))
options = append(options, WithServerInterceptor(traceServerInterceptor(c.name, c.config)))
options = append(options, WithServerInterceptor(accessServerInterceptor(c.name, c.config)))
options = append(options, WithServerInterceptor(accessServerInterceptor(c.name, c.config, c.logger)))
if c.config.EnableMetricInterceptor {
options = append(options, WithServerInterceptor(metricServerInterceptor(c.name, c.config)))
}
Expand Down
5 changes: 2 additions & 3 deletions ekafka/interceptor_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func traceClientInterceptor(compName string, c *config) ClientInterceptor {
}
}

func accessClientInterceptor(compName string, c *config) ClientInterceptor {
func accessClientInterceptor(compName string, c *config, logger *elog.Component) ClientInterceptor {
return func(next clientProcessFn) clientProcessFn {
return func(ctx context.Context, msgs Messages, cmd *cmd) error {
loggerKeys := transport.CustomContextKeys()
Expand Down Expand Up @@ -129,8 +129,7 @@ func accessClientInterceptor(compName string, c *config) ClientInterceptor {
if c.EnableAccessInterceptorRes {
fields = append(fields, elog.Any("res", json.RawMessage(xstring.JSON(cmd.res))))
}

elog.Info("access", fields...)
logger.Info("access", fields...)
}

if !eapp.IsDevelopmentMode() {
Expand Down
4 changes: 2 additions & 2 deletions ekafka/interceptor_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func traceServerInterceptor(compName string, c *config) ServerInterceptor {
}
}

func accessServerInterceptor(compName string, c *config) ServerInterceptor {
func accessServerInterceptor(compName string, c *config, logger *elog.Component) ServerInterceptor {
return func(next serverProcessFn) serverProcessFn {
return func(ctx context.Context, msgs Messages, cmd *cmd) error {
err := next(ctx, msgs, cmd)
Expand Down Expand Up @@ -127,7 +127,7 @@ func accessServerInterceptor(compName string, c *config) ServerInterceptor {
if c.EnableAccessInterceptorRes {
fields = append(fields, elog.Any("res", json.RawMessage(xstring.JSON(messageToLog(cmd.msg)))))
}
elog.Info("access", fields...)
logger.Info("access", fields...)
}

if !eapp.IsDevelopmentMode() {
Expand Down

0 comments on commit 978e07f

Please sign in to comment.