Skip to content

Commit

Permalink
Add metadata to fast data and event callbacks notification. (#2176)
Browse files Browse the repository at this point in the history
- if `DisableMonitoredItemCache` is set, the sequencenumber and publish time are unavailable. Add helpers to NotificationData to get the information in the fast data and event callbacks.
- new fast callback for keep alive messages
  • Loading branch information
mregen authored Jun 14, 2023
1 parent a81a696 commit e89b06e
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 9 deletions.
22 changes: 21 additions & 1 deletion Applications/ConsoleReferenceClient/ClientSamples.cs
Original file line number Diff line number Diff line change
Expand Up @@ -878,9 +878,11 @@ public async Task SubscribeAllValuesAsync(
KeepAliveCount = keepAliveCount,
SequentialPublishing = true,
RepublishAfterTransfer = true,
DisableMonitoredItemCache = true,
MaxNotificationsPerPublish = 1000,
MinLifetimeInterval = (uint)session.SessionTimeout,
FastDataChangeCallback = FastDataChangeNotification,
FastKeepAliveCallback = FastKeepAliveNotification,
};
session.AddSubscription(subscription);

Expand Down Expand Up @@ -959,14 +961,32 @@ public static string FormatValueAsJson(
#endregion

#region Private Methods
/// <summary>
/// The fast keep alive notification callback.
/// </summary>
private void FastKeepAliveNotification(Subscription subscription, NotificationData notification)
{
try
{
m_output.WriteLine("Keep Alive : Id={0} PublishTime={1} SequenceNumber={2}.",
subscription.Id, notification.PublishTime, notification.SequenceNumber);
}
catch (Exception ex)
{
m_output.WriteLine("FastKeepAliveNotification error: {0}", ex.Message);
}
}

/// <summary>
/// The fast data change notification callback.
/// </summary>
private void FastDataChangeNotification(Subscription subscription, DataChangeNotification notification, IList<string> stringTable)
{
try
{
m_output.WriteLine("Notification: Id={0} Items={1}.", subscription.Id, notification.MonitoredItems.Count);
m_output.WriteLine("Notification: Id={0} PublishTime={1} SequenceNumber={2} Items={3}.",
subscription.Id, notification.PublishTime,
notification.SequenceNumber, notification.MonitoredItems.Count);
}
catch (Exception ex)
{
Expand Down
68 changes: 60 additions & 8 deletions Libraries/Opc.Ua.Client/Subscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public Subscription(Subscription template, bool copyEventHandlers)
m_publishStatusChanged = template.m_publishStatusChanged;
m_fastDataChangeCallback = template.m_fastDataChangeCallback;
m_fastEventCallback = template.m_fastEventCallback;
m_fastKeepAliveCallback = template.m_fastKeepAliveCallback;
}

// copy the list of monitored items.
Expand Down Expand Up @@ -454,6 +455,19 @@ public FastEventNotificationEventHandler FastEventCallback
set => m_fastEventCallback = value;
}

/// <summary>
/// Gets or sets the fast keep alive callback.
/// </summary>
/// <value>The keep alive change callback.</value>
/// <remarks>
/// Only one callback is allowed at a time but it is more efficient to call than an event.
/// </remarks>
public FastKeepAliveNotificationEventHandler FastKeepAliveCallback
{
get => m_fastKeepAliveCallback;
set => m_fastKeepAliveCallback = value;
}

/// <summary>
/// The items to monitor.
/// </summary>
Expand Down Expand Up @@ -1903,15 +1917,28 @@ private async Task OnMessageReceived()
// get list of new messages to process.
List<NotificationMessage> messagesToProcess = null;

// get list of new messages to process.
List<IncomingMessage> keepAliveToProcess = null;

// get list of new messages to republish.
List<IncomingMessage> messagesToRepublish = null;

lock (m_cache)
{
for (LinkedListNode<IncomingMessage> ii = m_incomingMessages.First; ii != null; ii = ii.Next)
{
// process keep alive messages
if (ii.Value.Message == null && !ii.Value.Processed)
{
if (keepAliveToProcess == null)
{
keepAliveToProcess = new List<IncomingMessage>();
}
keepAliveToProcess.Add(ii.Value);
}

// update monitored items with unprocessed messages.
if (ii.Value.Message != null && !ii.Value.Processed &&
else if (ii.Value.Message != null && !ii.Value.Processed &&
// If sequential publishing is enabled, only release messages in perfect sequence.
(!m_sequentialPublishing || ii.Value.SequenceNumber <= m_lastSequenceNumberProcessed + 1))
{
Expand Down Expand Up @@ -1971,25 +1998,41 @@ private async Task OnMessageReceived()
}
}

// process new keep alive messages.
FastKeepAliveNotificationEventHandler keepAliveCallback = m_fastKeepAliveCallback;
if (keepAliveToProcess != null && keepAliveCallback != null)
{
foreach (IncomingMessage message in keepAliveToProcess)
{
var keepAlive = new NotificationData {
PublishTime = message.Timestamp,
SequenceNumber = message.SequenceNumber
};
keepAliveCallback(this, keepAlive);
}
}

// process new messages.
if (messagesToProcess != null)
{
int noNotificationsReceived;
FastDataChangeNotificationEventHandler datachangeCallback = m_fastDataChangeCallback;
FastEventNotificationEventHandler eventCallback = m_fastEventCallback;
int noNotificationsReceived = 0;

for (int ii = 0; ii < messagesToProcess.Count; ii++)
foreach (NotificationMessage message in messagesToProcess)
{
NotificationMessage message = messagesToProcess[ii];
noNotificationsReceived = 0;
try
{
for (int jj = 0; jj < message.NotificationData.Count; jj++)
foreach (ExtensionObject notificationData in message.NotificationData)
{
DataChangeNotification datachange = message.NotificationData[jj].Body as DataChangeNotification;
var datachange = notificationData.Body as DataChangeNotification;

if (datachange != null)
{
datachange.PublishTime = message.PublishTime;
datachange.SequenceNumber = message.SequenceNumber;

noNotificationsReceived += datachange.MonitoredItems.Count;

if (!m_disableMonitoredItemCache)
Expand All @@ -2003,10 +2046,13 @@ private async Task OnMessageReceived()
}
}

EventNotificationList events = message.NotificationData[jj].Body as EventNotificationList;
var events = notificationData.Body as EventNotificationList;

if (events != null)
{
events.PublishTime = message.PublishTime;
events.SequenceNumber = message.SequenceNumber;

noNotificationsReceived += events.Events.Count;

if (!m_disableMonitoredItemCache)
Expand All @@ -2020,7 +2066,7 @@ private async Task OnMessageReceived()
}
}

StatusChangeNotification statusChanged = message.NotificationData[jj].Body as StatusChangeNotification;
StatusChangeNotification statusChanged = notificationData.Body as StatusChangeNotification;

if (statusChanged != null)
{
Expand Down Expand Up @@ -2476,6 +2522,7 @@ private IncomingMessage FindOrCreateEntry(DateTime utcNow, uint sequenceNumber)
private bool m_disableMonitoredItemCache;
private FastDataChangeNotificationEventHandler m_fastDataChangeCallback;
private FastEventNotificationEventHandler m_fastEventCallback;
private FastKeepAliveNotificationEventHandler m_fastKeepAliveCallback;
private int m_outstandingMessageWorkers;
private SemaphoreSlim m_messageWorkersSemaphore;
private bool m_sequentialPublishing;
Expand Down Expand Up @@ -2569,6 +2616,11 @@ public enum SubscriptionChangeMask
/// </summary>
public delegate void FastEventNotificationEventHandler(Subscription subscription, EventNotificationList notification, IList<string> stringTable);

/// <summary>
/// The delegate used to receive keep alive notifications via a direct function call instead of a .NET Event.
/// </summary>
public delegate void FastKeepAliveNotificationEventHandler(Subscription subscription, NotificationData notification);

#region SubscriptionStateChangedEventArgs Class
/// <summary>
/// The event arguments provided when the state of a subscription changes.
Expand Down
41 changes: 41 additions & 0 deletions Stack/Opc.Ua.Core/Types/BuiltIn/NotificationData.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/* Copyright (c) 1996-2023 The OPC Foundation. All rights reserved.
The source code in this file is covered under a dual-license scenario:
- RCL: for OPC Foundation Corporate Members in good-standing
- GPL V2: everybody else
RCL license terms accompanied with this source code. See http://opcfoundation.org/License/RCL/1.00/
GNU General Public License as published by the Free Software Foundation;
version 2 of the License are accompanied with this source code. See http://opcfoundation.org/License/GPLv2
This source code is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
*/

using System;

namespace Opc.Ua
{
public partial class NotificationData
{
/// <summary>
/// Helper variable for a client to pass the sequence number
/// of the publish response for the data and the event change notification
/// to a client application which subscribes to subscription notifications.
/// </summary>
/// <remarks>
/// A value of 0 indicates that the sequence number is not known.
/// A KeepAlive notification contains the sequence number of the next
/// notification.
/// </remarks>
public uint SequenceNumber { get; set; }

/// <summary>
/// Helper variable for a client to pass the publish time
/// of the publish response for the data and the event change notification
/// to a client application which subscribes to subscription notifications.
/// </summary>
/// <remarks>
/// A value of MinTime indicates that the time is not known.
/// </remarks>
public DateTime PublishTime { get; set; }
}
}

0 comments on commit e89b06e

Please sign in to comment.