Skip to content

Commit

Permalink
Implement async reconnect (#2399)
Browse files Browse the repository at this point in the history
- Implement the reconnect and republish with async service calls. 

bug fixes:
- subscription Ids should start from a random number
- the keep alive interval calculation is too short, adding +1 for the timeout
- on BadSessionIdInvalid recreate session immediately 
- on a restarted server with new certificate, also reload endpoints when BadCertificateInvalid is returned
  • Loading branch information
mregen authored Nov 30, 2023
1 parent 1437138 commit 65803c4
Show file tree
Hide file tree
Showing 13 changed files with 524 additions and 310 deletions.
5 changes: 4 additions & 1 deletion Applications/ConsoleReferenceClient/UAClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void Dispose()
/// <summary>
/// The reconnect period to be used in ms.
/// </summary>
public int ReconnectPeriod { get; set; } = 5000;
public int ReconnectPeriod { get; set; } = 1000;

/// <summary>
/// The reconnect period exponential backoff to be used in ms.
Expand Down Expand Up @@ -294,6 +294,9 @@ private void Session_KeepAlive(ISession session, KeepAliveEventArgs e)
Utils.LogInfo("KeepAlive status {0}, reconnect status {1}.", e.Status, state);
}

// cancel sending a new keep alive request, because reconnect is triggered.
e.CancelKeepAlive = true;

return;
}
}
Expand Down
4 changes: 2 additions & 2 deletions Libraries/Opc.Ua.Client/DefaultSessionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public virtual async Task<ISession> RecreateAsync(ISession sessionTemplate, Canc
{
if (!(sessionTemplate is Session template))
{
throw new ArgumentOutOfRangeException(nameof(template), "The ISession provided is not of a supported type.");
throw new ArgumentOutOfRangeException(nameof(sessionTemplate), "The ISession provided is not of a supported type.");
}

return await Session.RecreateAsync(template, ct).ConfigureAwait(false);
Expand All @@ -196,7 +196,7 @@ public virtual async Task<ISession> RecreateAsync(ISession sessionTemplate, ITra
{
if (!(sessionTemplate is Session template))
{
throw new ArgumentOutOfRangeException(nameof(template), "The ISession provided is not of a supported type");
throw new ArgumentOutOfRangeException(nameof(sessionTemplate), "The ISession provided is not of a supported type");
}

return await Session.RecreateAsync(template, connection, ct).ConfigureAwait(false);
Expand Down
22 changes: 22 additions & 0 deletions Libraries/Opc.Ua.Client/ISession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,23 @@ public interface ISession : ISessionClient
/// </summary>
void Reconnect(ITransportChannel channel);

#if (CLIENT_ASYNC)
/// <summary>
/// Reconnects to the server after a network failure.
/// </summary>
Task ReconnectAsync(CancellationToken ct = default);

/// <summary>
/// Reconnects to the server after a network failure using a waiting connection.
/// </summary>
Task ReconnectAsync(ITransportWaitingConnection connection, CancellationToken ct = default);

/// <summary>
/// Reconnects to the server using a new channel.
/// </summary>
Task ReconnectAsync(ITransportChannel channel, CancellationToken ct = default);
#endif

/// <summary>
/// Saves all the subscriptions of the session.
/// </summary>
Expand Down Expand Up @@ -934,6 +951,11 @@ ResponseHeader EndBrowseNext(
bool ResendData(IEnumerable<Subscription> subscriptions, out IList<ServiceResult> errors);

#if CLIENT_ASYNC
/// <summary>
/// Sends a republish request.
/// </summary>
Task<bool> RepublishAsync(uint subscriptionId, uint sequenceNumber, CancellationToken ct = default);

/// <summary>
/// Call the ResendData method on the server for all subscriptions.
/// </summary>
Expand Down
12 changes: 6 additions & 6 deletions Libraries/Opc.Ua.Client/ISessionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,27 +175,27 @@ Task<ISession> CreateAsync(
/// <summary>
/// Recreates a session based on a specified template.
/// </summary>
/// <param name="template">The ISession object to use as template</param>
/// <param name="sessionTemplate">The ISession object to use as template</param>
/// <param name="ct">The cancellation token.</param>
/// <returns>The new session object.</returns>
Task<ISession> RecreateAsync(ISession template, CancellationToken ct = default);
Task<ISession> RecreateAsync(ISession sessionTemplate, CancellationToken ct = default);

/// <summary>
/// Recreates a session based on a specified template.
/// </summary>
/// <param name="template">The ISession object to use as template</param>
/// <param name="sessionTemplate">The ISession object to use as template</param>
/// <param name="connection">The waiting reverse connection.</param>
/// <param name="ct">The cancellation token.</param>
/// <returns>The new session object.</returns>
Task<ISession> RecreateAsync(ISession template, ITransportWaitingConnection connection, CancellationToken ct = default);
Task<ISession> RecreateAsync(ISession sessionTemplate, ITransportWaitingConnection connection, CancellationToken ct = default);

/// <summary>
/// Recreates a session based on a specified template using the provided channel.
/// </summary>
/// <param name="template">The Session object to use as template</param>
/// <param name="sessionTemplate">The Session object to use as template</param>
/// <param name="transportChannel">The channel to use to recreate the session.</param>
/// <param name="ct">The cancellation token.</param>
/// <returns>The new session object.</returns>
Task<ISession> RecreateAsync(ISession template, ITransportChannel transportChannel, CancellationToken ct = default);
Task<ISession> RecreateAsync(ISession sessionTemplate, ITransportChannel transportChannel, CancellationToken ct = default);
}
}
Loading

0 comments on commit 65803c4

Please sign in to comment.