Skip to content

Commit

Permalink
Use TimeProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
mayuki committed Sep 6, 2024
1 parent f70c2d0 commit 0fbd6ba
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 29 deletions.
2 changes: 1 addition & 1 deletion src/MagicOnion.Server/Diagnostics/MagicOnionMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void StreamingHubMethodCompleted(in MetricsContext context, StreamingHubH
var tags = InitializeTagListForStreamingHub(handler.HubName);
tags.Add("rpc.method", handler.MethodInfo.Name);
tags.Add("magiconion.streaminghub.is_error", isErrorOrInterrupted ? BoxedTrue : BoxedFalse);
streamingHubMethodDuration.Record((long)StopwatchHelper.GetElapsedTime(startingTimestamp, endingTimestamp).TotalMilliseconds, tags);
streamingHubMethodDuration.Record((long)TimeProvider.System.GetElapsedTime(startingTimestamp, endingTimestamp).TotalMilliseconds, tags);
streamingHubMethodCompletedCounter.Add(1, tags);
}
}
Expand Down
14 changes: 7 additions & 7 deletions src/MagicOnion.Server/Hubs/StreamingHub.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
using System.Buffers;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Cysharp.Runtime.Multicast;
using Cysharp.Runtime.Multicast.Remoting;
using Grpc.Core;
using MagicOnion.Internal;
Expand All @@ -23,6 +21,7 @@ public abstract class StreamingHubBase<THubInterface, TReceiver> : ServiceBase<T
{
IRemoteClientResultPendingTaskRegistry remoteClientResultPendingTasks = default!;
StreamingHubHeartbeatHandle heartbeatHandle = default!;
TimeProvider timeProvider = default!;

protected static readonly Task<Nil> NilTask = Task.FromResult(Nil.Default);
protected static readonly ValueTask CompletedTask = new ValueTask();
Expand Down Expand Up @@ -88,10 +87,11 @@ internal async Task<DuplexStreamingResult<StreamingHubPayload, StreamingHubPaylo

var features = this.Context.CallContext.GetHttpContext().Features;
var magicOnionOptions = serviceProvider.GetRequiredService<IOptions<MagicOnionOptions>>().Value;
timeProvider = magicOnionOptions.TimeProvider ?? TimeProvider.System;

var remoteProxyFactory = serviceProvider.GetRequiredService<IRemoteProxyFactory>();
var remoteSerializer = serviceProvider.GetRequiredService<IRemoteSerializer>();
this.remoteClientResultPendingTasks = new RemoteClientResultPendingTaskRegistry(magicOnionOptions.ClientResultsDefaultTimeout, magicOnionOptions.TimeProvider ?? TimeProvider.System);
this.remoteClientResultPendingTasks = new RemoteClientResultPendingTaskRegistry(magicOnionOptions.ClientResultsDefaultTimeout, timeProvider);
this.Client = remoteProxyFactory.CreateDirect<TReceiver>(new MagicOnionRemoteReceiverWriter(StreamingServiceContext), remoteSerializer, remoteClientResultPendingTasks);

var handlerRepository = serviceProvider.GetRequiredService<StreamingHubHandlerRepository>();
Expand Down Expand Up @@ -270,10 +270,10 @@ async ValueTask ProcessRequestAsync(UniqueHashDictionary<StreamingHubHandler> ha
hubInstance: this,
request: body,
messageId: messageId,
timestamp: DateTime.UtcNow
timestamp: timeProvider.GetUtcNow().UtcDateTime
);

var methodStartingTimestamp = Stopwatch.GetTimestamp();
var methodStartingTimestamp = timeProvider.GetTimestamp();
var isErrorOrInterrupted = false;
MagicOnionServerLog.BeginInvokeHubMethod(Context.MethodHandler.Logger, context, context.Request, handler.RequestType);
try
Expand All @@ -300,8 +300,8 @@ async ValueTask ProcessRequestAsync(UniqueHashDictionary<StreamingHubHandler> ha
}
finally
{
var methodEndingTimestamp = Stopwatch.GetTimestamp();
MagicOnionServerLog.EndInvokeHubMethod(Context.MethodHandler.Logger, context, context.ResponseSize, context.ResponseType, StopwatchHelper.GetElapsedTime(methodStartingTimestamp, methodEndingTimestamp).TotalMilliseconds, isErrorOrInterrupted);
var methodEndingTimestamp = timeProvider.GetTimestamp();
MagicOnionServerLog.EndInvokeHubMethod(Context.MethodHandler.Logger, context, context.ResponseSize, context.ResponseType, timeProvider.GetElapsedTime(methodStartingTimestamp, methodEndingTimestamp).TotalMilliseconds, isErrorOrInterrupted);
Metrics.StreamingHubMethodCompleted(Context.Metrics, handler, methodStartingTimestamp, methodEndingTimestamp, isErrorOrInterrupted);

StreamingHubContextPool.Shared.Return(context);
Expand Down
2 changes: 1 addition & 1 deletion src/MagicOnion.Server/Hubs/StreamingHubHeartbeatManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void Ack(short sequence)
lock (gate)
{
var receivedAtTimestamp = manager.TimeProvider.GetTimestamp();
var elapsed = StopwatchHelper.GetElapsedTime(lastSentAtTimestamp, receivedAtTimestamp);
var elapsed = manager.TimeProvider.GetElapsedTime(lastSentAtTimestamp, receivedAtTimestamp);

LastReceivedAt = lastSentAt.Add(elapsed);
Latency = elapsed;
Expand Down
20 changes: 0 additions & 20 deletions src/MagicOnion.Server/Internal/StopwatchHelper.cs

This file was deleted.

0 comments on commit 0fbd6ba

Please sign in to comment.