diff --git a/Applications/ConsoleReferenceClient/UAClient.cs b/Applications/ConsoleReferenceClient/UAClient.cs index aacb02474..51c49193e 100644 --- a/Applications/ConsoleReferenceClient/UAClient.cs +++ b/Applications/ConsoleReferenceClient/UAClient.cs @@ -99,7 +99,7 @@ public void Dispose() /// /// The reconnect period to be used in ms. /// - public int ReconnectPeriod { get; set; } = 5000; + public int ReconnectPeriod { get; set; } = 1000; /// /// The reconnect period exponential backoff to be used in ms. @@ -294,6 +294,9 @@ private void Session_KeepAlive(ISession session, KeepAliveEventArgs e) Utils.LogInfo("KeepAlive status {0}, reconnect status {1}.", e.Status, state); } + // cancel sending a new keep alive request, because reconnect is triggered. + e.CancelKeepAlive = true; + return; } } diff --git a/Libraries/Opc.Ua.Client/DefaultSessionFactory.cs b/Libraries/Opc.Ua.Client/DefaultSessionFactory.cs index beb87dafb..a498a8484 100644 --- a/Libraries/Opc.Ua.Client/DefaultSessionFactory.cs +++ b/Libraries/Opc.Ua.Client/DefaultSessionFactory.cs @@ -185,7 +185,7 @@ public virtual async Task RecreateAsync(ISession sessionTemplate, Canc { if (!(sessionTemplate is Session template)) { - throw new ArgumentOutOfRangeException(nameof(template), "The ISession provided is not of a supported type."); + throw new ArgumentOutOfRangeException(nameof(sessionTemplate), "The ISession provided is not of a supported type."); } return await Session.RecreateAsync(template, ct).ConfigureAwait(false); @@ -196,7 +196,7 @@ public virtual async Task RecreateAsync(ISession sessionTemplate, ITra { if (!(sessionTemplate is Session template)) { - throw new ArgumentOutOfRangeException(nameof(template), "The ISession provided is not of a supported type"); + throw new ArgumentOutOfRangeException(nameof(sessionTemplate), "The ISession provided is not of a supported type"); } return await Session.RecreateAsync(template, connection, ct).ConfigureAwait(false); diff --git a/Libraries/Opc.Ua.Client/ISession.cs b/Libraries/Opc.Ua.Client/ISession.cs index 3a1e2e86c..cd6c4cbff 100644 --- a/Libraries/Opc.Ua.Client/ISession.cs +++ b/Libraries/Opc.Ua.Client/ISession.cs @@ -314,6 +314,23 @@ public interface ISession : ISessionClient /// void Reconnect(ITransportChannel channel); +#if (CLIENT_ASYNC) + /// + /// Reconnects to the server after a network failure. + /// + Task ReconnectAsync(CancellationToken ct = default); + + /// + /// Reconnects to the server after a network failure using a waiting connection. + /// + Task ReconnectAsync(ITransportWaitingConnection connection, CancellationToken ct = default); + + /// + /// Reconnects to the server using a new channel. + /// + Task ReconnectAsync(ITransportChannel channel, CancellationToken ct = default); +#endif + /// /// Saves all the subscriptions of the session. /// @@ -934,6 +951,11 @@ ResponseHeader EndBrowseNext( bool ResendData(IEnumerable subscriptions, out IList errors); #if CLIENT_ASYNC + /// + /// Sends a republish request. + /// + Task RepublishAsync(uint subscriptionId, uint sequenceNumber, CancellationToken ct = default); + /// /// Call the ResendData method on the server for all subscriptions. /// diff --git a/Libraries/Opc.Ua.Client/ISessionFactory.cs b/Libraries/Opc.Ua.Client/ISessionFactory.cs index 93d1ede1d..ed7f917bd 100644 --- a/Libraries/Opc.Ua.Client/ISessionFactory.cs +++ b/Libraries/Opc.Ua.Client/ISessionFactory.cs @@ -175,27 +175,27 @@ Task CreateAsync( /// /// Recreates a session based on a specified template. /// - /// The ISession object to use as template + /// The ISession object to use as template /// The cancellation token. /// The new session object. - Task RecreateAsync(ISession template, CancellationToken ct = default); + Task RecreateAsync(ISession sessionTemplate, CancellationToken ct = default); /// /// Recreates a session based on a specified template. /// - /// The ISession object to use as template + /// The ISession object to use as template /// The waiting reverse connection. /// The cancellation token. /// The new session object. - Task RecreateAsync(ISession template, ITransportWaitingConnection connection, CancellationToken ct = default); + Task RecreateAsync(ISession sessionTemplate, ITransportWaitingConnection connection, CancellationToken ct = default); /// /// Recreates a session based on a specified template using the provided channel. /// - /// The Session object to use as template + /// The Session object to use as template /// The channel to use to recreate the session. /// The cancellation token. /// The new session object. - Task RecreateAsync(ISession template, ITransportChannel transportChannel, CancellationToken ct = default); + Task RecreateAsync(ISession sessionTemplate, ITransportChannel transportChannel, CancellationToken ct = default); } } diff --git a/Libraries/Opc.Ua.Client/Session.cs b/Libraries/Opc.Ua.Client/Session.cs index d1522ccc2..89c2cc336 100644 --- a/Libraries/Opc.Ua.Client/Session.cs +++ b/Libraries/Opc.Ua.Client/Session.cs @@ -374,8 +374,7 @@ protected override void Dispose(bool disposing) { if (disposing) { - Utils.SilentDispose(m_keepAliveTimer); - m_keepAliveTimer = null; + StopKeepAliveTimer(); Utils.SilentDispose(m_defaultSubscription); m_defaultSubscription = null; @@ -1395,9 +1394,7 @@ public SessionConfiguration SaveSessionConfiguration(Stream stream = null) /// public void Reconnect() - { - Reconnect(null, null); - } + => Reconnect(null, null); /// public void Reconnect(ITransportWaitingConnection connection) @@ -1415,130 +1412,31 @@ private void Reconnect(ITransportWaitingConnection connection, ITransportChannel bool resetReconnect = false; try { - // check if already connecting. - if (m_reconnecting) - { - Utils.LogWarning("Session is already attempting to reconnect."); - - throw ServiceResultException.Create( - StatusCodes.BadInvalidState, - "Session is already attempting to reconnect."); - } - - Utils.LogInfo("Session RECONNECT starting."); + Utils.LogInfo("Session RECONNECT {0} starting.", SessionId); m_reconnectLock.Wait(); + bool reconnecting = m_reconnecting; m_reconnecting = true; resetReconnect = true; m_reconnectLock.Release(); - lock (SyncRoot) - { - // stop keep alives. - Utils.SilentDispose(m_keepAliveTimer); - m_keepAliveTimer = null; - } - - // create the client signature. - byte[] dataToSign = Utils.Append(m_serverCertificate != null ? m_serverCertificate.RawData : null, m_serverNonce); - EndpointDescription endpoint = m_endpoint.Description; - SignatureData clientSignature = SecurityPolicies.Sign(m_instanceCertificate, endpoint.SecurityPolicyUri, dataToSign); - - // check that the user identity is supported by the endpoint. - UserTokenPolicy identityPolicy = endpoint.FindUserTokenPolicy(m_identity.TokenType, m_identity.IssuedTokenType); - - if (identityPolicy == null) + // check if already connecting. + if (reconnecting) { - Utils.LogError("Reconnect: Endpoint does not support the user identity type provided."); + Utils.LogWarning("Session is already attempting to reconnect."); throw ServiceResultException.Create( - StatusCodes.BadUserAccessDenied, - "Endpoint does not support the user identity type provided."); - } - - // select the security policy for the user token. - string securityPolicyUri = identityPolicy.SecurityPolicyUri; - - if (String.IsNullOrEmpty(securityPolicyUri)) - { - securityPolicyUri = endpoint.SecurityPolicyUri; - } - - // need to refresh the identity (reprompt for password, refresh token). - if (m_RenewUserIdentity != null) - { - m_identity = m_RenewUserIdentity(this, m_identity); + StatusCodes.BadInvalidState, + "Session is already attempting to reconnect."); } - // validate server nonce and security parameters for user identity. - ValidateServerNonce( - m_identity, - m_serverNonce, - securityPolicyUri, - m_previousServerNonce, - m_endpoint.Description.SecurityMode); - - // sign data with user token. - UserIdentityToken identityToken = m_identity.GetIdentityToken(); - identityToken.PolicyId = identityPolicy.PolicyId; - SignatureData userTokenSignature = identityToken.Sign(dataToSign, securityPolicyUri); - - // encrypt token. - identityToken.Encrypt(m_serverCertificate, m_serverNonce, securityPolicyUri); - - // send the software certificates assigned to the client. - SignedSoftwareCertificateCollection clientSoftwareCertificates = GetSoftwareCertificates(); - - Utils.LogInfo("Session REPLACING channel."); + IAsyncResult result = PrepareReconnectBeginActivate( + connection, + transportChannel); - if (connection != null) - { - // check if the channel supports reconnect. - if ((TransportChannel.SupportedFeatures & TransportChannelFeatures.Reconnect) != 0) - { - TransportChannel.Reconnect(connection); - } - else - { - // initialize the channel which will be created with the server. - ITransportChannel channel = SessionChannel.Create( - m_configuration, - connection, - m_endpoint.Description, - m_endpoint.Configuration, - m_instanceCertificate, - m_configuration.SecurityConfiguration.SendCertificateChain ? m_instanceCertificateChain : null, - MessageContext); - - // disposes the existing channel. - TransportChannel = channel; - } - } - else if (transportChannel != null) - { - TransportChannel = transportChannel; - } - else + if (!result.AsyncWaitHandle.WaitOne(kReconnectTimeout / 2)) { - // check if the channel supports reconnect. - if (TransportChannel != null && (TransportChannel.SupportedFeatures & TransportChannelFeatures.Reconnect) != 0) - { - TransportChannel.Reconnect(); - } - else - { - // initialize the channel which will be created with the server. - ITransportChannel channel = SessionChannel.Create( - m_configuration, - m_endpoint.Description, - m_endpoint.Configuration, - m_instanceCertificate, - m_configuration.SecurityConfiguration.SendCertificateChain ? m_instanceCertificateChain : null, - MessageContext); - - // disposes the existing channel. - TransportChannel = channel; - } + Utils.LogWarning("WARNING: ACTIVATE SESSION timed out. {0}/{1}", GoodPublishRequestCount, OutstandingRequestCount); } // reactivate session. @@ -1546,24 +1444,6 @@ private void Reconnect(ITransportWaitingConnection connection, ITransportChannel StatusCodeCollection certificateResults = null; DiagnosticInfoCollection certificateDiagnosticInfos = null; - Utils.LogInfo("Session RE-ACTIVATING session."); - - RequestHeader header = new RequestHeader() { TimeoutHint = kReconnectTimeout }; - IAsyncResult result = BeginActivateSession( - header, - clientSignature, - null, - m_preferredLocales, - new ExtensionObject(identityToken), - userTokenSignature, - null, - null); - - if (!result.AsyncWaitHandle.WaitOne(kReconnectTimeout / 2)) - { - Utils.LogWarning("WARNING: ACTIVATE SESSION timed out. {0}/{1}", GoodPublishRequestCount, OutstandingRequestCount); - } - EndActivateSession( result, out serverNonce, @@ -1572,9 +1452,10 @@ private void Reconnect(ITransportWaitingConnection connection, ITransportChannel int publishCount = 0; + Utils.LogInfo("Session RECONNECT {0} completed successfully.", SessionId); + lock (SyncRoot) { - Utils.LogInfo("Session RECONNECT completed successfully."); m_previousServerNonce = m_serverNonce; m_serverNonce = serverNonce; publishCount = GetMinPublishRequestCount(true); @@ -3074,7 +2955,7 @@ public override bool Equals(object obj) if (obj is ISession session) { if (!m_endpoint.Equals(session.Endpoint)) return false; - if (!m_sessionName.Equals(session.SessionName)) return false; + if (!m_sessionName.Equals(session.SessionName, StringComparison.Ordinal)) return false; if (!SessionId.Equals(session.SessionId)) return false; return true; @@ -3128,8 +3009,7 @@ public virtual StatusCode Close(int timeout, bool closeChannel) StatusCode result = StatusCodes.Good; // stop the keep alive timer. - Utils.SilentDispose(m_keepAliveTimer); - m_keepAliveTimer = null; + StopKeepAliveTimer(); // check if currectly connected. bool connected = Connected; @@ -3864,8 +3744,7 @@ private void StartKeepAliveTimer() // restart the publish timer. lock (SyncRoot) { - Utils.SilentDispose(m_keepAliveTimer); - m_keepAliveTimer = null; + StopKeepAliveTimer(); // start timer. m_keepAliveTimer = new Timer(OnKeepAlive, nodesToRead, keepAliveInterval, keepAliveInterval); @@ -3875,6 +3754,15 @@ private void StartKeepAliveTimer() OnKeepAlive(nodesToRead); } + /// + /// Stops the keep alive timer. + /// + private void StopKeepAliveTimer() + { + Utils.SilentDispose(m_keepAliveTimer); + m_keepAliveTimer = null; + } + /// /// Removes a completed async request. /// @@ -5176,6 +5064,8 @@ private void OnPublishComplete(IAsyncResult result) case StatusCodes.BadSessionIdInvalid: case StatusCodes.BadSecureChannelIdInvalid: case StatusCodes.BadSecureChannelClosed: + case StatusCodes.BadSecurityChecksFailed: + case StatusCodes.BadCertificateInvalid: case StatusCodes.BadServerHalted: return; @@ -5241,59 +5131,7 @@ public bool Republish(uint subscriptionId, uint sequenceNumber) } catch (Exception e) { - ServiceResult error = new ServiceResult(e); - - bool result = true; - switch (error.StatusCode.Code) - { - case StatusCodes.BadMessageNotAvailable: - Utils.LogWarning("Message {0}-{1} no longer available.", subscriptionId, sequenceNumber); - break; - // if encoding limits are exceeded, the issue is logged and - // the published data is acknowledged to prevent the endless republish loop. - case StatusCodes.BadEncodingLimitsExceeded: - Utils.LogError(e, "Message {0}-{1} exceeded size limits, ignored.", subscriptionId, sequenceNumber); - var ack = new SubscriptionAcknowledgement { - SubscriptionId = subscriptionId, - SequenceNumber = sequenceNumber - }; - lock (SyncRoot) - { - m_acknowledgementsToSend.Add(ack); - } - break; - default: - result = false; - Utils.LogError(e, "Unexpected error sending republish request."); - break; - } - - PublishErrorEventHandler callback = null; - - lock (m_eventLock) - { - callback = m_PublishError; - } - - // raise an error event. - if (callback != null) - { - try - { - PublishErrorEventArgs args = new PublishErrorEventArgs( - error, - subscriptionId, - sequenceNumber); - - callback(this, args); - } - catch (Exception e2) - { - Utils.LogError(e2, "Session: Unexpected error invoking PublishErrorCallback."); - } - } - - return result; + return ProcessRepublishResponseError(e, subscriptionId, sequenceNumber); } } @@ -5611,6 +5449,204 @@ private void ValidateServerEndpoints(EndpointDescriptionCollection serverEndpoin } } + /// + /// Helper to prepare the reconnect channel + /// and signature data before activate. + /// + private IAsyncResult PrepareReconnectBeginActivate( + ITransportWaitingConnection connection, + ITransportChannel transportChannel + ) + { + Utils.LogInfo("Session RECONNECT {0} starting.", SessionId); + + lock (SyncRoot) + { + // stop keep alives. + StopKeepAliveTimer(); + } + + // create the client signature. + byte[] dataToSign = Utils.Append(m_serverCertificate != null ? m_serverCertificate.RawData : null, m_serverNonce); + EndpointDescription endpoint = m_endpoint.Description; + SignatureData clientSignature = SecurityPolicies.Sign(m_instanceCertificate, endpoint.SecurityPolicyUri, dataToSign); + + // check that the user identity is supported by the endpoint. + UserTokenPolicy identityPolicy = endpoint.FindUserTokenPolicy(m_identity.TokenType, m_identity.IssuedTokenType); + + if (identityPolicy == null) + { + Utils.LogError("Reconnect: Endpoint does not support the user identity type provided."); + + throw ServiceResultException.Create( + StatusCodes.BadUserAccessDenied, + "Endpoint does not support the user identity type provided."); + } + + // select the security policy for the user token. + string securityPolicyUri = identityPolicy.SecurityPolicyUri; + + if (String.IsNullOrEmpty(securityPolicyUri)) + { + securityPolicyUri = endpoint.SecurityPolicyUri; + } + + // need to refresh the identity (reprompt for password, refresh token). + if (m_RenewUserIdentity != null) + { + m_identity = m_RenewUserIdentity(this, m_identity); + } + + // validate server nonce and security parameters for user identity. + ValidateServerNonce( + m_identity, + m_serverNonce, + securityPolicyUri, + m_previousServerNonce, + m_endpoint.Description.SecurityMode); + + // sign data with user token. + UserIdentityToken identityToken = m_identity.GetIdentityToken(); + identityToken.PolicyId = identityPolicy.PolicyId; + SignatureData userTokenSignature = identityToken.Sign(dataToSign, securityPolicyUri); + + // encrypt token. + identityToken.Encrypt(m_serverCertificate, m_serverNonce, securityPolicyUri); + + // send the software certificates assigned to the client. + SignedSoftwareCertificateCollection clientSoftwareCertificates = GetSoftwareCertificates(); + + Utils.LogInfo("Session REPLACING channel for {0}.", SessionId); + + if (connection != null) + { + // check if the channel supports reconnect. + if ((TransportChannel.SupportedFeatures & TransportChannelFeatures.Reconnect) != 0) + { + TransportChannel.Reconnect(connection); + } + else + { + // initialize the channel which will be created with the server. + ITransportChannel channel = SessionChannel.Create( + m_configuration, + connection, + m_endpoint.Description, + m_endpoint.Configuration, + m_instanceCertificate, + m_configuration.SecurityConfiguration.SendCertificateChain ? m_instanceCertificateChain : null, + MessageContext); + + // disposes the existing channel. + TransportChannel = channel; + } + } + else if (transportChannel != null) + { + TransportChannel = transportChannel; + } + else + { + // check if the channel supports reconnect. + if (TransportChannel != null && (TransportChannel.SupportedFeatures & TransportChannelFeatures.Reconnect) != 0) + { + TransportChannel.Reconnect(); + } + else + { + // initialize the channel which will be created with the server. + ITransportChannel channel = SessionChannel.Create( + m_configuration, + m_endpoint.Description, + m_endpoint.Configuration, + m_instanceCertificate, + m_configuration.SecurityConfiguration.SendCertificateChain ? m_instanceCertificateChain : null, + MessageContext); + + // disposes the existing channel. + TransportChannel = channel; + } + } + + Utils.LogInfo("Session RE-ACTIVATING {0}.", SessionId); + + RequestHeader header = new RequestHeader() { TimeoutHint = kReconnectTimeout }; + return BeginActivateSession( + header, + clientSignature, + null, + m_preferredLocales, + new ExtensionObject(identityToken), + userTokenSignature, + null, + null); + } + + /// + /// Process Republish error response. + /// + /// The exception that occurred during the republish operation. + /// The subscription Id for which the republish was requested. + /// The sequencenumber for which the republish was requested. + private bool ProcessRepublishResponseError(Exception e, uint subscriptionId, uint sequenceNumber) + { + + ServiceResult error = new ServiceResult(e); + + bool result = true; + switch (error.StatusCode.Code) + { + case StatusCodes.BadMessageNotAvailable: + Utils.LogWarning("Message {0}-{1} no longer available.", subscriptionId, sequenceNumber); + break; + + // if encoding limits are exceeded, the issue is logged and + // the published data is acknowledged to prevent the endless republish loop. + case StatusCodes.BadEncodingLimitsExceeded: + Utils.LogError(e, "Message {0}-{1} exceeded size limits, ignored.", subscriptionId, sequenceNumber); + var ack = new SubscriptionAcknowledgement { + SubscriptionId = subscriptionId, + SequenceNumber = sequenceNumber + }; + lock (SyncRoot) + { + m_acknowledgementsToSend.Add(ack); + } + break; + default: + result = false; + Utils.LogError(e, "Unexpected error sending republish request."); + break; + } + + PublishErrorEventHandler callback = null; + + lock (m_eventLock) + { + callback = m_PublishError; + } + + // raise an error event. + if (callback != null) + { + try + { + PublishErrorEventArgs args = new PublishErrorEventArgs( + error, + subscriptionId, + sequenceNumber); + + callback(this, args); + } + catch (Exception e2) + { + Utils.LogError(e2, "Session: Unexpected error invoking PublishErrorCallback."); + } + } + + return result; + } + /// /// Handles the validation of server software certificates and application callback. /// diff --git a/Libraries/Opc.Ua.Client/SessionAsync.cs b/Libraries/Opc.Ua.Client/SessionAsync.cs index 6d6aaeb17..583cb219e 100644 --- a/Libraries/Opc.Ua.Client/SessionAsync.cs +++ b/Libraries/Opc.Ua.Client/SessionAsync.cs @@ -36,6 +36,7 @@ using System.Security.Cryptography.X509Certificates; using System.Threading; using System.Threading.Tasks; +using Opc.Ua.Bindings; namespace Opc.Ua.Client { @@ -1236,44 +1237,44 @@ IList browseNextErrors /// /// Recreates a session based on a specified template. /// - /// The Session object to use as template + /// The Session object to use as template /// /// The new session object. - public static async Task RecreateAsync(Session template, CancellationToken ct = default) + public static async Task RecreateAsync(Session sessionTemplate, CancellationToken ct = default) { - ServiceMessageContext messageContext = template.m_configuration.CreateMessageContext(); - messageContext.Factory = template.Factory; + ServiceMessageContext messageContext = sessionTemplate.m_configuration.CreateMessageContext(); + messageContext.Factory = sessionTemplate.Factory; // create the channel object used to connect to the server. ITransportChannel channel = SessionChannel.Create( - template.m_configuration, - template.ConfiguredEndpoint.Description, - template.ConfiguredEndpoint.Configuration, - template.m_instanceCertificate, - template.m_configuration.SecurityConfiguration.SendCertificateChain ? - template.m_instanceCertificateChain : null, + sessionTemplate.m_configuration, + sessionTemplate.ConfiguredEndpoint.Description, + sessionTemplate.ConfiguredEndpoint.Configuration, + sessionTemplate.m_instanceCertificate, + sessionTemplate.m_configuration.SecurityConfiguration.SendCertificateChain ? + sessionTemplate.m_instanceCertificateChain : null, messageContext); // create the session object. - Session session = template.CloneSession(channel, true); + Session session = sessionTemplate.CloneSession(channel, true); try { // open the session. await session.OpenAsync( - template.SessionName, - (uint)template.SessionTimeout, - template.Identity, - template.PreferredLocales, - template.m_checkDomain, + sessionTemplate.SessionName, + (uint)sessionTemplate.SessionTimeout, + sessionTemplate.Identity, + sessionTemplate.PreferredLocales, + sessionTemplate.m_checkDomain, ct).ConfigureAwait(false); - await session.RecreateSubscriptionsAsync(template.Subscriptions, ct).ConfigureAwait(false); + await session.RecreateSubscriptionsAsync(sessionTemplate.Subscriptions, ct).ConfigureAwait(false); } catch (Exception e) { session.Dispose(); - throw ServiceResultException.Create(StatusCodes.BadCommunicationError, e, "Could not recreate session. {0}", template.SessionName); + throw ServiceResultException.Create(StatusCodes.BadCommunicationError, e, "Could not recreate session. {0}", sessionTemplate.SessionName); } return session; @@ -1282,46 +1283,46 @@ await session.OpenAsync( /// /// Recreates a session based on a specified template. /// - /// The Session object to use as template + /// The Session object to use as template /// The waiting reverse connection. /// /// The new session object. - public static async Task RecreateAsync(Session template, ITransportWaitingConnection connection, CancellationToken ct = default) + public static async Task RecreateAsync(Session sessionTemplate, ITransportWaitingConnection connection, CancellationToken ct = default) { - ServiceMessageContext messageContext = template.m_configuration.CreateMessageContext(); - messageContext.Factory = template.Factory; + ServiceMessageContext messageContext = sessionTemplate.m_configuration.CreateMessageContext(); + messageContext.Factory = sessionTemplate.Factory; // create the channel object used to connect to the server. ITransportChannel channel = SessionChannel.Create( - template.m_configuration, + sessionTemplate.m_configuration, connection, - template.m_endpoint.Description, - template.m_endpoint.Configuration, - template.m_instanceCertificate, - template.m_configuration.SecurityConfiguration.SendCertificateChain ? - template.m_instanceCertificateChain : null, + sessionTemplate.m_endpoint.Description, + sessionTemplate.m_endpoint.Configuration, + sessionTemplate.m_instanceCertificate, + sessionTemplate.m_configuration.SecurityConfiguration.SendCertificateChain ? + sessionTemplate.m_instanceCertificateChain : null, messageContext); // create the session object. - Session session = template.CloneSession(channel, true); + Session session = sessionTemplate.CloneSession(channel, true); try { // open the session. await session.OpenAsync( - template.m_sessionName, - (uint)template.m_sessionTimeout, - template.m_identity, - template.m_preferredLocales, - template.m_checkDomain, + sessionTemplate.m_sessionName, + (uint)sessionTemplate.m_sessionTimeout, + sessionTemplate.m_identity, + sessionTemplate.m_preferredLocales, + sessionTemplate.m_checkDomain, ct).ConfigureAwait(false); - await session.RecreateSubscriptionsAsync(template.Subscriptions, ct).ConfigureAwait(false); + await session.RecreateSubscriptionsAsync(sessionTemplate.Subscriptions, ct).ConfigureAwait(false); } catch (Exception e) { session.Dispose(); - throw ServiceResultException.Create(StatusCodes.BadCommunicationError, e, "Could not recreate session. {0}", template.m_sessionName); + throw ServiceResultException.Create(StatusCodes.BadCommunicationError, e, "Could not recreate session. {0}", sessionTemplate.m_sessionName); } return session; @@ -1330,27 +1331,27 @@ await session.OpenAsync( /// /// Recreates a session based on a specified template using the provided channel. /// - /// The Session object to use as template + /// The Session object to use as template /// The waiting reverse connection. /// /// The new session object. - public static async Task RecreateAsync(Session template, ITransportChannel transportChannel, CancellationToken ct = default) + public static async Task RecreateAsync(Session sessionTemplate, ITransportChannel transportChannel, CancellationToken ct = default) { - ServiceMessageContext messageContext = template.m_configuration.CreateMessageContext(); - messageContext.Factory = template.Factory; + ServiceMessageContext messageContext = sessionTemplate.m_configuration.CreateMessageContext(); + messageContext.Factory = sessionTemplate.Factory; // create the session object. - Session session = template.CloneSession(transportChannel, true); + Session session = sessionTemplate.CloneSession(transportChannel, true); try { // open the session. await session.OpenAsync( - template.m_sessionName, - (uint)template.m_sessionTimeout, - template.m_identity, - template.m_preferredLocales, - template.m_checkDomain, + sessionTemplate.m_sessionName, + (uint)sessionTemplate.m_sessionTimeout, + sessionTemplate.m_identity, + sessionTemplate.m_preferredLocales, + sessionTemplate.m_checkDomain, ct).ConfigureAwait(false); // create the subscriptions. @@ -1362,7 +1363,7 @@ await session.OpenAsync( catch (Exception e) { session.Dispose(); - throw ServiceResultException.Create(StatusCodes.BadCommunicationError, e, "Could not recreate session. {0}", template.m_sessionName); + throw ServiceResultException.Create(StatusCodes.BadCommunicationError, e, "Could not recreate session. {0}", sessionTemplate.m_sessionName); } return session; @@ -1471,6 +1472,147 @@ public virtual async Task CloseAsync(int timeout, bool closeChannel, } #endregion + #region Reconnect Async Methods + /// + public Task ReconnectAsync(CancellationToken ct) + => ReconnectAsync(null, null, ct); + + /// + public Task ReconnectAsync(ITransportWaitingConnection connection, CancellationToken ct) + => ReconnectAsync(connection, null, ct); + + /// + public Task ReconnectAsync(ITransportChannel channel, CancellationToken ct) + => ReconnectAsync(null, channel, ct); + + /// + /// Reconnects to the server after a network failure using a waiting connection. + /// + private async Task ReconnectAsync(ITransportWaitingConnection connection, ITransportChannel transportChannel, CancellationToken ct) + { + bool resetReconnect = false; + try + { + await m_reconnectLock.WaitAsync(ct).ConfigureAwait(false); + bool reconnecting = m_reconnecting; + m_reconnecting = true; + resetReconnect = true; + m_reconnectLock.Release(); + + // check if already connecting. + if (reconnecting) + { + Utils.LogWarning("Session is already attempting to reconnect."); + + throw ServiceResultException.Create( + StatusCodes.BadInvalidState, + "Session is already attempting to reconnect."); + } + + IAsyncResult result = PrepareReconnectBeginActivate( + connection, + transportChannel); + + if (!(result is ChannelAsyncOperation operation)) throw new ArgumentNullException(nameof(result)); + + try + { + _ = await operation.EndAsync(kReconnectTimeout / 2, true, ct).ConfigureAwait(false); + } + catch (ServiceResultException) + { + Utils.LogWarning("WARNING: ACTIVATE SESSION {0} timed out. {1}/{2}", SessionId, GoodPublishRequestCount, OutstandingRequestCount); + } + + // reactivate session. + byte[] serverNonce = null; + StatusCodeCollection certificateResults = null; + DiagnosticInfoCollection certificateDiagnosticInfos = null; + + EndActivateSession( + result, + out serverNonce, + out certificateResults, + out certificateDiagnosticInfos); + + int publishCount = 0; + + Utils.LogInfo("Session RECONNECT {0} completed successfully.", SessionId); + + lock (SyncRoot) + { + m_previousServerNonce = m_serverNonce; + m_serverNonce = serverNonce; + publishCount = GetMinPublishRequestCount(true); + } + + await m_reconnectLock.WaitAsync(ct).ConfigureAwait(false); + m_reconnecting = false; + resetReconnect = false; + m_reconnectLock.Release(); + + // refill pipeline. + for (int ii = 0; ii < publishCount; ii++) + { + BeginPublish(OperationTimeout); + } + + StartKeepAliveTimer(); + + IndicateSessionConfigurationChanged(); + } + finally + { + if (resetReconnect) + { + await m_reconnectLock.WaitAsync(ct).ConfigureAwait(false); + m_reconnecting = false; + m_reconnectLock.Release(); + } + } + } + + /// + public async Task RepublishAsync(uint subscriptionId, uint sequenceNumber, CancellationToken ct) + { + // send republish request. + RequestHeader requestHeader = new RequestHeader { + TimeoutHint = (uint)OperationTimeout, + ReturnDiagnostics = (uint)(int)ReturnDiagnostics, + RequestHandle = Utils.IncrementIdentifier(ref m_publishCounter) + }; + + try + { + Utils.LogInfo("Requesting RepublishAsync for {0}-{1}", subscriptionId, sequenceNumber); + + // request republish. + RepublishResponse response = await RepublishAsync( + requestHeader, + subscriptionId, + sequenceNumber, + ct).ConfigureAwait(false); + ResponseHeader responseHeader = response.ResponseHeader; + NotificationMessage notificationMessage = response.NotificationMessage; + + Utils.LogInfo("Received RepublishAsync for {0}-{1}-{2}", subscriptionId, sequenceNumber, responseHeader.ServiceResult); + + // process response. + ProcessPublishResponse( + responseHeader, + subscriptionId, + null, + false, + notificationMessage); + + return true; + } + catch (Exception e) + { + return ProcessRepublishResponseError(e, subscriptionId, sequenceNumber); + } + } + /// /// Recreate the subscriptions in a reconnected session. /// Uses Transfer service if is set to true. @@ -1516,6 +1658,7 @@ private async Task RecreateSubscriptionsAsync(IEnumerable subscrip } } } + #endregion } } #endif diff --git a/Libraries/Opc.Ua.Client/SessionReconnectHandler.cs b/Libraries/Opc.Ua.Client/SessionReconnectHandler.cs index 224cd34cc..e66204940 100644 --- a/Libraries/Opc.Ua.Client/SessionReconnectHandler.cs +++ b/Libraries/Opc.Ua.Client/SessionReconnectHandler.cs @@ -384,11 +384,11 @@ private async Task DoReconnectAsync() m_session.Endpoint.Server.ApplicationUri ).ConfigureAwait(false); - m_session.Reconnect(connection); + await m_session.ReconnectAsync(connection).ConfigureAwait(false); } else { - m_session.Reconnect(); + await m_session.ReconnectAsync().ConfigureAwait(false); } // monitored items should start updating on their own. @@ -418,12 +418,15 @@ private async Task DoReconnectAsync() } // check if the security configuration may have changed - if (sre.StatusCode == StatusCodes.BadSecurityChecksFailed) + if (sre.StatusCode == StatusCodes.BadSecurityChecksFailed || + sre.StatusCode == StatusCodes.BadCertificateInvalid) { m_updateFromServer = true; Utils.LogInfo("Reconnect failed due to security check. Request endpoint update from server. {0}", sre.Message); } - else + // wait for next scheduled reconnect if connection failed, + // otherwise recreate session immediately + else if (sre.StatusCode != StatusCodes.BadSessionIdInvalid) { // next attempt is to recreate session m_reconnectFailed = true; @@ -486,13 +489,15 @@ await endpoint.UpdateFromServerAsync( session = await m_session.SessionFactory.RecreateAsync(m_session).ConfigureAwait(false); } - m_session.Close(); + // note: the template session is not connected at this point + // and must be disposed by the owner m_session = session; return true; } catch (ServiceResultException sre) { - if (sre.InnerResult?.StatusCode == StatusCodes.BadSecurityChecksFailed) + if (sre.InnerResult?.StatusCode == StatusCodes.BadSecurityChecksFailed || + sre.InnerResult?.StatusCode == StatusCodes.BadCertificateInvalid) { // schedule endpoint update and retry m_updateFromServer = true; diff --git a/Libraries/Opc.Ua.Client/Subscription.cs b/Libraries/Opc.Ua.Client/Subscription.cs index 7a7f51d28..a8a3dde8c 100644 --- a/Libraries/Opc.Ua.Client/Subscription.cs +++ b/Libraries/Opc.Ua.Client/Subscription.cs @@ -804,9 +804,10 @@ public bool PublishingStopped { lock (m_cache) { - int keepAliveInterval = (int)(Math.Min(m_currentPublishingInterval * m_currentKeepAliveCount, Int32.MaxValue - 500)); + int keepAliveInterval = (int)(Math.Min(m_currentPublishingInterval * (m_currentKeepAliveCount + 1), Int32.MaxValue - 500)); + TimeSpan timeSinceLastNotification = DateTime.UtcNow - m_lastNotificationTime; - if (m_lastNotificationTime.AddMilliseconds(keepAliveInterval + 500) < DateTime.UtcNow) + if (timeSinceLastNotification.TotalMilliseconds > keepAliveInterval + 500) { return true; } @@ -1893,7 +1894,8 @@ private async Task PublishResponseMessageWorkerAsync() Utils.LogTrace("SubscriptionId {0} - Publish Thread {1:X8} Exited Normally.", m_id, Environment.CurrentManagedThreadId); break; } - OnMessageReceived(); + + await OnMessageReceivedAsync(CancellationToken.None).ConfigureAwait(false); } while (true); } @@ -2091,7 +2093,7 @@ private void AdjustCounts(ref uint keepAliveCount, ref uint lifetimeCount) /// /// Processes the incoming messages. /// - private void OnMessageReceived() + private async Task OnMessageReceivedAsync(CancellationToken ct) { try { @@ -2310,7 +2312,7 @@ private void OnMessageReceived() { for (int ii = 0; ii < messagesToRepublish.Count; ii++) { - if (!session.Republish(subscriptionId, messagesToRepublish[ii].SequenceNumber)) + if (!await session.RepublishAsync(subscriptionId, messagesToRepublish[ii].SequenceNumber, ct).ConfigureAwait(false)) { messagesToRepublish[ii].Republished = false; } diff --git a/Libraries/Opc.Ua.Client/TraceableSession.cs b/Libraries/Opc.Ua.Client/TraceableSession.cs index e94563281..0d2db4c1e 100644 --- a/Libraries/Opc.Ua.Client/TraceableSession.cs +++ b/Libraries/Opc.Ua.Client/TraceableSession.cs @@ -316,6 +316,33 @@ public void Reconnect(ITransportChannel channel) } } + /// + public async Task ReconnectAsync(CancellationToken ct = default) + { + using (Activity activity = ActivitySource.StartActivity(nameof(ReconnectAsync))) + { + await m_session.ReconnectAsync(ct).ConfigureAwait(false); + } + } + + /// + public async Task ReconnectAsync(ITransportWaitingConnection connection, CancellationToken ct = default) + { + using (Activity activity = ActivitySource.StartActivity(nameof(ReconnectAsync))) + { + await m_session.ReconnectAsync(connection, ct).ConfigureAwait(false); + } + } + + /// + public async Task ReconnectAsync(ITransportChannel channel, CancellationToken ct = default) + { + using (Activity activity = ActivitySource.StartActivity(nameof(ReconnectAsync))) + { + await m_session.ReconnectAsync(channel, ct).ConfigureAwait(false); + } + } + /// public void Save(string filePath, IEnumerable knownTypes = null) { @@ -905,6 +932,15 @@ public bool Republish(uint subscriptionId, uint sequenceNumber) } } + /// + public async Task RepublishAsync(uint subscriptionId, uint sequenceNumber, CancellationToken ct = default) + { + using (Activity activity = ActivitySource.StartActivity(nameof(RepublishAsync))) + { + return await m_session.RepublishAsync(subscriptionId, sequenceNumber, ct).ConfigureAwait(false); + } + } + /// public ResponseHeader CreateSession(RequestHeader requestHeader, ApplicationDescription clientDescription, string serverUri, string endpointUrl, string sessionName, byte[] clientNonce, byte[] clientCertificate, double requestedSessionTimeout, uint maxResponseMessageSize, out NodeId sessionId, out NodeId authenticationToken, out double revisedSessionTimeout, out byte[] serverNonce, out byte[] serverCertificate, out EndpointDescriptionCollection serverEndpoints, out SignedSoftwareCertificateCollection serverSoftwareCertificates, out SignatureData serverSignature, out uint maxRequestMessageSize) { diff --git a/Libraries/Opc.Ua.Server/Subscription/SubscriptionManager.cs b/Libraries/Opc.Ua.Server/Subscription/SubscriptionManager.cs index 5933c7995..0456e061e 100644 --- a/Libraries/Opc.Ua.Server/Subscription/SubscriptionManager.cs +++ b/Libraries/Opc.Ua.Server/Subscription/SubscriptionManager.cs @@ -66,6 +66,7 @@ public SubscriptionManager( m_subscriptions = new Dictionary(); m_publishQueues = new Dictionary(); m_statusMessages = new Dictionary>(); + m_lastSubscriptionId = BitConverter.ToInt64(Utils.Nonce.CreateNonce(sizeof(long)), 0); // create a event to signal shutdown. m_shutdownEvent = new ManualResetEvent(true); diff --git a/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryClientChannel.cs b/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryClientChannel.cs index 5b2412ce2..90159659b 100644 --- a/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryClientChannel.cs +++ b/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryClientChannel.cs @@ -224,7 +224,7 @@ public async Task CloseAsync(int timeout, CancellationToken ct = default) { try { - await operation.EndAsync(timeout, false, ct).ConfigureAwait(false); + await operation.EndAsync(timeout, true, ct).ConfigureAwait(false); } catch (ServiceResultException e) { @@ -294,47 +294,6 @@ public void Close(int timeout) Shutdown(StatusCodes.BadConnectionClosed); } - /// - /// Closes a connection with the server (async). - /// - public async Task CloseAsync(int timeout) - { - WriteOperation operation = InternalClose(timeout); - - // wait for the close to succeed. - if (operation != null) - { - try - { - await operation.EndAsync(timeout, false).ConfigureAwait(false); - } - catch (ServiceResultException e) - { - switch (e.StatusCode) - { - case StatusCodes.BadRequestInterrupted: - case StatusCodes.BadSecureChannelClosed: - { - break; - } - - default: - { - Utils.LogWarning(e, "ChannelId {0}: Could not gracefully close the channel. Reason={1}", ChannelId, e.Result.StatusCode); - break; - } - } - } - catch (Exception e) - { - Utils.LogError(e, "ChannelId {0}: Could not gracefully close the channel.", ChannelId); - } - } - - // shutdown. - Shutdown(StatusCodes.BadConnectionClosed); - } - /// /// Sends a request to the server. /// diff --git a/Tests/Opc.Ua.Client.Tests/NodeCacheAsyncTest.cs b/Tests/Opc.Ua.Client.Tests/NodeCacheAsyncTest.cs index 4a4c16685..bfe0dc13d 100644 --- a/Tests/Opc.Ua.Client.Tests/NodeCacheAsyncTest.cs +++ b/Tests/Opc.Ua.Client.Tests/NodeCacheAsyncTest.cs @@ -31,7 +31,6 @@ using System.Collections.Generic; using System.Linq; using System.Reflection; -using System.Runtime.CompilerServices; using System.Threading.Tasks; using BenchmarkDotNet.Attributes; using NUnit.Framework; diff --git a/Tests/Opc.Ua.Client.Tests/SubscriptionTest.cs b/Tests/Opc.Ua.Client.Tests/SubscriptionTest.cs index 2355baf91..46d0b87bf 100644 --- a/Tests/Opc.Ua.Client.Tests/SubscriptionTest.cs +++ b/Tests/Opc.Ua.Client.Tests/SubscriptionTest.cs @@ -589,7 +589,14 @@ public async Task ReconnectWithSavedSessionSecrets( }; // activate the session from saved sesson secrets on the new channel - session2.Reconnect(channel2); + if (asyncTest) + { + await session2.ReconnectAsync(channel2).ConfigureAwait(false); + } + else + { + session2.Reconnect(channel2); + } // reactivate restored subscriptions if (asyncTest) @@ -621,22 +628,23 @@ public async Task ReconnectWithSavedSessionSecrets( for (ii = 0; ii < kTestSubscriptions; ii++) { var monitoredItemCount = restoredSubscriptions[ii].MonitoredItemCount; + string errorText = $"Error in test subscription {ii}"; // the static subscription doesn't resend data until there is a data change if (ii == 0 && !sendInitialValues) { - Assert.AreEqual(0, targetSubscriptionCounters[ii]); - Assert.AreEqual(0, targetSubscriptionFastDataCounters[ii]); + Assert.AreEqual(0, targetSubscriptionCounters[ii], errorText); + Assert.AreEqual(0, targetSubscriptionFastDataCounters[ii], errorText); } else if (ii == 0) { - Assert.AreEqual(10, targetSubscriptionCounters[ii]); - Assert.AreEqual(1, targetSubscriptionFastDataCounters[ii]); + Assert.AreEqual(10, targetSubscriptionCounters[ii], errorText); + Assert.AreEqual(1, targetSubscriptionFastDataCounters[ii], errorText); } else { - Assert.LessOrEqual(monitoredItemCount, targetSubscriptionCounters[ii]); - Assert.LessOrEqual(1, targetSubscriptionFastDataCounters[ii]); + Assert.LessOrEqual(monitoredItemCount, targetSubscriptionCounters[ii], errorText); + Assert.LessOrEqual(1, targetSubscriptionFastDataCounters[ii], errorText); } }