Skip to content

Commit

Permalink
Add ConfluentKafka
Browse files Browse the repository at this point in the history
  • Loading branch information
g7ed6e committed Dec 12, 2023
1 parent ec811d8 commit eb51913
Show file tree
Hide file tree
Showing 23 changed files with 1,215 additions and 0 deletions.
1 change: 1 addition & 0 deletions build/Common.props
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
<OpenTelemetryCoreLatestVersion>[1.6.0,2.0)</OpenTelemetryCoreLatestVersion>
<OpenTelemetryCoreLatestPrereleaseVersion>[1.7.0-rc.1]</OpenTelemetryCoreLatestPrereleaseVersion>
<StackExchangeRedisPkgVer>[2.1.58,3.0)</StackExchangeRedisPkgVer>
<ConfluentKafkaPkgVer>[2.3.0,3.0)</ConfluentKafkaPkgVer>
<CassandraCSharpDriverPkgVer>[3.16.0,4.0)</CassandraCSharpDriverPkgVer>
<StyleCopAnalyzersPkgVer>[1.2.0-beta.507,2.0)</StyleCopAnalyzersPkgVer>
<SystemNetHttp>[4.3.4,)</SystemNetHttp>
Expand Down
33 changes: 33 additions & 0 deletions build/Projects/OpenTelemetry.Instrumentation.ConfluentKafka.proj
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<Project>

<PropertyGroup>
<RepoRoot>$([System.IO.Directory]::GetParent($(MSBuildThisFileDirectory)).Parent.Parent.FullName)</RepoRoot>
</PropertyGroup>

<ItemGroup>
<SolutionProjects Include="$(RepoRoot)\src\OpenTelemetry.Instrumentation.ConfluentKafka\OpenTelemetry.Instrumentation.ConfluentKafka.csproj" />
<SolutionProjects Include="$(RepoRoot)\test\OpenTelemetry.Instrumentation.ConfluentKafka.Tests\OpenTelemetry.Instrumentation.ConfluentKafka.Tests.csproj" />
<SolutionProjects Include="$(RepoRoot)\examples\redis\**\*.csproj" />

<PackProjects Include="$(RepoRoot)\src\OpenTelemetry.Instrumentation.ConfluentKafka\OpenTelemetry.Instrumentation.ConfluentKafka.csproj" />

<TestProjects Include="$(RepoRoot)\test\OpenTelemetry.Instrumentation.ConfluentKafka.Tests\OpenTelemetry.Instrumentation.ConfluentKafka.Tests.csproj" />
</ItemGroup>

<Target Name="Build">
<MSBuild Projects="@(SolutionProjects)" Targets="Build" ContinueOnError="ErrorAndStop" />
</Target>

<Target Name="Restore">
<MSBuild Projects="@(SolutionProjects)" Targets="Restore" ContinueOnError="ErrorAndStop" />
</Target>

<Target Name="Pack">
<MSBuild Projects="@(PackProjects)" Targets="Pack" ContinueOnError="ErrorAndStop" />
</Target>

<Target Name="VSTest">
<MSBuild Projects="@(TestProjects)" Targets="VSTest" ContinueOnError="ErrorAndStop" />
</Target>

</Project>
14 changes: 14 additions & 0 deletions opentelemetry-dotnet-contrib.sln
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenTelemetry.ResourceDetec
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenTelemetry.ResourceDetectors.ProcessRuntime.Tests", "test\OpenTelemetry.ResourceDetectors.ProcessRuntime.Tests\OpenTelemetry.ResourceDetectors.ProcessRuntime.Tests.csproj", "{B6157646-8EBA-464C-99B9-C386D474CB12}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OpenTelemetry.Instrumentation.ConfluentKafka", "src\OpenTelemetry.Instrumentation.ConfluentKafka\OpenTelemetry.Instrumentation.ConfluentKafka.csproj", "{96341E23-990E-4144-A7E3-9EF0DAFF3232}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OpenTelemetry.Instrumentation.ConfluentKafka.Tests", "test\OpenTelemetry.Instrumentation.ConfluentKafka.Tests\OpenTelemetry.Instrumentation.ConfluentKafka.Tests.csproj", "{BE40900A-2859-471D-8802-21DFD73DDAA7}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -655,6 +659,14 @@ Global
{B6157646-8EBA-464C-99B9-C386D474CB12}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B6157646-8EBA-464C-99B9-C386D474CB12}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B6157646-8EBA-464C-99B9-C386D474CB12}.Release|Any CPU.Build.0 = Release|Any CPU
{96341E23-990E-4144-A7E3-9EF0DAFF3232}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{96341E23-990E-4144-A7E3-9EF0DAFF3232}.Debug|Any CPU.Build.0 = Debug|Any CPU
{96341E23-990E-4144-A7E3-9EF0DAFF3232}.Release|Any CPU.ActiveCfg = Release|Any CPU
{96341E23-990E-4144-A7E3-9EF0DAFF3232}.Release|Any CPU.Build.0 = Release|Any CPU
{BE40900A-2859-471D-8802-21DFD73DDAA7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{BE40900A-2859-471D-8802-21DFD73DDAA7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BE40900A-2859-471D-8802-21DFD73DDAA7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BE40900A-2859-471D-8802-21DFD73DDAA7}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -753,6 +765,8 @@ Global
{048509D6-FB49-4B84-832A-90E55520B97B} = {824BD1DE-3FA8-4FE0-823A-FD365EAC78AF}
{95372E82-CA5B-4C61-BD6C-74E6AB1970D4} = {22DF5DC0-1290-4E83-A9D8-6BB7DE3B3E63}
{B6157646-8EBA-464C-99B9-C386D474CB12} = {2097345F-4DD3-477D-BC54-A922F9B2B402}
{96341E23-990E-4144-A7E3-9EF0DAFF3232} = {22DF5DC0-1290-4E83-A9D8-6BB7DE3B3E63}
{BE40900A-2859-471D-8802-21DFD73DDAA7} = {2097345F-4DD3-477D-BC54-A922F9B2B402}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {B0816796-CDB3-47D7-8C3C-946434DE3B66}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation
OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation.AddProducer<TKey, TValue>(OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder<TKey, TValue> producerBuilder) -> void
OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation.AddProducer<TKey, TValue>(string name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder<TKey, TValue> producerBuilder) -> void
OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation.Dispose() -> void
OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentationOptions
OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentationOptions.ConfluentKafkaInstrumentationOptions() -> void
OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder<TKey, TValue>
OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder<TKey, TValue>.InstrumentedProducerBuilder(System.Collections.Generic.IEnumerable<System.Collections.Generic.KeyValuePair<string, string>> config) -> void
OpenTelemetry.Trace.TracerProviderBuilderExtensions
override OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder<TKey, TValue>.Build() -> Confluent.Kafka.IProducer<TKey, TValue>
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation<TKey, TValue>(this OpenTelemetry.Trace.TracerProviderBuilder builder) -> OpenTelemetry.Trace.TracerProviderBuilder
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation<TKey, TValue>(this OpenTelemetry.Trace.TracerProviderBuilder builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder<TKey, TValue> producerBuilder) -> OpenTelemetry.Trace.TracerProviderBuilder
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation<TKey, TValue>(this OpenTelemetry.Trace.TracerProviderBuilder builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder<TKey, TValue> producerBuilder, System.Action<OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentationOptions> configure) -> OpenTelemetry.Trace.TracerProviderBuilder
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation<TKey, TValue>(this OpenTelemetry.Trace.TracerProviderBuilder builder, string name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder<TKey, TValue> producerBuilder, System.Action<OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentationOptions> configure) -> OpenTelemetry.Trace.TracerProviderBuilder
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation<TKey, TValue>(this OpenTelemetry.Trace.TracerProviderBuilder builder, System.Action<OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentationOptions> configure) -> OpenTelemetry.Trace.TracerProviderBuilder
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.ConfigureKafkaInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder, System.Action<OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation> configure) -> OpenTelemetry.Trace.TracerProviderBuilder
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.ConfigureKafkaInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder, System.Action<System.IServiceProvider, OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation> configure) -> OpenTelemetry.Trace.TracerProviderBuilder
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation
OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation.AddProducer<TKey, TValue>(OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder<TKey, TValue> producerBuilder) -> void
OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation.AddProducer<TKey, TValue>(string name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder<TKey, TValue> producerBuilder) -> void
OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation.Dispose() -> void
OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentationOptions
OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentationOptions.ConfluentKafkaInstrumentationOptions() -> void
OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder<TKey, TValue>
OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder<TKey, TValue>.InstrumentedProducerBuilder(System.Collections.Generic.IEnumerable<System.Collections.Generic.KeyValuePair<string, string>> config) -> void
OpenTelemetry.Trace.TracerProviderBuilderExtensions
override OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder<TKey, TValue>.Build() -> Confluent.Kafka.IProducer<TKey, TValue>
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation<TKey, TValue>(this OpenTelemetry.Trace.TracerProviderBuilder builder) -> OpenTelemetry.Trace.TracerProviderBuilder
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation<TKey, TValue>(this OpenTelemetry.Trace.TracerProviderBuilder builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder<TKey, TValue> producerBuilder) -> OpenTelemetry.Trace.TracerProviderBuilder
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation<TKey, TValue>(this OpenTelemetry.Trace.TracerProviderBuilder builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder<TKey, TValue> producerBuilder, System.Action<OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentationOptions> configure) -> OpenTelemetry.Trace.TracerProviderBuilder
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation<TKey, TValue>(this OpenTelemetry.Trace.TracerProviderBuilder builder, string name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder<TKey, TValue> producerBuilder, System.Action<OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentationOptions> configure) -> OpenTelemetry.Trace.TracerProviderBuilder
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation<TKey, TValue>(this OpenTelemetry.Trace.TracerProviderBuilder builder, System.Action<OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentationOptions> configure) -> OpenTelemetry.Trace.TracerProviderBuilder
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.ConfigureKafkaInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder, System.Action<OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation> configure) -> OpenTelemetry.Trace.TracerProviderBuilder
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.ConfigureKafkaInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder, System.Action<System.IServiceProvider, OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation> configure) -> OpenTelemetry.Trace.TracerProviderBuilder
34 changes: 34 additions & 0 deletions src/OpenTelemetry.Instrumentation.ConfluentKafka/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// <copyright file="AssemblyInfo.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("OpenTelemetry.Instrumentation.ConfluentKafka.Tests" + AssemblyInfo.PublicKey)]
[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2" + AssemblyInfo.MoqPublicKey)]

#if SIGNED
internal static class AssemblyInfo
{
public const string PublicKey = ", PublicKey=002400000480000094000000060200000024000052534131000400000100010051C1562A090FB0C9F391012A32198B5E5D9A60E9B80FA2D7B434C9E5CCB7259BD606E66F9660676AFC6692B8CDC6793D190904551D2103B7B22FA636DCBB8208839785BA402EA08FC00C8F1500CCEF28BBF599AA64FFB1E1D5DC1BF3420A3777BADFE697856E9D52070A50C3EA5821C80BEF17CA3ACFFA28F89DD413F096F898";
public const string MoqPublicKey = ", PublicKey=0024000004800000940000000602000000240000525341310004000001000100c547cac37abd99c8db225ef2f6c8a3602f3b3606cc9891605d02baa56104f4cfc0734aa39b93bf7852f7d9266654753cc297e7d2edfe0bac1cdcf9f717241550e0a7b191195b7667bb4f64bcb8e2121380fd1d9d46ad2d92d2d15605093924cceaf74c4861eff62abf69b9291ed0a340e113be11e6a7d3113e92484cf7045cc7";
}
#else
internal static class AssemblyInfo
{
public const string PublicKey = "";
public const string MoqPublicKey = "";
}
#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// <copyright file="ConfluentKafkaInstrumentation.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using Confluent.Kafka;
using Microsoft.Extensions.Options;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Instrumentation.ConfluentKafka;

/// <summary>
/// Confluent.Kafka instrumentation.
/// </summary>
public sealed class ConfluentKafkaInstrumentation : IDisposable
{
private readonly IOptionsMonitor<ConfluentKafkaInstrumentationOptions> options;
private readonly MetricsService metricsService;
private readonly MetricsChannel metricsChannel;
private readonly CancellationTokenSource cts = new();

internal ConfluentKafkaInstrumentation(
IOptionsMonitor<ConfluentKafkaInstrumentationOptions> options,
MetricsService metricsService,
MetricsChannel metricsChannel)
{
this.options = options;
this.metricsService = metricsService;
this.metricsChannel = metricsChannel;

Task.Factory.StartNew(() => this.metricsService.ExecuteAsync(this.cts.Token), this.cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}

internal List<ConfluentKafkaProducerInstrumentation> InstrumentedProducers { get; } = new();

/// <summary>
/// Adds an <see cref="InstrumentedProducer{TKey, TValue}"/> to the instrumentation.
/// </summary>
/// <typeparam name="TKey">The type of the key.</typeparam>
/// <typeparam name="TValue">The type of the value.</typeparam>
/// <param name="producerBuilder"><see cref="InstrumentedProducerBuilder{TKey, TValue}"/>.</param>
public void AddProducer<TKey, TValue>(InstrumentedProducerBuilder<TKey, TValue> producerBuilder)
=> this.AddProducer(Options.DefaultName, producerBuilder);

/// <summary>
/// Adds an <see cref="InstrumentedProducer{TKey, TValue}"/> to the instrumentation.
/// </summary>
/// <typeparam name="TKey">The type of the key.</typeparam>
/// <typeparam name="TValue">The type of the value.</typeparam>
/// <param name="name">Name to use when retrieving options.</param>
/// <param name="producerBuilder"><see cref="InstrumentedProducerBuilder{TKey, TValue}"/>.</param>
public void AddProducer<TKey, TValue>(string name, InstrumentedProducerBuilder<TKey, TValue> producerBuilder)
{
Guard.ThrowIfNull(name);
Guard.ThrowIfNull(producerBuilder);

var options = this.options.Get(name);

producerBuilder.SetStatisticsHandler(this.OnStatistics);

lock (this.InstrumentedProducers)
{
var instrumentation = new ConfluentKafkaProducerInstrumentation<TKey, TValue>(producerBuilder, name, options);

this.InstrumentedProducers.Add(instrumentation);

lock (this.InstrumentedProducers)
{
if (this.InstrumentedProducers.Remove(instrumentation))
{
instrumentation.Dispose();
}
}
}
}

/// <inheritdoc/>
public void Dispose()
{
lock (this.InstrumentedProducers)
{
foreach (var instrumentation in this.InstrumentedProducers)
{
instrumentation.Dispose();
}

this.InstrumentedProducers.Clear();
}

this.cts.Dispose();
}

private void OnStatistics<TKey, TValue>(IProducer<TKey, TValue> producer, string json)
{
if (string.IsNullOrEmpty(json))
{
return;
}

this.metricsChannel.Writer.TryWrite(json);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// <copyright file="ConfluentKafkaInstrumentationOptions.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

namespace OpenTelemetry.Instrumentation.ConfluentKafka;

/// <summary>
/// Options for <see cref="ConfluentKafkaInstrumentation"/>.
/// </summary>
public class ConfluentKafkaInstrumentationOptions
{
}
Loading

0 comments on commit eb51913

Please sign in to comment.