Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sdk-metrics] Exemplar concurrency updates #5465

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions OpenTelemetry.sln
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Shared", "Shared", "{A49299
src\Shared\TagAndValueTransformer.cs = src\Shared\TagAndValueTransformer.cs
src\Shared\TagTransformer.cs = src\Shared\TagTransformer.cs
src\Shared\TagTransformerJsonHelper.cs = src\Shared\TagTransformerJsonHelper.cs
src\Shared\ThreadSafeRandom.cs = src\Shared\ThreadSafeRandom.cs
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "DiagnosticSourceInstrumentation", "DiagnosticSourceInstrumentation", "{28F3EC79-660C-4659-8B73-F90DC1173316}"
Expand Down
42 changes: 42 additions & 0 deletions src/OpenTelemetry/Metrics/Exemplar/Exemplar.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ struct Exemplar
private int tagCount;
private KeyValuePair<string, object?>[]? tagStorage;
private MetricPointValueStorage valueStorage;
private int isCriticalSectionOccupied;

/// <summary>
/// Gets the timestamp.
Expand Down Expand Up @@ -103,6 +104,12 @@ public readonly ReadOnlyFilteredTagCollection FilteredTags
internal void Update<T>(in ExemplarMeasurement<T> measurement)
where T : struct
{
if (Interlocked.Exchange(ref this.isCriticalSectionOccupied, 1) != 0)
{
// Some other thread is already writing, abort.
vishweshbankwar marked this conversation as resolved.
Show resolved Hide resolved
return;
}

this.Timestamp = DateTimeOffset.UtcNow;

if (typeof(T) == typeof(long))
Expand Down Expand Up @@ -135,6 +142,8 @@ internal void Update<T>(in ExemplarMeasurement<T> measurement)
{
this.StoreRawTags(measurement.Tags);
}

Interlocked.Exchange(ref this.isCriticalSectionOccupied, 0);
}

internal void Reset()
Expand All @@ -147,6 +156,29 @@ internal readonly bool IsUpdated()
return this.Timestamp != default;
}

internal void Collect(ref Exemplar destination, bool reset)
{
if (Interlocked.Exchange(ref this.isCriticalSectionOccupied, 1) != 0)
{
this.AcquireLockRare();
}

if (this.IsUpdated())
{
this.Copy(ref destination);
if (reset)
{
this.Reset();
}
}
else
{
destination.Reset();
}

Interlocked.Exchange(ref this.isCriticalSectionOccupied, 0);
}

internal readonly void Copy(ref Exemplar destination)
{
destination.Timestamp = this.Timestamp;
Expand Down Expand Up @@ -179,4 +211,14 @@ private void StoreRawTags(ReadOnlySpan<KeyValuePair<string, object?>> tags)

tags.CopyTo(this.tagStorage);
}

private void AcquireLockRare()
{
SpinWait spinWait = default;
do
{
spinWait.SpinOnce();
}
while (Interlocked.Exchange(ref this.isCriticalSectionOccupied, 1) != 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,36 +29,13 @@ public sealed override ReadOnlyExemplarCollection Collect()
{
var runningExemplars = this.runningExemplars;

if (this.ResetOnCollect)
for (int i = 0; i < runningExemplars.Length; i++)
{
for (int i = 0; i < runningExemplars.Length; i++)
{
ref var running = ref runningExemplars[i];
if (running.IsUpdated())
{
running.Copy(ref this.snapshotExemplars[i]);
running.Reset();
}
else
{
this.snapshotExemplars[i].Reset();
}
}
}
else
{
for (int i = 0; i < runningExemplars.Length; i++)
{
ref var running = ref runningExemplars[i];
if (running.IsUpdated())
{
running.Copy(ref this.snapshotExemplars[i]);
}
else
{
this.snapshotExemplars[i].Reset();
}
}
ref var running = ref runningExemplars[i];

running.Collect(
ref this.snapshotExemplars[i],
reset: this.ResetOnCollect);
}

this.OnCollected();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using OpenTelemetry.Internal;

namespace OpenTelemetry.Metrics;

/// <summary>
Expand All @@ -12,8 +14,6 @@ namespace OpenTelemetry.Metrics;
/// </remarks>
internal sealed class SimpleFixedSizeExemplarReservoir : FixedSizeExemplarReservoir
{
private readonly Random random = new();

private int measurementsSeen;

public SimpleFixedSizeExemplarReservoir(int poolSize)
Expand Down Expand Up @@ -42,15 +42,15 @@ protected override void OnCollected()
private void Offer<T>(in ExemplarMeasurement<T> measurement)
where T : struct
{
var measurementNumber = this.measurementsSeen++;
var measurementNumber = Interlocked.Increment(ref this.measurementsSeen) - 1;
vishweshbankwar marked this conversation as resolved.
Show resolved Hide resolved

if (measurementNumber < this.Capacity)
{
this.UpdateExemplar(measurementNumber, in measurement);
}
else
{
var index = this.random.Next(0, measurementNumber);
int index = ThreadSafeRandom.Next(0, measurementNumber);
if (index < this.Capacity)
{
this.UpdateExemplar(index, in measurement);
Expand Down
1 change: 1 addition & 0 deletions src/OpenTelemetry/OpenTelemetry.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
<Compile Include="$(RepoRoot)\src\Shared\Options\*.cs" Link="Includes\Options\%(Filename).cs" />
<Compile Include="$(RepoRoot)\src\Shared\ResourceSemanticConventions.cs" Link="Includes\ResourceSemanticConventions.cs" />
<Compile Include="$(RepoRoot)\src\Shared\Shims\NullableAttributes.cs" Link="Includes\Shims\NullableAttributes.cs" />
<Compile Include="$(RepoRoot)\src\Shared\ThreadSafeRandom.cs" Link="Includes\ThreadSafeRandom.cs" />
</ItemGroup>

</Project>
37 changes: 37 additions & 0 deletions src/Shared/ThreadSafeRandom.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

namespace OpenTelemetry.Internal;

// Note: Inspired by https://devblogs.microsoft.com/pfxteam/getting-random-numbers-in-a-thread-safe-way/
internal static class ThreadSafeRandom
{
#if NET6_0_OR_GREATER
public static int Next(int min, int max)
{
return Random.Shared.Next(min, max);
}
#else
private static readonly Random GlobalRandom = new();

[ThreadStatic]
private static Random? localRandom;

public static int Next(int min, int max)
{
var local = localRandom;
if (local == null)
{
int seed;
lock (GlobalRandom)
{
seed = GlobalRandom.Next();
}

localRandom = local = new Random(seed);
}

return local.Next(min, max);
}
#endif
}