Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sdk-metrics] Exemplar concurrency updates #5465

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions OpenTelemetry.sln
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Shared", "Shared", "{A49299
src\Shared\StatusHelper.cs = src\Shared\StatusHelper.cs
src\Shared\TagTransformer.cs = src\Shared\TagTransformer.cs
src\Shared\TagTransformerJsonHelper.cs = src\Shared\TagTransformerJsonHelper.cs
src\Shared\ThreadSafeRandom.cs = src\Shared\ThreadSafeRandom.cs
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "DiagnosticSourceInstrumentation", "DiagnosticSourceInstrumentation", "{28F3EC79-660C-4659-8B73-F90DC1173316}"
Expand Down
47 changes: 47 additions & 0 deletions src/OpenTelemetry/Metrics/Exemplar/Exemplar.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ struct Exemplar
private int tagCount;
private KeyValuePair<string, object?>[]? tagStorage;
private MetricPointValueStorage valueStorage;
private int isCriticalSectionOccupied;

/// <summary>
/// Gets the timestamp.
Expand Down Expand Up @@ -103,6 +104,17 @@ public readonly ReadOnlyFilteredTagCollection FilteredTags
internal void Update<T>(in ExemplarMeasurement<T> measurement)
where T : struct
{
if (Interlocked.Exchange(ref this.isCriticalSectionOccupied, 1) != 0)
{
// Note: If we reached here it means some other thread is already
// updating the exemplar. Instead of spinning, we abort. The idea is
// for two exemplars offered at more or less the same time there
// really isn't a difference which one is stored so it is an
// optimization to let the losing thread(s) get back to work instead
// of spinning.
return;
}

this.Timestamp = DateTimeOffset.UtcNow;

if (typeof(T) == typeof(long))
Expand Down Expand Up @@ -135,6 +147,8 @@ internal void Update<T>(in ExemplarMeasurement<T> measurement)
{
this.StoreRawTags(measurement.Tags);
}

Interlocked.Exchange(ref this.isCriticalSectionOccupied, 0);
}

internal void Reset()
Expand All @@ -147,6 +161,29 @@ internal readonly bool IsUpdated()
return this.Timestamp != default;
}

internal void Collect(ref Exemplar destination, bool reset)
{
if (Interlocked.Exchange(ref this.isCriticalSectionOccupied, 1) != 0)
{
this.AcquireLockRare();
}

if (this.IsUpdated())
{
this.Copy(ref destination);
if (reset)
{
this.Reset();
}
}
else
{
destination.Reset();
}

Interlocked.Exchange(ref this.isCriticalSectionOccupied, 0);
}

internal readonly void Copy(ref Exemplar destination)
{
destination.Timestamp = this.Timestamp;
Expand Down Expand Up @@ -179,4 +216,14 @@ private void StoreRawTags(ReadOnlySpan<KeyValuePair<string, object?>> tags)

tags.CopyTo(this.tagStorage);
}

private void AcquireLockRare()
{
SpinWait spinWait = default;
do
{
spinWait.SpinOnce();
}
while (Interlocked.Exchange(ref this.isCriticalSectionOccupied, 1) != 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,36 +29,13 @@ public sealed override ReadOnlyExemplarCollection Collect()
{
var runningExemplars = this.runningExemplars;

if (this.ResetOnCollect)
for (int i = 0; i < runningExemplars.Length; i++)
{
for (int i = 0; i < runningExemplars.Length; i++)
{
ref var running = ref runningExemplars[i];
if (running.IsUpdated())
{
running.Copy(ref this.snapshotExemplars[i]);
running.Reset();
}
else
{
this.snapshotExemplars[i].Reset();
}
}
}
else
{
for (int i = 0; i < runningExemplars.Length; i++)
{
ref var running = ref runningExemplars[i];
if (running.IsUpdated())
{
running.Copy(ref this.snapshotExemplars[i]);
}
else
{
this.snapshotExemplars[i].Reset();
}
}
ref var running = ref runningExemplars[i];

running.Collect(
ref this.snapshotExemplars[i],
reset: this.ResetOnCollect);
}

this.OnCollected();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using OpenTelemetry.Internal;

namespace OpenTelemetry.Metrics;

/// <summary>
Expand All @@ -12,9 +14,8 @@ namespace OpenTelemetry.Metrics;
/// </remarks>
internal sealed class SimpleFixedSizeExemplarReservoir : FixedSizeExemplarReservoir
{
private readonly Random random = new();

private int measurementsSeen;
private const int DefaultMeasurementState = -1;
private int measurementState = DefaultMeasurementState;

public SimpleFixedSizeExemplarReservoir(int poolSize)
: base(poolSize)
Expand All @@ -36,21 +37,21 @@ protected override void OnCollected()
// Reset internal state irrespective of temporality.
// This ensures incoming measurements have fair chance
// of making it to the reservoir.
this.measurementsSeen = 0;
this.measurementState = DefaultMeasurementState;
}

private void Offer<T>(in ExemplarMeasurement<T> measurement)
where T : struct
{
var measurementNumber = this.measurementsSeen++;
var measurementState = Interlocked.Increment(ref this.measurementState);

if (measurementNumber < this.Capacity)
if (measurementState < this.Capacity)
{
this.UpdateExemplar(measurementNumber, in measurement);
this.UpdateExemplar(measurementState, in measurement);
}
else
{
var index = this.random.Next(0, measurementNumber);
int index = ThreadSafeRandom.Next(0, measurementState);
if (index < this.Capacity)
{
this.UpdateExemplar(index, in measurement);
Expand Down
1 change: 1 addition & 0 deletions src/OpenTelemetry/OpenTelemetry.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
<Compile Include="$(RepoRoot)\src\Shared\Options\*.cs" Link="Includes\Options\%(Filename).cs" />
<Compile Include="$(RepoRoot)\src\Shared\ResourceSemanticConventions.cs" Link="Includes\ResourceSemanticConventions.cs" />
<Compile Include="$(RepoRoot)\src\Shared\Shims\NullableAttributes.cs" Link="Includes\Shims\NullableAttributes.cs" />
<Compile Include="$(RepoRoot)\src\Shared\ThreadSafeRandom.cs" Link="Includes\ThreadSafeRandom.cs" />
</ItemGroup>

</Project>
37 changes: 37 additions & 0 deletions src/Shared/ThreadSafeRandom.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

namespace OpenTelemetry.Internal;

// Note: Inspired by https://devblogs.microsoft.com/pfxteam/getting-random-numbers-in-a-thread-safe-way/
internal static class ThreadSafeRandom
{
#if NET6_0_OR_GREATER
public static int Next(int min, int max)
{
return Random.Shared.Next(min, max);
}
#else
private static readonly Random GlobalRandom = new();

[ThreadStatic]
private static Random? localRandom;

public static int Next(int min, int max)
{
var local = localRandom;
if (local == null)
{
int seed;
lock (GlobalRandom)
{
seed = GlobalRandom.Next();
}

localRandom = local = new Random(seed);
}

return local.Next(min, max);
}
#endif
}