diff --git a/examples/server_simulator.py b/examples/server_simulator.py index 679a0b632..7ea31a05c 100755 --- a/examples/server_simulator.py +++ b/examples/server_simulator.py @@ -3,7 +3,7 @@ An example of using simulator datastore with json interface. -usage: server_simulator.py [-h] [--path JSON_FILE] +usage: server_simulator.py [-h] [--log {critical,error,warning,info,debug}] [--port PORT] @@ -11,7 +11,6 @@ options: -h, --help show this help message and exit - --json JSON_FILE path to json device configuration file --log {critical,error,warning,info,debug} "critical", "error", "warning", "info" or "debug" --port PORT the port to use @@ -31,6 +30,86 @@ _logger = logging.getLogger() +demo_config = { + "setup": { + "co size": 100, + "di size": 150, + "hr size": 200, + "ir size": 250, + "shared blocks": True, + "type exception": False, + "defaults": { + "value": { + "bits": 0x0708, + "uint16": 1, + "uint32": 45000, + "float32": 127.4, + "string": "X", + }, + "action": { + "bits": None, + "uint16": None, + "uint32": None, + "float32": None, + "string": None, + }, + }, + }, + "invalid": [ + 1, + [3, 4], + ], + "write": [ + 5, + [7, 8], + [16, 18], + [21, 26], + [31, 36], + ], + "bits": [ + 5, + [7, 8], + {"addr": 10, "value": 0x81}, + {"addr": [11, 12], "value": 0x04342}, + {"addr": 13, "action": "reset"}, + {"addr": 14, "value": 15, "action": "reset"}, + ], + "uint16": [ + {"addr": 16, "value": 3124}, + {"addr": [17, 18], "value": 5678}, + {"addr": [19, 20], "value": 14661, "action": "increment"}, + ], + "uint32": [ + {"addr": 21, "value": 3124}, + {"addr": [23, 25], "value": 5678}, + {"addr": [27, 29], "value": 345000, "action": "increment"}, + ], + "float32": [ + {"addr": 31, "value": 3124.17}, + {"addr": [33, 35], "value": 5678.19}, + {"addr": [37, 39], "value": 345000.18, "action": "increment"}, + ], + "string": [ + {"addr": [41, 42], "value": "Str"}, + {"addr": [43, 44], "value": "Strxyz"}, + ], + "repeat": [{"addr": [0, 45], "to": [46, 138]}], +} + + +def custom_action1(_inx, _cell): + """Test action.""" + + +def custom_action2(_inx, _cell): + """Test action.""" + + +demo_actions = { + "custom1": custom_action1, + "custom2": custom_action2, +} + def get_commandline(): """Read and validate command line arguments""" @@ -43,12 +122,6 @@ def get_commandline(): type=str, ) parser.add_argument("--port", help="set port", type=str, default="5020") - parser.add_argument( - "--json", - help="path to json device configuration file", - default=None, - type=str, - ) args = parser.parse_args() pymodbus_apply_logging_config() @@ -58,15 +131,14 @@ def get_commandline(): return args -def setup_simulator(args, json_dict=None): +def setup_simulator(args, setup=None, actions=None): """Run server setup.""" _logger.info("### Create datastore") - context = ModbusSimulatorContext() - - if args.json: - context.load_file(args.json, None) - else: - context.load_dict(json_dict, None) + if not setup: + setup = demo_config + if not actions: + actions = demo_actions + context = ModbusSimulatorContext(setup, actions) args.context = ModbusServerContext(slaves=context, single=True) return args diff --git a/pymodbus/datastore/simulator.py b/pymodbus/datastore/simulator.py index 98dd155b5..b01e1c392 100755 --- a/pymodbus/datastore/simulator.py +++ b/pymodbus/datastore/simulator.py @@ -1,346 +1,687 @@ """Pymodbus ModbusSimulatorContext.""" import dataclasses -import json import logging +import random +import struct import sys -from typing import Callable - -from pymodbus.interfaces import IModbusSlaveContext +from datetime import datetime +from typing import Callable, Dict _logger = logging.getLogger() -CELL_ACCESS_RO = "R" -CELL_ACCESS_RW = "W" -CELL_ACCESS_INVALID = "I" - -CELL_TYPE_UINT16 = "H" +CELL_TYPE_NONE = " " +CELL_TYPE_BIT = "B" +CELL_TYPE_UINT16 = "i" CELL_TYPE_UINT32 = "I" -CELL_TYPE_UINT32_NEXT = "i" -CELL_TYPE_STRING = "C" -CELL_TYPE_STRING_NEXT = "c" -CELL_TYPE_BITS = "B" +CELL_TYPE_FLOAT32 = "F" +CELL_TYPE_STRING = "S" +CELL_TYPE_NEXT = "n" +CELL_TYPE_ILLEGAL = "X" + +WORD_SIZE = 16 @dataclasses.dataclass class Cell: """Handle a single cell.""" - type: int = CELL_TYPE_UINT16 - access: str = CELL_ACCESS_RO + type: int = CELL_TYPE_NONE + access: bool = False value: int = 0 action: Callable = None -class ModbusSimulatorContext(IModbusSlaveContext): - """ModbuSimulatorContext +@dataclasses.dataclass +class Label: # pylint: disable=too-many-instance-attributes + """Defines all dict values. + + :meta private: + """ + + action: str = "action" + addr: str = "addr" + co_size: str = "co size" + defaults: str = "defaults" + di_size: str = "di size" + hr_size: str = "hr size" + increment: str = "increment" + invalid: str = "invalid" + ir_size: str = "ir size" + method: str = "method" + next: str = "next" + random: str = "random" + repeat: str = "repeat" + reset: str = "reset" + setup: str = "setup" + shared_blocks: str = "shared blocks" + timestamp: str = "timestamp" + repeat_to: str = "to" + type: str = "type" + type_bits = "bits" + type_exception: str = "type exception" + type_none: str = "none" + type_uint16: str = "uint16" + type_uint32: str = "uint32" + type_float32: str = "float32" + type_string: str = "string" + value: str = "value" + write: str = "write" + + @classmethod + def try_get(cls, key, config_part): + """Check if entry is present in config.""" + if key not in config_part: + txt = f"ERROR Configuration invalid, missing {key} in {config_part}" + raise RuntimeError(txt) + return config_part[key] + + +class Setup: + """Setup simulator. + + :meta private: + """ + + def __init__(self): + """Initialize.""" + self.config_types = { + Label.type_bits: { + Label.type: CELL_TYPE_BIT, + Label.next: None, + Label.value: 0, + Label.action: None, + Label.method: self.handle_type_bits, + }, + Label.type_uint16: { + Label.type: CELL_TYPE_UINT16, + Label.next: None, + Label.value: 0, + Label.action: None, + Label.method: self.handle_type_uint16, + }, + Label.type_uint32: { + Label.type: CELL_TYPE_UINT32, + Label.next: CELL_TYPE_NEXT, + Label.value: 0, + Label.action: None, + Label.method: self.handle_type_uint32, + }, + Label.type_float32: { + Label.type: CELL_TYPE_FLOAT32, + Label.next: CELL_TYPE_NEXT, + Label.value: 0, + Label.action: None, + Label.method: self.handle_type_float32, + }, + Label.type_string: { + Label.type: CELL_TYPE_STRING, + Label.next: CELL_TYPE_NEXT, + Label.value: 0, + Label.action: None, + Label.method: self.handle_type_string, + }, + } + + def handle_type_bits(self, registers, reg_count, start, stop, value, action): + """Handle type bits. + + :meta private: + """ + for i in range(start, stop): + if i >= reg_count: + raise RuntimeError( + f'Error section "{Label.type_bits}" addr {start}, {stop} out of range' + ) + if registers[i].type != CELL_TYPE_NONE: + txt = f'ERROR Configuration invalid in section "{Label.type_bits}" register {i} already defined' + raise RuntimeError(txt) + registers[i].value = value + registers[i].type = CELL_TYPE_BIT + registers[i].action = action + + def handle_type_uint16(self, registers, reg_count, start, stop, value, action): + """Handle type uint16. + + :meta private: + """ + for i in range(start, stop): + if i >= reg_count: + raise RuntimeError( + f'Error section "{Label.type_uint16}" addr {start}, {stop} out of range' + ) + if registers[i].type != CELL_TYPE_NONE: + txt = f'ERROR Configuration invalid in section "{Label.type_uint16}" register {i} already defined' + raise RuntimeError(txt) + registers[i].value = value + registers[i].type = CELL_TYPE_UINT16 + registers[i].action = action + + def handle_type_uint32(self, registers, reg_count, start, stop, value, action): + """Handle type uint32. + + :meta private: + """ + regs = ModbusSimulatorContext.build_registers_from_value(value, True) + for i in range(start, stop, 2): + if i + 1 >= reg_count: + raise RuntimeError( + f'Error section "{Label.type_uint32}" addr {start}, {stop} out of range' + ) + if registers[i].type != CELL_TYPE_NONE: + txt = f'ERROR Configuration invalid in section "{Label.type_uint32}" register {i} already defined' + raise RuntimeError(txt) + registers[i].value = regs[0] + registers[i].type = CELL_TYPE_UINT32 + registers[i].action = action + j = i + 1 + if registers[j].type != CELL_TYPE_NONE: + txt = f'ERROR Configuration invalid in section "{Label.type_uint32}" register {j} already defined' + raise RuntimeError(txt) + registers[j].value = regs[1] + registers[j].type = CELL_TYPE_NEXT + + def handle_type_float32(self, registers, reg_count, start, stop, value, action): + """Handle type uint32. + + :meta private: + """ + regs = ModbusSimulatorContext.build_registers_from_value(value, False) + for i in range(start, stop, 2): + if i + 1 >= reg_count: + raise RuntimeError( + f'Error section "{Label.type_float32}" addr {start}, {stop} out of range' + ) + if registers[i].type != CELL_TYPE_NONE: + txt = f'ERROR Configuration invalid in section "{Label.type_float32}" register {i} already defined' + raise RuntimeError(txt) + registers[i].value = regs[0] + registers[i].type = CELL_TYPE_FLOAT32 + registers[i].action = action + j = i + 1 + if registers[j].type != CELL_TYPE_NONE: + txt = f'ERROR Configuration invalid in section "{Label.type_float32}" register {j} already defined' + raise RuntimeError(txt) + registers[j].value = regs[1] + registers[j].type = CELL_TYPE_NEXT + + def handle_type_string(self, registers, reg_count, start, stop, value, action): + """Handle type string. + + :meta private: + """ + regs = stop - start + reg_len = regs * 2 + if len(value) > reg_len: + value = value[:reg_len] + else: + value = value.ljust(reg_len) + for i in range(stop - start): + inx = start + i + if i + 1 >= reg_count: + raise RuntimeError( + f'Error section "{Label.type_string}" addr {start}, {stop} out of range' + ) + if registers[inx].type != CELL_TYPE_NONE: + txt = f'ERROR Configuration invalid in section "{Label.type_string}" register {inx} already defined' + raise RuntimeError(txt) + registers[inx].value = value[i * 2 : (i + 1) * 2] + registers[inx].type = CELL_TYPE_STRING + registers[inx].action = action + + def handle_setup_section(self, config, actions): + """Load setup section""" + layout = Label.try_get(Label.setup, config) + offset = { + 1: 0, + 2: 0, + 3: 0, + 4: 0, + 5: 0, + 6: 0, + 15: 0, + 16: 0, + 22: 0, + 23: 0, + } + size_co = Label.try_get(Label.co_size, layout) + size_di = Label.try_get(Label.di_size, layout) + size_hr = Label.try_get(Label.hr_size, layout) + size_ir = Label.try_get(Label.ir_size, layout) + if Label.try_get(Label.shared_blocks, layout): + total_size = 0 + for i in (size_co, size_di, size_hr, size_ir): + if i > total_size: + total_size = i + else: + offset[1] = 0 + offset[5] = 0 + offset[15] = 0 + total_size = size_co + offset[2] = total_size + total_size += size_di + offset[4] = total_size + total_size = size_ir + offset[3] = total_size + offset[6] = total_size + offset[16] = total_size + offset[22] = total_size + offset[23] = total_size + total_size += size_hr + first_cell = Cell() + registers = [dataclasses.replace(first_cell) for i in range(total_size)] + defaults = Label.try_get(Label.defaults, layout) + type_exception = Label.try_get(Label.type_exception, layout) + defaults_value = Label.try_get(Label.value, defaults) + defaults_action = Label.try_get(Label.action, defaults) + for key, entry in self.config_types.items(): + entry[Label.value] = Label.try_get(key, defaults_value) + if (action := Label.try_get(key, defaults_action)) not in actions: + txt = f"ERROR Configuration invalid, illegal action {key} in {defaults_action}" + raise RuntimeError(txt) + entry[Label.action] = action + return registers, offset, type_exception + + def handle_invalid_address(self, registers, reg_count, config): + """Handle invalid address""" + for entry in Label.try_get(Label.invalid, config): + if isinstance(entry, int): + entry = [entry, entry] + for i in range(entry[0], entry[1] + 1): + if i >= reg_count: + raise RuntimeError( + f'Error section "{Label.invalid}" addr {entry} out of range' + ) + if registers[i].type != CELL_TYPE_NONE: + txt = f'ERROR Configuration invalid in section "invalid" register {i} already defined' + raise RuntimeError(txt) + registers[i].type = CELL_TYPE_ILLEGAL + + def handle_write_allowed(self, registers, reg_count, config): + """Handle write allowed""" + for entry in Label.try_get(Label.write, config): + if isinstance(entry, int): + entry = [entry, entry] + for i in range(entry[0], entry[1] + 1): + if i >= reg_count: + raise RuntimeError( + f'Error section "{Label.write}" addr {entry} out of range' + ) + registers[i].access = True + + def handle_types(self, registers, actions, reg_count, config): + """Handle the different types""" + for section, type_entry in self.config_types.items(): + layout = Label.try_get(section, config) + for entry in layout: + if not isinstance(entry, dict): + entry = {Label.addr: entry} + if not isinstance(Label.try_get(Label.addr, entry), list): + entry[Label.addr] = [entry[Label.addr], entry[Label.addr]] + type_entry[Label.method]( + registers, + reg_count, + entry[Label.addr][0], + entry[Label.addr][1] + 1, + entry.get(Label.value, type_entry[Label.value]), + actions[entry.get("action", type_entry[Label.action])], + ) + + def handle_repeat(self, registers, reg_count, config): + """Handle repeat. + + :meta private: + """ + for entry in Label.try_get(Label.repeat, config): + addr = Label.try_get(Label.addr, entry) + copy_start = addr[0] + copy_end = addr[1] + copy_inx = copy_start - 1 + addr_to = Label.try_get(Label.repeat_to, entry) + for inx in range(addr_to[0], addr_to[1] + 1): + copy_inx = copy_start if copy_inx >= copy_end else copy_inx + 1 + if inx >= reg_count: + raise RuntimeError( + f'Error section "{Label.repeat}" entry {entry} out of range' + ) + registers[inx] = dataclasses.replace(registers[copy_inx]) + + def setup(self, config, actions, custom_actions) -> None: + """Load layout from dict with json structure. + + :meta private: + """ + actions[""] = None + actions[None] = None + if custom_actions: + actions.update(custom_actions) + + registers, offset, typ_exc = self.handle_setup_section(config, actions) + reg_count = len(registers) + self.handle_invalid_address(registers, reg_count, config) + self.handle_write_allowed(registers, reg_count, config) + self.handle_types(registers, actions, reg_count, config) + self.handle_repeat(registers, reg_count, config) + for i in range(reg_count): + if registers[i].type == CELL_TYPE_NONE: + registers[i].type = CELL_TYPE_ILLEGAL + + return (registers, offset, typ_exc, reg_count) + - loads a memory configuration from a json file - (see examples/simulator.py for details) and prepares a - simulation of a device. +class ModbusSimulatorContext: + """Modbus simulator - The integration is simple:: + :param config: A dict with structure as shown below. + :param actions: A dict with "": structure. + :raises RuntimeError: if json contains errors (msg explains what) - store = ModbusSimulatorContext() + It builds and maintains a virtual copy of a device, with simulation of + device specific functions. - store.load_file(, ) - # or - store.load_dict(, ) + The device is described in a dict, user supplied actions will + be added to the builtin actions. - StartAsyncTcpServer(context=store) + It is used in conjunction with a pymodbus server. + + Example:: + + store = ModbusSimulatorContext(, ) + StartAsyncTcpServer(, context=store) Now the server will simulate the defined device with features like: - invalid addresses - write protected addresses - optional control of access for string, uint32, bit/bits - - optional automatic value increment by each read - - builtin functions for e.g. reset/datetime - - custom functions invoked by read/write to a specific address + - builtin actions for e.g. reset/datetime, value increment by read + - custom actions Description of the json file or dict to be supplied:: { - "registers": 200, - --> Total number of registers - "invalid_address": { - --> List of invalid addresses, Read/Write causes invalid address response. - "registers": [ - [78, 99], - --> start, end register, repeated as needed - ]}, - "write_allowed": { - --> default is ReadOnly, allow write (other addresses causes invalid address response) - "registers": [ - [5, 5] - --> start, end register, repeated as needed - [61, 76], - ]}, - "type_uint32": { - --> Define 32 bit integers 2 registers - "value": 0, - --> Default value of uint32 - "action": "random", - --> Default action to use, need to be used with care ! - "registers": [ - [1, 2], - --> start, end register, repeated as needed - [3, 6], - --> start, end register can be a group of int32 - {"registers": [7, 8], "value": 300}, - --> Override default value - {"registers": [14, 20], "action": "increment"}, - --> Override default action - {"registers": [14, 20], "value": 117, "action": "increment"}, - --> Override default value and action - ]}, - "type_string": { - --> Define strings, variable number of registers (2 bytes) - "value": " ", - --> Default value of string ONLY 1 register, expanded automatically - "action": "", - --> Default action to use, need to be used with care ! - "registers": [ - [21, 22], - --> start, end register, define 1 string - {"registers": 23, 25], "value": "String"}, - --> start, end register, define 1 string, with value - {"registers": 26, 27], "action": ""}, - --> start, end register, define 1 string, with action - {"registers": 28, 29], "action": "", "value": "String"} - --> start, end register, define 1 string, with action and value - ]}, - "type_bits": { - --> Define 16 bit registers - "value": "0x00", - --> Default value of register in hex - "action": "increment", - --> Default action to use, need to be used with care ! - "registers": [ - [30, 31], - --> start, end register, repeated as needed - {"registers": [32, 34], "value": "0xF1F2F3"}, - --> start, end register, with value - {"registers": [35, 36], "action": "increment"}, - --> start, end register, with action - {"registers": [37, 38], "action": "increment", "value": "0xF1F2F3"} - --> start, end register, with action and value - ]}, - "type_uint16": { - --> Define uint16 (1 register), This is automatically defined - "value": 0, - --> Default value of register - "action": "random", - --> Default action to use, need to be used with care ! - "registers": [ - {"registers": [40, 46], "action": "timestamp"}, - --> start, end register, with action - {"registers": [47, 90], "value": 17}, - --> start, end register, with value - {"registers": [91, 91], "value": 15, "action": "increment"} - --> start, end register, with action and value - ]}, - "repeat_address": { - --> allows to repeat section e.g. for n devices - "registers": [ - {"registers": [100, 200], "repeat": [50, 275]} - --> Repeat registers 100-200 to 50+ until register 275 - ]}} + "setup": { + "di size": 0, --> Size of discrete input block (8 bit) + "co size": 0, --> Size of coils block (8 bit) + "ir size": 0, --> Size of input registers block (16 bit) + "hr size": 0, --> Size of holding registers block (16 bit) + "shared blocks": True, --> share memory for all blocks (largest size wins) + "defaults": { + "value": { --> Initial values (can be overwritten) + "bits": 0x01, + "uint16": 122, + "uint32": 67000, + "float32": 127.4, + "string": " ", + }, + "action": { --> default action (can be overwritten) + "bits": None, + "uint16": None, + "uint32": None, + "float32": None, + "string": None, + }, + }, + "type exception": False, --> return IO exception if read/write on non boundary + }, + "invalid": [ --> List of invalid addresses, IO exception returned + 51, --> single register + [78, 99], --> start, end registers, repeated as needed + ], + "write": [ --> allow write, efault is ReadOnly + [5, 5] --> start, end bytes, repeated as needed + ], + "bits": [ --> Define bits (1 register == 1 byte) + [30, 31], --> start, end registers, repeated as needed + {"addr": [32, 34], "value": 0xF1}, --> with value + {"addr": [35, 36], "action": "increment"}, --> with action + {"addr": [37, 38], "action": "increment", "value": 0xF1} --> with action and value + ], + "uint16": [ --> Define uint16 (1 register == 2 bytes) + --> same as type_bits + ], + "uint32": [ --> Define 32 bit integers (2 registers == 4 bytes) + --> same as type_bits + ], + "float32": [ --> Define 32 bit floats (2 registers == 4 bytes) + --> same as type_bits + ], + "string": [ --> Define strings (variable number of registers (each 2 bytes)) + [21, 22], --> start, end registers, define 1 string + {"addr": 23, 25], "value": "ups"}, --> with value + {"addr": 26, 27], "action": "user"}, --> with action + {"addr": 28, 29], "action": "", "value": "user"} --> with action and value + ], + "repeat": [ --> allows to repeat section e.g. for n devices + {"addr": [100, 200], "to": [50, 275]} --> Repeat registers 100-200 to 50+ until 275 + ] + } """ # -------------------------------------------- # External interfaces # -------------------------------------------- - def __init__(self) -> None: + def __init__(self, config: Dict[str, any], actions: Dict[str, Callable]) -> None: """Initialize.""" - super().__init__() - self.last_register = 0 - self.registers = [] - self.endian_convert = sys.byteorder == "little" - - def load_file(self, json_path: str, actions_dict: dict) -> None: - """Load layout from json file. - - :param json_path: A qualified path to the json file. - :param actions_dict: A dict with "": structure. - :raises FileNotFound: if the file cannot be opened. - :raises RuntimeError: if json contains errors (msg explains what) - """ - with open(json_path, encoding="latin-1") as json_file: - rules = json.load(json_file) - self.load_dict(rules, actions_dict) - - def load_dict(self, json_dict: str, actions_dict: dict) -> None: - """Load layout from dict with json structure. - - :param json_dict: A dict with same structure as json file. - :param actions_dict: A dict with "": structure. - :raises RuntimeError: if dict contains errors (msg explains what) - """ - - sections = ( - ("invalid_address", self.handle_invalid_address), - ("write_allowed", self.handle_write_allowed), - ("type_uint32", self.handle_type_uint32), - ("type_string", self.handle_type_string), - ("type_bits", self.handle_type_bits), - ("type_uint16", self.handle_type_uint16), - ("repeat_address", self.handle_repeat_address), - ) - actions = { - "random": self.action_random, - "increment": self.action_increment, - "timestamp": self.action_timestamp, - "reset": self.action_reset, - "": None, - None: None, + builtin_actions = { + Label.increment: self.action_increment, + Label.random: self.action_random, + Label.reset: self.action_reset, + Label.timestamp: self.action_timestamp, } - if actions_dict: - actions.update(actions_dict) - - entry_registers = "registers" - entry_value = "value" - entry_action = "action" - - self.last_register = json_dict[entry_registers] - self.registers = [Cell() for i in range(self.last_register + 1)] - self.handle_invalid_address([0, 0], None, None, None) - for section, method in sections: - layout = json_dict[section] - default_value = layout.get(entry_value, None) - default_action = layout.get(entry_action, None) - for entry in layout[entry_registers]: - if not isinstance(entry, dict): - entry = {entry_registers: entry} - if (action := entry.get(entry_action, default_action)) not in actions: - raise RuntimeError(f"Action {action} not defined.") - action_call = actions[action] if action else None - method( - entry[entry_registers], - entry.get(entry_value, default_value), - action_call, - entry, - ) + res = Setup().setup(config, builtin_actions, actions) + self.registers = res[0] + self.offset = res[1] + self.type_exception = res[2] + self.register_count = res[3] # -------------------------------------------- # Modbus server interface # -------------------------------------------- - _write_fx = (5, 6, 15, 22, 23) + _write_func_code = (5, 6, 15, 16, 22, 23) + _bits_func_code = (1, 2, 5, 15) - def validate(self, fx, address, count=1): + def validate(self, func_code, address, count=1): """Check to see if the request is in range. :meta private: """ - if address <= 0 or address + count - 1 > self.last_register: + if func_code in self._bits_func_code: + # Bit count, correct to register count + count = int((count + WORD_SIZE - 1) / WORD_SIZE) + address = int(address / 16) + real_address = self.offset[func_code] + address + if real_address <= 0 or real_address > self.register_count: return False - fx_write = fx in self._write_fx - for i in range(address, address + count): + fx_write = func_code in self._write_func_code + for i in range(real_address, real_address + count): reg = self.registers[i] - if reg.access == CELL_ACCESS_INVALID: + if reg.type == CELL_TYPE_ILLEGAL: return False - if fx_write and not reg.access == CELL_ACCESS_RW: + if fx_write and not reg.access: return False + if self.type_exception: + return self.validate_type(func_code, real_address, count) return True - def getValues(self, fx, address, count=1): + def getValues(self, func_code, address, count=1): # pylint: disable=invalid-name """Return the requested values of the datastore. :meta private: """ result = [] - for i in range(address, address + count): - if action := self.registers[i].action: - action(self.registers, i) - result.append(self.registers[i].value) + if func_code not in self._bits_func_code: + real_address = self.offset[func_code] + address + for i in range(real_address, real_address + count): + reg = self.registers[i] + if reg.action: + reg.action(self.registers, i, reg) + result.append(reg.value) + else: + # bit access + real_address = self.offset[func_code] + int(address / 16) + bit_index = address % 16 + reg_count = int((count + bit_index + 15) / 16) + for i in range(real_address, real_address + reg_count): + reg = self.registers[i] + if reg.action: + reg.action(i, reg) + while count and bit_index < 16: + result.append(bool(reg.value & (2**bit_index))) + count -= 1 + bit_index += 1 + bit_index = 0 return result - def setValues(self, fx, address, values): + def setValues(self, func_code, address, values): # pylint: disable=invalid-name """Set the requested values of the datastore. :meta private: """ - for i, value in enumerate(values): - if action := self.registers[address + i].action: - action(self.registers, address + i, values=values[i:]) - self.registers[address + i].value = value + if func_code not in self._bits_func_code: + real_address = self.offset[func_code] + address + for value in values: + self.registers[real_address].value = value + real_address += 1 + return + + # bit access + real_address = self.offset[func_code] + int(address / 16) + bit_index = address % 16 + for value in values: + bit_mask = 2**bit_index + if bool(value): + self.registers[real_address].value |= bit_mask + else: + self.registers[real_address].value &= ~bit_mask + bit_index += 1 + if bit_index == 16: + bit_index = 0 + real_address += 1 + return + + # -------------------------------------------- + # Internal action methods + # -------------------------------------------- + + @classmethod + def action_random(cls, registers, inx, cell): + """Update with random value. + + :meta private: + """ + if cell.type == CELL_TYPE_BIT: + registers[inx].value = random.randint(0, 65536) + elif cell.type == CELL_TYPE_FLOAT32: + regs = cls.build_registers_from_value(random.uniform(0.0, 100.0), False) + registers[inx].value = regs[0] + registers[inx + 1].value = regs[1] + elif cell.type == CELL_TYPE_UINT16: + registers[inx].value = random.randint(0, 65536) + elif cell.type == CELL_TYPE_UINT32: + regs = cls.build_registers_from_value(random.uniform(0.0, 100.0), True) + registers[inx].value = regs[0] + registers[inx + 1].value = regs[1] + + @classmethod + def action_increment(cls, registers, inx, cell): + """Increment value reset with overflow. + + :meta private: + """ + if cell.type == CELL_TYPE_BIT: + registers[inx].value += 1 + elif cell.type == CELL_TYPE_FLOAT32: + value = cls.build_value_from_registers(registers[inx : inx + 2], False) + value += 1.0 + regs = cls.build_registers_from_value(value, False) + registers[inx].value = regs[0] + registers[inx + 1].value = regs[1] + elif cell.type == CELL_TYPE_UINT16: + registers[inx].value += 1 + elif cell.type == CELL_TYPE_UINT32: + value = cls.build_value_from_registers(registers[inx : inx + 2], True) + value += 1 + regs = cls.build_registers_from_value(value, True) + registers[inx].value = regs[0] + registers[inx + 1].value = regs[1] + + @classmethod + def action_timestamp(cls, registers, inx, _cell): + """Set current time. + + :meta private: + """ + system_time = datetime.now() + registers[inx].value = system_time.tm_year - 1900 + registers[inx + 1].value = system_time.tm_mon + registers[inx + 2].value = system_time.tm_mday + registers[inx + 3].value = system_time.tm_wday + registers[inx + 4].value = system_time.tm_hour + registers[inx + 5].value = system_time.tm_min + registers[inx + 1].value = system_time.tm_sec + + @classmethod + def action_reset(cls, _registers, _inx, _cell): + """Reboot server. + + :meta private: + """ + raise RuntimeError("RESET server") # -------------------------------------------- # Internal helper methods # -------------------------------------------- - def action_random(self, _registers, address, values=None): - """Update with random value.""" - print(f"JAN -> {address}, {values}") - - def action_increment(self, _registers, _address, _values=None): - """Increment value reset with overflow.""" - - def action_timestamp(self, _registers, _address, _values=None): - """Set current time.""" - - def action_reset(self, _registers, _address, _values=None): - """Reboot server.""" - - def handle_invalid_address(self, registers, _value, _action, _entry): - """Handle invalid address.""" - for i in range(registers[0], registers[1] + 1): - self.registers[i].access = CELL_ACCESS_INVALID - self.registers[i].value = None - self.registers[i].action = None - self.registers[i].type = None - - def handle_write_allowed(self, registers, _value, _action, _entry): - """Handle write allowed.""" - for i in range(registers[0], registers[1] + 1): - self.registers[i].access = CELL_ACCESS_RW - - def handle_type_uint32(self, registers, value, action, _entry): - """Handle type uint32.""" - value_bytes = value.to_bytes(4, "big") - value_reg1 = int.from_bytes(value_bytes[:2], "big") - value_reg2 = int.from_bytes(value_bytes[2:4], "big") - for i in range(registers[0], registers[1] + 1, 2): - self.registers[i].type = CELL_TYPE_UINT32 - self.registers[i].action = action - self.registers[i].value = value_reg1 - self.registers[i + 1].type = CELL_TYPE_UINT32_NEXT - self.registers[i + 1].value = value_reg2 - - def handle_type_string(self, registers, value, action, _entry): - """Handle type string.""" - j = 0 - for i in range(registers[0], registers[1] + 1): - self.registers[i].type = CELL_TYPE_STRING_NEXT - self.registers[i].value = value[j : j + 2] - j += 2 - self.registers[registers[0]].type = CELL_TYPE_STRING - self.registers[registers[0]].action = action - - def handle_type_bits(self, registers, _value, action, _entry): - """Handle type bits.""" - value_int = int(_value, 16) - for i in range(registers[0], registers[1] + 1): - self.registers[i].type = CELL_TYPE_BITS - self.registers[i].value = value_int - self.registers[i].action = action - - def handle_type_uint16(self, registers, value, action, _entry): - """Handle type uint16.""" - self.registers[registers[0]].action = action - for i in range(registers[0], registers[1] + 1): - self.registers[i].type = CELL_TYPE_UINT16 - self.registers[i].value = value - - def handle_repeat_address(self, registers, _value, _action, entry): - """Handle repeat address.""" - copy_start = registers[0] - copy_end = registers[1] - i = copy_start - 1 - for repeat in range(entry["repeat"][0], entry["repeat"][1] + 1): - i = copy_start if i > copy_end else i + 1 - self.registers[repeat] = dataclasses.replace(self.registers[i]) + def validate_type(self, func_code, real_address, count): + """Check if request is done against correct type + + :meta private: + """ + + if func_code in self._bits_func_code: + # Bit access + check = CELL_TYPE_BIT + reg_step = 1 + elif count % 2: + # 16 bit access + check = (CELL_TYPE_UINT16, CELL_TYPE_STRING) + reg_step = 1 + else: + check = (CELL_TYPE_UINT32, CELL_TYPE_FLOAT32, CELL_TYPE_STRING) + reg_step = 2 + + for i in range( # pylint: disable=consider-using-any-or-all + real_address, real_address + count, reg_step + ): + if self.registers[i].type not in check: + return False + return True + + @classmethod + def build_registers_from_value(cls, value, is_int): + """Build registers from int32 or float32""" + regs = [0, 0] + if is_int: + value_bytes = int.to_bytes(value, 4, sys.byteorder) + else: + value_bytes = struct.pack("f", value) + regs[0] = int.from_bytes(value_bytes[:2], sys.byteorder) + regs[1] = int.from_bytes(value_bytes[-2:], sys.byteorder) + return regs + + @classmethod + def build_value_from_registers(cls, registers, is_int): + """Build registers from int32 or float32""" + value_bytes = int.to_bytes(registers[0], 2, sys.byteorder) + int.to_bytes( + registers[1], 2, sys.byteorder + ) + if is_int: + value = int.from_bytes(value_bytes, sys.byteorder) + else: + value = struct.unpack("f", value_bytes)[0] + return value diff --git a/test/test_datastore_simulator.py b/test/test_datastore_simulator.py deleted file mode 100644 index 345aefc37..000000000 --- a/test/test_datastore_simulator.py +++ /dev/null @@ -1,201 +0,0 @@ -"""Test datastore.""" -import asyncio -import json - -import pytest - -from examples.client_async import setup_async_client -from examples.helper import Commandline -from examples.server_simulator import run_server_simulator, setup_simulator -from pymodbus import pymodbus_apply_logging_config -from pymodbus.datastore import ModbusSimulatorContext -from pymodbus.datastore.simulator import ( - CELL_ACCESS_INVALID, - CELL_ACCESS_RO, - CELL_ACCESS_RW, - CELL_TYPE_BITS, - CELL_TYPE_STRING, - CELL_TYPE_STRING_NEXT, - CELL_TYPE_UINT16, - CELL_TYPE_UINT32, - CELL_TYPE_UINT32_NEXT, - Cell, -) -from pymodbus.server import ServerAsyncStop -from pymodbus.transaction import ModbusSocketFramer - - -FX_READ = 1 -FX_WRITE = 5 - - -class TestSimulator: - """Unittest for the pymodbus.Simutor module.""" - - simulator = None - json_dict = None - - def setup_method(self): - """Do simulator test setup.""" - self.json_dict = { - "registers": 63, - "invalid_address": { - "registers": [ - [2, 2], - ] - }, - "write_allowed": { - "registers": [ - [3, 3], - ] - }, - "type_uint16": { - "value": 0, - "registers": [ - {"registers": [3, 3], "value": 4660}, - {"registers": [4, 4], "action": "reset"}, - {"registers": [5, 11], "value": 0, "action": "timestamp"}, - ], - }, - "type_uint32": { - "value": 5, - "registers": [ - [12, 13], - {"registers": [14, 15], "value": 19088743, "action": "increment"}, - ], - }, - "type_bits": { - "value": "0102", - "action": "", - "registers": [ - [16, 16], - {"registers": [17, 18], "value": "F1F2F3F4", "action": "random"}, - ], - }, - "type_string": { - "value": " ", - "registers": [ - {"registers": [19, 20], "value": "Str"}, - ], - }, - "repeat_address": { - "registers": [{"registers": [0, 20], "repeat": [21, 63]}] - }, - } - self.simulator = ModbusSimulatorContext() - self.simulator.load_dict(self.json_dict, None) - - def test_simulator_load(self, tmp_path): - """Test load from file and from dict.""" - filepath = f"{tmp_path}/test_load.json" - with open(filepath, "w", encoding="latin-1") as json_file: - json.dump(self.json_dict, json_file) - sim_file = ModbusSimulatorContext() - sim_file.load_file(filepath, None) - sim_dict = ModbusSimulatorContext() - sim_dict.load_dict(self.json_dict, None) - len_sim_file = len(sim_file.registers) - assert len_sim_file == len(sim_dict.registers) - for i in range(len_sim_file): - entry = sim_dict.registers[i] - if entry.action: - sim_dict.registers[i].action = entry.action.__name__ - entry = sim_file.registers[i] - if entry.action: - sim_file.registers[i].action = entry.action.__name__ - assert sim_file.registers == sim_dict.registers - - def test_simulator_registers(self): - """Test validate method.""" - test_registers = [ - # register 0 - Cell(None, CELL_ACCESS_INVALID, None, None), - Cell(CELL_TYPE_UINT16, CELL_ACCESS_RO, 0, None), - Cell(None, CELL_ACCESS_INVALID, None, None), - Cell(CELL_TYPE_UINT16, CELL_ACCESS_RW, 4660, None), - Cell(CELL_TYPE_UINT16, CELL_ACCESS_RO, 0, self.simulator.action_reset), - Cell(CELL_TYPE_UINT16, CELL_ACCESS_RO, 0, self.simulator.action_timestamp), - Cell(CELL_TYPE_UINT16, CELL_ACCESS_RO, 0, None), - Cell(CELL_TYPE_UINT16, CELL_ACCESS_RO, 0, None), - Cell(CELL_TYPE_UINT16, CELL_ACCESS_RO, 0, None), - Cell(CELL_TYPE_UINT16, CELL_ACCESS_RO, 0, None), - Cell(CELL_TYPE_UINT16, CELL_ACCESS_RO, 0, None), - Cell(CELL_TYPE_UINT16, CELL_ACCESS_RO, 0, None), - # register 12 - Cell(CELL_TYPE_UINT32, CELL_ACCESS_RO, 0, None), - Cell(CELL_TYPE_UINT32_NEXT, CELL_ACCESS_RO, 5), - Cell( - CELL_TYPE_UINT32, - CELL_ACCESS_RO, - 291, - self.simulator.action_increment, - ), # --> 291 - Cell(CELL_TYPE_UINT32_NEXT, CELL_ACCESS_RO, 17767, None), # --> 19088743 - # register 16 - Cell(CELL_TYPE_BITS, CELL_ACCESS_RO, 258, None), - Cell( - CELL_TYPE_BITS, CELL_ACCESS_RO, 4059231220, self.simulator.action_random - ), # --> 61938 - Cell( - CELL_TYPE_BITS, CELL_ACCESS_RO, 4059231220, self.simulator.action_random - ), # --> 62452 - # register 19 - Cell(CELL_TYPE_STRING, CELL_ACCESS_RO, "St", None), - Cell(CELL_TYPE_STRING_NEXT, CELL_ACCESS_RO, "r", None), - ] - - for repetition in range(0, 2): - for i, test_cell in enumerate(test_registers): - assert ( - test_cell == self.simulator.registers[i + 21 * repetition] - ), f"failed in repeat {repetition} register {i}" - - @pytest.mark.parametrize( - "func_code,address,count,expected", - [ - (FX_READ, 0, 1, False), - (FX_READ, 1, 2, False), - (FX_READ, 2, 1, False), - (FX_WRITE, 2, 1, False), - (FX_WRITE, 3, 2, False), - (FX_READ, 200, 1, False), - (FX_READ, 1, 1, True), - (FX_WRITE, 3, 1, True), - (FX_READ, 5, 10, True), - ], - ) - def test_simulator_validate(self, func_code, address, count, expected): - """Test validate call.""" - assert self.simulator.validate(func_code, address, count) == expected - - def test_simulator_get_values(self): - """Test simulator get values.""" - assert self.simulator.getValues(FX_READ, 3) == [4660] - assert self.simulator.getValues(FX_READ, 3, count=2) == [4660, 0] - - def test_simulator_set_values(self): - """Test simulator set values.""" - - self.simulator.setValues(FX_WRITE, 3, values=[5]) - assert self.simulator.getValues(FX_READ, 3) == [5] - - async def test_simulator_example(self): - """Test datastore simulator example.""" - pymodbus_apply_logging_config() - - args = Commandline.copy() - args.comm = "tcp" - args.framer = ModbusSocketFramer - args.port = 5021 - args.json = None - run_args = setup_simulator(args, json_dict=self.json_dict) - asyncio.create_task(run_server_simulator(run_args)) - await asyncio.sleep(0.1) - client = setup_async_client(args) - await client.connect() - assert client.protocol - - rr = await client.read_holding_registers(3, 1, slave=1) - assert rr.registers - await client.close() - await ServerAsyncStop() diff --git a/test/test_simulator.py b/test/test_simulator.py new file mode 100644 index 000000000..7ff8a5483 --- /dev/null +++ b/test/test_simulator.py @@ -0,0 +1,329 @@ +"""Test datastore.""" +import asyncio +import copy + +from examples.client_async import setup_async_client +from examples.helper import Commandline +from examples.server_simulator import run_server_simulator, setup_simulator +from pymodbus import pymodbus_apply_logging_config +from pymodbus.datastore import ModbusSimulatorContext +from pymodbus.datastore.simulator import ( + CELL_TYPE_BIT, + CELL_TYPE_FLOAT32, + CELL_TYPE_ILLEGAL, + CELL_TYPE_NEXT, + CELL_TYPE_STRING, + CELL_TYPE_UINT16, + CELL_TYPE_UINT32, + Cell, +) +from pymodbus.server import ServerAsyncStop +from pymodbus.transaction import ModbusSocketFramer + + +FX_READ_BIT = 1 +FX_READ_REG = 3 +FX_WRITE_BIT = 5 +FX_WRITE_REG = 6 + + +class TestSimulator: + """Unittest for the pymodbus.Simutor module.""" + + simulator = None + default_config = { + "setup": { + "co size": 100, + "di size": 150, + "hr size": 200, + "ir size": 250, + "shared blocks": True, + "type exception": False, + "defaults": { + "value": { + "bits": 0x0708, + "uint16": 1, + "uint32": 45000, + "float32": 127.4, + "string": "X", + }, + "action": { + "bits": None, + "uint16": None, + "uint32": None, + "float32": None, + "string": None, + }, + }, + }, + "invalid": [ + 1, + [3, 4], + ], + "write": [ + 5, + [7, 8], + [16, 18], + [21, 26], + [31, 36], + ], + "bits": [ + 5, + [7, 8], + {"addr": 10, "value": 0x81}, + {"addr": [11, 12], "value": 0x04342}, + {"addr": 13, "action": "reset"}, + {"addr": 14, "value": 15, "action": "reset"}, + ], + "uint16": [ + {"addr": 16, "value": 3124}, + {"addr": [17, 18], "value": 5678}, + {"addr": [19, 20], "value": 14661, "action": "increment"}, + ], + "uint32": [ + {"addr": 21, "value": 3124}, + {"addr": [23, 25], "value": 5678}, + {"addr": [27, 29], "value": 345000, "action": "increment"}, + ], + "float32": [ + {"addr": 31, "value": 3124.17}, + {"addr": [33, 35], "value": 5678.19}, + {"addr": [37, 39], "value": 345000.18, "action": "increment"}, + ], + "string": [ + {"addr": [41, 42], "value": "Str"}, + {"addr": [43, 44], "value": "Strxyz"}, + ], + "repeat": [{"addr": [0, 45], "to": [46, 138]}], + } + + test_registers = [ + Cell(CELL_TYPE_ILLEGAL, False, 0, None), + Cell(CELL_TYPE_ILLEGAL, False, 0, None), + Cell(CELL_TYPE_ILLEGAL, False, 0, None), + Cell(CELL_TYPE_ILLEGAL, False, 0, None), + Cell(CELL_TYPE_ILLEGAL, False, 0, None), + Cell(CELL_TYPE_BIT, True, 1800, None), + Cell(CELL_TYPE_ILLEGAL, False, 0, None), + Cell(CELL_TYPE_BIT, True, 1800, None), + Cell(CELL_TYPE_BIT, True, 1800, None), + Cell(CELL_TYPE_ILLEGAL, False, 0, None), + Cell(CELL_TYPE_BIT, False, 0x81, None), # 10 + Cell(CELL_TYPE_BIT, False, 0x4342, None), + Cell(CELL_TYPE_BIT, False, 0x4342, None), + Cell(CELL_TYPE_BIT, False, 1800, True), + Cell(CELL_TYPE_BIT, False, 15, True), + Cell(CELL_TYPE_ILLEGAL, False, 0, None), + Cell(CELL_TYPE_UINT16, True, 3124, None), + Cell(CELL_TYPE_UINT16, True, 5678, None), + Cell(CELL_TYPE_UINT16, True, 5678, None), + Cell(CELL_TYPE_UINT16, False, 14661, True), + Cell(CELL_TYPE_UINT16, False, 14661, True), # 20 + Cell(CELL_TYPE_UINT32, True, 3124, None), + Cell(CELL_TYPE_NEXT, True, 0, None), + Cell(CELL_TYPE_UINT32, True, 5678, None), + Cell(CELL_TYPE_NEXT, True, 0, None), + Cell(CELL_TYPE_UINT32, True, 5678, None), + Cell(CELL_TYPE_NEXT, True, 0, None), + Cell(CELL_TYPE_UINT32, False, 17320, True), + Cell(CELL_TYPE_NEXT, False, 5, None), + Cell(CELL_TYPE_UINT32, False, 17320, True), + Cell(CELL_TYPE_NEXT, False, 5, None), # 30 + Cell(CELL_TYPE_FLOAT32, True, 17080, None), + Cell(CELL_TYPE_NEXT, True, 17731, None), + Cell(CELL_TYPE_FLOAT32, True, 29061, None), + Cell(CELL_TYPE_NEXT, True, 17841, None), + Cell(CELL_TYPE_FLOAT32, True, 29061, None), + Cell(CELL_TYPE_NEXT, True, 17841, None), + Cell(CELL_TYPE_FLOAT32, False, 29958, True), + Cell(CELL_TYPE_NEXT, False, 18600, None), + Cell(CELL_TYPE_FLOAT32, False, 29958, True), + Cell(CELL_TYPE_NEXT, False, 18600, None), + Cell(CELL_TYPE_STRING, False, "St", None), + Cell(CELL_TYPE_STRING, False, "r ", None), + Cell(CELL_TYPE_STRING, False, "St", None), + Cell(CELL_TYPE_STRING, False, "rx", None), # 29 MAX before repeat + ] + + @classmethod + def custom_action1(cls, _inx, _cell): + """Test action.""" + + @classmethod + def custom_action2(cls, _inx, _cell): + """Test action.""" + + custom_actions = { + "custom1": custom_action1, + "custom2": custom_action2, + } + + def setup_method(self): + """Do simulator test setup.""" + self.simulator = ModbusSimulatorContext( + self.default_config, self.custom_actions + ) + + def test_pack_unpack_values(self): + """Test the pack unpack methods.""" + value = 32145678 + regs = ModbusSimulatorContext.build_registers_from_value(value, True) + test_value = ModbusSimulatorContext.build_value_from_registers(regs, True) + assert value == test_value + + value = 3.14159265358979 + regs = ModbusSimulatorContext.build_registers_from_value(value, False) + test_value = ModbusSimulatorContext.build_value_from_registers(regs, False) + assert round(value, 6) == round(test_value, 6) + + def test_simulator_config_verify(self): + """Test basic configuration.""" + # Manually build expected memory image and then compare. + assert self.simulator.register_count == 250 + for offset in (0, 46, 92): + for i, test_cell in enumerate(self.test_registers): + assert ( + self.simulator.registers[i + offset].type == test_cell.type + ), f"at index {i} - {offset}" + assert ( + self.simulator.registers[i + offset].access == test_cell.access + ), f"at index {i} - {offset}" + assert ( + self.simulator.registers[i + offset].value == test_cell.value + ), f"at index {i} - {offset}" + assert (self.simulator.registers[i + offset].action is None) == ( + test_cell.action is None + ), f"at index {i} - {offset}" + assert self.simulator.registers[138] == self.test_registers[0] + + def test_simulator_validate_illegal(self): + """Test validation without exceptions""" + illegal_cell_list = (0, 1, 2, 3, 4, 6, 9, 15) + write_cell_list = ( + 5, + 7, + 8, + 16, + 17, + 18, + 21, + 22, + 23, + 24, + 25, + 26, + 31, + 32, + 33, + 34, + 35, + 36, + ) + # for func_code in (FX_READ_BIT, FX_READ_REG, FX_WRITE_BIT, FX_WRITE_REG): + for func_code in (FX_READ_BIT,): + for addr in range(len(self.test_registers) - 1): + exp1 = self.simulator.validate(func_code, addr * 16, 1) + exp2 = self.simulator.validate(func_code, addr * 16, 20) + # Illegal cell and no write + if addr in illegal_cell_list: + assert not exp1, f"wrong illegal at index {addr}" + continue + if addr + 1 in illegal_cell_list: + assert not exp2, f"wrong legal at second index {addr+1}" + continue + if func_code in (FX_WRITE_BIT, FX_WRITE_REG): + if addr in write_cell_list: + assert not exp1, f"missing write at index {addr}" + continue + if addr + 1 in illegal_cell_list: + assert not exp2, f"missing write at second index {addr+1}" + continue + assert exp1, f"wrong legal at index {addr}" + assert exp2, f"wrong legal at second index {addr+1}" + + def test_simulator_validate_type(self): + """Test validate call.""" + exc_setup = copy.deepcopy(self.default_config) + exc_setup["setup"]["type exception"] = True + exc_simulator = ModbusSimulatorContext(exc_setup, None) + + for entry in ( + (FX_READ_BIT, 80, 1, True), + (FX_READ_BIT, 116, 16, True), + (FX_READ_BIT, 112, 32, True), + (FX_READ_BIT, 128, 17, False), + (FX_READ_BIT, 256, 1, False), + (FX_READ_REG, 16, 1, True), + (FX_READ_REG, 41, 1, True), + (FX_READ_REG, 21, 1, False), + (FX_READ_REG, 21, 2, True), + (FX_READ_REG, 41, 2, True), + ): + validated = exc_simulator.validate(entry[0], entry[1], entry[2]) + assert entry[3] == validated, f"at entry {entry}" + + def test_simulator_get_values(self): + """Test simulator get values.""" + for entry in ( + (FX_READ_BIT, 80, 1, [False]), + (FX_READ_BIT, 83, 1, [True]), + (FX_READ_BIT, 87, 5, [False] + [True] * 3 + [False]), + (FX_READ_BIT, 190, 4, [True, False, False, True]), + (FX_READ_REG, 16, 1, [3124]), + (FX_READ_REG, 16, 2, [3124, 5678]), + ): + values = self.simulator.getValues(entry[0], entry[1], entry[2]) + assert entry[3] == values, f"at entry {entry}" + + def test_simulator_set_values(self): + """Test simulator set values.""" + exc_setup = copy.deepcopy(self.default_config) + exc_simulator = ModbusSimulatorContext(exc_setup, None) + + value = [31234] + exc_simulator.setValues(FX_WRITE_REG, 16, value) + result = exc_simulator.getValues(FX_READ_REG, 16, 1) + assert value == result + value = [31234, 189] + exc_simulator.setValues(FX_WRITE_REG, 16, value) + result = exc_simulator.getValues(FX_READ_REG, 16, 2) + assert value == result + + exc_simulator.registers[5].value = 0 + exc_simulator.setValues(FX_WRITE_BIT, 80, [True]) + exc_simulator.setValues(FX_WRITE_BIT, 82, [True]) + exc_simulator.setValues(FX_WRITE_BIT, 84, [True]) + exc_simulator.setValues(FX_WRITE_BIT, 86, [True, False, True]) + result = exc_simulator.getValues(FX_READ_BIT, 80, 8) + assert [True, False] * 4 == result + exc_simulator.setValues(FX_WRITE_BIT, 88, [False]) + result = exc_simulator.getValues(FX_READ_BIT, 86, 3) + assert [True, False, False] == result + + def test_simulator_action_timestamp(self): + """Test action random""" + + def test_simulator_action_reset(self): + """Test action random""" + + async def test_simulator_example(self): + """Test datastore simulator example.""" + pymodbus_apply_logging_config() + + args = Commandline.copy() + args.comm = "tcp" + args.framer = ModbusSocketFramer + args.port = 5021 + run_args = setup_simulator( + args, setup=self.default_config, actions=self.custom_actions + ) + asyncio.create_task(run_server_simulator(run_args)) + await asyncio.sleep(0.1) + client = setup_async_client(args) + await client.connect() + assert client.protocol + + rr = await client.read_holding_registers(16, 1, slave=1) + assert rr.registers + await client.close() + await ServerAsyncStop()