Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions examples/vucm/ats3/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
## Setup

- Install Docker or Docker Desktop
- Create Standalone Device on your site in Enapter Cloud
- Generate config for your Standalone device
- Copy it to `env.txt` file as `ENAPTER_VUCM_BLOB` value
- Verify that your LabVIEW program sends data as JSON over TCP connection. Only one message per connection is allowed.
- Take number of TCP port, to which data is being sent from LabVIEW and set it to `LISTEN_TCP_PORT` variable in `env.list`.

## Run
- Copy `*_run.sh` script to directory with `env.txt`
- Open Terminal in Docker Desktop. Change working directory to the same as in previous step.
- `./*_run.sh`


## Development
1. Run `*_build.sh`
2. `docker push ...`
2. Notify colleagues to pull the latest image
13 changes: 13 additions & 0 deletions examples/vucm/ats3/ats3.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
FROM python:3.11-alpine3.22

WORKDIR /app

RUN apk add build-base

RUN python -m venv .venv
COPY requirements.txt requirements.txt
RUN .venv/bin/pip install -r requirements.txt

COPY script.py script.py

CMD [".venv/bin/python", "script.py"]
14 changes: 14 additions & 0 deletions examples/vucm/ats3/backup.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM python:3.11-alpine3.22

WORKDIR /app

RUN apk add build-base

RUN python -m venv .venv
COPY requirements.txt requirements.txt
RUN .venv/bin/pip install -r requirements.txt

COPY backup.py backup.py
COPY files/ files/

CMD [".venv/bin/python", "backup.py"]
76 changes: 76 additions & 0 deletions examples/vucm/ats3/backup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import asyncio
import csv
import functools
import glob
import os
from datetime import datetime
from zoneinfo import ZoneInfo

import enapter


async def main():
csv_files = sorted(glob.glob("files/*.csv"))
print("files", csv_files)
device_factory = functools.partial(
CSVBackup, csv_files=csv_files, timezone=os.getenv("TIMEZONE", "Europe/Berlin"),)
await enapter.vucm.run(device_factory)


class CSVBackup(enapter.vucm.Device):
def __init__(self, csv_files, timezone, **kwargs):
super().__init__(**kwargs)
self.csv_files = csv_files
self.timezone = timezone

async def task_send_csv_telemetry(self):
"""
Read CSV file line by line, send each row as telemetry every second.
"""
read_csv_files = 0
while True:
if read_csv_files == len(self.csv_files):
break
for f in self.csv_files:
await self.log.info(f"reading file: {f}")
try:
with open(f, newline="") as csv_file:
reader = csv.DictReader(csv_file)
headers = reader.fieldnames or []
for row in reader:
telemetry = {}
telemetry["status"] = "ok"
for key in headers:
value = row.get(key)
if key in ("Date", "Time"):
telemetry[key] = value
else:
telemetry[key] = float(value) if value != "" else None
await self.log.info(f"read {key}: {telemetry[key]}")
self._add_timestamp_if_present(telemetry)
await self.send_telemetry(telemetry)
await asyncio.sleep(1)
except Exception as e:
await self.log.error(f"Failed to read CSV: {e}")
await asyncio.sleep(5)
finally:
read_csv_files += 1
await self.log.info(f"finished reading file: {f}")
break
await self.log.info(f"all read")

def _add_timestamp_if_present(self, telemetry):
"""If 'Date' and 'Time' are present, combine and convert to timestamp."""
date_str = telemetry.get("Date")
time_str = telemetry.get("Time")
if date_str and time_str:
dt_str = f"{date_str} {time_str}"
naive_dt = datetime.strptime(dt_str, "%d/%m/%Y %H:%M:%S")
tz_aware_dt = naive_dt.replace(tzinfo=ZoneInfo(self.timezone))
telemetry["timestamp"] = int(tz_aware_dt.timestamp())
telemetry.pop("Date")
telemetry.pop("Time")


if __name__ == "__main__":
asyncio.run(main())
9 changes: 9 additions & 0 deletions examples/vucm/ats3/backup_build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/bash

set -euo pipefail
IFS=$'\n\t'

IMAGE_TAG="tyenap/ats3-backup:latest"
SCRIPT_DIR="$(realpath "$(dirname "$0")")"

docker build --file backup.Dockerfile --tag "$IMAGE_TAG" "$SCRIPT_DIR"
13 changes: 13 additions & 0 deletions examples/vucm/ats3/backup_run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash

set -euo pipefail
IFS=$'\n\t'

IMAGE_TAG="tyenap/ats3-backup:latest"

docker run --rm -it \
--name "ats3" \
--network host \
--env-file env.txt \
-e ENAPTER_LOG_LEVEL="${ENAPTER_LOG_LEVEL:-info}" \
"$IMAGE_TAG"
9 changes: 9 additions & 0 deletions examples/vucm/ats3/docker_build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/bash

set -euo pipefail
IFS=$'\n\t'

IMAGE_TAG="tyenap/ats3:latest"
SCRIPT_DIR="$(realpath "$(dirname "$0")")"

docker build --file ats3.Dockerfile --tag "$IMAGE_TAG" "$SCRIPT_DIR"
13 changes: 13 additions & 0 deletions examples/vucm/ats3/docker_run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash

set -euo pipefail
IFS=$'\n\t'

IMAGE_TAG="tyenap/ats3:latest"

docker run --rm -it \
--name "ni-daq" \
--network host \
--env-file env.txt \
-e ENAPTER_LOG_LEVEL="${ENAPTER_LOG_LEVEL:-info}" \
"$IMAGE_TAG"
104 changes: 104 additions & 0 deletions examples/vucm/ats3/manifest.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
blueprint_spec: "device/1.0"

display_name: ATS3 stack

communication_module:
product: ENP-VIRTUAL

properties:
vendor:
display_name: Vendor
type: string
model:
display_name: Model
type: string

alerts:
parse_error:
display_name: Data processing failed
severity: error
telemetry:
status:
display_name: Status
type: string
enum:
- ok
- error
- no_data
T1:
display_name: T1
type: float
T2:
display_name: T2
type: float
T3:
display_name: T2
type: float
Current:
display_name: Current
type: float
PSU:
display_name: Current
type: float
P1:
display_name: P1
type: float
P2:
display_name: P2
type: float
P3:
display_name: P3
type: float
Flow:
display_name: Flow
type: float
Conductivity:
display_name: Conductivity
type: float
MFMH2:
display_name: MFMH2
type: float
Theoretical_h2:
display_name: MFMH2
type: float
MCM02:
display_name: MCM02
type: float
Refilling:
display_name: Refilling
type: float
PC:
display_name: PC
type: float
C1:
display_name: Cell 1
type: float
C2:
display_name: Cell 2
type: float
C3:
display_name: Cell 3
type: float
C4:
display_name: Cell 4
type: float
C5:
display_name: Cell 5
type: float
C6:
display_name: Cell 6
type: float
C7:
display_name: Cell 7
type: float
C8:
display_name: Cell 8
type: float
C9:
display_name: Cell 9
type: float
C10:
display_name: Cell 10
type: float

commands: {}
2 changes: 2 additions & 0 deletions examples/vucm/ats3/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
enapter==0.9.2
python-dateutil==2.8.2
100 changes: 100 additions & 0 deletions examples/vucm/ats3/script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import asyncio
import functools
import json
import os
import socket
from datetime import datetime
from zoneinfo import ZoneInfo

import enapter


async def main():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
sock.bind(("127.0.0.1", int(os.environ["LISTEN_TCP_PORT"])))
sock.listen()
sock.setblocking(False)
device_factory = functools.partial(
ATS3Stack,
socket=sock,
timezone=os.getenv("TIMEZONE", "Europe/Berlin"),
)
await enapter.vucm.run(device_factory)


class ATS3Stack(enapter.vucm.Device):
def __init__(self, socket, timezone, **kwargs):
super().__init__(**kwargs)
self.socket = socket
self.timezone = timezone

async def task_accept_conns(self):
"""
Accept incoming TCP connections in a loop and spawn a handler task for each connection.
"""
async with asyncio.TaskGroup() as tg:
while True:
conn, addr = await asyncio.get_event_loop().sock_accept(self.socket)
tg.create_task(self.handle_conn(conn, addr))

async def handle_conn(self, conn, addr):
"""
Handle a single TCP connection: read data with a timeout, process and send telemetry.
"""
data = bytearray()
try:
while True:
try:
async with asyncio.timeout(5):
chunk = await asyncio.get_event_loop().sock_recv(conn, 1024)
except TimeoutError:
await self.log.error(f"{addr}: connection timeout", True)
return
if chunk:
data.extend(chunk)
continue
await self.log.debug(f"{addr}: read data: {data}")
await self._process_and_send_telemetry(data)
return
finally:
conn.close()

async def task_properties_sender(self):
"""Periodically send device properties."""
while True:
await self.send_properties(
{"vendor": "National Instruments", "model": "cDAQ 9178"}
)
await asyncio.sleep(10)

async def _process_and_send_telemetry(self, data):
"""Parse, enrich, and send telemetry data."""
telemetry = {}
status = "no_data"
try:
if data:
status = "ok"
telemetry = json.loads(data.decode())
self._add_timestamp_if_present(telemetry)
telemetry["status"] = status
await self.send_telemetry(telemetry)
self.alerts.clear()
except Exception as e:
self.alerts.add("parse_error")
await self.log.error(f"Failed to process data: {e}")

def _add_timestamp_if_present(self, telemetry):
"""If 'Date' and 'Time' are present, combine and convert to timestamp."""
date_str = telemetry.get("Date")
time_str = telemetry.get("Time")
if date_str and time_str:
dt_str = f"{date_str} {time_str}"
naive_dt = datetime.strptime(dt_str, "%d/%m/%Y %H:%M:%S")
tz_aware_dt = naive_dt.replace(tzinfo=ZoneInfo(self.timezone))
telemetry["timestamp"] = int(tz_aware_dt.timestamp())
telemetry.pop("Date")
telemetry.pop("Time")

if __name__ == "__main__":
asyncio.run(main())
Loading