diff --git a/src/SparkplugNet/Core/Application/SparkplugApplicationBase.cs b/src/SparkplugNet/Core/Application/SparkplugApplicationBase.cs
index c140274..2ad7f4a 100644
--- a/src/SparkplugNet/Core/Application/SparkplugApplicationBase.cs
+++ b/src/SparkplugNet/Core/Application/SparkplugApplicationBase.cs
@@ -47,14 +47,9 @@ public SparkplugApplicationBase(
}
///
- /// Gets the node states.
+ /// Get the group states.
///
- public ConcurrentDictionary> NodeStates { get; } = new();
-
- ///
- /// Gets the device states.
- ///
- public ConcurrentDictionary> DeviceStates { get; } = new();
+ public ConcurrentDictionary> GroupStates { get; } = new();
///
/// Gets the options.
@@ -85,8 +80,7 @@ public async Task Start(SparkplugApplicationOptions applicationOptions)
}
// Clear states.
- this.NodeStates.Clear();
- this.DeviceStates.Clear();
+ this.GroupStates.Clear();
// Add handlers.
this.AddEventHandlers();
@@ -364,7 +358,7 @@ private async Task ConnectInternal()
}
else
{
- builder.WithWebSocketServer(options =>
+ builder.WithWebSocketServer(options =>
options.WithCookieContainer(this.Options.MqttWebSocketOptions.CookieContainer)
.WithCookieContainer(this.Options.MqttWebSocketOptions.Credentials)
.WithProxyOptions(this.Options.MqttWebSocketOptions.ProxyOptions)
@@ -450,11 +444,12 @@ private async Task SubscribeInternal()
/// The metric state.
private void UpdateMetricState(SparkplugMetricStatus metricState)
{
- var keys = new List(this.NodeStates.Keys.ToList());
-
- foreach (string key in keys)
+ foreach (var group in this.GroupStates)
{
- this.NodeStates[key].MetricStatus = metricState;
+ foreach (var node in group.Value.NodeStates)
+ {
+ node.Value.MetricStatus = metricState;
+ }
}
}
diff --git a/src/SparkplugNet/Core/GroupState .cs b/src/SparkplugNet/Core/GroupState .cs
new file mode 100644
index 0000000..5088007
--- /dev/null
+++ b/src/SparkplugNet/Core/GroupState .cs
@@ -0,0 +1,22 @@
+// --------------------------------------------------------------------------------------------------------------------
+//
+// The project is licensed under the MIT license.
+//
+//
+// A state class for the metrics.
+//
+// --------------------------------------------------------------------------------------------------------------------
+
+namespace SparkplugNet.Core;
+
+///
+/// The group state class.
+///
+/// The type parameter.
+public sealed class GroupState where T : IMetric, new()
+{
+ ///
+ /// Get the device states.
+ ///
+ public ConcurrentDictionary> NodeStates { get; } = new();
+}
diff --git a/src/SparkplugNet/Core/Messages/SparkplugMessageGenerator.cs b/src/SparkplugNet/Core/Messages/SparkplugMessageGenerator.cs
index d96c248..18ca17f 100644
--- a/src/SparkplugNet/Core/Messages/SparkplugMessageGenerator.cs
+++ b/src/SparkplugNet/Core/Messages/SparkplugMessageGenerator.cs
@@ -672,7 +672,7 @@ private MqttApplicationMessage GetSparkplugNodeBirthB(
Timestamp = (ulong)dateTime.ToUnixTimeMilliseconds()
};
- var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload);
+ var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload, SparkplugMessageType.NodeBirth);
var serialized = PayloadHelper.Serialize(convertedPayload);
return new MqttApplicationMessageBuilder()
@@ -756,7 +756,7 @@ private MqttApplicationMessage GetSparkplugDeviceBirthB(
Timestamp = (ulong)dateTime.ToUnixTimeMilliseconds()
};
- var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload);
+ var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload, SparkplugMessageType.DeviceBirth);
var serialized = PayloadHelper.Serialize(convertedPayload);
return new MqttApplicationMessageBuilder()
@@ -827,7 +827,7 @@ private MqttApplicationMessage GetSparkplugNodeDeathB(
Metrics = metrics.ToList()
};
- var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload);
+ var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload, SparkplugMessageType.NodeDeath);
var serialized = PayloadHelper.Serialize(convertedPayload);
return new MqttApplicationMessageBuilder()
@@ -911,7 +911,7 @@ private MqttApplicationMessage GetSparkplugDeviceDeathB(
Timestamp = (ulong)dateTime.ToUnixTimeMilliseconds()
};
- var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload);
+ var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload, SparkplugMessageType.DeviceDeath);
var serialized = PayloadHelper.Serialize(convertedPayload);
return new MqttApplicationMessageBuilder()
@@ -991,7 +991,7 @@ private MqttApplicationMessage GetSparkplugNodeDataB(
Timestamp = (ulong)dateTime.ToUnixTimeMilliseconds()
};
- var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload);
+ var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload, SparkplugMessageType.NodeData);
var serialized = PayloadHelper.Serialize(convertedPayload);
return new MqttApplicationMessageBuilder()
@@ -1075,7 +1075,7 @@ private MqttApplicationMessage GetSparkplugDeviceDataB(
Timestamp = (ulong)dateTime.ToUnixTimeMilliseconds()
};
- var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload);
+ var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload, SparkplugMessageType.DeviceData);
var serialized = PayloadHelper.Serialize(convertedPayload);
return new MqttApplicationMessageBuilder()
@@ -1154,7 +1154,7 @@ private static MqttApplicationMessage GetSparkplugNodeCommandB(
Timestamp = (ulong)dateTime.ToUnixTimeMilliseconds()
};
- var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload);
+ var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload, SparkplugMessageType.NodeCommand);
var serialized = PayloadHelper.Serialize(convertedPayload);
return new MqttApplicationMessageBuilder()
@@ -1236,7 +1236,7 @@ private static MqttApplicationMessage GetSparkplugDeviceCommandB(
Timestamp = (ulong)dateTime.ToUnixTimeMilliseconds()
};
- var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload);
+ var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload, SparkplugMessageType.DeviceCommand);
var serialized = PayloadHelper.Serialize(convertedPayload);
return new MqttApplicationMessageBuilder()
diff --git a/src/SparkplugNet/Core/Messages/SparkplugTopicGenerator.cs b/src/SparkplugNet/Core/Messages/SparkplugTopicGenerator.cs
index 0621bd2..b4a7485 100644
--- a/src/SparkplugNet/Core/Messages/SparkplugTopicGenerator.cs
+++ b/src/SparkplugNet/Core/Messages/SparkplugTopicGenerator.cs
@@ -61,16 +61,6 @@ internal static string GetDeviceCommandSubscribeTopic(SparkplugNamespace nameSpa
return $"{nameSpace.GetDescription()}/{groupIdentifier}/{SparkplugMessageType.DeviceCommand.GetDescription()}/{edgeNodeIdentifier}/{deviceIdentifier}";
}
- ///
- /// Gets state subscription topic.
- ///
- /// The SCADA host identifier.
- /// The state subscription topic .
- internal static string GetStateSubscribeTopic(string scadaHostIdentifier)
- {
- return $"{SparkplugMessageType.StateMessage.GetDescription()}/{scadaHostIdentifier}";
- }
-
///
/// Gets the topic (Except STATE messages).
///
diff --git a/src/SparkplugNet/Core/MetricState.cs b/src/SparkplugNet/Core/MetricState.cs
index 104919d..b2722d5 100644
--- a/src/SparkplugNet/Core/MetricState.cs
+++ b/src/SparkplugNet/Core/MetricState.cs
@@ -13,7 +13,7 @@ namespace SparkplugNet.Core;
/// The metric state class.
///
/// The type parameter.
-public sealed class MetricState where T : IMetric, new()
+public class MetricState where T : IMetric, new()
{
///
/// Gets or sets the metric status.
diff --git a/src/SparkplugNet/Core/Node/SparkplugNodeBase.cs b/src/SparkplugNet/Core/Node/SparkplugNodeBase.cs
index 27df22d..76b345e 100644
--- a/src/SparkplugNet/Core/Node/SparkplugNodeBase.cs
+++ b/src/SparkplugNet/Core/Node/SparkplugNodeBase.cs
@@ -86,7 +86,11 @@ public async Task Start(SparkplugNodeOptions nodeOptions, KnownMetricStorage? kn
// Connect, subscribe to incoming messages and send a state message.
await this.ConnectInternal();
await this.SubscribeInternal();
- await this.PublishNodeAndDeviceBirthsInternal();
+
+ if (string.IsNullOrEmpty(this.Options.ScadaHostIdentifier))
+ {
+ await this.PublishNodeAndDeviceBirthsInternal();
+ }
}
///
@@ -124,14 +128,11 @@ public async Task PublishMetrics(IEnumerable metrics
}
///
- /// Does a node rebirth.
+ /// Does a node birth.
///
/// The new metrics.
- public async Task Rebirth(IEnumerable metrics)
+ public async Task Birth(IEnumerable metrics)
{
- // Send node death first.
- await this.SendNodeDeathMessage();
-
// Reset the known metrics.
this.knownMetrics = new KnownMetricStorage(metrics);
@@ -139,6 +140,18 @@ public async Task Rebirth(IEnumerable metrics)
await this.PublishNodeAndDeviceBirthsInternal();
}
+ ///
+ /// Does a node rebirth.
+ ///
+ /// The new metrics.
+ public async Task Rebirth(IEnumerable metrics)
+ {
+ // Send node death first.
+ await this.SendNodeDeathMessage();
+
+ await this.Birth(metrics);
+ }
+
///
/// Publishes metrics for a node.
///
@@ -403,7 +416,8 @@ private async Task SubscribeInternal()
await this.client.SubscribeAsync(deviceCommandSubscribeTopic, (MqttQualityOfServiceLevel)SparkplugQualityOfServiceLevel.AtLeastOnce);
// Subscribe to the state topic.
- var stateSubscribeTopic = SparkplugTopicGenerator.GetStateSubscribeTopic(this.Options.ScadaHostIdentifier);
+ //var stateSubscribeTopic = SparkplugTopicGenerator.GetStateSubscribeTopic(this.Options.ScadaHostIdentifier);
+ var stateSubscribeTopic = SparkplugTopicGenerator.GetSparkplugStateMessageTopic(this.Options.ScadaHostIdentifier, this.specificationVersion);
await this.client.SubscribeAsync(stateSubscribeTopic, (MqttQualityOfServiceLevel)SparkplugQualityOfServiceLevel.AtLeastOnce);
}
diff --git a/src/SparkplugNet/Core/NodeState .cs b/src/SparkplugNet/Core/NodeState .cs
new file mode 100644
index 0000000..e0816b9
--- /dev/null
+++ b/src/SparkplugNet/Core/NodeState .cs
@@ -0,0 +1,22 @@
+// --------------------------------------------------------------------------------------------------------------------
+//
+// The project is licensed under the MIT license.
+//
+//
+// A state class for the metrics.
+//
+// --------------------------------------------------------------------------------------------------------------------
+
+namespace SparkplugNet.Core;
+
+///
+/// The node state class.
+///
+/// The type parameter.
+public sealed class NodeState : MetricState where T : IMetric, new()
+{
+ ///
+ /// Get the device states.
+ ///
+ public ConcurrentDictionary> DeviceStates { get; set; } = new();
+}
diff --git a/src/SparkplugNet/Core/SparkplugBase.KnownMetricStorage.cs b/src/SparkplugNet/Core/SparkplugBase.KnownMetricStorage.cs
index 83590ed..9244f49 100644
--- a/src/SparkplugNet/Core/SparkplugBase.KnownMetricStorage.cs
+++ b/src/SparkplugNet/Core/SparkplugBase.KnownMetricStorage.cs
@@ -313,5 +313,11 @@ private void AddVersionBMetric(T metric, Metric versionBMetric)
this.knownMetricsByName[metric.Name] = metric;
}
}
+
+ ///
+ /// Return the known metrics by name.
+ ///
+ ///
+ public ConcurrentDictionary GetKnownMetricsByName() { return this.knownMetricsByName; }
}
}
diff --git a/src/SparkplugNet/Core/SparkplugBase.cs b/src/SparkplugNet/Core/SparkplugBase.cs
index 7e0b7da..a3aa994 100644
--- a/src/SparkplugNet/Core/SparkplugBase.cs
+++ b/src/SparkplugNet/Core/SparkplugBase.cs
@@ -17,6 +17,11 @@ namespace SparkplugNet.Core;
///
public partial class SparkplugBase : ISparkplugConnection where T : IMetric, new()
{
+ ///
+ /// The sparkplug specification version.
+ ///
+ internal readonly SparkplugSpecificationVersion specificationVersion;
+
///
/// The message generator.
///
@@ -54,6 +59,7 @@ public SparkplugBase(IEnumerable knownMetrics, SparkplugSpecificationVersion
///
public SparkplugBase(KnownMetricStorage knownMetricsStorage, SparkplugSpecificationVersion specificationVersion)
{
+ this.specificationVersion = specificationVersion;
this.knownMetrics = knownMetricsStorage;
if (typeof(T).IsAssignableFrom(typeof(VersionAData.KuraMetric)))
diff --git a/src/SparkplugNet/SparkplugNet.csproj b/src/SparkplugNet/SparkplugNet.csproj
index 4774d34..6220201 100644
--- a/src/SparkplugNet/SparkplugNet.csproj
+++ b/src/SparkplugNet/SparkplugNet.csproj
@@ -1,7 +1,7 @@
- net6.0;net8.0
+ net8.0
SparkplugNet
SparkplugNet
true
@@ -28,13 +28,10 @@
NU1803,CS0618,CS0809,NU1901,NU1902
true
all
+ Debug;Release;p1600sedac;p1000edge;p1600sedacedge;p1800epsi;p1000master
-
- all
- runtime; build; native; contentfiles; analyzers; buildtransitive
-
all
diff --git a/src/SparkplugNet/VersionA/SparkplugApplication.cs b/src/SparkplugNet/VersionA/SparkplugApplication.cs
index 1f76637..df6a564 100644
--- a/src/SparkplugNet/VersionA/SparkplugApplication.cs
+++ b/src/SparkplugNet/VersionA/SparkplugApplication.cs
@@ -222,30 +222,51 @@ await this.FireDeviceBirthReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifi
/// The topic.
/// The metrics.
/// The metric status.
- /// Thrown if the edge node identifier is invalid.
+ /// Thrown if any identifier is invalid.
/// Thrown if the metric cast is invalid.
private IEnumerable ProcessPayload(
SparkplugMessageTopic topic,
List metrics,
SparkplugMetricStatus metricStatus)
{
- var metricState = new MetricState
+ // Check group id.
+ if (string.IsNullOrWhiteSpace(topic.GroupIdentifier))
+ {
+ throw new InvalidOperationException($"The group identifier is invalid {topic.GroupIdentifier}.");
+ }
+
+ if (!this.GroupStates.ContainsKey(topic.GroupIdentifier))
+ {
+ this.GroupStates[topic.GroupIdentifier] = new GroupState();
+ }
+
+ NodeState metricState = new()
{
MetricStatus = metricStatus
};
+ // Check node id.
+ if (string.IsNullOrWhiteSpace(topic.EdgeNodeIdentifier))
+ {
+ throw new InvalidOperationException($"The edge node identifier is invalid {topic.EdgeNodeIdentifier}.");
+ }
+
if (!string.IsNullOrWhiteSpace(topic.DeviceIdentifier))
{
- if (string.IsNullOrWhiteSpace(topic.EdgeNodeIdentifier))
+ if (!this.GroupStates[topic.GroupIdentifier].NodeStates.ContainsKey(topic.EdgeNodeIdentifier))
{
- throw new InvalidOperationException($"The edge node identifier is invalid for device {topic.DeviceIdentifier}.");
+ this.GroupStates[topic.GroupIdentifier]
+ .NodeStates[topic.EdgeNodeIdentifier] = metricState;
}
- this.DeviceStates[$"{topic.EdgeNodeIdentifier}/{topic.DeviceIdentifier}"] = metricState;
+ this.GroupStates[topic.GroupIdentifier]
+ .NodeStates[topic.EdgeNodeIdentifier]
+ .DeviceStates[topic.DeviceIdentifier] = metricState;
}
else
{
- this.NodeStates[topic.EdgeNodeIdentifier] = metricState;
+ this.GroupStates[topic.GroupIdentifier]
+ .NodeStates[topic.EdgeNodeIdentifier] = metricState;
}
foreach (var payloadMetric in metrics)
diff --git a/src/SparkplugNet/VersionB/PayloadConverter.cs b/src/SparkplugNet/VersionB/PayloadConverter.cs
index fcc2242..0229d7b 100644
--- a/src/SparkplugNet/VersionB/PayloadConverter.cs
+++ b/src/SparkplugNet/VersionB/PayloadConverter.cs
@@ -18,39 +18,56 @@ internal static class PayloadConverter
/// Gets the version B payload converted from the ProtoBuf payload.
///
/// The .
+ ///
/// The .
- public static Payload ConvertVersionBPayload(VersionBProtoBuf.ProtoBufPayload protoPayload)
+ public static Payload ConvertVersionBPayload(VersionBProtoBuf.ProtoBufPayload protoPayload, ConcurrentDictionary? metrics)
=> new()
{
Body = protoPayload.Body,
- Metrics = protoPayload.Metrics.Select(ConvertVersionBMetric).ToList(),
+ Metrics = protoPayload.Metrics.Select(m => ConvertVersionBMetric(m, metrics)).ToList(),
Seq = protoPayload.Seq,
Timestamp = protoPayload.Timestamp,
Uuid = protoPayload.Uuid ?? string.Empty
};
+
+ public static VersionBProtoBuf.ProtoBufPayload ConvertVersionBPayload(Payload payload)
+ => ConvertVersionBPayload(payload, null);
+
///
/// Gets the ProtoBuf payload converted from the version B payload.
///
/// The .
+ /// The .
/// The .
- public static VersionBProtoBuf.ProtoBufPayload ConvertVersionBPayload(Payload payload)
+ public static VersionBProtoBuf.ProtoBufPayload ConvertVersionBPayload(Payload payload, SparkplugMessageType? sparkplugMessageType)
=> new()
{
Body = payload.Body,
- Metrics = payload.Metrics.Select(ConvertVersionBMetric).ToList(),
+ Metrics = payload.Metrics.Select(m => ConvertVersionBMetric(m, sparkplugMessageType)).ToList(),
Seq = payload.Seq,
Timestamp = payload.Timestamp,
Uuid = payload.Uuid
};
+ ///
+ ///
+ ///
+ ///
+ ///
+ public static Metric ConvertVersionBMetric(VersionBProtoBuf.ProtoBufPayload.Metric protoMetric)
+ {
+ return ConvertVersionBMetric(protoMetric, null);
+ }
+
///
/// Gets the version B metric from the version B ProtoBuf metric.
///
/// The .
+ /// The .
/// Thrown if the metric data type is unknown.
/// The .
- public static Metric ConvertVersionBMetric(VersionBProtoBuf.ProtoBufPayload.Metric protoMetric)
+ public static Metric ConvertVersionBMetric(VersionBProtoBuf.ProtoBufPayload.Metric protoMetric, ConcurrentDictionary? metrics)
{
var metric = new Metric()
{
@@ -62,7 +79,34 @@ public static Metric ConvertVersionBMetric(VersionBProtoBuf.ProtoBufPayload.Metr
Timestamp = protoMetric.Timestamp
};
- var dataType = ConvertVersionBDataType((VersionBProtoBuf.DataType?)protoMetric.DataType);
+ // Get properties
+ if (protoMetric.PropertySetValue is not null)
+ {
+ PropertySet propertySet = new();
+ propertySet.Keys = protoMetric.PropertySetValue.Keys;
+
+ propertySet.Values = [];
+
+ foreach (var item in protoMetric.PropertySetValue.Values)
+ {
+ propertySet.Values.Add(ConvertVersionBPropertyValue(item));
+ }
+
+ metric.Properties = propertySet;
+ }
+
+ // [tck-id-payloads-metric-datatype-not-req]
+ // The datatype SHOULD NOT be included with metric definitions in NDATA, NCMD, DDATA, and DCMD messages.
+ VersionBDataTypeEnum dataType;
+
+ if (metrics is null)
+ {
+ dataType = ConvertVersionBDataType((VersionBProtoBuf.DataType?)protoMetric.DataType);
+ }
+ else
+ {
+ dataType = metrics.Where(o => o.Key == metric.Name).Select(o => o.Value.DataType).FirstOrDefault();
+ }
switch (dataType)
{
@@ -139,7 +183,7 @@ public static Metric ConvertVersionBMetric(VersionBProtoBuf.ProtoBufPayload.Metr
metric.SetValue(VersionBDataTypeEnum.Int32Array, int32Array);
break;
case VersionBDataTypeEnum.Int64Array:
- var int64Array = PayloadHelper.GetArrayOfTFromBytes(protoMetric.BytesValue, BinaryPrimitives.ReadInt64LittleEndian);
+ var int64Array = PayloadHelper.GetArrayOfTFromBytes(protoMetric.BytesValue, BinaryPrimitives.ReadInt64LittleEndian);
metric.SetValue(VersionBDataTypeEnum.Int64Array, int64Array);
break;
case VersionBDataTypeEnum.UInt8Array:
@@ -177,7 +221,7 @@ public static Metric ConvertVersionBMetric(VersionBProtoBuf.ProtoBufPayload.Metr
var dateTimeArray = PayloadHelper.GetArrayOfTFromBytes(protoMetric.BytesValue, BinaryPrimitives.ReadUInt64LittleEndian);
metric.SetValue(VersionBDataTypeEnum.DateTimeArray, dateTimeArray.Select(x => DateTimeOffset.FromUnixTimeMilliseconds((long)x)).ToArray());
break;
- // Todo: What to do here?
+ // Todo: What to do here?
case VersionBDataTypeEnum.PropertySetList:
case VersionBDataTypeEnum.Unknown:
default:
@@ -191,15 +235,35 @@ public static Metric ConvertVersionBMetric(VersionBProtoBuf.ProtoBufPayload.Metr
/// Gets the version B ProtoBuf metric from the version B metric.
///
/// The .
+ ///
+ public static VersionBProtoBuf.ProtoBufPayload.Metric ConvertVersionBMetric(Metric metric)
+ {
+ return ConvertVersionBMetric(metric, null);
+ }
+
+ ///
+ /// Gets the version B ProtoBuf metric from the version B metric.
+ ///
+ /// The .
+ /// The .
/// Thrown if the property set data type is set for a metric.
/// Thrown if the metric data type is unknown.
/// The .
- public static VersionBProtoBuf.ProtoBufPayload.Metric ConvertVersionBMetric(Metric metric)
+ public static VersionBProtoBuf.ProtoBufPayload.Metric ConvertVersionBMetric(Metric metric, SparkplugMessageType? sparkplugMessageType)
{
+ // [tck-id-payloads-metric-datatype-not-req]
+ // The datatype SHOULD NOT be included with metric definitions in NDATA, NCMD, DDATA, and DCMD messages.
+ uint? dataType = null;
+
+ if (sparkplugMessageType == null || sparkplugMessageType == SparkplugMessageType.NodeBirth || sparkplugMessageType == SparkplugMessageType.DeviceBirth)
+ {
+ dataType = (uint?)ConvertVersionBDataType(metric.DataType);
+ }
+
var protoMetric = new VersionBProtoBuf.ProtoBufPayload.Metric()
{
Alias = metric.Alias,
- DataType = (uint?)ConvertVersionBDataType(metric.DataType),
+ DataType = dataType,
IsHistorical = metric.IsHistorical,
IsNull = metric.IsNull,
IsTransient = metric.IsTransient,
diff --git a/src/SparkplugNet/VersionB/SparkplugApplication.cs b/src/SparkplugNet/VersionB/SparkplugApplication.cs
index 76f896b..38a8c9e 100644
--- a/src/SparkplugNet/VersionB/SparkplugApplication.cs
+++ b/src/SparkplugNet/VersionB/SparkplugApplication.cs
@@ -9,6 +9,8 @@
namespace SparkplugNet.VersionB;
+using System.Diagnostics.Metrics;
+
///
///
/// A class that handles a Sparkplug application.
@@ -126,17 +128,39 @@ protected override async Task OnMessageReceived(SparkplugMessageTopic topic, byt
{
var payloadVersionB = PayloadHelper.Deserialize(payload);
- if (payloadVersionB is not null)
+ if (payloadVersionB == null) { return; }
+
+ ConcurrentDictionary? metrics = null;
+
+ if (!(topic.MessageType == SparkplugMessageType.NodeBirth || topic.MessageType == SparkplugMessageType.DeviceBirth))
{
- var convertedPayload = PayloadConverter.ConvertVersionBPayload(payloadVersionB);
+ // Get known metrics
+ if (!this.GroupStates.TryGetValue(topic.GroupIdentifier, out var groupState)) { return; }
+
+ if (!groupState.NodeStates.TryGetValue(topic.EdgeNodeIdentifier, out var nodeState)) { return; }
- if (convertedPayload is not Payload _)
+ if (topic.DeviceIdentifier is null)
{
- throw new InvalidCastException("The metric cast didn't work properly.");
+ metrics = nodeState.Metrics;
+ }
+ else if (nodeState.DeviceStates.TryGetValue(topic.DeviceIdentifier, out var metricState))
+ {
+ metrics = metricState.Metrics;
}
+ else
+ {
+ return;
+ }
+ }
+
+ var convertedPayload = PayloadConverter.ConvertVersionBPayload(payloadVersionB, metrics);
- await this.HandleMessagesForVersionB(topic, convertedPayload);
+ if (convertedPayload is not Payload _)
+ {
+ throw new InvalidCastException("The metric cast didn't work properly.");
}
+
+ await this.HandleMessagesForVersionB(topic, convertedPayload);
}
///
@@ -149,12 +173,12 @@ private async Task HandleMessagesForVersionB(SparkplugMessageTopic topic, Payloa
{
// Filter out session number metric.
var sessionNumberMetric = payload.Metrics.FirstOrDefault(m => m.Name == Constants.SessionNumberMetricName);
- var metricsWithoutSequenceMetric = payload.Metrics.Where(m => m.Name != Constants.SessionNumberMetricName);
- var filteredMetrics = this.KnownMetricsStorage.FilterMetrics(metricsWithoutSequenceMetric, topic.MessageType).ToList();
+ var metrics = payload.Metrics.Where(m => m.Name != Constants.SessionNumberMetricName).ToList();
+ // var filteredMetrics = this.KnownMetricsStorage.FilterMetrics(metricsWithoutSequenceMetric, topic.MessageType).ToList();
if (sessionNumberMetric is not null)
{
- filteredMetrics.Add(sessionNumberMetric);
+ metrics.Add(sessionNumberMetric);
}
// Handle messages.
@@ -162,7 +186,7 @@ private async Task HandleMessagesForVersionB(SparkplugMessageTopic topic, Payloa
{
case SparkplugMessageType.NodeBirth:
await this.FireNodeBirthReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier,
- this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Online));
+ this.ProcessPayload(topic, metrics, SparkplugMetricStatus.Online));
break;
case SparkplugMessageType.DeviceBirth:
if (string.IsNullOrWhiteSpace(topic.DeviceIdentifier))
@@ -171,10 +195,10 @@ await this.FireNodeBirthReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier
}
await this.FireDeviceBirthReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier, topic.DeviceIdentifier,
- this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Online));
+ this.ProcessPayload(topic, metrics, SparkplugMetricStatus.Online));
break;
case SparkplugMessageType.NodeData:
- var nodeDataMetrics = this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Online);
+ var nodeDataMetrics = this.ProcessPayload(topic, metrics, SparkplugMetricStatus.Online);
await this.FireNodeDataReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier, nodeDataMetrics);
break;
case SparkplugMessageType.DeviceData:
@@ -183,11 +207,11 @@ await this.FireDeviceBirthReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifi
throw new InvalidOperationException($"Topic {topic} is invalid!");
}
- var deviceDataMetrics = this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Online);
+ var deviceDataMetrics = this.ProcessPayload(topic, metrics, SparkplugMetricStatus.Online);
await this.FireDeviceDataReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier, topic.DeviceIdentifier, deviceDataMetrics);
break;
case SparkplugMessageType.NodeDeath:
- this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Offline);
+ this.ProcessPayload(topic, metrics, SparkplugMetricStatus.Offline);
await this.FireNodeDeathReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier, sessionNumberMetric);
break;
case SparkplugMessageType.DeviceDeath:
@@ -196,7 +220,7 @@ await this.FireDeviceBirthReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifi
throw new InvalidOperationException($"Topic {topic} is invalid!");
}
- this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Offline);
+ this.ProcessPayload(topic, metrics, SparkplugMetricStatus.Offline);
await this.FireDeviceDeathReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier, topic.DeviceIdentifier);
break;
}
@@ -208,27 +232,63 @@ await this.FireDeviceBirthReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifi
/// The topic.
/// The metrics.
/// The metric status.
- /// Thrown if the edge node identifier is invalid.
+ /// Thrown if any identifier is invalid.
/// Thrown if the metric cast is invalid.
private IEnumerable ProcessPayload(SparkplugMessageTopic topic, List metrics, SparkplugMetricStatus metricStatus)
{
- var metricState = new MetricState
+ // Check group id.
+ if (string.IsNullOrWhiteSpace(topic.GroupIdentifier))
+ {
+ throw new InvalidOperationException($"The group identifier is invalid {topic.GroupIdentifier}.");
+ }
+
+ if (!this.GroupStates.ContainsKey(topic.GroupIdentifier))
+ {
+ this.GroupStates[topic.GroupIdentifier] = new GroupState();
+ }
+
+ NodeState metricState = new()
{
MetricStatus = metricStatus
};
+ // Check node id.
+ if (string.IsNullOrWhiteSpace(topic.EdgeNodeIdentifier))
+ {
+ throw new InvalidOperationException($"The edge node identifier is invalid {topic.EdgeNodeIdentifier}.");
+ }
+
if (!string.IsNullOrWhiteSpace(topic.DeviceIdentifier))
{
- if (string.IsNullOrWhiteSpace(topic.EdgeNodeIdentifier))
+ // If the group doesn't contain the node, create a new node.
+ if (!this.GroupStates[topic.GroupIdentifier].NodeStates.ContainsKey(topic.EdgeNodeIdentifier))
+ {
+ this.GroupStates[topic.GroupIdentifier]
+ .NodeStates[topic.EdgeNodeIdentifier] = metricState;
+ }
+
+ if (this.GroupStates[topic.GroupIdentifier]
+ .NodeStates[topic.EdgeNodeIdentifier]
+ .DeviceStates.TryGetValue(topic.DeviceIdentifier, out var metric))
{
- throw new InvalidOperationException($"The edge node identifier is invalid for device {topic.DeviceIdentifier}.");
+ metricState.Metrics = metric.Metrics;
}
- this.DeviceStates[$"{topic.EdgeNodeIdentifier}/{topic.DeviceIdentifier}"] = metricState;
+ this.GroupStates[topic.GroupIdentifier]
+ .NodeStates[topic.EdgeNodeIdentifier]
+ .DeviceStates[topic.DeviceIdentifier] = metricState;
}
else
{
- this.NodeStates[topic.EdgeNodeIdentifier] = metricState;
+ if (this.GroupStates[topic.GroupIdentifier]
+ .NodeStates.TryGetValue(topic.EdgeNodeIdentifier, out var metric))
+ {
+ metricState.Metrics = metric.Metrics;
+ metricState.DeviceStates = metric.DeviceStates;
+ }
+
+ this.GroupStates[topic.GroupIdentifier]
+ .NodeStates[topic.EdgeNodeIdentifier] = metricState;
}
foreach (var payloadMetric in metrics)
diff --git a/src/SparkplugNet/VersionB/SparkplugNode.cs b/src/SparkplugNet/VersionB/SparkplugNode.cs
index 58ed39f..de76e06 100644
--- a/src/SparkplugNet/VersionB/SparkplugNode.cs
+++ b/src/SparkplugNet/VersionB/SparkplugNode.cs
@@ -91,17 +91,36 @@ protected override async Task OnMessageReceived(SparkplugMessageTopic topic, byt
{
var payloadVersionB = PayloadHelper.Deserialize(payload);
- if (payloadVersionB is not null)
- {
- var convertedPayload = PayloadConverter.ConvertVersionBPayload(payloadVersionB);
+ if (payloadVersionB == null) { return; }
+
+ ConcurrentDictionary? metrics = null;
- if (convertedPayload is not Payload _)
+ if (!(topic.MessageType == SparkplugMessageType.NodeBirth || topic.MessageType == SparkplugMessageType.DeviceBirth))
+ {
+ // Get known metrics
+ if (topic.DeviceIdentifier is null)
+ {
+ metrics = this.knownMetrics.GetKnownMetricsByName();
+ }
+ else if (this.KnownDevices.TryGetValue(topic.DeviceIdentifier, out var knownMetricStorage))
+ {
+ metrics = knownMetricStorage.GetKnownMetricsByName();
+ }
+ else
{
- throw new InvalidCastException("The metric cast didn't work properly.");
+ return;
}
+ }
+
+ var convertedPayload = PayloadConverter.ConvertVersionBPayload(payloadVersionB, metrics);
- await this.HandleMessagesForVersionB(topic, convertedPayload);
+ if (convertedPayload is not Payload _)
+ {
+ throw new InvalidCastException("The metric cast didn't work properly.");
}
+
+ await this.HandleMessagesForVersionB(topic, convertedPayload);
+
}
///
@@ -114,12 +133,12 @@ private async Task HandleMessagesForVersionB(SparkplugMessageTopic topic, Payloa
{
// Filter out session number metric.
var sessionNumberMetric = payload.Metrics.FirstOrDefault(m => m.Name == Constants.SessionNumberMetricName);
- var metricsWithoutSequenceMetric = payload.Metrics.Where(m => m.Name != Constants.SessionNumberMetricName);
- var filteredMetrics = this.KnownMetricsStorage.FilterMetrics(metricsWithoutSequenceMetric, topic.MessageType).ToList();
+ var metrics = payload.Metrics.Where(m => m.Name != Constants.SessionNumberMetricName).ToList();
+ // var filteredMetrics = this.KnownMetricsStorage.FilterMetrics(metricsWithoutSequenceMetric, topic.MessageType).ToList();
if (sessionNumberMetric is not null)
{
- filteredMetrics.Add(sessionNumberMetric);
+ metrics.Add(sessionNumberMetric);
}
// Handle messages.
@@ -131,11 +150,11 @@ private async Task HandleMessagesForVersionB(SparkplugMessageTopic topic, Payloa
throw new InvalidOperationException($"Topic {topic} is invalid!");
}
- await this.FireDeviceCommandReceived(topic.DeviceIdentifier, filteredMetrics);
+ await this.FireDeviceCommandReceived(topic.DeviceIdentifier, metrics);
break;
case SparkplugMessageType.NodeCommand:
- await this.FireNodeCommandReceived(filteredMetrics);
+ await this.FireNodeCommandReceived(metrics);
break;
}
}