Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sdk-metrics] Refactor and improve interlocking code for doubles in MetricPoint #5378

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions src/OpenTelemetry/Internal/InterlockedHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Runtime.CompilerServices;

namespace OpenTelemetry.Internal;

internal static class InterlockedHelper
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void Add(ref double location, double value)
{
// Note: Not calling InterlockedHelper.Read here on purpose because it
// is too expensive for fast/happy-path. If the first attempt fails
// we'll end up in an Interlocked.CompareExchange loop anyway.
double currentValue = Volatile.Read(ref location);

var returnedValue = Interlocked.CompareExchange(ref location, currentValue + value, currentValue);
if (returnedValue != currentValue)
{
AddRare(ref location, value, returnedValue);
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static double Read(ref double location)
=> Interlocked.CompareExchange(ref location, double.NaN, double.NaN);
Comment on lines +26 to +27

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need this to be a CompareExchange rather than just Volatile.Read? You need a full fence for some reason?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code before was...

this.snapshotValue.AsDouble = Interlocked.CompareExchange(ref this.runningValue.AsDouble, 0.0, double.NegativeInfinity);

Now it is calling this helper...

this.snapshotValue.AsDouble = InterlockedHelper.Read(ref this.runningValue.AsDouble);

What that code needs is no stale reads. Does Interlocked.CompareExchange (full fence) give us 100% guaranteed up-to-date values? My understanding with Volatile.Read is it could give us back something stale.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What that code needs is no stale reads.

Define stale. Volatile.Read will ensure it's not just using some enregistered value. But beyond that, I don't see how CompareExchange(ref location, sentinel, sentinel) would provide strong guarantees about "up-to-date". The exact moment after you read the value but before you do anything with it it could be updated concurrently, at which point the value isn't "up to date".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How OpenTelemetry SDK is working is many threads are recording values. For this double case it is either of these:

            case AggregationType.DoubleSumIncomingDelta:
                {
                    InterlockedHelper.Add(ref this.runningValue.AsDouble, number);
                    break;
                }

            case AggregationType.DoubleSumIncomingCumulative:
                {
                    Interlocked.Exchange(ref this.runningValue.AsDouble, number);
                    break;
                }

There is a dedicated thread which does a collection operation periodically. Its job is to grab a snapshot of the current/running state. That is this block:

            case AggregationType.DoubleSumIncomingDelta:
            case AggregationType.DoubleSumIncomingCumulative:
                {
                    if (outputDelta)
                    {
                        double initValue = InterlockedHelper.Read(ref this.runningValue.AsDouble);
                        this.snapshotValue.AsDouble = initValue - this.deltaLastValue.AsDouble;
                        this.deltaLastValue.AsDouble = initValue;
                        this.MetricPointStatus = MetricPointStatus.NoCollectPending;

                        // Check again if value got updated, if yes reset status.
                        // This ensures no Updates get Lost.
                        if (initValue != InterlockedHelper.Read(ref this.runningValue.AsDouble))
                        {
                            this.MetricPointStatus = MetricPointStatus.CollectPending;
                        }
                    }
                    else
                    {
                        this.snapshotValue.AsDouble = InterlockedHelper.Read(ref this.runningValue.AsDouble);
                    }

                    break;
                }

The delta case needs to compute the change since the last collect:

                        double initValue = InterlockedHelper.Read(ref this.runningValue.AsDouble);
                        this.snapshotValue.AsDouble = initValue - this.deltaLastValue.AsDouble;
                        this.deltaLastValue.AsDouble = initValue;

It's OK if the value changes right before or right after but we need to make sure whatever we got was true and not some cached or local thing. Does that help?

Copy link

@stephentoub stephentoub Feb 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's OK if the value changes right before or right after but we need to make sure whatever we got was true and not some cached or local thing.

I still don't understand. With a CompareExchange (that's not also updating anything atomically, just reading), you might read the value, then the thread might be context switched out, and 10 minutes later it might start running again and you continue with the 10 minute old value. What's the difference between that and "some cached" thing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stephentoub

Here is our version for long measurements:

            case AggregationType.LongSumIncomingDelta:
            case AggregationType.LongSumIncomingCumulative:
                {
                    if (outputDelta)
                    {
                        long initValue = Interlocked.Read(ref this.runningValue.AsLong);
                        this.snapshotValue.AsLong = initValue - this.deltaLastValue.AsLong;
                        this.deltaLastValue.AsLong = initValue;
                        this.MetricPointStatus = MetricPointStatus.NoCollectPending;

                        // Check again if value got updated, if yes reset status.
                        // This ensures no Updates get Lost.
                        if (initValue != Interlocked.Read(ref this.runningValue.AsLong))
                        {
                            this.MetricPointStatus = MetricPointStatus.CollectPending;
                        }
                    }
                    else
                    {
                        this.snapshotValue.AsLong = Interlocked.Read(ref this.runningValue.AsLong);
                    }

                    break;
                }

Also uses Interlocked.Read, do you think it would be better to use Volatile.Read there too?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is our main motivation behind doing our best to capture as many updates as possible before running collect. Any updates that we miss, would only be reported in the next collect cycle (which would be like a minute later).

Can you share the data you have that demonstrates using CompareExchange is actually improving this?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also uses Interlocked.Read, do you think it would be better to use Volatile.Read there too?

Interlocked.Read(ref value) is just CompareExchange(ref value, 0, 0), so it's effectively the same thing you're doing with the doubles. Personally, I'd just use Volatile.Read. I'm really interested in seeing the data that suggests there's any meaningful benefit to using CompareExchange instead in these scenarios; otherwise, I'd stick with the idiomatic answer that's also cheaper.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you share the data you have that demonstrates using CompareExchange is actually improving this?

We haven't tested this, and we most likely would not anytime soon. It would require elaborate testing setups with different kinds of hardware. That might not be the best use of our time. Ideally, we would like to rely on the official docs for this kind of information instead of running experiments ourselves and gathering this kind of data.

In this particular case, our understanding/assumption is in line with what you mentioned in one of your previous comments:

Using an Interlocked.CompareExchange here may give you a value more synchronized with the other uses of those variables, but that's because it's actually synchronizing,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My guess is you'd find it makes essentially zero difference, other than slowing down the writing code.


[MethodImpl(MethodImplOptions.NoInlining)]
private static void AddRare(ref double location, double value, double currentValue)
{
var sw = default(SpinWait);
while (true)
{
sw.SpinOnce();

var returnedValue = Interlocked.CompareExchange(ref location, currentValue + value, currentValue);
if (returnedValue == currentValue)
{
break;
}

currentValue = returnedValue;
}
}
}
46 changes: 7 additions & 39 deletions src/OpenTelemetry/Metrics/MetricPoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System.Diagnostics;
using System.Runtime.CompilerServices;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Metrics;

Expand Down Expand Up @@ -578,25 +579,7 @@ internal void Update(double number)
{
case AggregationType.DoubleSumIncomingDelta:
{
double initValue, newValue;
var sw = default(SpinWait);
while (true)
{
initValue = this.runningValue.AsDouble;

unchecked
{
newValue = initValue + number;
}

if (initValue == Interlocked.CompareExchange(ref this.runningValue.AsDouble, newValue, initValue))
{
break;
}

sw.SpinOnce();
}

InterlockedHelper.Add(ref this.runningValue.AsDouble, number);
break;
}

Expand Down Expand Up @@ -833,31 +816,21 @@ internal void TakeSnapshot(bool outputDelta)
{
if (outputDelta)
{
// TODO:
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
// Is this thread-safe way to read double?
// As long as the value is not -ve infinity,
// the exchange (to 0.0) will never occur,
// but we get the original value atomically.
double initValue = Interlocked.CompareExchange(ref this.runningValue.AsDouble, 0.0, double.NegativeInfinity);
double initValue = InterlockedHelper.Read(ref this.runningValue.AsDouble);
this.snapshotValue.AsDouble = initValue - this.deltaLastValue.AsDouble;
this.deltaLastValue.AsDouble = initValue;
this.MetricPointStatus = MetricPointStatus.NoCollectPending;

// Check again if value got updated, if yes reset status.
// This ensures no Updates get Lost.
if (initValue != Interlocked.CompareExchange(ref this.runningValue.AsDouble, 0.0, double.NegativeInfinity))
if (initValue != InterlockedHelper.Read(ref this.runningValue.AsDouble))
{
this.MetricPointStatus = MetricPointStatus.CollectPending;
}
}
else
{
// TODO:
// Is this thread-safe way to read double?
// As long as the value is not -ve infinity,
// the exchange (to 0.0) will never occur,
// but we get the original value atomically.
this.snapshotValue.AsDouble = Interlocked.CompareExchange(ref this.runningValue.AsDouble, 0.0, double.NegativeInfinity);
this.snapshotValue.AsDouble = InterlockedHelper.Read(ref this.runningValue.AsDouble);
}

break;
Expand All @@ -880,17 +853,12 @@ internal void TakeSnapshot(bool outputDelta)

case AggregationType.DoubleGauge:
{
// TODO:
// Is this thread-safe way to read double?
// As long as the value is not -ve infinity,
// the exchange (to 0.0) will never occur,
// but we get the original value atomically.
this.snapshotValue.AsDouble = Interlocked.CompareExchange(ref this.runningValue.AsDouble, 0.0, double.NegativeInfinity);
this.snapshotValue.AsDouble = InterlockedHelper.Read(ref this.runningValue.AsDouble);
this.MetricPointStatus = MetricPointStatus.NoCollectPending;

// Check again if value got updated, if yes reset status.
// This ensures no Updates get Lost.
if (this.snapshotValue.AsDouble != Interlocked.CompareExchange(ref this.runningValue.AsDouble, 0.0, double.NegativeInfinity))
if (this.snapshotValue.AsDouble != InterlockedHelper.Read(ref this.runningValue.AsDouble))
{
this.MetricPointStatus = MetricPointStatus.CollectPending;
}
Expand Down