Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 40 additions & 148 deletions sttp/transport/CompactMeasurement.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ const (
systemIssueMask StateFlagsEnum = 0xE0000000
calculatedValueMask StateFlagsEnum = 0x00001000
discardedValueMask StateFlagsEnum = 0x00400000

fixedLength = 9
)

func (compactFlags compactStateFlagsEnum) mapToFullFlags() StateFlagsEnum {
Expand Down Expand Up @@ -128,126 +126,18 @@ func (fullFlags StateFlagsEnum) mapToCompactFlags() compactStateFlagsEnum {

// CompactMeasurement defines a measured value, in simple compact format, for transmission or reception in STTP.
type CompactMeasurement struct {
Measurement
signalIndexCache *SignalIndexCache
baseTimeOffsets *[2]int64
includeTime bool
useMillisecondResolution bool
timeIndex int32
usingBaseTimeOffset bool
}

// NewCompactMeasurement creates a new CompactMeasurement
func NewCompactMeasurement(signalIndexCache *SignalIndexCache, includeTime, useMillisecondResolution bool, baseTimeOffsets *[2]int64) CompactMeasurement {
return CompactMeasurement{
Measurement: Measurement{},
signalIndexCache: signalIndexCache,
baseTimeOffsets: baseTimeOffsets,
includeTime: includeTime,
useMillisecondResolution: useMillisecondResolution,
timeIndex: 0,
usingBaseTimeOffset: false,
}
}

// GetBinaryLength gets the binary byte length of a CompactMeasurement
func (cm *CompactMeasurement) GetBinaryLength() uint32 {
var length uint32 = fixedLength

if !cm.includeTime {
return length
}

baseTimeOffset := cm.baseTimeOffsets[cm.timeIndex]

if baseTimeOffset > 0 {
// See if timestamp will fit within space allowed for active base offset. We cache result so that post call
// to GetBinaryLength, result will speed other subsequent parsing operations by not having to reevaluate.
difference := cm.TimestampValue() - baseTimeOffset

if difference > 0 {
if cm.useMillisecondResolution {
cm.usingBaseTimeOffset = difference/int64(ticks.PerMillisecond) < math.MaxUint16
} else {
cm.usingBaseTimeOffset = difference < math.MaxUint32
}
} else {
cm.usingBaseTimeOffset = false
}

if cm.usingBaseTimeOffset {
if cm.useMillisecondResolution {
length += 2 // Use two bytes for millisecond resolution timestamp with valid offset
} else {
length += 4 // Use four bytes for tick resolution timestamp with valid offset
}
} else {
length += 8 // Use eight bytes for full fidelity time
}
} else {
// Use eight bytes for full fidelity time
length += 8
}

return length
}

// GetTimestampC2 gets offset compressed millisecond-resolution 2-byte timestamp.
func (cm *CompactMeasurement) GetTimestampC2() uint16 {
return uint16((cm.TimestampValue() - cm.baseTimeOffsets[cm.timeIndex]) / int64(ticks.PerMillisecond))
}

// GetTimestampC4 gets offset compressed tick-resolution 4-byte timestamp.
func (cm *CompactMeasurement) GetTimestampC4() uint32 {
return uint32(cm.TimestampValue() - cm.baseTimeOffsets[cm.timeIndex])
}

// GetCompactStateFlags gets byte level compact state flags with encoded time index and base time offset bits.
func (cm *CompactMeasurement) GetCompactStateFlags() byte {
// Encode compact state flags
flags := cm.Flags.mapToCompactFlags()

if cm.timeIndex != 0 {
flags |= compactStateFlags.TimeIndex
}

if cm.usingBaseTimeOffset {
flags |= compactStateFlags.BaseTimeOffset
}

return byte(flags)
}

// SetCompactStateFlags sets byte level compact state flags with encoded time index and base time offset bits.
func (cm *CompactMeasurement) SetCompactStateFlags(value byte) {
// Decode compact state flags
flags := compactStateFlagsEnum(value)

cm.Flags = flags.mapToFullFlags()

if (flags & compactStateFlags.TimeIndex) > 0 {
cm.timeIndex = 1
} else {
cm.timeIndex = 0
}

cm.usingBaseTimeOffset = (flags & compactStateFlags.BaseTimeOffset) > 0
Value float32
Timestamp ticks.Ticks
SignalIndex uint32
Flags compactStateFlagsEnum
}

// GetRuntimeID gets the 4-byte run-time signal index for this measurement.
func (cm *CompactMeasurement) GetRuntimeID() int32 {
return cm.signalIndexCache.SignalIndex(cm.SignalID)
}

// SetRuntimeID assigns CompactMeasurement SignalID (UUID) from the specified signalIndex.
func (cm *CompactMeasurement) SetRuntimeID(signalIndex int32) {
cm.SignalID = cm.signalIndexCache.SignalID(signalIndex)
}
// Constructs a CompactMeasurement from the specified byte buffer; returns the measurement and the number of bytes occupied by this measurement.
func NewCompactMeasurement(includeTime, useMillisecondResolution bool, baseTimeOffsets *[2]int64, buffer []byte) (CompactMeasurement, int, error) {
var cm CompactMeasurement

// Decode parses a CompactMeasurement from the specified byte buffer.
func (cm *CompactMeasurement) Decode(buffer []byte) (int, error) {
if len(buffer) < fixedLength {
return 0, errors.New("not enough buffer available to deserialize compact measurement")
if len(buffer) < 9 {
return cm, 0, errors.New("not enough buffer available to deserialize compact measurement")
}

// Basic Compact Measurement Format:
Expand All @@ -257,51 +147,53 @@ func (cm *CompactMeasurement) Decode(buffer []byte) (int, error) {
// ID 4
// Value 4
// [Time] 0/2/4/8
var index int

// Decode state flags
cm.SetCompactStateFlags(buffer[0])
index++
cm.Flags = compactStateFlagsEnum(buffer[0])
cm.SignalIndex = binary.BigEndian.Uint32(buffer[1:5])
cm.Value = math.Float32frombits(binary.BigEndian.Uint32(buffer[5:9]))

// Decode runtime ID
cm.SetRuntimeID(int32(binary.BigEndian.Uint32(buffer[index:])))
index += 4

// Decode value
cm.Value = float64(math.Float32frombits(binary.BigEndian.Uint32(buffer[index:])))
index += 4

if !cm.includeTime {
return index, nil
if !includeTime {
return cm, 9, nil
}

if cm.usingBaseTimeOffset {
baseTimeOffset := cm.baseTimeOffsets[cm.timeIndex]

if cm.useMillisecondResolution {
if (cm.Flags & compactStateFlags.BaseTimeOffset) != 0 {
timeIndex := (cm.Flags & compactStateFlags.TimeIndex) >> 7
baseTimeOffset := baseTimeOffsets[timeIndex]
if useMillisecondResolution {
// Decode 2-byte millisecond offset timestamp
if baseTimeOffset > 0 {
cm.Timestamp = ticks.Ticks(baseTimeOffset + int64(binary.BigEndian.Uint16(buffer[index:]))*int64(ticks.PerMillisecond))
cm.Timestamp = ticks.Ticks(baseTimeOffset + int64(binary.BigEndian.Uint16(buffer[9:11]))*int64(ticks.PerMillisecond))
}
index += 2
return cm, 11, nil
} else {
// Decode 4-byte tick offset timestamp
if baseTimeOffset > 0 {
cm.Timestamp = ticks.Ticks(baseTimeOffset + int64(binary.BigEndian.Uint32(buffer[index:])))
cm.Timestamp = ticks.Ticks(baseTimeOffset + int64(binary.BigEndian.Uint32(buffer[9:13])))
}
index += 4
return cm, 13, nil
}
} else {
// Decode 8-byte full fidelity timestamp
// Note that only a full fidelity timestamp can carry leap second flags
cm.Timestamp = ticks.Ticks(binary.BigEndian.Uint64(buffer[index:]))
index += 8
cm.Timestamp = ticks.Ticks(binary.BigEndian.Uint64(buffer[9:17]))
return cm, 17, nil
}
}

return index, nil
// Compute the full measurement from the compact representation
func (cm *CompactMeasurement) Expand(signalIndexCache *SignalIndexCache) Measurement {
return Measurement{
SignalID: signalIndexCache.SignalID(int32(cm.SignalIndex)),
Timestamp: cm.Timestamp,
Value: float64(cm.Value),
Flags: cm.Flags.mapToFullFlags(),
}
}

//// Encode serializes a CompactMeasurement to a byte buffer for publication to a DataSubscriber.
//func (cm *CompactMeasurement) Encode(buffer []byte) {
// // TODO: This will be needed by DataPublisher implementation
//}
// // Serializes a CompactMeasurement to a byte buffer for publication to a DataSubscriber.
func (cm *CompactMeasurement) Marshal(b []byte) {
b[0] = byte(cm.Flags)
binary.BigEndian.PutUint32(b[1:], cm.SignalIndex)
binary.BigEndian.PutUint32(b[5:], math.Float32bits(float32(cm.Value)))
binary.BigEndian.PutUint64(b[9:], uint64(cm.Timestamp))
}
8 changes: 3 additions & 5 deletions sttp/transport/DataSubscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -1353,21 +1353,19 @@ func (ds *DataSubscriber) parseCompactMeasurements(signalIndexCache *SignalIndex

useMillisecondResolution := ds.subscription.UseMillisecondResolution
includeTime := ds.subscription.IncludeTime
index := 0

for i := 0; i < len(measurements); i++ {
// Deserialize compact measurement format
compactMeasurement := NewCompactMeasurement(signalIndexCache, includeTime, useMillisecondResolution, &ds.baseTimeOffsets)
bytesDecoded, err := compactMeasurement.Decode(data[index:])
cm, n, err := NewCompactMeasurement(includeTime, useMillisecondResolution, &ds.baseTimeOffsets, data)

if err != nil {
ds.dispatchErrorMessage("Failed to parse compact measurements - disconnecting: " + err.Error())
ds.dispatchConnectionTerminated()
return
}

index += bytesDecoded
measurements[i] = compactMeasurement.Measurement
data = data[n:]
measurements[i] = cm.Expand(signalIndexCache)
}
}

Expand Down
14 changes: 7 additions & 7 deletions sttp/transport/SignalIndexCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ import (
// SignalIndexCache maps 32-bit runtime IDs to 128-bit globally unique Measurement IDs. The structure
// additionally provides reverse lookup and an extra mapping to human-readable measurement keys.
type SignalIndexCache struct {
reference map[int32]uint32
signalIDList []guid.Guid
sourceList []string
idList []uint64
signalIDCache map[guid.Guid]int32
binaryLength uint32
tsscDecoder *tssc.Decoder
reference map[int32]uint32
signalIDList []guid.Guid
sourceList []string
idList []uint64
signalIDCache map[guid.Guid]int32
binaryLength uint32
tsscDecoder *tssc.Decoder
}

// NewSignalIndexCache makes a new SignalIndexCache
Expand Down