|
| 1 | +"""Modbus client async unix domain socket communication.""" |
| 2 | +import asyncio |
| 3 | +import logging |
| 4 | +import select |
| 5 | +import socket |
| 6 | +import time |
| 7 | +import typing |
| 8 | + |
| 9 | +from pymodbus.client.base import ModbusBaseClient, ModbusClientProtocol |
| 10 | +from pymodbus.constants import Defaults |
| 11 | +from pymodbus.exceptions import ConnectionException |
| 12 | +from pymodbus.framer import ModbusFramer |
| 13 | +from pymodbus.framer.socket_framer import ModbusSocketFramer |
| 14 | +from pymodbus.utilities import ModbusTransactionState |
| 15 | + |
| 16 | + |
| 17 | +_logger = logging.getLogger(__name__) |
| 18 | + |
| 19 | + |
| 20 | +class AsyncModbusUnixClient(ModbusBaseClient): |
| 21 | + """**AsyncModbusUnixClient**. |
| 22 | +
|
| 23 | + :param path: unix domain socket path |
| 24 | + :param framer: (optional) Framer class |
| 25 | + :param kwargs: (optional) Experimental parameters |
| 26 | +
|
| 27 | + Example:: |
| 28 | +
|
| 29 | + from pymodbus.client import AsyncModbusUnixClient |
| 30 | +
|
| 31 | + async def run(): |
| 32 | + client = AsyncModbusTcpClient("/tmp/unix_socket") |
| 33 | +
|
| 34 | + await client.connect() |
| 35 | + ... |
| 36 | + await client.close() |
| 37 | + """ |
| 38 | + |
| 39 | + def __init__( |
| 40 | + self, |
| 41 | + path: str, |
| 42 | + framer: ModbusFramer = ModbusSocketFramer, |
| 43 | + **kwargs: any, |
| 44 | + ) -> None: |
| 45 | + """Initialize Asyncio Modbus TCP Client.""" |
| 46 | + self.protocol = None |
| 47 | + super().__init__(framer=framer, **kwargs) |
| 48 | + self.params.host = path |
| 49 | + self.loop = None |
| 50 | + self.connected = False |
| 51 | + self.delay_ms = self.params.reconnect_delay |
| 52 | + |
| 53 | + async def connect(self): # pylint: disable=invalid-overridden-method |
| 54 | + """Initiate connection to start client.""" |
| 55 | + # force reconnect if required: |
| 56 | + self.loop = asyncio.get_running_loop() |
| 57 | + |
| 58 | + txt = f"Connecting to {self.params.host}:{self.params.port}." |
| 59 | + _logger.debug(txt) |
| 60 | + return await self._connect() |
| 61 | + |
| 62 | + async def close(self): # pylint: disable=invalid-overridden-method |
| 63 | + """Stop client.""" |
| 64 | + |
| 65 | + # prevent reconnect: |
| 66 | + self.delay_ms = 0 |
| 67 | + if self.connected: |
| 68 | + if self.protocol.transport: |
| 69 | + self.protocol.transport.close() |
| 70 | + if self.protocol: |
| 71 | + await self.protocol.close() |
| 72 | + await asyncio.sleep(0.1) |
| 73 | + |
| 74 | + def _create_protocol(self): |
| 75 | + """Create initialized protocol instance with factory function.""" |
| 76 | + protocol = ModbusClientProtocol( |
| 77 | + framer=self.params.framer, |
| 78 | + xframer=self.framer, |
| 79 | + timeout=self.params.timeout, |
| 80 | + retries=self.params.retries, |
| 81 | + retry_on_empty=self.params.retry_on_empty, |
| 82 | + close_comm_on_error=self.params.close_comm_on_error, |
| 83 | + strict=self.params.strict, |
| 84 | + broadcast_enable=self.params.broadcast_enable, |
| 85 | + reconnect_delay=self.params.reconnect_delay, |
| 86 | + reconnect_delay_max=self.params.reconnect_delay_max, |
| 87 | + **self.params.kwargs, |
| 88 | + ) |
| 89 | + protocol.factory = self |
| 90 | + return protocol |
| 91 | + |
| 92 | + async def _connect(self): |
| 93 | + """Connect.""" |
| 94 | + _logger.debug("Connecting.") |
| 95 | + try: |
| 96 | + transport, protocol = await asyncio.wait_for( |
| 97 | + self.loop.create_unix_connection( |
| 98 | + self._create_protocol, path=self.params.host), |
| 99 | + timeout=self.params.timeout, |
| 100 | + ) |
| 101 | + except Exception as exc: # pylint: disable=broad-except |
| 102 | + txt = f"Failed to connect: {exc}" |
| 103 | + _logger.warning(txt) |
| 104 | + if self.delay_ms > 0: |
| 105 | + asyncio.ensure_future(self._reconnect()) |
| 106 | + else: |
| 107 | + txt = f"Connected to {self.params.host}:{self.params.port}." |
| 108 | + _logger.info(txt) |
| 109 | + self.reset_delay() |
| 110 | + return transport, protocol |
| 111 | + |
| 112 | + def protocol_made_connection(self, protocol): |
| 113 | + """Notify successful connection.""" |
| 114 | + _logger.info("Protocol made connection.") |
| 115 | + if not self.connected: |
| 116 | + self.connected = True |
| 117 | + self.protocol = protocol |
| 118 | + else: |
| 119 | + _logger.error("Factory protocol connect callback called while connected.") |
| 120 | + |
| 121 | + def protocol_lost_connection(self, protocol): |
| 122 | + """Notify lost connection.""" |
| 123 | + _logger.info("Protocol lost connection.") |
| 124 | + if protocol is not self.protocol: |
| 125 | + _logger.error( |
| 126 | + "Factory protocol callback called from unexpected protocol instance." |
| 127 | + ) |
| 128 | + |
| 129 | + self.connected = False |
| 130 | + if self.protocol is not None: |
| 131 | + del self.protocol |
| 132 | + self.protocol = None |
| 133 | + if self.delay_ms > 0: |
| 134 | + asyncio.ensure_future(self._reconnect()) |
| 135 | + |
| 136 | + async def _reconnect(self): |
| 137 | + """Reconnect.""" |
| 138 | + txt = f"Waiting {self.delay_ms} ms before next connection attempt." |
| 139 | + _logger.debug(txt) |
| 140 | + await asyncio.sleep(self.delay_ms / 1000) |
| 141 | + self.delay_ms = min(2 * self.delay_ms, self.params.reconnect_delay_max) |
| 142 | + |
| 143 | + return await self._connect() |
0 commit comments