diff --git a/examples/with-tracing/README.md b/examples/with-tracing/README.md index ea1269b..5f00199 100644 --- a/examples/with-tracing/README.md +++ b/examples/with-tracing/README.md @@ -32,6 +32,113 @@ You can access the jaeger dashboard as [jaeger dashboard](http://localhost:16686 You can run the demo as `go run main.go` +```go +package main + +import ( + "context" + "fmt" + "github.com/Trendyol/kafka-konsumer/v2" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/jaeger" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.19.0" + "os" + "os/signal" + "time" +) + +func main() { + jaegerUrl := "http://localhost:14268/api/traces" + tp := initJaegerTracer(jaegerUrl) + defer tp.Shutdown(context.Background()) + + otel.SetTracerProvider(tp) + otel.SetTextMapPropagator(propagation.TraceContext{}) + + // ===============SIMULATE PRODUCER=============== + producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ + Writer: kafka.WriterConfig{ + Brokers: []string{"localhost:29092"}, + }, + DistributedTracingEnabled: true, + }) + + const topicName = "standart-topic" + producedMessage := kafka.Message{ + Topic: topicName, + Key: []byte("1"), + Value: []byte(`{ "foo": "bar" }`), + } + + tr := otel.Tracer("after producing") + parentCtx, span := tr.Start(context.Background(), "before producing work") + time.Sleep(100 * time.Millisecond) + span.End() + + _ = producer.Produce(parentCtx, producedMessage) + + // ===============SIMULATE CONSUMER=============== + consumerCfg := &kafka.ConsumerConfig{ + Reader: kafka.ReaderConfig{ + Brokers: []string{"localhost:29092"}, + Topic: topicName, + GroupID: "standart-cg", + }, + ConsumeFn: consumeFn, + DistributedTracingEnabled: true, + } + + consumer, _ := kafka.NewConsumer(consumerCfg) + defer consumer.Stop() + + consumer.Consume() + + fmt.Println("Consumer started...!") + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + <-c +} + +func consumeFn(message *kafka.Message) error { + fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value)) + + tr := otel.Tracer("consumer") + parentCtx, span := tr.Start(message.Context, "work") + time.Sleep(100 * time.Millisecond) + span.End() + + _, span = tr.Start(parentCtx, "another work") + time.Sleep(50 * time.Millisecond) + span.End() + + return nil +} + +func initJaegerTracer(url string) *trace.TracerProvider { + // Create the Jaeger exporter + exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url))) + if err != nil { + panic("Err initializing jaeger instance" + err.Error()) + } + + tp := trace.NewTracerProvider( + trace.WithBatcher(exp), + trace.WithResource(resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceName("kafka-konsumer-demo"), + attribute.String("environment", "prod"), + )), + ) + + return tp +} +``` + In the producing step, we open only two spans. In the consuming step, we open three spans. You can see their relationship via the jeager dashboard, as shown below. ![Demo Jeager](../../.github/images/jaeger-dashboard-example.png) \ No newline at end of file diff --git a/examples/with-tracing/main.go b/examples/with-tracing/main.go deleted file mode 100644 index 5cf5b92..0000000 --- a/examples/with-tracing/main.go +++ /dev/null @@ -1,104 +0,0 @@ -package main - -import ( - "context" - "fmt" - "github.com/Trendyol/kafka-konsumer/v2" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/exporters/jaeger" - "go.opentelemetry.io/otel/propagation" - "go.opentelemetry.io/otel/sdk/resource" - "go.opentelemetry.io/otel/sdk/trace" - semconv "go.opentelemetry.io/otel/semconv/v1.19.0" - "os" - "os/signal" - "time" -) - -func main() { - jaegerUrl := "http://localhost:14268/api/traces" - tp := initJaegerTracer(jaegerUrl) - defer tp.Shutdown(context.Background()) - - otel.SetTracerProvider(tp) - otel.SetTextMapPropagator(propagation.TraceContext{}) - - // ===============SIMULATE PRODUCER=============== - producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ - Writer: kafka.WriterConfig{ - Brokers: []string{"localhost:29092"}, - }, - DistributedTracingEnabled: true, - }) - - const topicName = "standart-topic" - producedMessage := kafka.Message{ - Topic: topicName, - Key: []byte("1"), - Value: []byte(`{ "foo": "bar" }`), - } - - tr := otel.Tracer("after producing") - parentCtx, span := tr.Start(context.Background(), "before producing work") - time.Sleep(100 * time.Millisecond) - span.End() - - _ = producer.Produce(parentCtx, producedMessage) - - // ===============SIMULATE CONSUMER=============== - consumerCfg := &kafka.ConsumerConfig{ - Reader: kafka.ReaderConfig{ - Brokers: []string{"localhost:29092"}, - Topic: topicName, - GroupID: "standart-cg", - }, - ConsumeFn: consumeFn, - DistributedTracingEnabled: true, - } - - consumer, _ := kafka.NewConsumer(consumerCfg) - defer consumer.Stop() - - consumer.Consume() - - fmt.Println("Consumer started...!") - - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt) - <-c -} - -func consumeFn(message *kafka.Message) error { - fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value)) - - tr := otel.Tracer("consumer") - parentCtx, span := tr.Start(message.Context, "work") - time.Sleep(100 * time.Millisecond) - span.End() - - _, span = tr.Start(parentCtx, "another work") - time.Sleep(50 * time.Millisecond) - span.End() - - return nil -} - -func initJaegerTracer(url string) *trace.TracerProvider { - // Create the Jaeger exporter - exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url))) - if err != nil { - panic("Err initializing jaeger instance" + err.Error()) - } - - tp := trace.NewTracerProvider( - trace.WithBatcher(exp), - trace.WithResource(resource.NewWithAttributes( - semconv.SchemaURL, - semconv.ServiceName("kafka-konsumer-demo"), - attribute.String("environment", "prod"), - )), - ) - - return tp -} diff --git a/go.mod b/go.mod index d186e2c..1c436ea 100644 --- a/go.mod +++ b/go.mod @@ -10,8 +10,6 @@ require ( github.com/prometheus/client_golang v1.16.0 github.com/segmentio/kafka-go v0.4.46 go.opentelemetry.io/otel v1.19.0 - go.opentelemetry.io/otel/exporters/jaeger v1.16.0 - go.opentelemetry.io/otel/sdk v1.19.0 go.opentelemetry.io/otel/trace v1.19.0 go.uber.org/zap v1.24.0 ) diff --git a/go.sum b/go.sum index 2044262..d1b12a1 100644 --- a/go.sum +++ b/go.sum @@ -66,7 +66,6 @@ github.com/segmentio/kafka-go v0.4.46 h1:Sx8/kvtY+/G8nM0roTNnFezSJj3bT2sW0Xy/YY3 github.com/segmentio/kafka-go v0.4.46/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= @@ -85,12 +84,8 @@ github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gi github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs= go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY= -go.opentelemetry.io/otel/exporters/jaeger v1.16.0 h1:YhxxmXZ011C0aDZKoNw+juVWAmEfv/0W2XBOv9aHTaA= -go.opentelemetry.io/otel/exporters/jaeger v1.16.0/go.mod h1:grYbBo/5afWlPpdPZYhyn78Bk04hnvxn2+hvxQhKIQM= go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPiOKwvpE= go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8= -go.opentelemetry.io/otel/sdk v1.19.0 h1:6USY6zH+L8uMH8L3t1enZPR3WFEmSTADlqldyHtJi3o= -go.opentelemetry.io/otel/sdk v1.19.0/go.mod h1:NedEbbS4w3C6zElbLdPJKOpJQOrGUJ+GfzpjUvI0v1A= go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg= go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=