diff --git a/sttp/transport/CompactMeasurement.go b/sttp/transport/CompactMeasurement.go index ab48e2a..c8fc592 100644 --- a/sttp/transport/CompactMeasurement.go +++ b/sttp/transport/CompactMeasurement.go @@ -62,8 +62,6 @@ const ( systemIssueMask StateFlagsEnum = 0xE0000000 calculatedValueMask StateFlagsEnum = 0x00001000 discardedValueMask StateFlagsEnum = 0x00400000 - - fixedLength = 9 ) func (compactFlags compactStateFlagsEnum) mapToFullFlags() StateFlagsEnum { @@ -128,126 +126,18 @@ func (fullFlags StateFlagsEnum) mapToCompactFlags() compactStateFlagsEnum { // CompactMeasurement defines a measured value, in simple compact format, for transmission or reception in STTP. type CompactMeasurement struct { - Measurement - signalIndexCache *SignalIndexCache - baseTimeOffsets *[2]int64 - includeTime bool - useMillisecondResolution bool - timeIndex int32 - usingBaseTimeOffset bool -} - -// NewCompactMeasurement creates a new CompactMeasurement -func NewCompactMeasurement(signalIndexCache *SignalIndexCache, includeTime, useMillisecondResolution bool, baseTimeOffsets *[2]int64) CompactMeasurement { - return CompactMeasurement{ - Measurement: Measurement{}, - signalIndexCache: signalIndexCache, - baseTimeOffsets: baseTimeOffsets, - includeTime: includeTime, - useMillisecondResolution: useMillisecondResolution, - timeIndex: 0, - usingBaseTimeOffset: false, - } -} - -// GetBinaryLength gets the binary byte length of a CompactMeasurement -func (cm *CompactMeasurement) GetBinaryLength() uint32 { - var length uint32 = fixedLength - - if !cm.includeTime { - return length - } - - baseTimeOffset := cm.baseTimeOffsets[cm.timeIndex] - - if baseTimeOffset > 0 { - // See if timestamp will fit within space allowed for active base offset. We cache result so that post call - // to GetBinaryLength, result will speed other subsequent parsing operations by not having to reevaluate. - difference := cm.TimestampValue() - baseTimeOffset - - if difference > 0 { - if cm.useMillisecondResolution { - cm.usingBaseTimeOffset = difference/int64(ticks.PerMillisecond) < math.MaxUint16 - } else { - cm.usingBaseTimeOffset = difference < math.MaxUint32 - } - } else { - cm.usingBaseTimeOffset = false - } - - if cm.usingBaseTimeOffset { - if cm.useMillisecondResolution { - length += 2 // Use two bytes for millisecond resolution timestamp with valid offset - } else { - length += 4 // Use four bytes for tick resolution timestamp with valid offset - } - } else { - length += 8 // Use eight bytes for full fidelity time - } - } else { - // Use eight bytes for full fidelity time - length += 8 - } - - return length -} - -// GetTimestampC2 gets offset compressed millisecond-resolution 2-byte timestamp. -func (cm *CompactMeasurement) GetTimestampC2() uint16 { - return uint16((cm.TimestampValue() - cm.baseTimeOffsets[cm.timeIndex]) / int64(ticks.PerMillisecond)) -} - -// GetTimestampC4 gets offset compressed tick-resolution 4-byte timestamp. -func (cm *CompactMeasurement) GetTimestampC4() uint32 { - return uint32(cm.TimestampValue() - cm.baseTimeOffsets[cm.timeIndex]) -} - -// GetCompactStateFlags gets byte level compact state flags with encoded time index and base time offset bits. -func (cm *CompactMeasurement) GetCompactStateFlags() byte { - // Encode compact state flags - flags := cm.Flags.mapToCompactFlags() - - if cm.timeIndex != 0 { - flags |= compactStateFlags.TimeIndex - } - - if cm.usingBaseTimeOffset { - flags |= compactStateFlags.BaseTimeOffset - } - - return byte(flags) -} - -// SetCompactStateFlags sets byte level compact state flags with encoded time index and base time offset bits. -func (cm *CompactMeasurement) SetCompactStateFlags(value byte) { - // Decode compact state flags - flags := compactStateFlagsEnum(value) - - cm.Flags = flags.mapToFullFlags() - - if (flags & compactStateFlags.TimeIndex) > 0 { - cm.timeIndex = 1 - } else { - cm.timeIndex = 0 - } - - cm.usingBaseTimeOffset = (flags & compactStateFlags.BaseTimeOffset) > 0 + Value float32 + Timestamp ticks.Ticks + SignalIndex uint32 + Flags compactStateFlagsEnum } -// GetRuntimeID gets the 4-byte run-time signal index for this measurement. -func (cm *CompactMeasurement) GetRuntimeID() int32 { - return cm.signalIndexCache.SignalIndex(cm.SignalID) -} - -// SetRuntimeID assigns CompactMeasurement SignalID (UUID) from the specified signalIndex. -func (cm *CompactMeasurement) SetRuntimeID(signalIndex int32) { - cm.SignalID = cm.signalIndexCache.SignalID(signalIndex) -} +// Constructs a CompactMeasurement from the specified byte buffer; returns the measurement and the number of bytes occupied by this measurement. +func NewCompactMeasurement(includeTime, useMillisecondResolution bool, baseTimeOffsets *[2]int64, buffer []byte) (CompactMeasurement, int, error) { + var cm CompactMeasurement -// Decode parses a CompactMeasurement from the specified byte buffer. -func (cm *CompactMeasurement) Decode(buffer []byte) (int, error) { - if len(buffer) < fixedLength { - return 0, errors.New("not enough buffer available to deserialize compact measurement") + if len(buffer) < 9 { + return cm, 0, errors.New("not enough buffer available to deserialize compact measurement") } // Basic Compact Measurement Format: @@ -257,51 +147,53 @@ func (cm *CompactMeasurement) Decode(buffer []byte) (int, error) { // ID 4 // Value 4 // [Time] 0/2/4/8 - var index int - // Decode state flags - cm.SetCompactStateFlags(buffer[0]) - index++ + cm.Flags = compactStateFlagsEnum(buffer[0]) + cm.SignalIndex = binary.BigEndian.Uint32(buffer[1:5]) + cm.Value = math.Float32frombits(binary.BigEndian.Uint32(buffer[5:9])) - // Decode runtime ID - cm.SetRuntimeID(int32(binary.BigEndian.Uint32(buffer[index:]))) - index += 4 - - // Decode value - cm.Value = float64(math.Float32frombits(binary.BigEndian.Uint32(buffer[index:]))) - index += 4 - - if !cm.includeTime { - return index, nil + if !includeTime { + return cm, 9, nil } - if cm.usingBaseTimeOffset { - baseTimeOffset := cm.baseTimeOffsets[cm.timeIndex] - - if cm.useMillisecondResolution { + if (cm.Flags & compactStateFlags.BaseTimeOffset) != 0 { + timeIndex := (cm.Flags & compactStateFlags.TimeIndex) >> 7 + baseTimeOffset := baseTimeOffsets[timeIndex] + if useMillisecondResolution { // Decode 2-byte millisecond offset timestamp if baseTimeOffset > 0 { - cm.Timestamp = ticks.Ticks(baseTimeOffset + int64(binary.BigEndian.Uint16(buffer[index:]))*int64(ticks.PerMillisecond)) + cm.Timestamp = ticks.Ticks(baseTimeOffset + int64(binary.BigEndian.Uint16(buffer[9:11]))*int64(ticks.PerMillisecond)) } - index += 2 + return cm, 11, nil } else { // Decode 4-byte tick offset timestamp if baseTimeOffset > 0 { - cm.Timestamp = ticks.Ticks(baseTimeOffset + int64(binary.BigEndian.Uint32(buffer[index:]))) + cm.Timestamp = ticks.Ticks(baseTimeOffset + int64(binary.BigEndian.Uint32(buffer[9:13]))) } - index += 4 + return cm, 13, nil } } else { // Decode 8-byte full fidelity timestamp // Note that only a full fidelity timestamp can carry leap second flags - cm.Timestamp = ticks.Ticks(binary.BigEndian.Uint64(buffer[index:])) - index += 8 + cm.Timestamp = ticks.Ticks(binary.BigEndian.Uint64(buffer[9:17])) + return cm, 17, nil } +} - return index, nil +// Compute the full measurement from the compact representation +func (cm *CompactMeasurement) Expand(signalIndexCache *SignalIndexCache) Measurement { + return Measurement{ + SignalID: signalIndexCache.SignalID(int32(cm.SignalIndex)), + Timestamp: cm.Timestamp, + Value: float64(cm.Value), + Flags: cm.Flags.mapToFullFlags(), + } } -//// Encode serializes a CompactMeasurement to a byte buffer for publication to a DataSubscriber. -//func (cm *CompactMeasurement) Encode(buffer []byte) { -// // TODO: This will be needed by DataPublisher implementation -//} +// // Serializes a CompactMeasurement to a byte buffer for publication to a DataSubscriber. +func (cm *CompactMeasurement) Marshal(b []byte) { + b[0] = byte(cm.Flags) + binary.BigEndian.PutUint32(b[1:], cm.SignalIndex) + binary.BigEndian.PutUint32(b[5:], math.Float32bits(float32(cm.Value))) + binary.BigEndian.PutUint64(b[9:], uint64(cm.Timestamp)) +} diff --git a/sttp/transport/DataSubscriber.go b/sttp/transport/DataSubscriber.go index 89e94f5..f0691b7 100644 --- a/sttp/transport/DataSubscriber.go +++ b/sttp/transport/DataSubscriber.go @@ -1353,12 +1353,10 @@ func (ds *DataSubscriber) parseCompactMeasurements(signalIndexCache *SignalIndex useMillisecondResolution := ds.subscription.UseMillisecondResolution includeTime := ds.subscription.IncludeTime - index := 0 for i := 0; i < len(measurements); i++ { // Deserialize compact measurement format - compactMeasurement := NewCompactMeasurement(signalIndexCache, includeTime, useMillisecondResolution, &ds.baseTimeOffsets) - bytesDecoded, err := compactMeasurement.Decode(data[index:]) + cm, n, err := NewCompactMeasurement(includeTime, useMillisecondResolution, &ds.baseTimeOffsets, data) if err != nil { ds.dispatchErrorMessage("Failed to parse compact measurements - disconnecting: " + err.Error()) @@ -1366,8 +1364,8 @@ func (ds *DataSubscriber) parseCompactMeasurements(signalIndexCache *SignalIndex return } - index += bytesDecoded - measurements[i] = compactMeasurement.Measurement + data = data[n:] + measurements[i] = cm.Expand(signalIndexCache) } } diff --git a/sttp/transport/SignalIndexCache.go b/sttp/transport/SignalIndexCache.go index 305d584..6d5d824 100644 --- a/sttp/transport/SignalIndexCache.go +++ b/sttp/transport/SignalIndexCache.go @@ -36,13 +36,13 @@ import ( // SignalIndexCache maps 32-bit runtime IDs to 128-bit globally unique Measurement IDs. The structure // additionally provides reverse lookup and an extra mapping to human-readable measurement keys. type SignalIndexCache struct { - reference map[int32]uint32 - signalIDList []guid.Guid - sourceList []string - idList []uint64 - signalIDCache map[guid.Guid]int32 - binaryLength uint32 - tsscDecoder *tssc.Decoder + reference map[int32]uint32 + signalIDList []guid.Guid + sourceList []string + idList []uint64 + signalIDCache map[guid.Guid]int32 + binaryLength uint32 + tsscDecoder *tssc.Decoder } // NewSignalIndexCache makes a new SignalIndexCache