diff --git a/API_changes.rst b/API_changes.rst index 53c039335..3ed04ef88 100644 --- a/API_changes.rst +++ b/API_changes.rst @@ -6,6 +6,9 @@ PyModbus - API changes. Version 3.1.0 ------------- Added --host to client_* examples, to allow easier use. +unit= in client calls are no longer converted to slave=, but raises a runtime exception. +Added missing client calls (all standard request are not available as methods). +client.mask_write_register() changed parameters. --------------------- Version 3.0.1 / 3.0.2 diff --git a/examples/client_calls.py b/examples/client_calls.py index 29d59be89..7a1135e07 100755 --- a/examples/client_calls.py +++ b/examples/client_calls.py @@ -180,7 +180,7 @@ async def _handle_holding_registers(client): "write_address": 1, "write_registers": [256, 128, 100, 50, 25, 10, 5, 1], } - _check_call(await client.readwrite_registers(unit=SLAVE, **arguments)) + _check_call(await client.readwrite_registers(slave=SLAVE, **arguments)) rr = _check_call(await client.read_holding_registers(1, 8, slave=SLAVE)) assert rr.registers == arguments["write_registers"] diff --git a/pymodbus/client/base.py b/pymodbus/client/base.py index 4d9daf182..6f205ae5e 100644 --- a/pymodbus/client/base.py +++ b/pymodbus/client/base.py @@ -60,10 +60,13 @@ def run(): rr = client.read_coils(0x01) client.close() - **Application methods, common to all clients**: """ + state = ModbusTransactionState.IDLE + last_frame_end = 0 + silent_interval = 0 + @dataclass class _params: # pylint: disable=too-many-instance-attributes """Parameter class.""" diff --git a/pymodbus/client/mixin.py b/pymodbus/client/mixin.py index 5cd99a462..2545a5762 100644 --- a/pymodbus/client/mixin.py +++ b/pymodbus/client/mixin.py @@ -1,16 +1,16 @@ """Modbus Client Common.""" import logging -from typing import List, Union +from typing import List, Tuple, Union import pymodbus.bit_read_message as pdu_bit_read import pymodbus.bit_write_message as pdu_bit_write import pymodbus.diag_message as pdu_diag +import pymodbus.file_message as pdu_file_msg +import pymodbus.mei_message as pdu_mei import pymodbus.other_message as pdu_other_msg import pymodbus.register_read_message as pdu_reg_read import pymodbus.register_write_message as pdu_req_write -from pymodbus.constants import Defaults from pymodbus.pdu import ModbusRequest, ModbusResponse -from pymodbus.utilities import ModbusTransactionState _logger = logging.getLogger(__name__) @@ -34,509 +34,523 @@ class ModbusClientMixin: # pylint: disable=too-many-public-methods response = await client.execute(request) .. tip:: - All methods can be used directly (synchronous) or with await - depending on the instantiated client. - """ + All methods can be used directly (synchronous) or + with await (asynchronous) depending on the client used. - state = ModbusTransactionState.IDLE - last_frame_end = 0 - silent_interval = 0 + jan + """ def __init__(self): """Initialize.""" def execute(self, request: ModbusRequest) -> ModbusResponse: - """Execute request. + """Execute request (code ???). :param request: Request to send + :returns: A deferred response handle :raises ModbusException: + + Call with custom function codes. + + .. tip:: + Response is not interpreted. """ return request def read_coils( - self, - address: int, - count: int = Defaults.Count, - slave: int = Defaults.Slave, - **kwargs: any + self, address: int, count: int = 1, slave: int = 0, **kwargs: any ) -> pdu_bit_read.ReadCoilsResponse: - """Read coils (function code 0x01). + """Read coils (code 0x01). :param address: Start address to read from :param count: (optional) Number of coils to read - :param slave: (optional) Modbus slave unit ID + :param slave: (optional) Modbus slave ID :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle :raises ModbusException: """ - if "unit" in kwargs: - _logger.error("Please do not use unit=, convert to slave=.") - slave = kwargs.pop("unit", slave) - request = pdu_bit_read.ReadCoilsRequest(address, count, slave, **kwargs) - return self.execute(request) + return self.execute( + pdu_bit_read.ReadCoilsRequest(address, count, slave, **kwargs) + ) def read_discrete_inputs( - self, - address: int, - count: int = Defaults.Count, - slave: int = Defaults.Slave, - **kwargs: any + self, address: int, count: int = 1, slave: int = 0, **kwargs: any ) -> pdu_bit_read.ReadDiscreteInputsResponse: - """Read discrete inputs (function code 0x02). + """Read discrete inputs (code 0x02). :param address: Start address to read from :param count: (optional) Number of coils to read - :param slave: (optional) Modbus slave unit ID + :param slave: (optional) Modbus slave ID :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle :raises ModbusException: """ - if "unit" in kwargs: - _logger.error("Please do not use unit=, convert to slave=.") - slave = kwargs.pop("unit", slave) - request = pdu_bit_read.ReadDiscreteInputsRequest( - address, count, slave, **kwargs + return self.execute( + pdu_bit_read.ReadDiscreteInputsRequest(address, count, slave, **kwargs) ) - return self.execute(request) def read_holding_registers( - self, - address: int, - count: int = Defaults.Count, - slave: int = Defaults.Slave, - **kwargs: any + self, address: int, count: int = 1, slave: int = 0, **kwargs: any ) -> pdu_reg_read.ReadHoldingRegistersResponse: - """Read holding registers (function code 0x03). + """Read holding registers (code 0x03). :param address: Start address to read from :param count: (optional) Number of coils to read - :param slave: (optional) Modbus slave unit ID + :param slave: (optional) Modbus slave ID :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle :raises ModbusException: """ - if "unit" in kwargs: - _logger.error("Please do not use unit=, convert to slave=.") - slave = kwargs.pop("unit", slave) - request = pdu_reg_read.ReadHoldingRegistersRequest( - address, count, slave, **kwargs + return self.execute( + pdu_reg_read.ReadHoldingRegistersRequest(address, count, slave, **kwargs) ) - return self.execute(request) def read_input_registers( - self, - address: int, - count: int = Defaults.Count, - slave: int = Defaults.Slave, - **kwargs: any + self, address: int, count: int = 1, slave: int = 0, **kwargs: any ) -> pdu_reg_read.ReadInputRegistersResponse: - """Read input registers (function code 0x04). + """Read input registers (code 0x04). :param address: Start address to read from :param count: (optional) Number of coils to read - :param slave: (optional) Modbus slave unit ID + :param slave: (optional) Modbus slave ID :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle :raises ModbusException: """ - if "unit" in kwargs: - _logger.error("Please do not use unit=, convert to slave=.") - slave = kwargs.pop("unit", slave) - request = pdu_reg_read.ReadInputRegistersRequest( - address, count, slave, **kwargs + return self.execute( + pdu_reg_read.ReadInputRegistersRequest(address, count, slave, **kwargs) ) - return self.execute(request) def write_coil( - self, address: int, value: bool, slave: int = Defaults.Slave, **kwargs: any + self, address: int, value: bool, slave: int = 0, **kwargs: any ) -> pdu_bit_write.WriteSingleCoilResponse: - """Write single coil (function code 0x05). + """Write single coil (code 0x05). :param address: Start address to read from :param value: Boolean to write - :param slave: (optional) Modbus slave unit ID + :param slave: (optional) Modbus slave ID :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle :raises ModbusException: """ - if "unit" in kwargs: - _logger.error("Please do not use unit=, convert to slave=.") - slave = kwargs.pop("unit", slave) - request = pdu_bit_write.WriteSingleCoilRequest(address, value, slave, **kwargs) - return self.execute(request) + return self.execute( + pdu_bit_write.WriteSingleCoilRequest(address, value, slave, **kwargs) + ) def write_register( - self, - address: int, - value: Union[int, float, str], - slave: int = Defaults.Slave, - **kwargs: any + self, address: int, value: Union[int, float, str], slave: int = 0, **kwargs: any ) -> pdu_req_write.WriteSingleRegisterResponse: - """Write register (function code 0x06). + """Write register (code 0x06). :param address: Start address to read from :param value: Value to write - :param slave: (optional) Modbus slave unit ID + :param slave: (optional) Modbus slave ID :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle :raises ModbusException: """ - if "unit" in kwargs: - _logger.error("Please do not use unit=, convert to slave=.") - slave = kwargs.pop("unit", slave) - request = pdu_req_write.WriteSingleRegisterRequest( - address, value, slave, **kwargs + return self.execute( + pdu_req_write.WriteSingleRegisterRequest(address, value, slave, **kwargs) ) - return self.execute(request) def read_exception_status( - self, slave: int = Defaults.Slave, **kwargs: any + self, slave: int = 0, **kwargs: any ) -> pdu_other_msg.ReadExceptionStatusResponse: - """Read Exception Status (function code 0x07). + """Read Exception Status (code 0x07). - :param slave: (optional) Modbus slave unit ID + :param slave: (optional) Modbus slave ID :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle :raises ModbusException: """ - if "unit" in kwargs: - _logger.error("Please do not use unit=, convert to slave=.") - slave = kwargs.pop("unit", slave) - request = pdu_other_msg.ReadExceptionStatusRequest(slave, **kwargs) - return self.execute(request) + return self.execute(pdu_other_msg.ReadExceptionStatusRequest(slave, **kwargs)) def diag_query_data( - self, msg: bytearray, slave: int = Defaults.Slave, **kwargs: any + self, msg: bytearray, slave: int = 0, **kwargs: any ) -> pdu_diag.ReturnQueryDataResponse: - """Diagnose query data (function code 0x08 - 0x00). + """Diagnose query data (code 0x08 sub 0x00). :param msg: Message to be returned - :param slave: (optional) Modbus slave unit ID + :param slave: (optional) Modbus slave ID :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle :raises ModbusException: """ - if "unit" in kwargs: - _logger.error("Please do not use unit=, convert to slave=.") - slave = kwargs.pop("unit", slave) - request = pdu_diag.ReturnQueryDataRequest(msg, slave, **kwargs) - return self.execute(request) + return self.execute(pdu_diag.ReturnQueryDataRequest(msg, slave, **kwargs)) def diag_restart_communication( - self, toggle: bool, slave: int = Defaults.Slave, **kwargs: any + self, toggle: bool, slave: int = 0, **kwargs: any ) -> pdu_diag.RestartCommunicationsOptionResponse: - """Diagnose restart communication (function code 0x08 - 0x01). + """Diagnose restart communication (code 0x08 sub 0x01). :param toggle: True if toogled. - :param slave: (optional) Modbus slave unit ID + :param slave: (optional) Modbus slave ID :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle :raises ModbusException: """ - if "unit" in kwargs: - _logger.error("Please do not use unit=, convert to slave=.") - slave = kwargs.pop("unit", slave) - request = pdu_diag.RestartCommunicationsOptionRequest(toggle, slave, **kwargs) - return self.execute(request) + return self.execute( + pdu_diag.RestartCommunicationsOptionRequest(toggle, slave, **kwargs) + ) def diag_read_diagnostic_register( - self, slave: int = Defaults.Slave, **kwargs: any + self, slave: int = 0, **kwargs: any ) -> pdu_diag.ReturnDiagnosticRegisterResponse: - """Diagnose read diagnostic register (function code 0x08 - 0x02). + """Diagnose read diagnostic register (code 0x08 sub 0x02). - :param slave: (optional) Modbus slave unit ID + :param slave: (optional) Modbus slave ID :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle :raises ModbusException: """ - if "unit" in kwargs: - _logger.error("Please do not use unit=, convert to slave=.") - slave = kwargs.pop("unit", slave) - request = pdu_diag.ReturnDiagnosticRegisterRequest(slave, **kwargs) - return self.execute(request) + return self.execute(pdu_diag.ReturnDiagnosticRegisterRequest(slave, **kwargs)) def diag_change_ascii_input_delimeter( - self, slave: int = Defaults.Slave, **kwargs: any + self, slave: int = 0, **kwargs: any ) -> pdu_diag.ChangeAsciiInputDelimiterResponse: - """Diagnose change ASCII input delimiter (function code 0x08 - 0x03). + """Diagnose change ASCII input delimiter (code 0x08 sub 0x03). - :param slave: (optional) Modbus slave unit ID + :param slave: (optional) Modbus slave ID :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle :raises ModbusException: """ - if "unit" in kwargs: - _logger.error("Please do not use unit=, convert to slave=.") - slave = kwargs.pop("unit", slave) - request = pdu_diag.ChangeAsciiInputDelimiterRequest(slave, **kwargs) - return self.execute(request) + return self.execute(pdu_diag.ChangeAsciiInputDelimiterRequest(slave, **kwargs)) def diag_force_listen_only( - self, slave: int = Defaults.Slave, **kwargs: any + self, slave: int = 0, **kwargs: any ) -> pdu_diag.ForceListenOnlyModeResponse: - """Diagnose force listen only (function code 0x08 - 0x04). + """Diagnose force listen only (code 0x08 sub 0x04). - :param slave: (optional) Modbus slave unit ID + :param slave: (optional) Modbus slave ID :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle :raises ModbusException: """ - if "unit" in kwargs: - _logger.error("Please do not use unit=, convert to slave=.") - slave = kwargs.pop("unit", slave) - request = pdu_diag.ForceListenOnlyModeRequest(slave, **kwargs) - return self.execute(request) + return self.execute(pdu_diag.ForceListenOnlyModeRequest(slave, **kwargs)) def diag_clear_counters( - self, slave: int = Defaults.Slave, **kwargs: any + self, slave: int = 0, **kwargs: any ) -> pdu_diag.ClearCountersResponse: - """Diagnose clear counters (function code 0x08 - 0x0A). + """Diagnose clear counters (code 0x08 sub 0x0A). - :param slave: (optional) Modbus slave unit ID + :param slave: (optional) Modbus slave ID :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle :raises ModbusException: """ - if "unit" in kwargs: - _logger.error("Please do not use unit=, convert to slave=.") - slave = kwargs.pop("unit", slave) - request = pdu_diag.ClearCountersRequest(slave, **kwargs) - return self.execute(request) + return self.execute(pdu_diag.ClearCountersRequest(slave, **kwargs)) def diag_read_bus_message_count( - self, slave: int = Defaults.Slave, **kwargs: any + self, slave: int = 0, **kwargs: any ) -> pdu_diag.ReturnBusMessageCountResponse: - """Diagnose read bus message count (function code 0x08 - 0x0B). + """Diagnose read bus message count (code 0x08 sub 0x0B). - :param slave: (optional) Modbus slave unit ID + :param slave: (optional) Modbus slave ID :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle :raises ModbusException: """ - if "unit" in kwargs: - _logger.error("Please do not use unit=, convert to slave=.") - slave = kwargs.pop("unit", slave) - request = pdu_diag.ReturnBusMessageCountRequest(slave, **kwargs) - return self.execute(request) + return self.execute(pdu_diag.ReturnBusMessageCountRequest(slave, **kwargs)) def diag_read_bus_comm_error_count( - self, slave: int = Defaults.Slave, **kwargs: any + self, slave: int = 0, **kwargs: any ) -> pdu_diag.ReturnBusCommunicationErrorCountResponse: - """Diagnose read Bus Communication Error Count (function code 0x08 - 0x0C). + """Diagnose read Bus Communication Error Count (code 0x08 sub 0x0C). - :param slave: (optional) Modbus slave unit ID + :param slave: (optional) Modbus slave ID :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle :raises ModbusException: """ - if "unit" in kwargs: - _logger.error("Please do not use unit=, convert to slave=.") - slave = kwargs.pop("unit", slave) - request = pdu_diag.ReturnBusCommunicationErrorCountRequest(slave, **kwargs) - return self.execute(request) + return self.execute( + pdu_diag.ReturnBusCommunicationErrorCountRequest(slave, **kwargs) + ) def diag_read_bus_exception_error_count( - self, slave: int = Defaults.Slave, **kwargs: any + self, slave: int = 0, **kwargs: any ) -> pdu_diag.ReturnBusExceptionErrorCountResponse: - """Diagnose read Bus Exception Error Count (function code 0x08 - 0x0D). + """Diagnose read Bus Exception Error Count (code 0x08 sub 0x0D). - :param slave: (optional) Modbus slave unit ID + :param slave: (optional) Modbus slave ID :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle :raises ModbusException: """ - if "unit" in kwargs: - _logger.error("Please do not use unit=, convert to slave=.") - slave = kwargs.pop("unit", slave) - request = pdu_diag.ReturnBusExceptionErrorCountRequest(slave, **kwargs) - return self.execute(request) + return self.execute( + pdu_diag.ReturnBusExceptionErrorCountRequest(slave, **kwargs) + ) def diag_read_slave_message_count( - self, slave: int = Defaults.Slave, **kwargs: any + self, slave: int = 0, **kwargs: any ) -> pdu_diag.ReturnSlaveMessageCountResponse: - """Diagnose read Slave Message Count (function code 0x08 - 0x0E). + """Diagnose read Slave Message Count (code 0x08 sub 0x0E). - :param slave: (optional) Modbus slave unit ID + :param slave: (optional) Modbus slave ID :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle :raises ModbusException: """ - if "unit" in kwargs: - _logger.error("Please do not use unit=, convert to slave=.") - slave = kwargs.pop("unit", slave) - request = pdu_diag.ReturnSlaveMessageCountRequest(slave, **kwargs) - return self.execute(request) + return self.execute(pdu_diag.ReturnSlaveMessageCountRequest(slave, **kwargs)) def diag_read_slave_no_response_count( - self, slave: int = Defaults.Slave, **kwargs: any + self, slave: int = 0, **kwargs: any ) -> pdu_diag.ReturnSlaveNoReponseCountResponse: - """Diagnose read Slave No Response Count (function code 0x08 - 0x0F). + """Diagnose read Slave No Response Count (code 0x08 sub 0x0F). - :param slave: (optional) Modbus slave unit ID + :param slave: (optional) Modbus slave ID :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle :raises ModbusException: """ - if "unit" in kwargs: - _logger.error("Please do not use unit=, convert to slave=.") - slave = kwargs.pop("unit", slave) - request = pdu_diag.ReturnSlaveNoResponseCountRequest(slave, **kwargs) - return self.execute(request) + return self.execute(pdu_diag.ReturnSlaveNoResponseCountRequest(slave, **kwargs)) def diag_read_slave_nak_count( - self, slave: int = Defaults.Slave, **kwargs: any + self, slave: int = 0, **kwargs: any ) -> pdu_diag.ReturnSlaveNAKCountResponse: - """Diagnose read Slave NAK Count (function code 0x08 - 0x10). + """Diagnose read Slave NAK Count (code 0x08 sub 0x10). - :param slave: (optional) Modbus slave unit ID + :param slave: (optional) Modbus slave ID :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle :raises ModbusException: """ - if "unit" in kwargs: - _logger.error("Please do not use unit=, convert to slave=.") - slave = kwargs.pop("unit", slave) - request = pdu_diag.ReturnSlaveNAKCountRequest(slave, **kwargs) - return self.execute(request) + return self.execute(pdu_diag.ReturnSlaveNAKCountRequest(slave, **kwargs)) def diag_read_slave_busy_count( - self, slave: int = Defaults.Slave, **kwargs: any + self, slave: int = 0, **kwargs: any ) -> pdu_diag.ReturnSlaveBusyCountResponse: - """Diagnose read Slave Busy Count (function code 0x08 - 0x11). + """Diagnose read Slave Busy Count (code 0x08 sub 0x11). - :param slave: (optional) Modbus slave unit ID + :param slave: (optional) Modbus slave ID :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle :raises ModbusException: """ - if "unit" in kwargs: - _logger.error("Please do not use unit=, convert to slave=.") - slave = kwargs.pop("unit", slave) - request = pdu_diag.ReturnSlaveBusyCountRequest(slave, **kwargs) - return self.execute(request) + return self.execute(pdu_diag.ReturnSlaveBusyCountRequest(slave, **kwargs)) def diag_read_bus_char_overrun_count( - self, slave: int = Defaults.Slave, **kwargs: any + self, slave: int = 0, **kwargs: any ) -> pdu_diag.ReturnSlaveBusCharacterOverrunCountResponse: - """Diagnose read Bus Character Overrun Count (function code 0x08 - 0x12). + """Diagnose read Bus Character Overrun Count (code 0x08 sub 0x12). - :param slave: (optional) Modbus slave unit ID + :param slave: (optional) Modbus slave ID :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle :raises ModbusException: """ - if "unit" in kwargs: - _logger.error("Please do not use unit=, convert to slave=.") - slave = kwargs.pop("unit", slave) - request = pdu_diag.ReturnSlaveBusCharacterOverrunCountRequest(slave, **kwargs) - return self.execute(request) + return self.execute( + pdu_diag.ReturnSlaveBusCharacterOverrunCountRequest(slave, **kwargs) + ) def diag_read_iop_overrun_count( - self, slave: int = Defaults.Slave, **kwargs: any + self, slave: int = 0, **kwargs: any ) -> pdu_diag.ReturnIopOverrunCountResponse: - """Diagnose read Iop overrun count (function code 0x08 - 0x13). + """Diagnose read Iop overrun count (code 0x08 sub 0x13). - :param slave: (optional) Modbus slave unit ID + :param slave: (optional) Modbus slave ID :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle :raises ModbusException: """ - if "unit" in kwargs: - _logger.error("Please do not use unit=, convert to slave=.") - slave = kwargs.pop("unit", slave) - request = pdu_diag.ReturnIopOverrunCountRequest(slave, **kwargs) - return self.execute(request) + return self.execute(pdu_diag.ReturnIopOverrunCountRequest(slave, **kwargs)) def diag_clear_overrun_counter( - self, slave: int = Defaults.Slave, **kwargs: any + self, slave: int = 0, **kwargs: any ) -> pdu_diag.ClearOverrunCountResponse: - """Diagnose Clear Overrun Counter and Flag (function code 0x08 - 0x14). + """Diagnose Clear Overrun Counter and Flag (code 0x08 sub 0x14). - :param slave: (optional) Modbus slave unit ID + :param slave: (optional) Modbus slave ID :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle :raises ModbusException: """ - if "unit" in kwargs: - _logger.error("Please do not use unit=, convert to slave=.") - slave = kwargs.pop("unit", slave) - request = pdu_diag.ClearOverrunCountRequest(slave, **kwargs) - return self.execute(request) + return self.execute(pdu_diag.ClearOverrunCountRequest(slave, **kwargs)) def diag_getclear_modbus_response( - self, slave: int = Defaults.Slave, **kwargs: any + self, slave: int = 0, **kwargs: any ) -> pdu_diag.GetClearModbusPlusResponse: - """Diagnose Get/Clear modbus plus request (function code 0x08 - 0x15). + """Diagnose Get/Clear modbus plus (code 0x08 sub 0x15). - :param slave: (optional) Modbus slave unit ID + :param slave: (optional) Modbus slave ID :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle :raises ModbusException: """ - if "unit" in kwargs: - _logger.error("Please do not use unit=, convert to slave=.") - slave = kwargs.pop("unit", slave) - request = pdu_diag.GetClearModbusPlusRequest(slave, **kwargs) - return self.execute(request) + return self.execute(pdu_diag.GetClearModbusPlusRequest(slave, **kwargs)) - # TBD missing functions - # 0x0B Get Comm Event Counter (Serial Line only) - # 0x0C Get Comm Event Log (Serial Line only) + def diag_get_comm_event_counter( + self, **kwargs: any + ) -> pdu_other_msg.GetCommEventCounterResponse: + """Diagnose get event counter (code 0x0B). + + :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle + :raises ModbusException: + """ + return self.execute(pdu_other_msg.GetCommEventCounterRequest(**kwargs)) + + def diag_get_comm_event_log( + self, **kwargs: any + ) -> pdu_other_msg.GetCommEventLogResponse: + """Diagnose get event counter (code 0x0C). + + :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle + :raises ModbusException: + """ + return self.execute(pdu_other_msg.GetCommEventLogRequest(**kwargs)) def write_coils( - self, - address: int, - values: List[bool], - slave: int = Defaults.Slave, - **kwargs: any + self, address: int, values: List[bool], slave: int = 0, **kwargs: any ) -> pdu_bit_write.WriteMultipleCoilsResponse: - """Write coils (function code 0x0F). + """Write coils (code 0x0F). :param address: Start address to read from :param values: List of booleans to write - :param slave: (optional) Modbus slave unit ID + :param slave: (optional) Modbus slave ID :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle :raises ModbusException: """ - if "unit" in kwargs: - _logger.error("Please do not use unit=, convert to slave=.") - slave = kwargs.pop("unit", slave) - request = pdu_bit_write.WriteMultipleCoilsRequest( - address, values, slave, **kwargs + return self.execute( + pdu_bit_write.WriteMultipleCoilsRequest(address, values, slave, **kwargs) ) - return self.execute(request) def write_registers( self, address: int, values: List[Union[int, float, str]], - slave: int = Defaults.Slave, + slave: int = 0, **kwargs: any ) -> pdu_req_write.WriteMultipleRegistersResponse: - """Write registers (function code 0x10). + """Write registers (code 0x10). :param address: Start address to read from :param values: List of booleans to write :param slave: (optional) Modbus slave unit ID :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle + :raises ModbusException: + """ + return self.execute( + pdu_req_write.WriteMultipleRegistersRequest( + address, values, slave, **kwargs + ) + ) + + def report_slave_id( + self, slave: int = 0, **kwargs: any + ) -> pdu_other_msg.ReportSlaveIdResponse: + """Report slave ID (code 0x11). + + :param slave: (optional) Modbus slave unit ID + :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle + :raises ModbusException: + """ + return self.execute(pdu_other_msg.ReportSlaveIdRequest(slave, **kwargs)) + + def read_file_record( + self, records: List[Tuple], **kwargs: any + ) -> pdu_file_msg.ReadFileRecordResponse: + """Read file record (code 0x14). + + :param records: List of (Reference type, File number, Record Number, Record Length) + :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle + :raises ModbusException: + """ + return self.execute(pdu_file_msg.ReadFileRecordRequest(records, **kwargs)) + + def write_file_record( + self, records: List[Tuple], **kwargs: any + ) -> pdu_file_msg.ReadFileRecordResponse: + """Write file record (code 0x15). + + :param records: List of (Reference type, File number, Record Number, Record Length) + :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle + :raises ModbusException: + """ + return self.execute(pdu_file_msg.WriteFileRecordRequest(records, **kwargs)) + + def mask_write_register( + self, + address: int = 0x0000, + and_mask: int = 0xFFFF, + or_mask: int = 0x0000, + **kwargs: any + ) -> pdu_req_write.MaskWriteRegisterResponse: + """Mask write register (code 0x16). + + :param address: The mask pointer address (0x0000 to 0xffff) + :param and_mask: The and bitmask to apply to the register address + :param or_mask: The or bitmask to apply to the register address + :param kwargs: (optional) Experimental parameters. + :returns: A deferred response handle :raises ModbusException: """ - if "unit" in kwargs: - _logger.error("Please do not use unit=, convert to slave=.") - slave = kwargs.pop("unit", slave) - request = pdu_req_write.WriteMultipleRegistersRequest( - address, values, slave, **kwargs + return self.execute( + pdu_req_write.MaskWriteRegisterRequest(address, and_mask, or_mask, **kwargs) ) - return self.execute(request) - - # Function codes descriptions - # 0x11 Report Slave ID (Serial Line only) - # 0x14 Read File Record - # 0x15 Write File Record - # 0x16 Mask Write Register - # 0x17 Read/Write Multiple registers - # 0x18 Read FIFO Queue - # 0x2B Encapsulated Interface Transport - # 0x2B / 0x0D CANopen General Reference Request and Response - # PDU - # 0x2B / 0x0E Read Device Identification - # MODBUS Exception Responses def readwrite_registers( - self, *args, **kwargs + self, + read_address: int = 0, + read_count: int = 0, + write_address: int = 0, + values: int = 0, + slave: int = 0, + **kwargs ) -> pdu_reg_read.ReadWriteMultipleRegistersResponse: - """Read/Write registers + """Read/Write registers (code 0x17). - :param args: + :param read_address: The address to start reading from + :param read_count: The number of registers to read from address + :param write_address: The address to start writing to + :param values: The registers to write to the specified address + :param slave: (optional) Modbus slave unit ID :param kwargs: :returns: A deferred response handle + :raises ModbusException: """ - request = pdu_reg_read.ReadWriteMultipleRegistersRequest(*args, **kwargs) - return self.execute(request) + return self.execute( + pdu_reg_read.ReadWriteMultipleRegistersRequest( + read_address=read_address, + read_count=read_count, + write_address=write_address, + values=values, + unit=slave, + **kwargs + ) + ) - def mask_write_register( - self, *args, **kwargs - ) -> pdu_req_write.MaskWriteRegisterResponse: - """Mask write register. + def read_fifo_queue( + self, address: int = 0x0000, **kwargs: any + ) -> pdu_file_msg.ReadFifoQueueResponse: + """Read FIFO queue (code 0x18). - :args: + :param address: The address to start reading from + :param kwargs: :returns: A deferred response handle + :raises ModbusException: """ - request = pdu_req_write.MaskWriteRegisterRequest(*args, **kwargs) - return self.execute(request) + return self.execute(pdu_file_msg.ReadFifoQueueRequest(address, **kwargs)) + + # code 0x2B sub 0x0D: CANopen General Reference Request and Response, NOT IMPLEMENTED + + def read_device_information( + self, read_code: int = None, object_id: int = 0x00, **kwargs: any + ) -> pdu_mei.ReadDeviceInformationResponse: + """Read FIFO queue (code 0x2B sub 0x0E). + + :param read_code: The device information read code + :param object_id: The object to read from + :param kwargs: + :returns: A deferred response handle + :raises ModbusException: + """ + return self.execute( + pdu_mei.ReadDeviceInformationRequest(read_code, object_id, **kwargs) + )