From e89b06e57b135a2182441b83f9bb6261616293ef Mon Sep 17 00:00:00 2001 From: Martin Regen Date: Wed, 14 Jun 2023 09:47:21 +0200 Subject: [PATCH] Add metadata to fast data and event callbacks notification. (#2176) - if `DisableMonitoredItemCache` is set, the sequencenumber and publish time are unavailable. Add helpers to NotificationData to get the information in the fast data and event callbacks. - new fast callback for keep alive messages --- .../ConsoleReferenceClient/ClientSamples.cs | 22 +++++- Libraries/Opc.Ua.Client/Subscription.cs | 68 ++++++++++++++++--- .../Types/BuiltIn/NotificationData.cs | 41 +++++++++++ 3 files changed, 122 insertions(+), 9 deletions(-) create mode 100644 Stack/Opc.Ua.Core/Types/BuiltIn/NotificationData.cs diff --git a/Applications/ConsoleReferenceClient/ClientSamples.cs b/Applications/ConsoleReferenceClient/ClientSamples.cs index 67a9e3413..f9ad77ad8 100644 --- a/Applications/ConsoleReferenceClient/ClientSamples.cs +++ b/Applications/ConsoleReferenceClient/ClientSamples.cs @@ -878,9 +878,11 @@ public async Task SubscribeAllValuesAsync( KeepAliveCount = keepAliveCount, SequentialPublishing = true, RepublishAfterTransfer = true, + DisableMonitoredItemCache = true, MaxNotificationsPerPublish = 1000, MinLifetimeInterval = (uint)session.SessionTimeout, FastDataChangeCallback = FastDataChangeNotification, + FastKeepAliveCallback = FastKeepAliveNotification, }; session.AddSubscription(subscription); @@ -959,6 +961,22 @@ public static string FormatValueAsJson( #endregion #region Private Methods + /// + /// The fast keep alive notification callback. + /// + private void FastKeepAliveNotification(Subscription subscription, NotificationData notification) + { + try + { + m_output.WriteLine("Keep Alive : Id={0} PublishTime={1} SequenceNumber={2}.", + subscription.Id, notification.PublishTime, notification.SequenceNumber); + } + catch (Exception ex) + { + m_output.WriteLine("FastKeepAliveNotification error: {0}", ex.Message); + } + } + /// /// The fast data change notification callback. /// @@ -966,7 +984,9 @@ private void FastDataChangeNotification(Subscription subscription, DataChangeNot { try { - m_output.WriteLine("Notification: Id={0} Items={1}.", subscription.Id, notification.MonitoredItems.Count); + m_output.WriteLine("Notification: Id={0} PublishTime={1} SequenceNumber={2} Items={3}.", + subscription.Id, notification.PublishTime, + notification.SequenceNumber, notification.MonitoredItems.Count); } catch (Exception ex) { diff --git a/Libraries/Opc.Ua.Client/Subscription.cs b/Libraries/Opc.Ua.Client/Subscription.cs index d0f8a48b3..989d6bbb1 100644 --- a/Libraries/Opc.Ua.Client/Subscription.cs +++ b/Libraries/Opc.Ua.Client/Subscription.cs @@ -114,6 +114,7 @@ public Subscription(Subscription template, bool copyEventHandlers) m_publishStatusChanged = template.m_publishStatusChanged; m_fastDataChangeCallback = template.m_fastDataChangeCallback; m_fastEventCallback = template.m_fastEventCallback; + m_fastKeepAliveCallback = template.m_fastKeepAliveCallback; } // copy the list of monitored items. @@ -454,6 +455,19 @@ public FastEventNotificationEventHandler FastEventCallback set => m_fastEventCallback = value; } + /// + /// Gets or sets the fast keep alive callback. + /// + /// The keep alive change callback. + /// + /// Only one callback is allowed at a time but it is more efficient to call than an event. + /// + public FastKeepAliveNotificationEventHandler FastKeepAliveCallback + { + get => m_fastKeepAliveCallback; + set => m_fastKeepAliveCallback = value; + } + /// /// The items to monitor. /// @@ -1903,6 +1917,9 @@ private async Task OnMessageReceived() // get list of new messages to process. List messagesToProcess = null; + // get list of new messages to process. + List keepAliveToProcess = null; + // get list of new messages to republish. List messagesToRepublish = null; @@ -1910,8 +1927,18 @@ private async Task OnMessageReceived() { for (LinkedListNode ii = m_incomingMessages.First; ii != null; ii = ii.Next) { + // process keep alive messages + if (ii.Value.Message == null && !ii.Value.Processed) + { + if (keepAliveToProcess == null) + { + keepAliveToProcess = new List(); + } + keepAliveToProcess.Add(ii.Value); + } + // update monitored items with unprocessed messages. - if (ii.Value.Message != null && !ii.Value.Processed && + else if (ii.Value.Message != null && !ii.Value.Processed && // If sequential publishing is enabled, only release messages in perfect sequence. (!m_sequentialPublishing || ii.Value.SequenceNumber <= m_lastSequenceNumberProcessed + 1)) { @@ -1971,25 +1998,41 @@ private async Task OnMessageReceived() } } + // process new keep alive messages. + FastKeepAliveNotificationEventHandler keepAliveCallback = m_fastKeepAliveCallback; + if (keepAliveToProcess != null && keepAliveCallback != null) + { + foreach (IncomingMessage message in keepAliveToProcess) + { + var keepAlive = new NotificationData { + PublishTime = message.Timestamp, + SequenceNumber = message.SequenceNumber + }; + keepAliveCallback(this, keepAlive); + } + } + // process new messages. if (messagesToProcess != null) { + int noNotificationsReceived; FastDataChangeNotificationEventHandler datachangeCallback = m_fastDataChangeCallback; FastEventNotificationEventHandler eventCallback = m_fastEventCallback; - int noNotificationsReceived = 0; - for (int ii = 0; ii < messagesToProcess.Count; ii++) + foreach (NotificationMessage message in messagesToProcess) { - NotificationMessage message = messagesToProcess[ii]; noNotificationsReceived = 0; try { - for (int jj = 0; jj < message.NotificationData.Count; jj++) + foreach (ExtensionObject notificationData in message.NotificationData) { - DataChangeNotification datachange = message.NotificationData[jj].Body as DataChangeNotification; + var datachange = notificationData.Body as DataChangeNotification; if (datachange != null) { + datachange.PublishTime = message.PublishTime; + datachange.SequenceNumber = message.SequenceNumber; + noNotificationsReceived += datachange.MonitoredItems.Count; if (!m_disableMonitoredItemCache) @@ -2003,10 +2046,13 @@ private async Task OnMessageReceived() } } - EventNotificationList events = message.NotificationData[jj].Body as EventNotificationList; + var events = notificationData.Body as EventNotificationList; if (events != null) { + events.PublishTime = message.PublishTime; + events.SequenceNumber = message.SequenceNumber; + noNotificationsReceived += events.Events.Count; if (!m_disableMonitoredItemCache) @@ -2020,7 +2066,7 @@ private async Task OnMessageReceived() } } - StatusChangeNotification statusChanged = message.NotificationData[jj].Body as StatusChangeNotification; + StatusChangeNotification statusChanged = notificationData.Body as StatusChangeNotification; if (statusChanged != null) { @@ -2476,6 +2522,7 @@ private IncomingMessage FindOrCreateEntry(DateTime utcNow, uint sequenceNumber) private bool m_disableMonitoredItemCache; private FastDataChangeNotificationEventHandler m_fastDataChangeCallback; private FastEventNotificationEventHandler m_fastEventCallback; + private FastKeepAliveNotificationEventHandler m_fastKeepAliveCallback; private int m_outstandingMessageWorkers; private SemaphoreSlim m_messageWorkersSemaphore; private bool m_sequentialPublishing; @@ -2569,6 +2616,11 @@ public enum SubscriptionChangeMask /// public delegate void FastEventNotificationEventHandler(Subscription subscription, EventNotificationList notification, IList stringTable); + /// + /// The delegate used to receive keep alive notifications via a direct function call instead of a .NET Event. + /// + public delegate void FastKeepAliveNotificationEventHandler(Subscription subscription, NotificationData notification); + #region SubscriptionStateChangedEventArgs Class /// /// The event arguments provided when the state of a subscription changes. diff --git a/Stack/Opc.Ua.Core/Types/BuiltIn/NotificationData.cs b/Stack/Opc.Ua.Core/Types/BuiltIn/NotificationData.cs new file mode 100644 index 000000000..89ae50a3d --- /dev/null +++ b/Stack/Opc.Ua.Core/Types/BuiltIn/NotificationData.cs @@ -0,0 +1,41 @@ +/* Copyright (c) 1996-2023 The OPC Foundation. All rights reserved. + The source code in this file is covered under a dual-license scenario: + - RCL: for OPC Foundation Corporate Members in good-standing + - GPL V2: everybody else + RCL license terms accompanied with this source code. See http://opcfoundation.org/License/RCL/1.00/ + GNU General Public License as published by the Free Software Foundation; + version 2 of the License are accompanied with this source code. See http://opcfoundation.org/License/GPLv2 + This source code is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +*/ + +using System; + +namespace Opc.Ua +{ + public partial class NotificationData + { + /// + /// Helper variable for a client to pass the sequence number + /// of the publish response for the data and the event change notification + /// to a client application which subscribes to subscription notifications. + /// + /// + /// A value of 0 indicates that the sequence number is not known. + /// A KeepAlive notification contains the sequence number of the next + /// notification. + /// + public uint SequenceNumber { get; set; } + + /// + /// Helper variable for a client to pass the publish time + /// of the publish response for the data and the event change notification + /// to a client application which subscribes to subscription notifications. + /// + /// + /// A value of MinTime indicates that the time is not known. + /// + public DateTime PublishTime { get; set; } + } +}