diff --git a/examples/vucm/ats3/README.md b/examples/vucm/ats3/README.md new file mode 100644 index 0000000..bed2178 --- /dev/null +++ b/examples/vucm/ats3/README.md @@ -0,0 +1,19 @@ +## Setup + +- Install Docker or Docker Desktop +- Create Standalone Device on your site in Enapter Cloud +- Generate config for your Standalone device +- Copy it to `env.txt` file as `ENAPTER_VUCM_BLOB` value +- Verify that your LabVIEW program sends data as JSON over TCP connection. Only one message per connection is allowed. +- Take number of TCP port, to which data is being sent from LabVIEW and set it to `LISTEN_TCP_PORT` variable in `env.list`. + +## Run +- Copy `*_run.sh` script to directory with `env.txt` +- Open Terminal in Docker Desktop. Change working directory to the same as in previous step. +- `./*_run.sh` + + +## Development +1. Run `*_build.sh` +2. `docker push ...` +2. Notify colleagues to pull the latest image \ No newline at end of file diff --git a/examples/vucm/ats3/ats3.Dockerfile b/examples/vucm/ats3/ats3.Dockerfile new file mode 100644 index 0000000..296cfb0 --- /dev/null +++ b/examples/vucm/ats3/ats3.Dockerfile @@ -0,0 +1,13 @@ +FROM python:3.11-alpine3.22 + +WORKDIR /app + +RUN apk add build-base + +RUN python -m venv .venv +COPY requirements.txt requirements.txt +RUN .venv/bin/pip install -r requirements.txt + +COPY script.py script.py + +CMD [".venv/bin/python", "script.py"] diff --git a/examples/vucm/ats3/backup.Dockerfile b/examples/vucm/ats3/backup.Dockerfile new file mode 100644 index 0000000..7b1639a --- /dev/null +++ b/examples/vucm/ats3/backup.Dockerfile @@ -0,0 +1,14 @@ +FROM python:3.11-alpine3.22 + +WORKDIR /app + +RUN apk add build-base + +RUN python -m venv .venv +COPY requirements.txt requirements.txt +RUN .venv/bin/pip install -r requirements.txt + +COPY backup.py backup.py +COPY files/ files/ + +CMD [".venv/bin/python", "backup.py"] diff --git a/examples/vucm/ats3/backup.py b/examples/vucm/ats3/backup.py new file mode 100644 index 0000000..d58d615 --- /dev/null +++ b/examples/vucm/ats3/backup.py @@ -0,0 +1,76 @@ +import asyncio +import csv +import functools +import glob +import os +from datetime import datetime +from zoneinfo import ZoneInfo + +import enapter + + +async def main(): + csv_files = sorted(glob.glob("files/*.csv")) + print("files", csv_files) + device_factory = functools.partial( + CSVBackup, csv_files=csv_files, timezone=os.getenv("TIMEZONE", "Europe/Berlin"),) + await enapter.vucm.run(device_factory) + + +class CSVBackup(enapter.vucm.Device): + def __init__(self, csv_files, timezone, **kwargs): + super().__init__(**kwargs) + self.csv_files = csv_files + self.timezone = timezone + + async def task_send_csv_telemetry(self): + """ + Read CSV file line by line, send each row as telemetry every second. + """ + read_csv_files = 0 + while True: + if read_csv_files == len(self.csv_files): + break + for f in self.csv_files: + await self.log.info(f"reading file: {f}") + try: + with open(f, newline="") as csv_file: + reader = csv.DictReader(csv_file) + headers = reader.fieldnames or [] + for row in reader: + telemetry = {} + telemetry["status"] = "ok" + for key in headers: + value = row.get(key) + if key in ("Date", "Time"): + telemetry[key] = value + else: + telemetry[key] = float(value) if value != "" else None + await self.log.info(f"read {key}: {telemetry[key]}") + self._add_timestamp_if_present(telemetry) + await self.send_telemetry(telemetry) + await asyncio.sleep(1) + except Exception as e: + await self.log.error(f"Failed to read CSV: {e}") + await asyncio.sleep(5) + finally: + read_csv_files += 1 + await self.log.info(f"finished reading file: {f}") + break + await self.log.info(f"all read") + + def _add_timestamp_if_present(self, telemetry): + """If 'Date' and 'Time' are present, combine and convert to timestamp.""" + date_str = telemetry.get("Date") + time_str = telemetry.get("Time") + if date_str and time_str: + dt_str = f"{date_str} {time_str}" + naive_dt = datetime.strptime(dt_str, "%d/%m/%Y %H:%M:%S") + tz_aware_dt = naive_dt.replace(tzinfo=ZoneInfo(self.timezone)) + telemetry["timestamp"] = int(tz_aware_dt.timestamp()) + telemetry.pop("Date") + telemetry.pop("Time") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/vucm/ats3/backup_build.sh b/examples/vucm/ats3/backup_build.sh new file mode 100755 index 0000000..4479646 --- /dev/null +++ b/examples/vucm/ats3/backup_build.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +set -euo pipefail +IFS=$'\n\t' + +IMAGE_TAG="tyenap/ats3-backup:latest" +SCRIPT_DIR="$(realpath "$(dirname "$0")")" + +docker build --file backup.Dockerfile --tag "$IMAGE_TAG" "$SCRIPT_DIR" diff --git a/examples/vucm/ats3/backup_run.sh b/examples/vucm/ats3/backup_run.sh new file mode 100755 index 0000000..479c0d8 --- /dev/null +++ b/examples/vucm/ats3/backup_run.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +set -euo pipefail +IFS=$'\n\t' + +IMAGE_TAG="tyenap/ats3-backup:latest" + +docker run --rm -it \ + --name "ats3" \ + --network host \ + --env-file env.txt \ + -e ENAPTER_LOG_LEVEL="${ENAPTER_LOG_LEVEL:-info}" \ + "$IMAGE_TAG" diff --git a/examples/vucm/ats3/docker_build.sh b/examples/vucm/ats3/docker_build.sh new file mode 100755 index 0000000..9147cc3 --- /dev/null +++ b/examples/vucm/ats3/docker_build.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +set -euo pipefail +IFS=$'\n\t' + +IMAGE_TAG="tyenap/ats3:latest" +SCRIPT_DIR="$(realpath "$(dirname "$0")")" + +docker build --file ats3.Dockerfile --tag "$IMAGE_TAG" "$SCRIPT_DIR" diff --git a/examples/vucm/ats3/docker_run.sh b/examples/vucm/ats3/docker_run.sh new file mode 100755 index 0000000..6d107e9 --- /dev/null +++ b/examples/vucm/ats3/docker_run.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +set -euo pipefail +IFS=$'\n\t' + +IMAGE_TAG="tyenap/ats3:latest" + +docker run --rm -it \ + --name "ni-daq" \ + --network host \ + --env-file env.txt \ + -e ENAPTER_LOG_LEVEL="${ENAPTER_LOG_LEVEL:-info}" \ + "$IMAGE_TAG" diff --git a/examples/vucm/ats3/manifest.yml b/examples/vucm/ats3/manifest.yml new file mode 100644 index 0000000..ecdb487 --- /dev/null +++ b/examples/vucm/ats3/manifest.yml @@ -0,0 +1,104 @@ +blueprint_spec: "device/1.0" + +display_name: ATS3 stack + +communication_module: + product: ENP-VIRTUAL + +properties: + vendor: + display_name: Vendor + type: string + model: + display_name: Model + type: string + +alerts: + parse_error: + display_name: Data processing failed + severity: error +telemetry: + status: + display_name: Status + type: string + enum: + - ok + - error + - no_data + T1: + display_name: T1 + type: float + T2: + display_name: T2 + type: float + T3: + display_name: T2 + type: float + Current: + display_name: Current + type: float + PSU: + display_name: Current + type: float + P1: + display_name: P1 + type: float + P2: + display_name: P2 + type: float + P3: + display_name: P3 + type: float + Flow: + display_name: Flow + type: float + Conductivity: + display_name: Conductivity + type: float + MFMH2: + display_name: MFMH2 + type: float + Theoretical_h2: + display_name: MFMH2 + type: float + MCM02: + display_name: MCM02 + type: float + Refilling: + display_name: Refilling + type: float + PC: + display_name: PC + type: float + C1: + display_name: Cell 1 + type: float + C2: + display_name: Cell 2 + type: float + C3: + display_name: Cell 3 + type: float + C4: + display_name: Cell 4 + type: float + C5: + display_name: Cell 5 + type: float + C6: + display_name: Cell 6 + type: float + C7: + display_name: Cell 7 + type: float + C8: + display_name: Cell 8 + type: float + C9: + display_name: Cell 9 + type: float + C10: + display_name: Cell 10 + type: float + +commands: {} diff --git a/examples/vucm/ats3/requirements.txt b/examples/vucm/ats3/requirements.txt new file mode 100644 index 0000000..9d11155 --- /dev/null +++ b/examples/vucm/ats3/requirements.txt @@ -0,0 +1,2 @@ +enapter==0.9.2 +python-dateutil==2.8.2 diff --git a/examples/vucm/ats3/script.py b/examples/vucm/ats3/script.py new file mode 100644 index 0000000..e353d54 --- /dev/null +++ b/examples/vucm/ats3/script.py @@ -0,0 +1,100 @@ +import asyncio +import functools +import json +import os +import socket +from datetime import datetime +from zoneinfo import ZoneInfo + +import enapter + + +async def main(): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) + sock.bind(("127.0.0.1", int(os.environ["LISTEN_TCP_PORT"]))) + sock.listen() + sock.setblocking(False) + device_factory = functools.partial( + ATS3Stack, + socket=sock, + timezone=os.getenv("TIMEZONE", "Europe/Berlin"), + ) + await enapter.vucm.run(device_factory) + + +class ATS3Stack(enapter.vucm.Device): + def __init__(self, socket, timezone, **kwargs): + super().__init__(**kwargs) + self.socket = socket + self.timezone = timezone + + async def task_accept_conns(self): + """ + Accept incoming TCP connections in a loop and spawn a handler task for each connection. + """ + async with asyncio.TaskGroup() as tg: + while True: + conn, addr = await asyncio.get_event_loop().sock_accept(self.socket) + tg.create_task(self.handle_conn(conn, addr)) + + async def handle_conn(self, conn, addr): + """ + Handle a single TCP connection: read data with a timeout, process and send telemetry. + """ + data = bytearray() + try: + while True: + try: + async with asyncio.timeout(5): + chunk = await asyncio.get_event_loop().sock_recv(conn, 1024) + except TimeoutError: + await self.log.error(f"{addr}: connection timeout", True) + return + if chunk: + data.extend(chunk) + continue + await self.log.debug(f"{addr}: read data: {data}") + await self._process_and_send_telemetry(data) + return + finally: + conn.close() + + async def task_properties_sender(self): + """Periodically send device properties.""" + while True: + await self.send_properties( + {"vendor": "National Instruments", "model": "cDAQ 9178"} + ) + await asyncio.sleep(10) + + async def _process_and_send_telemetry(self, data): + """Parse, enrich, and send telemetry data.""" + telemetry = {} + status = "no_data" + try: + if data: + status = "ok" + telemetry = json.loads(data.decode()) + self._add_timestamp_if_present(telemetry) + telemetry["status"] = status + await self.send_telemetry(telemetry) + self.alerts.clear() + except Exception as e: + self.alerts.add("parse_error") + await self.log.error(f"Failed to process data: {e}") + + def _add_timestamp_if_present(self, telemetry): + """If 'Date' and 'Time' are present, combine and convert to timestamp.""" + date_str = telemetry.get("Date") + time_str = telemetry.get("Time") + if date_str and time_str: + dt_str = f"{date_str} {time_str}" + naive_dt = datetime.strptime(dt_str, "%d/%m/%Y %H:%M:%S") + tz_aware_dt = naive_dt.replace(tzinfo=ZoneInfo(self.timezone)) + telemetry["timestamp"] = int(tz_aware_dt.timestamp()) + telemetry.pop("Date") + telemetry.pop("Time") + +if __name__ == "__main__": + asyncio.run(main())