From 9024dcd1da124acb7ced918379f923d8be85acb7 Mon Sep 17 00:00:00 2001 From: Robert Walton Date: Tue, 6 Jul 2021 10:35:21 +0100 Subject: [PATCH 01/13] htrun: Fix package name passed to setuptools We passed the package name `greentea` to setuptools to discover the current package version. This failed because the package name is actually `mbed-greentea`. --- src/htrun/host_tests_runner/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/htrun/host_tests_runner/__init__.py b/src/htrun/host_tests_runner/__init__.py index e71b0379..f78fda46 100644 --- a/src/htrun/host_tests_runner/__init__.py +++ b/src/htrun/host_tests_runner/__init__.py @@ -12,4 +12,4 @@ from pkg_resources import get_distribution -__version__ = get_distribution("greentea").version +__version__ = get_distribution("mbed-greentea").version From 24c7433134cde064337603d5d6a278c1a85563cd Mon Sep 17 00:00:00 2001 From: Robert Walton Date: Wed, 14 Jul 2021 22:53:48 +0100 Subject: [PATCH 02/13] ci: Update flake8 config Ignore "imported but unused" warnings for __init__.py files as it's a common pattern to use __init__.py to export the public API of a package. Ignore directories we don't want flake8 to traverse. Ignore docstring errors in the host tests. Docstrings in test classes are less important than docstrings in production code. --- .flake8 | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/.flake8 b/.flake8 index 1c1371f7..d2207007 100644 --- a/.flake8 +++ b/.flake8 @@ -2,10 +2,24 @@ # black, the super-great auto-formatter has decided that 88 is a good number. # I'm inclined to agree, or not to dissagree. max-line-length = 88 +docstring-convention = google exclude = + .git, + .tox, + .venv, + __pycache__, + dist, test/*, ignore = # W503: line break before binary operator (this is no longer PEP8 compliant) W503, # E203: whitespace before ':' (this is not PEP8 compliant) E203, +per-file-ignores = + # Package level __init__ files improve user experience by short cutting + # imports. + # F401: imported but unused + __init__.py:F401 + src/htrun/*/__init__.py:F401 + # We don't care about docstrings in test classes + src/htrun/host_tests/*: D From 9afc996135a046598afa05749bd73790c423a395 Mon Sep 17 00:00:00 2001 From: Robert Walton Date: Thu, 15 Jul 2021 12:11:00 +0100 Subject: [PATCH 03/13] style: Fix docstring flake8 errors Convert the docstrings to google style and fix the flake8 errors. --- src/greentea/gtea/greentea_log.py | 1 + src/greentea/gtea/target_info.py | 1 + src/greentea/gtea/tests_spec.py | 1 + src/htrun/__init__.py | 20 ++- src/htrun/host_tests/__init__.py | 2 +- src/htrun/host_tests/base_host_test.py | 135 ++++++++------- src/htrun/host_tests/default_auto.py | 7 +- src/htrun/host_tests/detect_auto.py | 5 + src/htrun/host_tests/dev_null_auto.py | 5 +- src/htrun/host_tests/echo.py | 6 +- src/htrun/host_tests/hello_auto.py | 7 +- src/htrun/host_tests/rtc_auto.py | 7 + src/htrun/host_tests/wait_us_auto.py | 11 +- src/htrun/host_tests_conn_proxy/__init__.py | 1 + .../host_tests_conn_proxy/conn_primitive.py | 69 +++++--- .../conn_primitive_fastmodel.py | 75 +++++++-- .../conn_primitive_remote.py | 60 +++++-- .../conn_primitive_serial.py | 61 +++++-- src/htrun/host_tests_conn_proxy/conn_proxy.py | 76 ++++++--- src/htrun/host_tests_logger/__init__.py | 1 + src/htrun/host_tests_logger/ht_logger.py | 5 +- src/htrun/host_tests_plugins/__init__.py | 40 +++-- .../host_tests_plugins/host_test_plugins.py | 157 +++++++++++------- .../host_tests_plugins/host_test_registry.py | 73 +++++--- .../host_tests_plugins/module_copy_jn51xx.py | 30 ++-- .../host_tests_plugins/module_copy_mps2.py | 41 +++-- .../host_tests_plugins/module_copy_pyocd.py | 29 +++- .../host_tests_plugins/module_copy_shell.py | 30 ++-- .../host_tests_plugins/module_copy_silabs.py | 26 +-- .../host_tests_plugins/module_copy_stlink.py | 30 ++-- .../module_copy_to_target.py | 34 ++-- .../host_tests_plugins/module_copy_ublox.py | 30 ++-- .../module_power_cycle_target.py | 52 +++--- .../host_tests_plugins/module_reset_jn51xx.py | 41 +++-- .../host_tests_plugins/module_reset_mps2.py | 27 +-- .../host_tests_plugins/module_reset_pyocd.py | 32 ++-- .../host_tests_plugins/module_reset_silabs.py | 38 +++-- .../host_tests_plugins/module_reset_stlink.py | 47 ++++-- .../host_tests_plugins/module_reset_target.py | 52 +++--- .../host_tests_plugins/module_reset_ublox.py | 29 ++-- src/htrun/host_tests_registry/__init__.py | 6 +- .../host_tests_registry/host_registry.py | 51 ++++-- src/htrun/host_tests_runner/__init__.py | 9 +- src/htrun/host_tests_runner/host_test.py | 65 +++++--- .../host_tests_runner/host_test_default.py | 137 +++++++++------ src/htrun/host_tests_runner/target_base.py | 61 ++++--- src/htrun/host_tests_toolbox/__init__.py | 1 + .../host_tests_toolbox/host_functional.py | 43 ++--- src/htrun/htrun.py | 9 +- test/host_tests/host_registry.py | 1 + 50 files changed, 1153 insertions(+), 624 deletions(-) diff --git a/src/greentea/gtea/greentea_log.py b/src/greentea/gtea/greentea_log.py index fd362dfb..dc3e2822 100644 --- a/src/greentea/gtea/greentea_log.py +++ b/src/greentea/gtea/greentea_log.py @@ -82,6 +82,7 @@ def gt_log(self, text, print_text=True): Args: text: String to be logged. print_text: Force log to be printed on screen. + Returns: String with message. """ diff --git a/src/greentea/gtea/target_info.py b/src/greentea/gtea/target_info.py index 43912fa7..3b204693 100644 --- a/src/greentea/gtea/target_info.py +++ b/src/greentea/gtea/target_info.py @@ -41,6 +41,7 @@ def _platform_property_from_targets_json(targets, platform, property): targets: Data structure from targets.json. platform: Name of the platform. property: Name of the property. + Returns: Property value, None if not found. diff --git a/src/greentea/gtea/tests_spec.py b/src/greentea/gtea/tests_spec.py index 35859fe0..4dc21a9d 100644 --- a/src/greentea/gtea/tests_spec.py +++ b/src/greentea/gtea/tests_spec.py @@ -119,6 +119,7 @@ def add_binary(self, path, binary_type, compare_log=None): Args: path: Path to the binary. binary_type: Type of binary being added. + compare_log: Log to match output from. """ self.__binaries_by_flash_method[binary_type] = TestBinary( path, binary_type, compare_log diff --git a/src/htrun/__init__.py b/src/htrun/__init__.py index 4a3731e5..87087b51 100644 --- a/src/htrun/__init__.py +++ b/src/htrun/__init__.py @@ -3,13 +3,11 @@ # SPDX-License-Identifier: Apache-2.0 # +"""The htrun package. -"""! @package htrun - -Flash, reset and perform host supervised tests on Mbed enabled platforms. -Write your own programs (import this package) or use 'htrun' -command line tool instead. - +Flash, reset and perform host supervised tests on Mbed enabled platforms. +Write your own programs (import this package) or use 'htrun' command line tool +instead. """ import imp @@ -29,6 +27,7 @@ def get_plugin_caps(methods=None): + """Return the capabilities of a plugin.""" if not methods: methods = ["CopyMethod", "ResetMethod"] result = {} @@ -38,9 +37,12 @@ def get_plugin_caps(methods=None): def init_host_test_cli_params(): - """! Function creates CLI parser object and returns populated options object. - @return Function returns 'options' object returned from OptionParser class - @details Options object later can be used to populate host test selector script. + """Create CLI parser object and return populated options object. + + Options object can be used to populate host test selector script. + + Returns: + 'options' object returned from OptionParser class. """ parser = OptionParser() diff --git a/src/htrun/host_tests/__init__.py b/src/htrun/host_tests/__init__.py index b8f10a05..ee1aeaf2 100644 --- a/src/htrun/host_tests/__init__.py +++ b/src/htrun/host_tests/__init__.py @@ -2,6 +2,6 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""Base host test class.""" -# base host test class from .base_host_test import BaseHostTest, event_callback diff --git a/src/htrun/host_tests/base_host_test.py b/src/htrun/host_tests/base_host_test.py index 4fdd7019..8e3fd784 100644 --- a/src/htrun/host_tests/base_host_test.py +++ b/src/htrun/host_tests/base_host_test.py @@ -10,8 +10,13 @@ class BaseHostTestAbstract(object): - """Base class for each host-test test cases with standard - setup, test and teardown set of functions + """Base class for host-test test cases. + + Defines an interface of setup, test and teardown methods subclasses should + implement. + + This class also performs common 'housekeeping' tasks such as pushing/popping + messages on the event_queue and handling test config. """ name = "" # name of the host test (used for local registration) @@ -33,81 +38,99 @@ def __notify_sync_failed(self, text): self.__event_queue.put(("__notify_sync_failed", text, time())) def __notify_dut(self, key, value): - """! Send data over serial to DUT """ + """Send data over serial to DUT.""" if self.__dut_event_queue: self.__dut_event_queue.put((key, value, time())) def notify_complete(self, result=None): - """! Notify main even loop that host test finished processing - @param result True for success, False failure. If None - no action in main even loop + """Notify the main event loop that a host test is complete. + + Args: + result: True for success, False failure. """ if self.__event_queue: self.__event_queue.put(("__notify_complete", result, time())) def reset_dut(self, value): - """ - Reset device under test - :return: + """Reset the device under test. + + Args: + value: Value to send with the reset message. """ if self.__event_queue: self.__event_queue.put(("__reset_dut", value, time())) def reset(self): - """ - Reset the device under test and continue running the host test - :return: - """ + """Reset the device under test and continue running the host test.""" if self.__event_queue: self.__event_queue.put(("__reset", "0", time())) def notify_conn_lost(self, text): - """! Notify main even loop that there was a DUT-host test connection error - @param consume If True htrun will process (consume) all remaining events + """Notify main event loop of a DUT-host connection error. + + Args: + text: Additional text to send with the notification. """ self.__notify_conn_lost(text) def log(self, text): - """! Send log message to main event loop """ + """Send log message to main event loop. + + Args: + text: Additional text to send with the notification. + """ self.__notify_prn(text) def send_kv(self, key, value): - """! Send Key-Value data to DUT """ + """Send Key-Value pair to the DUT. + + Args: + key: Key part of KV pair. + value: Value part of KV pair. + """ self.__notify_dut(key, value) def setup_communication(self, event_queue, dut_event_queue, config={}): - """! Setup queues used for IPC """ + """Setup queues used for comms between DUT and host. + + Args: + event_queue: List of KV messages sent toward the host. + dut_event_queue: List of KV messages sent toward the DUT. + config: Test config. + """ self.__event_queue = event_queue # To main even loop self.__dut_event_queue = dut_event_queue # To DUT self.__config = config def get_config_item(self, name): - """ - Return test config + """Get an item from the config by name. + + Args: + name: Name of config parameter to get. - :param name: - :return: + Returns: + Value of the config parameter with the given name. None if not found. """ return self.__config.get(name, None) def setup(self): - """! Setup your tests and callbacks """ + """Setup tests and callbacks.""" raise NotImplementedError def result(self): - """! Returns host test result (True, False or None) """ + """Return host test result (True, False or None).""" raise NotImplementedError def teardown(self): - """! Blocking always guaranteed test teardown """ + """Test teardown.""" raise NotImplementedError def event_callback(key): - """ - Decorator for defining a event callback method. Adds a property attribute "event_key" with value as the passed key. + """Decorator for defining a event callback method. - :param key: - :return: + Adds an "event_key" attribute to the decorated function, which is set to the passed + key. """ def decorator(func): @@ -144,43 +167,36 @@ def __init__(self): self.__assign_decorated_callbacks() def __callback_default(self, key, value, timestamp): - """! Default callback """ + """Default callback.""" # self.log("CALLBACK: key=%s, value=%s, timestamp=%f"% (key, value, timestamp)) pass def __default_end_callback(self, key, value, timestamp): - """ - Default handler for event 'end' that gives test result from target. - This callback is not decorated as we don't know then in what order this - callback would be registered. We want to let users over write this callback. - Hence it should be registered before registering user defined callbacks. + """Default handler for event 'end' that gives test result from target. - :param key: - :param value: - :param timestamp: - :return: + This callback is not decorated as we don't know in what order this + callback will be registered. We want to let users override this callback. + Hence it should be registered before registering user defined callbacks. """ self.notify_complete(value == "success") def __assign_default_callbacks(self): - """! Assigns default callback handlers """ + """Assign default callback handlers.""" for key in self.__consume_by_default: self.__callbacks[key] = self.__callback_default - # Register default handler for event 'end' before assigning user defined callbacks to let users over write it. + # Register default handler for event 'end' before assigning user defined + # callbacks to let users over write it. self.register_callback("end", self.__default_end_callback) def __assign_decorated_callbacks(self): - """ - It looks for any callback methods decorated with @event_callback + """Look for any callback methods decorated with @event_callback Example: - Define a method with @event_callback decorator like: - - @event_callback('') - def event_handler(self, key, value, timestamp): - do something.. + Define a method with @event_callback decorator like: - :return: + @event_callback('') + def event_handler(self, key, value, timestamp): + do something.. """ for name, method in inspect.getmembers(self, inspect.ismethod): key = getattr(method, "event_key", None) @@ -188,10 +204,12 @@ def event_handler(self, key, value, timestamp): self.register_callback(key, method) def register_callback(self, key, callback, force=False): - """! Register callback for a specific event (key: event name) - @param key String with name of the event - @param callback Callable which will be registstered for event "key" - @param force God mode + """Register callback for a specific event (key: event name). + + Args: + key: Name of the event. + callback: Callable which will be registered for event "key". + force: God mode. """ # Non-string keys are not allowed @@ -203,7 +221,8 @@ def register_callback(self, key, callback, force=False): raise TypeError("event callback should be callable") # Check if callback has all three required parameters (key, value, timestamp) - # When callback is class method should have 4 arguments (self, key, value, timestamp) + # When callback is class method should have 4 arguments (self, key, value, + # timestamp) if ismethod(callback): arg_count = six.get_function_code(callback).co_argcount if arg_count != 4: @@ -218,7 +237,8 @@ def register_callback(self, key, callback, force=False): ) raise TypeError(err_msg) - # When callback is just a function should have 3 arguments func(key, value, timestamp) + # When callback is just a function should have 3 arguments func(key, value, + # timestamp) if isfunction(callback): arg_count = six.get_function_code(callback).co_argcount if arg_count != 3: @@ -264,10 +284,13 @@ class BaseHostTest(HostTestCallbackBase): __BaseHostTest_Called = False def base_host_test_inited(self): - """This function will check if BaseHostTest ctor was called + """Check if BaseHostTest ctor was called. + Call to BaseHostTest is required in order to force required interfaces implementation. - @return Returns True if ctor was called (ok behaviour) + + Returns: + True if ctor was called. """ return self.__BaseHostTest_Called diff --git a/src/htrun/host_tests/default_auto.py b/src/htrun/host_tests/default_auto.py index ee87c8f1..ab155e92 100644 --- a/src/htrun/host_tests/default_auto.py +++ b/src/htrun/host_tests/default_auto.py @@ -2,14 +2,15 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # - +"""Default host test.""" from .. import BaseHostTest class DefaultAuto(BaseHostTest): - """Simple, basic host test's test runner waiting for serial port - output from MUT, no supervision over test running in MUT is executed. + """Waits for serial port output from the DUT. + + Only recognises the test completion message from greentea-client. """ pass diff --git a/src/htrun/host_tests/detect_auto.py b/src/htrun/host_tests/detect_auto.py index 4c0ee22d..332553ac 100644 --- a/src/htrun/host_tests/detect_auto.py +++ b/src/htrun/host_tests/detect_auto.py @@ -2,19 +2,24 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""Auto detection host test.""" import re from .. import BaseHostTest class DetectPlatformTest(BaseHostTest): + """Test to auto detect the platform.""" + PATTERN_MICRO_NAME = "Target '(\w+)'" re_detect_micro_name = re.compile(PATTERN_MICRO_NAME) def result(self): + """Not implemented.""" raise NotImplementedError def test(self, selftest): + """Run test.""" result = True c = selftest.mbed.serial_readline() # {{start}} preamble diff --git a/src/htrun/host_tests/dev_null_auto.py b/src/htrun/host_tests/dev_null_auto.py index 8fa1764c..8cb1a6fd 100644 --- a/src/htrun/host_tests/dev_null_auto.py +++ b/src/htrun/host_tests/dev_null_auto.py @@ -2,11 +2,12 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # - +"""Test dev null.""" from .. import BaseHostTest class DevNullTest(BaseHostTest): + """DevNullTest.""" __result = None @@ -19,9 +20,11 @@ def _callback_to_stdout(self, key, value, timestamp): self.log("_callback_to_stdout !") def setup(self): + """Set up test.""" self.register_callback("end", self._callback_result) self.register_callback("to_null", self._callback_result) self.register_callback("to_stdout", self._callback_to_stdout) def result(self): + """Return test result.""" return self.__result diff --git a/src/htrun/host_tests/echo.py b/src/htrun/host_tests/echo.py index 49748c48..ea9e99ca 100644 --- a/src/htrun/host_tests/echo.py +++ b/src/htrun/host_tests/echo.py @@ -2,13 +2,14 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # - +"""Test device echo.""" import uuid from .. import BaseHostTest class EchoTest(BaseHostTest): + """EchoTest.""" __result = None echo_count = 0 @@ -35,12 +36,15 @@ def _callback_echo_count(self, key, value, timestamp): self.__send_echo_uuid() def setup(self): + """Set up the test.""" self.register_callback("echo", self._callback_echo) self.register_callback("echo_count", self._callback_echo_count) def result(self): + """Report test result.""" self.__result = self.uuid_sent == self.uuid_recv return self.__result def teardown(self): + """Tear down test resources.""" pass diff --git a/src/htrun/host_tests/hello_auto.py b/src/htrun/host_tests/hello_auto.py index 7adac370..89d9292b 100644 --- a/src/htrun/host_tests/hello_auto.py +++ b/src/htrun/host_tests/hello_auto.py @@ -2,11 +2,13 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # - +"""'Hello world' test case.""" from .. import BaseHostTest class HelloTest(BaseHostTest): + """'Hello world' test case.""" + HELLO_WORLD = "Hello World" __result = None @@ -16,10 +18,13 @@ def _callback_hello_world(self, key, value, timestamp): self.notify_complete() def setup(self): + """Set up the test.""" self.register_callback("hello_world", self._callback_hello_world) def result(self): + """Return the test result.""" return self.__result def teardown(self): + """Tear down the test case.""" pass diff --git a/src/htrun/host_tests/rtc_auto.py b/src/htrun/host_tests/rtc_auto.py index b1f19654..a0375312 100644 --- a/src/htrun/host_tests/rtc_auto.py +++ b/src/htrun/host_tests/rtc_auto.py @@ -2,6 +2,7 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""RTC auto test.""" import re from time import time, strftime, gmtime @@ -9,6 +10,8 @@ class RTCTest(BaseHostTest): + """Test RTC.""" + PATTERN_RTC_VALUE = "\[(\d+)\] \[(\d+-\d+-\d+ \d+:\d+:\d+ [AaPpMm]{2})\]" re_detect_rtc_value = re.compile(PATTERN_RTC_VALUE) @@ -26,11 +29,14 @@ def _callback_end(self, key, value, timestamp): self.notify_complete() def setup(self): + """Set up the test.""" self.register_callback("timestamp", self._callback_timestamp) self.register_callback("rtc", self._callback_rtc) self.register_callback("end", self._callback_end) def result(self): + """Report test result.""" + def check_strftimes_format(t): m = self.re_detect_rtc_value.search(t) if m and len(m.groups()): @@ -44,4 +50,5 @@ def check_strftimes_format(t): return self.__result def teardown(self): + """Tear down the test.""" pass diff --git a/src/htrun/host_tests/wait_us_auto.py b/src/htrun/host_tests/wait_us_auto.py index 63ea478d..3174e6ad 100644 --- a/src/htrun/host_tests/wait_us_auto.py +++ b/src/htrun/host_tests/wait_us_auto.py @@ -2,15 +2,14 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""Test reads single characters from stdio and measures time between occurrences.""" from time import time from .. import BaseHostTest class WaitusTest(BaseHostTest): - """This test is reading single characters from stdio - and measures time between their occurrences. - """ + """Test ticker timing.""" __result = None DEVIATION = 0.10 # +/-10% @@ -20,15 +19,18 @@ def _callback_exit(self, key, value, timeout): self.notify_complete() def _callback_tick(self, key, value, timestamp): - """ {{tick;%d}}} """ + """{{tick;%d}}}.""" self.log("tick! " + str(timestamp)) self.ticks.append((key, value, timestamp)) def setup(self): + """Set up the test case.""" self.register_callback("exit", self._callback_exit) self.register_callback("tick", self._callback_tick) def result(self): + """Report test result.""" + def sub_timestamps(t1, t2): delta = t1 - t2 deviation = abs(delta - 1.0) @@ -48,4 +50,5 @@ def sub_timestamps(t1, t2): return self.__result def teardown(self): + """Tear down test.""" pass diff --git a/src/htrun/host_tests_conn_proxy/__init__.py b/src/htrun/host_tests_conn_proxy/__init__.py index bbea896f..71c5371c 100644 --- a/src/htrun/host_tests_conn_proxy/__init__.py +++ b/src/htrun/host_tests_conn_proxy/__init__.py @@ -2,5 +2,6 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""conn_proxy package.""" from .conn_proxy import conn_process diff --git a/src/htrun/host_tests_conn_proxy/conn_primitive.py b/src/htrun/host_tests_conn_proxy/conn_primitive.py index f79e4427..27e9298a 100644 --- a/src/htrun/host_tests_conn_proxy/conn_primitive.py +++ b/src/htrun/host_tests_conn_proxy/conn_primitive.py @@ -2,29 +2,42 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # - +"""Module defines ConnectorPrimitive base class for device connection and comms.""" from ..host_tests_logger import HtrunLogger class ConnectorPrimitiveException(Exception): - """ - Exception in connector primitive module. - """ + """Exception in connector primitive module.""" pass class ConnectorPrimitive(object): + """Base class for communicating with DUT.""" + def __init__(self, name): + """Initialise object. + + Args: + name: Name to display in the log. + """ self.LAST_ERROR = None self.logger = HtrunLogger(name) self.polling_timeout = 60 def write_kv(self, key, value): - """! Forms and sends Key-Value protocol message. - @details On how to parse K-V sent from DUT see KiViBufferWalker::KIVI_REGEX - On how DUT sends K-V please see greentea_write_postamble() function in greentea-client - @return Returns buffer with K-V message sent to DUT on success, None on failure + """Write a Key-Value protocol message. + + A Key-Value protocol message is in the form '{{key;value}}'. The greentea tests + running on the DUT recognise messages in this format and act according to the + given commands. + + Args: + key: Key part of the Key-Value protocol message. + value: Value part of the Key-Value message. + + Returns: + Buffer containing the K-V message on success, None on failure. """ # All Key-Value messages ends with newline character kv_buff = "{{%s;%s}}" % (key, value) + "\n" @@ -36,40 +49,52 @@ def write_kv(self, key, value): return None def read(self, count): - """! Read data from DUT - @param count Number of bytes to read - @return Bytes read + """Read data from DUT. + + Args: + count: Number of bytes to read. + + Returns: + Bytes read. """ raise NotImplementedError def write(self, payload, log=False): - """! Read data from DUT - @param payload Buffer with data to send - @param log Set to True if you want to enable logging for this function - @return Payload (what was actually sent - if possible to establish that) + """Write data to the DUT. + + Args: + payload: Buffer with data to send. + log: Set to True to enable logging for this function. + + Returns: + Payload (what was actually sent - if possible to establish that). """ raise NotImplementedError def flush(self): - """! Flush read/write channels of DUT """ + """Flush read/write channels of the DUT.""" raise NotImplementedError def reset(self): - """! Reset the dut""" + """Reset the DUT.""" raise NotImplementedError def connected(self): - """! Check if there is a connection to DUT - @return True if there is conenction to DUT (read/write/flush API works) + """Check if there is a connection to the DUT. + + Returns: + True if there is connection to the DUT (read/write/flush API works). """ raise NotImplementedError def error(self): - """! Returns LAST_ERROR value - @return Value of self.LAST_ERROR + """LAST_ERROR value. + + Returns: + Value of self.LAST_ERROR """ return self.LAST_ERROR def finish(self): - """! Handle DUT dtor like (close resource) operations here""" + """Close the connection to the DUT and perform any clean up operations.""" raise NotImplementedError diff --git a/src/htrun/host_tests_conn_proxy/conn_primitive_fastmodel.py b/src/htrun/host_tests_conn_proxy/conn_primitive_fastmodel.py index 4916463b..03d577c5 100644 --- a/src/htrun/host_tests_conn_proxy/conn_primitive_fastmodel.py +++ b/src/htrun/host_tests_conn_proxy/conn_primitive_fastmodel.py @@ -2,6 +2,7 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""Connect to fast models.""" import telnetlib import socket @@ -9,7 +10,18 @@ class FastmodelConnectorPrimitive(ConnectorPrimitive): + """ConnectorPrimitive for a FastModel. + + Wrapper around fm_agent module. + """ + def __init__(self, name, config): + """Initialise the FastModel. + + Args: + name: Name of the FastModel. + config: Map of config parameters describing the state of the FastModel. + """ ConnectorPrimitive.__init__(self, name) self.config = config self.fm_config = config.get("fm_config", None) @@ -24,20 +36,27 @@ def __init__(self, name, config): # Initialize FastModel if self.__fastmodel_init(): - # FastModel Launch load and run, equivalent to DUT connection, flashing and reset... + # FastModel Launch load and run, equivalent to DUT connection, flashing and + # reset... self.__fastmodel_launch() self.__fastmodel_load(self.image_path) self.__fastmodel_run() def __fastmodel_init(self): - """! Initialize models using fm_agent APIs """ + """Import the fm_agent module and set up the FastModel simulator. + + Raises: + ConnectorPrimitiveException: fm_agent import failed, or the FastModel setup + failed. + """ self.logger.prn_inf("Initializing FastModel...") try: self.fm_agent_module = __import__("fm_agent") except ImportError as e: self.logger.prn_err( - "unable to load mbed-fastmodel-agent module. Check if the module install correctly." + "unable to load mbed-fastmodel-agent module. Check if the module " + "install correctly." ) self.fm_agent_module = None self.logger.prn_err("Importing failed : %s" % str(e)) @@ -56,7 +75,11 @@ def __fastmodel_init(self): return True def __fastmodel_launch(self): - """! launch the FastModel""" + """Start the FastModel. + + Raises: + ConnectorPrimitiveException: Simulator start-up failed. + """ self.logger.prn_inf("Launching FastModel...") try: if not self.resource.start_simulator(): @@ -70,7 +93,11 @@ def __fastmodel_launch(self): ) def __fastmodel_run(self): - """! Use fm_agent API to run the FastModel """ + """Run the FastModel simulator. + + Raises: + ConnectorPrimitiveException: Failed to run the simulator. + """ self.logger.prn_inf("Running FastModel...") try: if not self.resource.run_simulator(): @@ -84,7 +111,13 @@ def __fastmodel_run(self): ) def __fastmodel_load(self, filename): - """! Use fm_agent API to load image to FastModel, this is functional equivalent to flashing DUT""" + """Load a firmware image to the FastModel. + + This is the functional equivalent of flashing a physical DUT. + + Args: + filename: Path to the image to load. + """ self.logger.prn_inf("loading FastModel with image '%s'..." % filename) try: if not self.resource.load_simulator(filename): @@ -98,8 +131,10 @@ def __fastmodel_load(self, filename): ) def __resource_allocated(self): - """! Check whether FastModel resource been allocated - @return True or throw an exception + """Check if the FastModel agent resource been 'allocated'. + + Returns: + True if the FastModel agent is available. """ if self.resource: return True @@ -108,7 +143,14 @@ def __resource_allocated(self): return False def read(self, count): - """! Read data from DUT, count is not used for FastModel""" + """Read data from the FastModel. + + Args: + count: Not used for FastModels. + + Returns: + The data from the FastModel if the read was successful, otherwise False. + """ date = str() if self.__resource_allocated(): try: @@ -123,7 +165,12 @@ def read(self, count): return False def write(self, payload, log=False): - """! Write 'payload' to DUT""" + """Send text to the FastModel. + + Args: + payload: Text to send to the FastModel. + log: Log the text payload if True. + """ if self.__resource_allocated(): if log: self.logger.prn_txd(payload) @@ -139,18 +186,18 @@ def write(self, payload, log=False): return False def flush(self): - """! flush not supported in FastModel_module""" + """Flush is not supported in the FastModel_module.""" pass def connected(self): - """! return whether FastModel is connected """ + """Check if the FastModel is running.""" if self.__resource_allocated(): return self.resource.is_simulator_alive else: return False def finish(self): - """! shutdown the FastModel and release the allocation """ + """Shut down the FastModel.""" if self.__resource_allocated(): try: self.resource.shutdown_simulator() @@ -161,6 +208,7 @@ def finish(self): ) def reset(self): + """Reset the FastModel.""" if self.__resource_allocated(): try: if not self.resource.reset_simulator(): @@ -173,4 +221,5 @@ def reset(self): ) def __del__(self): + """Shut down the FastModel when garbage collected.""" self.finish() diff --git a/src/htrun/host_tests_conn_proxy/conn_primitive_remote.py b/src/htrun/host_tests_conn_proxy/conn_primitive_remote.py index dd2215ad..64db3cb4 100644 --- a/src/htrun/host_tests_conn_proxy/conn_primitive_remote.py +++ b/src/htrun/host_tests_conn_proxy/conn_primitive_remote.py @@ -2,14 +2,22 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # - +"""ConnectorPrimitive enabling remote communication with a DUT.""" import time from .. import DEFAULT_BAUD_RATE from .conn_primitive import ConnectorPrimitive class RemoteConnectorPrimitive(ConnectorPrimitive): + """Connect to a remote device using a global resource manager (grm). + + This object will import an arbitrary python module it uses as a "remote client", to + connect to a device over IP. The object expects the remote client module name, IP + address and port to be specified in the `config` dictionary passed to __init__. + """ + def __init__(self, name, config, importer=__import__): + """Populate instance attributes with device and grm data.""" ConnectorPrimitive.__init__(self, name) self.config = config self.target_id = self.config.get("target_id", None) @@ -42,9 +50,13 @@ def __init__(self, name, config, importer=__import__): self.__remote_init(importer) def __remote_init(self, importer): - """! Initialize DUT using GRM APIs """ + """Import the "remote client" module, use it to connect to the DUT. - # We want to load global resource manager module by name from command line (switch --grm) + Args: + importer: Callable that will import the module by name. + """ + # We want to load global resource manager module by name from command line + # (switch --grm) try: self.remote_module = importer(self.grm_module) except ImportError as error: @@ -90,7 +102,11 @@ def __remote_init(self, importer): return True def __remote_connect(self, baudrate=DEFAULT_BAUD_RATE): - """! Open remote connection to DUT """ + """Open a remote connection to the DUT. + + Args: + baudrate: The baud rate the remote client uses to connect to the DUT. + """ self.logger.prn_inf( "opening connection to platform at baudrate='%s'" % baudrate ) @@ -104,6 +120,7 @@ def __remote_connect(self, baudrate=DEFAULT_BAUD_RATE): raise def __remote_disconnect(self): + """Close the connection to the selected DUT.""" if not self.selected_resource: raise Exception("remote resource not exists!") try: @@ -115,7 +132,11 @@ def __remote_disconnect(self): ) def __remote_reset(self, delay=0): - """! Use GRM remote API to reset DUT """ + """Reset the DUT remotely. + + Args: + delay: Time to wait after sending the reset command. + """ self.logger.prn_inf("remote resources reset...") if not self.selected_resource: raise Exception("remote resource not exists!") @@ -132,7 +153,12 @@ def __remote_reset(self, delay=0): time.sleep(delay) def __remote_flashing(self, filename, forceflash=False): - """! Use GRM remote API to flash DUT """ + """Flash the DUT remotely. + + Args: + filename: Path to the image to flash to the remote target. + forceflash: Force flashing, this is just forwarded to the remote client. + """ self.logger.prn_inf("remote resources flashing with '%s'..." % filename) if not self.selected_resource: raise Exception("remote resource not exists!") @@ -144,7 +170,11 @@ def __remote_flashing(self, filename, forceflash=False): raise def read(self, count): - """! Read 'count' bytes of data from DUT """ + """Read data from the DUT. + + Args: + count: Number of bytes to read. + """ if not self.connected(): raise Exception("remote resource not exists!") data = str() @@ -157,7 +187,12 @@ def read(self, count): return data def write(self, payload, log=False): - """! Write 'payload' to DUT """ + """Send some text to the DUT. + + Args: + payload: Text payload to send to the DUT. + log: Log the payload. + """ if self.connected(): try: self.selected_resource.write(payload) @@ -170,9 +205,11 @@ def write(self, payload, log=False): return False def flush(self): + """No-op.""" pass def allocated(self): + """Check if the selected resource is allocated.""" return ( self.remote_module and self.selected_resource @@ -180,9 +217,11 @@ def allocated(self): ) def connected(self): + """Check if the selected resource is connected.""" return self.allocated() and self.selected_resource.is_connected def __remote_release(self): + """Release the remote resource.""" try: if self.allocated(): self.selected_resource.release() @@ -193,14 +232,15 @@ def __remote_release(self): ) def finish(self): - # Finally once we're done with the resource - # we disconnect and release the allocation + """Disconnect the resource and release the allocation.""" if self.allocated(): self.__remote_disconnect() self.__remote_release() def reset(self): + """Reset the selected resource.""" self.__remote_reset(delay=self.forced_reset_timeout) def __del__(self): + """Disconnect from the remote client when object is garbage collected.""" self.finish() diff --git a/src/htrun/host_tests_conn_proxy/conn_primitive_serial.py b/src/htrun/host_tests_conn_proxy/conn_primitive_serial.py index a116d4d6..8ab449fd 100644 --- a/src/htrun/host_tests_conn_proxy/conn_primitive_serial.py +++ b/src/htrun/host_tests_conn_proxy/conn_primitive_serial.py @@ -2,7 +2,7 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # - +"""Connects to a device's serial port.""" import time from serial import Serial, SerialException @@ -13,7 +13,17 @@ class SerialConnectorPrimitive(ConnectorPrimitive): + """ConnectorPrimitive implementation using serial IO.""" + def __init__(self, name, port, baudrate, config): + """Initialise with serial params. + + Args: + name: Target name to display in the log. + port: Serial COM port. + baudrate: Baudrate to use for serial comms. + config: Map of config parameters describing the state of the DUT. + """ ConnectorPrimitive.__init__(self, name) self.port = port self.baudrate = int(baudrate) @@ -27,12 +37,12 @@ def __init__(self, name, port, baudrate, config): self.skip_reset = config.get("skip_reset", False) self.serial = None - # Check if serial port for given target_id changed - # If it does we will use new port to open connections and make sure reset plugin - # later can reuse opened already serial port + # Check if serial port for given target_id changed. If it does we will use new + # port to open connections and make sure reset plugin later can reuse opened + # already serial port # - # Note: This listener opens serial port and keeps connection so reset plugin uses - # serial port object not serial port name! + # Note: This listener opens serial port and keeps connection so reset plugin + # uses serial port object not serial port name! serial_port = HostTestPluginBase().check_serial_port_ready( self.port, target_id=self.target_id, timeout=self.polling_timeout ) @@ -53,9 +63,11 @@ def __init__(self, name, port, baudrate, config): ) while time.time() - startTime < self.polling_timeout: try: - # TIMEOUT: While creating Serial object timeout is delibrately passed as 0. Because blocking in Serial.read - # impacts thread and mutliprocess functioning in Python. Hence, instead in self.read() s delay (sleep()) is - # inserted to let serial buffer collect data and avoid spinning on non blocking read(). + # TIMEOUT: While creating Serial object timeout is delibrately passed as + # 0. Because blocking in Serial.read impacts thread and mutliprocess + # functioning in Python. Hence, instead in self.read() s delay (sleep()) + # is inserted to let serial buffer collect data and avoid spinning on + # non blocking read(). self.serial = Serial( self.port, baudrate=self.baudrate, @@ -85,7 +97,13 @@ def __init__(self, name, port, baudrate, config): time.sleep(1) def reset_dev_via_serial(self, delay=1): - """! Reset device using selected method, calls one of the reset plugins """ + """Reset device using selected method. + + Calls one of the reset plugins. + + Args: + delay: Time to wait after sending the reset command. + """ reset_type = self.config.get("reset_type", "default") if not reset_type: reset_type = "default" @@ -109,9 +127,14 @@ def reset_dev_via_serial(self, delay=1): return result def read(self, count): - """! Read data from serial port RX buffer """ - # TIMEOUT: Since read is called in a loop, wait for self.timeout period before calling serial.read(). See - # comment on serial.Serial() call above about timeout. + """Read data from the serial port RX buffer. + + Args: + count: Number of bytes to read. + """ + # TIMEOUT: Since read is called in a loop, wait for self.timeout period before + # calling serial.read(). See comment on serial.Serial() call above about + # timeout. time.sleep(self.read_timeout) c = str() try: @@ -124,7 +147,12 @@ def read(self, count): return c def write(self, payload, log=False): - """! Write data to serial port TX buffer """ + """Write data to serial port TX buffer. + + Args: + payload: Bytes to write to the serial port. + log: Log the payload. + """ try: if self.serial: self.serial.write(payload.encode("utf-8")) @@ -141,18 +169,23 @@ def write(self, payload, log=False): return False def flush(self): + """Flush the serial IO.""" if self.serial: self.serial.flush() def connected(self): + """Return True if connected to serial port.""" return bool(self.serial) def finish(self): + """Close the serial port.""" if self.serial: self.serial.close() def reset(self): + """Send serial break to reset the device.""" self.reset_dev_via_serial(self.forced_reset_timeout) def __del__(self): + """Release resources when garbage collected.""" self.finish() diff --git a/src/htrun/host_tests_conn_proxy/conn_proxy.py b/src/htrun/host_tests_conn_proxy/conn_proxy.py index 1aac55da..d0f3eb95 100644 --- a/src/htrun/host_tests_conn_proxy/conn_proxy.py +++ b/src/htrun/host_tests_conn_proxy/conn_proxy.py @@ -2,6 +2,7 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""Create and manage connection to the DUT.""" import re import sys @@ -21,16 +22,31 @@ class KiViBufferWalker: - """! Simple auxiliary class used to walk through a buffer and search for KV tokens """ + """A collection-like object holding KV pairs in an internal buffer. + + Despite the name, this object is actually a buffer/collection of KV pairs, and not + an object to just parse them from another buffer. + """ def __init__(self): + """Initialise the object.""" self.KIVI_REGEX = r"\{\{([\w\d_-]+);([^\}]+)\}\}" self.buff = str() self.kvl = [] self.re_kv = re.compile(self.KIVI_REGEX) def append(self, payload): - """! Append stream buffer with payload and process. Returns non-KV strings""" + """Append a KV pair to the buffer. + + Scrapes the first KV pair found in 'payload' and appends it to the internal + buffer. + + Args: + payload: Text to scrape a KV pair from. + + Returns: + Non-KV strings in payload. + """ logger = HtrunLogger("CONN") try: self.buff += payload.decode("utf-8") @@ -64,23 +80,28 @@ def append(self, payload): return discarded def search(self): - """! Check if there is a KV value in buffer """ + """Check if there is a KV pair in the buffer.""" return len(self.kvl) > 0 def pop_kv(self): + """Pop a KV pair from the buffer.""" if len(self.kvl): return self.kvl.pop(0) return None, None, time() def conn_primitive_factory(conn_resource, config, event_queue, logger): - """! Factory producing connectors based on type and config - @param conn_resource Name of connection primitive (e.g. 'serial' for - local serial port connection or 'grm' for global resource manager) - @param event_queue Even queue of Key-Value protocol - @param config Global configuration for connection process - @param logger Host Test logger instance - @return Object of type or None if type of connection primitive unknown (conn_resource) + """Construct a ConnectorPrimitive subclass for the given name and config. + + Args: + conn_resource: Name of connection primitive (e.g. 'serial' for + local serial port connection or 'grm' for global resource manager). + event_queue: Event queue of Key-Value protocol messages. + config: Global configuration map describing the connection target. + logger: Host Test logger instance. + + Returns: + A ConnectorPrimitive, or None if conn_resource is not recognised. """ polling_timeout = int(config.get("polling_timeout", 60)) logger.prn_inf( @@ -118,6 +139,14 @@ def conn_primitive_factory(conn_resource, config, event_queue, logger): def conn_process(event_queue, dut_event_queue, config): + """Start the connection process to the DUT. + + Args: + event_queue: KV messages read by the host. + dut_event_queue: KV messages sent to the DUT. + config: Map of configuration settings describing the test env and the DUT. + """ + def __notify_conn_lost(): error_msg = connector.error() connector.finish() @@ -182,15 +211,16 @@ def __send_sync(timeout=None): __notify_conn_lost() return 0 - # Sync packet management allows us to manipulate the way htrun sends __sync packet(s) - # With current settings we can force on htrun to send __sync packets in this manner: + # Sync packet management allows us to manipulate the way htrun sends __sync + # packet(s) With current settings we can force on htrun to send __sync packets in + # this manner: # # * --sync=0 - No sync packets will be sent to target platform # * --sync=-10 - __sync packets will be sent unless we will reach - # timeout or proper response is sent from target platform - # * --sync=N - Send up to N __sync packets to target platform. Response - # is sent unless we get response from target platform or - # timeout occur + # timeout or proper response is sent from target platform + # * --sync=N - Send up to N __sync packets to target platform. + # Response is sent unless we get response from target platform or timeout + # occur if sync_behavior > 0: # Sending up to 'n' __sync packets @@ -255,7 +285,8 @@ def __send_sync(timeout=None): __notify_conn_lost() break - # Since read is done every 0.2 sec, with maximum baud rate we can receive 2304 bytes in one read in worst case. + # Since read is done every 0.2 sec, with maximum baud rate we can receive 2304 + # bytes in one read in worst case. data = connector.read(2304) if data: # Stream data stream KV parsing @@ -278,8 +309,8 @@ def __send_sync(timeout=None): event_queue.put((key, value, time())) idx = sync_uuid_list.index(value) logger.prn_inf( - "found SYNC in stream: {{%s;%s}} it is #%d sent, queued..." - % (key, value, idx) + "found SYNC in stream: {{%s;%s}} it is #%d sent, " + "queued..." % (key, value, idx) ) else: logger.prn_err( @@ -287,7 +318,8 @@ def __send_sync(timeout=None): % (key, value) ) logger.prn_inf( - "Resetting the part and sync timeout to clear out the buffer..." + "Resetting the part and sync timeout to clear out the" + " buffer..." ) connector.reset() loop_timer = time() @@ -313,8 +345,8 @@ def __send_sync(timeout=None): sync_uuid_list.append(sync_uuid) sync_behavior -= 1 loop_timer = time() - # Sync behavior will be zero and if last sync fails we should report connection - # lost + # Sync behavior will be zero and if last sync fails we should + # report connection lost if sync_behavior == 0: last_sync = True else: diff --git a/src/htrun/host_tests_logger/__init__.py b/src/htrun/host_tests_logger/__init__.py index 8dd8e8dd..762351d2 100644 --- a/src/htrun/host_tests_logger/__init__.py +++ b/src/htrun/host_tests_logger/__init__.py @@ -2,5 +2,6 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""Logging package.""" from .ht_logger import HtrunLogger diff --git a/src/htrun/host_tests_logger/ht_logger.py b/src/htrun/host_tests_logger/ht_logger.py index 35a85382..4e880197 100644 --- a/src/htrun/host_tests_logger/ht_logger.py +++ b/src/htrun/host_tests_logger/ht_logger.py @@ -2,7 +2,7 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # - +"""Logger.""" import sys import logging @@ -10,9 +10,10 @@ class HtrunLogger(object): - """! Yet another logger flavour """ + """Yet another logger flavour.""" def __init__(self, name): + """Initialise logger to stdout.""" logging.basicConfig( stream=sys.stdout, format="[%(created).2f][%(name)s]%(message)s", diff --git a/src/htrun/host_tests_plugins/__init__.py b/src/htrun/host_tests_plugins/__init__.py index 75d3417a..249c4b11 100644 --- a/src/htrun/host_tests_plugins/__init__.py +++ b/src/htrun/host_tests_plugins/__init__.py @@ -3,11 +3,10 @@ # SPDX-License-Identifier: Apache-2.0 # -"""! @package greentea-host-test-plugins +"""greentea-host-test-plugins package. This package contains plugins used by host test to reset, flash devices etc. -This package can be extended with new packages to add more generic functionality - +This package can be extended with new packages to add more generic functionality. """ from . import host_test_registry @@ -63,31 +62,44 @@ ############################################################################### # Functional interface for host test plugin registry ############################################################################### + + def call_plugin(type, capability, *args, **kwargs): - """! Interface to call plugin registry functional way - @param capability Plugin capability we want to call - @param args Additional parameters passed to plugin - @param kwargs Additional parameters passed to plugin - @return Returns return value from call_plugin call + """Call a plugin from the HOST_TEST_PLUGIN_REGISTRY. + + Args: + capability: Plugin capability we want to call. + args: Additional parameters passed to plugin. + kwargs: Additional parameters passed to plugin. + + Returns: + True if the call succeeded, otherwise False. """ return HOST_TEST_PLUGIN_REGISTRY.call_plugin(type, capability, *args, **kwargs) def get_plugin_caps(type): - """! Get list of all capabilities for plugin family with the same type - @param type Type of a plugin - @return Returns list of all capabilities for plugin family with the same type. If there are no capabilities empty list is returned + """Get a list of all capabilities for a plugin type. + + Args: + type: Type of a plugin. + + Returns: + List of all capabilities for plugin family with the same type. If there are no + capabilities an empty list is returned. """ return HOST_TEST_PLUGIN_REGISTRY.get_plugin_caps(type) def get_plugin_info(): - """! Return plugins information - @return Dictionary HOST_TEST_PLUGIN_REGISTRY + """Get 'information' about the plugins currently in the registry. + + Returns: + Dictionary of 'information' about the plugins in HOST_TEST_PLUGIN_REGISTRY. """ return HOST_TEST_PLUGIN_REGISTRY.get_dict() def print_plugin_info(): - """! Prints plugins' information in user friendly way""" + """Print plugin information in a user friendly way.""" print(HOST_TEST_PLUGIN_REGISTRY) diff --git a/src/htrun/host_tests_plugins/host_test_plugins.py b/src/htrun/host_tests_plugins/host_test_plugins.py index 20bc0de4..75868095 100644 --- a/src/htrun/host_tests_plugins/host_test_plugins.py +++ b/src/htrun/host_tests_plugins/host_test_plugins.py @@ -2,7 +2,7 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # - +"""Base class for plugins.""" import os import sys import platform @@ -17,7 +17,7 @@ class HostTestPluginBase: - """! Base class for all plugins used with host tests""" + """Base class for all plugins used with host tests.""" ########################################################################### # Interface: @@ -36,7 +36,7 @@ class HostTestPluginBase: stable = False # Determine if plugin is stable and can be used def __init__(self): - """ctor""" + """Initialise the object.""" # Setting Host Test Logger instance ht_loggers = { "BasePlugin": HtrunLogger("PLGN"), @@ -50,27 +50,40 @@ def __init__(self): ########################################################################### def setup(self, *args, **kwargs): - """Configure plugin, this function should be called before plugin execute() method is used.""" + """Configure plugin. + + This function should be called before plugin execute() method is used. + """ return False def execute(self, capability, *args, **kwargs): - """! Executes capability by name - @param capability Capability name - @param args Additional arguments - @param kwargs Additional arguments - @details Each capability e.g. may directly just call some command line program or execute building pythonic function - @return Capability call return value + """Execute plugin 'capability' by name. + + Each capability may directly just call some command line program or execute a + function. + + Args: + capability: Capability name. + args: Additional arguments. + kwargs: Additional arguments. + + Returns: + Capability call return value. """ return False def is_os_supported(self, os_name=None): - """! - @return Returns true if plugin works (supportes) under certain OS - @os_name String describing OS. - See self.host_os_support() and self.host_os_info() - @details In some cases a plugin will not work under particular OS - mainly because command / software used to implement plugin - functionality is not available e.g. on MacOS or Linux. + """Check if the OS is supported by this plugin. + + In some cases a plugin will not work under a particular OS. Usually because the + command line tool used to implement the plugin functionality is not available. + + Args: + os_name: String describing OS. See self.host_os_support() and + self.host_os_info() + + Returns: + True if plugin works under certain OS. """ return True @@ -78,23 +91,26 @@ def is_os_supported(self, os_name=None): # Interface helper methods - overload only if you need to have custom behaviour ########################################################################### def print_plugin_error(self, text): - """! Function prints error in console and exits always with False - @param text Text to print + """Print error messages to the console. + + Args: + text: Text to print. """ self.plugin_logger.prn_err(text) return False def print_plugin_info(self, text, NL=True): - """! Function prints notification in console and exits always with True - @param text Text to print - @param NL Deprecated! Newline will be added behind text if this flag is True - """ + """Print notifications to the console. + Args: + text: Text to print. + NL: (Deprecated) Newline will be added behind text if this flag is True. + """ self.plugin_logger.prn_inf(text) return True def print_plugin_char(self, char): - """Function prints char on stdout""" + """Print a char to stdout.""" stdout.write(char) stdout.flush() return True @@ -107,18 +123,21 @@ def check_mount_point_ready( target_id=None, timeout=60, ): - """! Waits until destination_disk is ready and can be accessed by e.g. copy commands - @return True if mount point was ready in given time, False otherwise - @param destination_disk Mount point (disk) which will be checked for readiness - @param init_delay - Initial delay time before first access check - @param loop_delay - polling delay for access check - @param timeout Mount point pooling timeout in seconds - """ + """Wait until destination_disk is ready and can be accessed. + + Args: + destination_disk: Mount point (disk) which will be checked for readiness. + init_delay: Initial delay time before first access check. + loop_delay: Polling delay for access check. + timeout: Polling timeout in seconds. + Returns: + True if mount point was ready in given time, otherwise False. + """ if target_id: # Wait for mount point to appear with mbed-ls # and if it does check if mount point for target_id changed - # If mount point changed, use new mount point and check if its ready (os.access) + # If mount point changed, use new mount point and check if its ready. new_destination_disk = destination_disk # Sometimes OSes take a long time to mount devices (up to one minute). @@ -158,7 +177,8 @@ def check_mount_point_ready( destination_disk = new_destination_disk result = True - # Check if mount point we've promoted to be valid one (by optional target_id check above) + # Check if mount point we've promoted to be valid one (by optional target_id + # check above) # Let's wait for 30 * loop_delay + init_delay max if not access(destination_disk, F_OK): self.print_plugin_info( @@ -180,12 +200,17 @@ def check_mount_point_ready( return (result, destination_disk) def check_serial_port_ready(self, serial_port, target_id=None, timeout=60): - """! Function checks (using mbed-ls) and updates serial port name information for DUT with specified target_id. - If no target_id is specified function returns old serial port name. - @param serial_port Current serial port name - @param target_id Target ID of a device under test which serial port will be checked and updated if needed - @param timeout Serial port pooling timeout in seconds - @return Tuple with result (always True) and serial port read from mbed-ls + """Check and update serial port name information for DUT. + + If no target_id is specified return the old serial port name. + + Args: + serial_port: Current serial port name. + target_id: Target ID of a device under test. + timeout: Serial port pooling timeout in seconds. + + Returns: + Tuple with result (always True) and serial port read from mbed-ls. """ # If serial port changed (check using mbed-ls), use new serial port new_serial_port = None @@ -200,7 +225,8 @@ def check_serial_port_ready(self, serial_port, target_id=None, timeout=60): timeout_step = 0.5 timeout = int(timeout / timeout_step) for i in range(timeout): - # mbed_lstools.main.create() should be done inside the loop. Otherwise it will loop on same data. + # mbed_lstools.main.create() should be done inside the loop. Otherwise + # it will loop on same data. mbeds = create() mbed_list = mbeds.list_mbeds() # list of mbeds present # get first item in list with a matching target_id, if present @@ -216,7 +242,7 @@ def check_serial_port_ready(self, serial_port, target_id=None, timeout=60): ): new_serial_port = mbed_target["serial_port"] if new_serial_port != serial_port: - # Serial port changed, update to new serial port from mbed-ls + # Serial port changed, update to new serial port self.print_plugin_info( "Serial port for tid='%s' changed from '%s' to '%s'..." % (target_id, serial_port, new_serial_port) @@ -229,11 +255,17 @@ def check_serial_port_ready(self, serial_port, target_id=None, timeout=60): return new_serial_port def check_parameters(self, capability, *args, **kwargs): - """! This function should be ran each time we call execute() to check if none of the required parameters is missing - @param capability Capability name - @param args Additional parameters - @param kwargs Additional parameters - @return Returns True if all parameters are passed to plugin, else return False + """Check if required parameters are missing. + + This function should be called each time we call execute(). + + Args: + capability: Capability name. + args: Additional parameters. + kwargs: Additional parameters. + + Returns: + True if all required parameters are passed to plugin, otherwise False. """ missing_parameters = [] for parameter in self.required_parameters: @@ -247,12 +279,18 @@ def check_parameters(self, capability, *args, **kwargs): return True def run_command(self, cmd, shell=True, stdin=None): - """! Runs command from command line. - @param cmd Command to execute - @param shell True if shell command should be executed (eg. ls, ps) - @param stdin A custom stdin for the process running the command (defaults to None) - @details Function prints 'cmd' return code if execution failed - @return True if command successfully executed + """Run a shell command as a subprocess. + + Prints 'cmd' return code if execution failed. + + Args: + cmd: Command to execute. + shell: True if shell command should be executed (eg. ls, ps). + stdin: A custom stdin for the process running the command (defaults + to None). + + Returns: + True if command successfully executed, otherwise False. """ result = True try: @@ -267,8 +305,10 @@ def run_command(self, cmd, shell=True, stdin=None): return result def host_os_info(self): - """! Returns information about host OS - @return Returns tuple with information about OS and host platform + """Return information about host OS. + + Returns: + Tuple with information about OS and host platform. """ result = ( os.name, @@ -280,9 +320,12 @@ def host_os_info(self): return result def host_os_support(self): - """! Function used to determine host OS - @return Returns None if host OS is unknown, else string with name - @details This function should be ported for new OS support + """Determine host OS. + + This function should be ported for new OS support. + + Returns: + None if host OS is unknown, else string with name. """ result = None os_info = self.host_os_info() diff --git a/src/htrun/host_tests_plugins/host_test_registry.py b/src/htrun/host_tests_plugins/host_test_registry.py index 59406d05..df65610c 100644 --- a/src/htrun/host_tests_plugins/host_test_registry.py +++ b/src/htrun/host_tests_plugins/host_test_registry.py @@ -2,34 +2,34 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""Registry of available host test plugins.""" class HostTestRegistry: - """Simple class used to register and store - host test plugins for further usage - """ + """Register and store host test plugins for further usage.""" # Here we actually store all the plugins PLUGINS = {} # 'Plugin Name' : Plugin Object def print_error(self, text): - """! Prints error directly on console + """Print an error message to the console. - @param text Error message text message + Args: + text: Error message reason. """ print("Plugin load failed. Reason: %s" % text) def register_plugin(self, plugin): - """! Registers and stores plugin inside registry for further use. + """Store a plugin in the registry. - @param plugin Plugin name + This method also calls the plugin's setup() method to configure the plugin. - @return True if plugin setup was successful and plugin can be registered, else False + Args: + plugin: Plugin instance. - @details Method also calls plugin's setup() function to configure plugin if needed. - Note: Different groups of plugins may demand different extra parameter. Plugins - should be at least for one type of plugin configured with the same parameters - because we do not know which of them will actually use particular parameter. + Returns: + True if plugin setup was successful and plugin can be registered, else + False. """ # TODO: # - check for unique caps for specified type @@ -44,12 +44,16 @@ def register_plugin(self, plugin): return False def call_plugin(self, type, capability, *args, **kwargs): - """! Execute plugin functionality respectively to its purpose - @param type Plugin type - @param capability Plugin capability name - @param args Additional plugin parameters - @param kwargs Additional plugin parameters - @return Returns result from plugin's execute() method + """Execute the first plugin found with a particular 'type' and 'capability'. + + Args: + type: Plugin type. + capability: Plugin capability name. + args: Additional plugin parameters. + kwargs: Additional plugin parameters. + + Returns: + True if a plugin was found and execution succeeded, otherwise False. """ for plugin_name in self.PLUGINS: plugin = self.PLUGINS[plugin_name] @@ -58,9 +62,14 @@ def call_plugin(self, type, capability, *args, **kwargs): return False def get_plugin_caps(self, type): - """! Returns list of all capabilities for plugin family with the same type - @param type Plugin type - @return Returns list of capabilities for plugin. If there are no capabilities empty list is returned + """List all capabilities for plugins with the specified type. + + Args: + type: Plugin type. + + Returns: + List of capabilities found. If there are no capabilities an empty + list is returned. """ result = [] for plugin_name in self.PLUGINS: @@ -70,16 +79,26 @@ def get_plugin_caps(self, type): return sorted(result) def load_plugin(self, name): - """! Used to load module from system (by import) - @param name name of the module to import - @return Returns result of __import__ operation + """Import a plugin module. + + Args: + name: Name of the module to import. + + Returns: + Imported module. + + Raises: + ImportError: The module with the given name was not found. """ mod = __import__("module_%s" % name) return mod def get_string(self): - """! User friendly printing method to show hooked plugins - @return Returns string formatted with PrettyTable + """User friendly printing method to show hooked plugins. + + Returns: + PrettyTable formatted string describing the contents of the plugin + registry. """ from prettytable import PrettyTable, HEADER @@ -115,6 +134,7 @@ def get_string(self): return pt.get_string() def get_dict(self): + """Return a dictionary of registered plugins.""" column_names = ["name", "type", "capabilities", "stable"] result = {} for plugin_name in sorted(self.PLUGINS.keys()): @@ -135,4 +155,5 @@ def get_dict(self): return result def __str__(self): + """Return str representation of object.""" return self.get_string() diff --git a/src/htrun/host_tests_plugins/module_copy_jn51xx.py b/src/htrun/host_tests_plugins/module_copy_jn51xx.py index 6dd010cc..1126c038 100644 --- a/src/htrun/host_tests_plugins/module_copy_jn51xx.py +++ b/src/htrun/host_tests_plugins/module_copy_jn51xx.py @@ -2,25 +2,26 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""Copy to devices using JN51xxProgrammer.exe.""" import os from .host_test_plugins import HostTestPluginBase class HostTestPluginCopyMethod_JN51xx(HostTestPluginBase): + """Plugin interface adaptor for the JN51xxProgrammer tool.""" - # Plugin interface name = "HostTestPluginCopyMethod_JN51xx" type = "CopyMethod" capabilities = ["jn51xx"] required_parameters = ["image_path", "serial"] def __init__(self): - """ctor""" + """Initialise plugin.""" HostTestPluginBase.__init__(self) def is_os_supported(self, os_name=None): - """! In this implementation this plugin only is supporeted under Windows machines""" + """Plugin only supported on Windows.""" # If no OS name provided use host OS name if not os_name: os_name = self.host_os_support() @@ -31,18 +32,25 @@ def is_os_supported(self, os_name=None): return False def setup(self, *args, **kwargs): - """! Configure plugin, this function should be called before plugin execute() method is used.""" + """Configure plugin. + + This function should be called before plugin execute() method is used. + """ self.JN51XX_PROGRAMMER = "JN51xxProgrammer.exe" return True def execute(self, capability, *args, **kwargs): - """! Executes capability by name + """Copy a firmware image to a JN51xx target using JN51xxProgrammer.exe. + + If the "capability" name is not 'jn51xx' this method will just fail. + + Args: + capability: Capability name. + args: Additional arguments. + kwargs: Additional arguments. - @param capability Capability name - @param args Additional arguments - @param kwargs Additional arguments - @details Each capability e.g. may directly just call some command line program or execute building pythonic function - @return Capability call return value + Returns: + True if the copy succeeded, otherwise False. """ if not kwargs["image_path"]: self.print_plugin_error("Error: image path not specified") @@ -73,5 +81,5 @@ def execute(self, capability, *args, **kwargs): def load_plugin(): - """Returns plugin available in this module""" + """Return plugin available in this module.""" return HostTestPluginCopyMethod_JN51xx() diff --git a/src/htrun/host_tests_plugins/module_copy_mps2.py b/src/htrun/host_tests_plugins/module_copy_mps2.py index a407e1bf..672935d3 100644 --- a/src/htrun/host_tests_plugins/module_copy_mps2.py +++ b/src/htrun/host_tests_plugins/module_copy_mps2.py @@ -2,6 +2,7 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""MPS2 specific flashing / binary setup functions.""" import os from shutil import copy @@ -9,7 +10,7 @@ class HostTestPluginCopyMethod_MPS2(HostTestPluginBase): - # MPS2 specific flashing / binary setup funcitons + """Plugin interface adapter for a shell 'copy' command.""" name = "HostTestPluginCopyMethod_MPS2" type = "CopyMethod" @@ -18,19 +19,21 @@ class HostTestPluginCopyMethod_MPS2(HostTestPluginBase): required_parameters = ["image_path", "destination_disk"] def __init__(self): - """ctor""" + """Initialise plugin.""" HostTestPluginBase.__init__(self) def mps2_copy(self, image_path, destination_disk): - """! mps2 copy method for "mbed enabled" devices. - This copies the file on the MPS2 keeping the same extension but - renaming it "mbed.extension" - @param image_path Path to file to be copied - @param destination_disk Path to destination (mbed mount point) + """mps2 copy method for "mbed enabled" devices. - @details this uses shutil copy to copy the file. + Copies the file to the MPS2, using shutil.copy. Prepends the file + extension with 'mbed'. - @return Returns True if copy (flashing) was successful + Args: + image_path: Path to file to be copied. + destination_disk: Path to destination (mbed mount point). + + Returns; + True if copy (flashing) was successful, otherwise False. """ result = True # Keep the same extension in the test spec and on the MPS2 @@ -51,15 +54,20 @@ def mps2_copy(self, image_path, destination_disk): return result def setup(self, *args, **kwargs): - """Configure plugin, this function should be called before plugin execute() method is used.""" + """Configure plugin. + + This function should be called before plugin execute() method is used. + """ return True def execute(self, capability, *args, **kwargs): - """! Executes capability by name. - @details Each capability may directly just call some command line program or execute building pythonic function - @return Returns True if 'capability' operation was successful - """ + """Copy a firmware image to a device using the mps2_copy method. + + If the "capability" name is not 'mps2' this method will just fail. + Returns: + True if copy operation succeeded, otherwise False. + """ if not kwargs["image_path"]: self.print_plugin_error("Error: image path not specified") return False @@ -81,7 +89,8 @@ def execute(self, capability, *args, **kwargs): destination_disk = os.path.normpath(kwargs["destination_disk"]) # Wait for mount point to be ready # if mount point changed according to target_id use new mount point - # available in result (_, destination_disk) of check_mount_point_ready + # available in result (_, destination_disk) of + # check_mount_point_ready result, destination_disk = self.check_mount_point_ready( destination_disk, target_id=target_id, timeout=pooling_timeout ) # Blocking @@ -91,5 +100,5 @@ def execute(self, capability, *args, **kwargs): def load_plugin(): - """Returns plugin available in this module""" + """Return plugin available in this module.""" return HostTestPluginCopyMethod_MPS2() diff --git a/src/htrun/host_tests_plugins/module_copy_pyocd.py b/src/htrun/host_tests_plugins/module_copy_pyocd.py index 415f66e4..c676ab4e 100644 --- a/src/htrun/host_tests_plugins/module_copy_pyocd.py +++ b/src/htrun/host_tests_plugins/module_copy_pyocd.py @@ -2,6 +2,7 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""Flash a firmware image to a device using PyOCD.""" import os from .host_test_plugins import HostTestPluginBase @@ -16,7 +17,8 @@ class HostTestPluginCopyMethod_pyOCD(HostTestPluginBase): - # Plugin interface + """Plugin interface adaptor for pyOCD.""" + name = "HostTestPluginCopyMethod_pyOCD" type = "CopyMethod" stable = True @@ -24,19 +26,28 @@ class HostTestPluginCopyMethod_pyOCD(HostTestPluginBase): required_parameters = ["image_path", "target_id"] def __init__(self): - """ctor""" + """Initialise plugin.""" HostTestPluginBase.__init__(self) def setup(self, *args, **kwargs): - """Configure plugin, this function should be called before plugin execute() method is used.""" + """Configure plugin. + + This function should be called before plugin execute() method is used. + """ return True def execute(self, capability, *args, **kwargs): - """! Executes capability by name - @param capability Capability name - @param args Additional arguments - @param kwargs Additional arguments - @return Capability call return value + """Flash a firmware image to a device using pyOCD. + + In this implementation we don't seem to care what the capability name is. + + Args: + capability: Capability name. + args: Additional arguments. + kwargs: Additional arguments. + + Returns: + True if flashing succeeded, otherwise False. """ if not PYOCD_PRESENT: self.print_plugin_error( @@ -84,5 +95,5 @@ def execute(self, capability, *args, **kwargs): def load_plugin(): - """Returns plugin available in this module""" + """Return plugin available in this module.""" return HostTestPluginCopyMethod_pyOCD() diff --git a/src/htrun/host_tests_plugins/module_copy_shell.py b/src/htrun/host_tests_plugins/module_copy_shell.py index 5e3ce0a4..59ac88b6 100644 --- a/src/htrun/host_tests_plugins/module_copy_shell.py +++ b/src/htrun/host_tests_plugins/module_copy_shell.py @@ -2,13 +2,15 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # - +"""Wrapper around cp/xcopy/copy.""" import os from os.path import join, basename from .host_test_plugins import HostTestPluginBase class HostTestPluginCopyMethod_Shell(HostTestPluginBase): + """Plugin interface adaptor for shell copy commands.""" + # Plugin interface name = "HostTestPluginCopyMethod_Shell" type = "CopyMethod" @@ -17,21 +19,29 @@ class HostTestPluginCopyMethod_Shell(HostTestPluginBase): required_parameters = ["image_path", "destination_disk"] def __init__(self): - """ctor""" + """Initialise the plugin.""" HostTestPluginBase.__init__(self) def setup(self, *args, **kwargs): - """Configure plugin, this function should be called before plugin execute() method is used.""" + """Configure plugin. + + This function should be called before plugin execute() method is used. + """ return True def execute(self, capability, *args, **kwargs): - """! Executes capability by name + """Copy an image to a destination disk using a shell copy command. + + "capability" is used to select which command to invoke, valid + capabilities are "shell", "cp", "copy" and "xcopy". + + Args: + capability: Capability name. + args: Additional arguments. + kwargs: Additional arguments. - @param capability Capability name - @param args Additional arguments - @param kwargs Additional arguments - @details Each capability e.g. may directly just call some command line program or execute building pythonic function - @return Capability call return value + Returns: + True if the copy succeeded, otherwise False. """ if not kwargs["image_path"]: self.print_plugin_error("Error: image path not specified") @@ -83,5 +93,5 @@ def execute(self, capability, *args, **kwargs): def load_plugin(): - """Returns plugin available in this module""" + """Return plugin available in this module.""" return HostTestPluginCopyMethod_Shell() diff --git a/src/htrun/host_tests_plugins/module_copy_silabs.py b/src/htrun/host_tests_plugins/module_copy_silabs.py index 6689feef..95a59cef 100644 --- a/src/htrun/host_tests_plugins/module_copy_silabs.py +++ b/src/htrun/host_tests_plugins/module_copy_silabs.py @@ -2,14 +2,15 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""Copy firmware images to silab devices using the eACommander.exe tool.""" import os from .host_test_plugins import HostTestPluginBase class HostTestPluginCopyMethod_Silabs(HostTestPluginBase): + """Plugin interface adapter for eACommander.exe.""" - # Plugin interface name = "HostTestPluginCopyMethod_Silabs" type = "CopyMethod" capabilities = ["eACommander", "eACommander-usb"] @@ -17,24 +18,29 @@ class HostTestPluginCopyMethod_Silabs(HostTestPluginBase): stable = True def __init__(self): - """ctor""" + """Initialise plugin.""" HostTestPluginBase.__init__(self) def setup(self, *args, **kwargs): - """Configure plugin, this function should be called before plugin execute() method is used.""" + """Configure plugin. + + This function should be called before plugin execute() method is used. + """ self.EACOMMANDER_CMD = "eACommander.exe" return True def execute(self, capability, *args, **kwargs): - """! Executes capability by name + """Copy a firmware image to a silab device using eACommander.exe. - @param capability Capability name - @param args Additional arguments - @param kwargs Additional arguments + The "capability" name must be eACommander or this method will just fail. - @details Each capability e.g. may directly just call some command line program or execute building pythonic function + Args: + capability: Capability name. + args: Additional arguments. + kwargs: Additional arguments. - @return Capability call return value + Returns: + True if the copy was successful, otherwise False. """ result = False if self.check_parameters(capability, *args, **kwargs) is True: @@ -65,5 +71,5 @@ def execute(self, capability, *args, **kwargs): def load_plugin(): - """Returns plugin available in this module""" + """Return plugin available in this module.""" return HostTestPluginCopyMethod_Silabs() diff --git a/src/htrun/host_tests_plugins/module_copy_stlink.py b/src/htrun/host_tests_plugins/module_copy_stlink.py index 3ff51726..f0b404ba 100644 --- a/src/htrun/host_tests_plugins/module_copy_stlink.py +++ b/src/htrun/host_tests_plugins/module_copy_stlink.py @@ -2,12 +2,14 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""Implements a plugin to flash ST devices using ST-LINK-CLI.""" import os from .host_test_plugins import HostTestPluginBase class HostTestPluginCopyMethod_Stlink(HostTestPluginBase): + """Plugin interface adaptor for the ST-LINK-CLI.""" # Plugin interface name = "HostTestPluginCopyMethod_Stlink" @@ -16,11 +18,11 @@ class HostTestPluginCopyMethod_Stlink(HostTestPluginBase): required_parameters = ["image_path"] def __init__(self): - """ctor""" + """Initialise the object.""" HostTestPluginBase.__init__(self) def is_os_supported(self, os_name=None): - """! In this implementation this plugin only is supporeted under Windows machines""" + """Check if the OS is supported.""" # If no OS name provided use host OS name if not os_name: os_name = self.host_os_support() @@ -31,32 +33,38 @@ def is_os_supported(self, os_name=None): return False def setup(self, *args, **kwargs): - """! Configure plugin, this function should be called before plugin execute() method is used.""" + """Configure plugin. + + This function should be called before plugin execute() method is used. + """ self.ST_LINK_CLI = "ST-LINK_CLI.exe" return True def execute(self, capability, *args, **kwargs): - """! Executes capability by name + """Copy a firmware image to a deice using the ST-LINK-CLI. - @param capability Capability name - @param args Additional arguments - @param kwargs Additional arguments + If the "capability" name is not 'stlink' this method will just fail. - @details Each capability e.g. may directly just call some command line program or execute building pythonic function + Args: + capability: Capability name. + args: Additional arguments. + kwargs: Additional arguments. - @return Capability call return value + Returns: + True if the copy succeeded, otherwise False. """ result = False if self.check_parameters(capability, *args, **kwargs) is True: image_path = os.path.normpath(kwargs["image_path"]) if capability == "stlink": # Example: - # ST-LINK_CLI.exe -p "C:\Work\mbed\build\test\DISCO_F429ZI\GCC_ARM\MBED_A1\basic.bin" + # ST-LINK_CLI.exe -p \ + # "C:\Work\mbed\build\test\DISCO_F429ZI\GCC_ARM\MBED_A1\basic.bin" cmd = [self.ST_LINK_CLI, "-p", image_path, "0x08000000", "-V"] result = self.run_command(cmd) return result def load_plugin(): - """Returns plugin available in this module""" + """Return plugin available in this module.""" return HostTestPluginCopyMethod_Stlink() diff --git a/src/htrun/host_tests_plugins/module_copy_to_target.py b/src/htrun/host_tests_plugins/module_copy_to_target.py index 0aaff721..2f00869f 100644 --- a/src/htrun/host_tests_plugins/module_copy_to_target.py +++ b/src/htrun/host_tests_plugins/module_copy_to_target.py @@ -2,6 +2,7 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""Copy to DAPLink enabled devices using file operations.""" import os from shutil import copy @@ -9,21 +10,21 @@ class HostTestPluginCopyMethod_Target(HostTestPluginBase): - """Generic flashing method for Mbed enabled platforms (by copy)""" + """Generic flashing method for DAPLink enabled platforms using file copy.""" def __init__(self): - """ctor""" + """Initialise the plugin.""" HostTestPluginBase.__init__(self) def generic_target_copy(self, image_path, destination_disk): - """! Generic target copy method for "Mbed enabled" devices. + """Target copy method for "Mbed enabled" devices. - @param image_path Path to binary file to be flashed - @param destination_disk Path to destination (target mount point) + Args: + image_path: Path to binary file to be flashed. + destination_disk: Path to destination (target mount point). - @details It uses standard python shutil function to copy image_file (target specific binary) to device's disk. - - @return Returns True if copy (flashing) was successful + Returns: + True if copy (flashing) was successful, otherwise False. """ result = True if not destination_disk.endswith("/") and not destination_disk.endswith("\\"): @@ -38,7 +39,6 @@ def generic_target_copy(self, image_path, destination_disk): result = False return result - # Plugin interface name = "HostTestPluginCopyMethod_Target" type = "CopyMethod" stable = True @@ -46,13 +46,16 @@ def generic_target_copy(self, image_path, destination_disk): required_parameters = ["image_path", "destination_disk"] def setup(self, *args, **kwargs): - """Configure plugin, this function should be called before plugin execute() method is used.""" + """Configure plugin.""" return True def execute(self, capability, *args, **kwargs): - """! Executes capability by name. - @details Each capability may directly just call some command line program or execute building pythonic function - @return Returns True if 'capability' operation was successful + """Copy a firmware image to a DAPLink compatible device's filesystem. + + The "capability" name must be "shutil" or this function will just fail. + + Returns: + True if copy was successful, otherwise False. """ if not kwargs["image_path"]: self.print_plugin_error("Error: image path not specified") @@ -75,7 +78,8 @@ def execute(self, capability, *args, **kwargs): destination_disk = os.path.normpath(kwargs["destination_disk"]) # Wait for mount point to be ready # if mount point changed according to target_id use new mount point - # available in result (_, destination_disk) of check_mount_point_ready + # available in result (_, destination_disk) of + # check_mount_point_ready mount_res, destination_disk = self.check_mount_point_ready( destination_disk, target_id=self.target_id, @@ -87,5 +91,5 @@ def execute(self, capability, *args, **kwargs): def load_plugin(): - """Returns plugin available in this module""" + """Return plugin available in this module.""" return HostTestPluginCopyMethod_Target() diff --git a/src/htrun/host_tests_plugins/module_copy_ublox.py b/src/htrun/host_tests_plugins/module_copy_ublox.py index bddffcf9..5dbf36b4 100644 --- a/src/htrun/host_tests_plugins/module_copy_ublox.py +++ b/src/htrun/host_tests_plugins/module_copy_ublox.py @@ -2,21 +2,26 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""Copy images to ublox devices using FlashErase.exe.""" import os from .host_test_plugins import HostTestPluginBase class HostTestPluginCopyMethod_ublox(HostTestPluginBase): + """Plugin interface adaptor for the FlashErase.exe tool.""" - # Plugin interface name = "HostTestPluginCopyMethod_ublox" type = "CopyMethod" capabilities = ["ublox"] required_parameters = ["image_path"] def is_os_supported(self, os_name=None): - """! In this implementation this plugin only is supporeted under Windows machines""" + """Plugin only works on Windows. + + Args: + os_name: Name of the current OS. + """ # If no OS name provided use host OS name if not os_name: os_name = self.host_os_support() @@ -27,20 +32,25 @@ def is_os_supported(self, os_name=None): return False def setup(self, *args, **kwargs): - """! Configure plugin, this function should be called before plugin execute() method is used.""" + """Configure plugin. + + This function should be called before plugin execute() method is used. + """ self.FLASH_ERASE = "FlashErase.exe" return True def execute(self, capability, *args, **kwargs): - """! Executes capability by name + """Copy an image to a ublox device using FlashErase.exe. - @param capability Capability name - @param args Additional arguments - @param kwargs Additional arguments + The "capability" name must be "ublox" or this method will just fail. - @details Each capability e.g. may directly just call some command line program or execute building pythonic function + Args: + capability: Capability name. + args: Additional arguments. + kwargs: Additional arguments. - @return Capability call return value + Returns: + True if the copy succeeded, otherwise False. """ result = False if self.check_parameters(capability, *args, **kwargs) is True: @@ -64,5 +74,5 @@ def execute(self, capability, *args, **kwargs): def load_plugin(): - """Returns plugin available in this module""" + """Return plugin available in this module.""" return HostTestPluginCopyMethod_ublox() diff --git a/src/htrun/host_tests_plugins/module_power_cycle_target.py b/src/htrun/host_tests_plugins/module_power_cycle_target.py index 0028a68e..8b22bc05 100644 --- a/src/htrun/host_tests_plugins/module_power_cycle_target.py +++ b/src/htrun/host_tests_plugins/module_power_cycle_target.py @@ -2,6 +2,7 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""Power cycle devices using the 'Mbed TAS RM REST API'.""" import os import json @@ -11,8 +12,8 @@ class HostTestPluginPowerCycleResetMethod(HostTestPluginBase): + """Plugin interface adaptor for Mbed TAS RM REST API.""" - # Plugin interface name = "HostTestPluginPowerCycleResetMethod" type = "ResetMethod" stable = True @@ -20,21 +21,28 @@ class HostTestPluginPowerCycleResetMethod(HostTestPluginBase): required_parameters = ["target_id", "device_info"] def __init__(self): - """ctor""" + """Initialise plugin.""" HostTestPluginBase.__init__(self) def setup(self, *args, **kwargs): - """! Configure plugin, this function should be called before plugin execute() method is used.""" + """Configure plugin. + + This function should be called before plugin execute() method is used. + """ return True def execute(self, capability, *args, **kwargs): - """! Executes capability by name + """Power cycle a device using the TAS RM API. + + If the "capability" name is not "power_cycle" this method will just fail. + + Args: + capability: Capability name. + args: Additional arguments. + kwargs: Additional arguments. - @param capability Capability name - @param args Additional arguments - @param kwargs Additional arguments - @details Each capability e.g. may directly just call some command line program or execute building pythonic function - @return Capability call return value + Returns: + True if the power cycle succeeded, otherwise False. """ if "target_id" not in kwargs or not kwargs["target_id"]: self.print_plugin_error("Error: This plugin requires unique target_id") @@ -42,7 +50,8 @@ def execute(self, capability, *args, **kwargs): if "device_info" not in kwargs or type(kwargs["device_info"]) is not dict: self.print_plugin_error( - "Error: This plugin requires dict parameter 'device_info' passed by the caller." + "Error: This plugin requires dict parameter 'device_info' passed by " + "the caller." ) return False @@ -58,10 +67,7 @@ def execute(self, capability, *args, **kwargs): return result def __get_mbed_tas_rm_addr(self): - """ - Get IP and Port of mbed tas rm service. - :return: - """ + """Get IP and Port of mbed tas rm service.""" try: ip = os.environ["MBED_TAS_RM_IP"] port = os.environ["MBED_TAS_RM_PORT"] @@ -76,16 +82,7 @@ def __get_mbed_tas_rm_addr(self): return None def __hw_reset(self, ip, port, target_id, device_info): - """ - Reset target device using TAS RM API - - :param ip: - :param port: - :param target_id: - :param device_info: - :return: - """ - + """Reset target device using TAS RM API.""" switch_off_req = { "name": "switchResource", "sub_requests": [ @@ -170,11 +167,6 @@ def poll_state(required_state): @staticmethod def __run_request(ip, port, request): - """ - - :param request: - :return: - """ headers = {"Content-type": "application/json", "Accept": "text/plain"} get_resp = requests.get( "http://%s:%s/" % (ip, port), data=json.dumps(request), headers=headers @@ -187,5 +179,5 @@ def __run_request(ip, port, request): def load_plugin(): - """! Returns plugin available in this module""" + """Return plugin available in this module.""" return HostTestPluginPowerCycleResetMethod() diff --git a/src/htrun/host_tests_plugins/module_reset_jn51xx.py b/src/htrun/host_tests_plugins/module_reset_jn51xx.py index 866ff02c..7a39db05 100644 --- a/src/htrun/host_tests_plugins/module_reset_jn51xx.py +++ b/src/htrun/host_tests_plugins/module_reset_jn51xx.py @@ -2,13 +2,14 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""Reset method using the JN51xxProgrammer.exe.""" from .host_test_plugins import HostTestPluginBase class HostTestPluginResetMethod_JN51xx(HostTestPluginBase): + """Plugin interface adaptor for JN51xxProgrammer.exe.""" - # Plugin interface name = "HostTestPluginResetMethod_JN51xx" type = "ResetMethod" capabilities = ["jn51xx"] @@ -16,11 +17,11 @@ class HostTestPluginResetMethod_JN51xx(HostTestPluginBase): stable = False def __init__(self): - """ctor""" + """Initialise the plugin.""" HostTestPluginBase.__init__(self) def is_os_supported(self, os_name=None): - """! In this implementation this plugin only is supporeted under Windows machines""" + """Plugin is only supported on Windows.""" # If no OS name provided use host OS name if not os_name: os_name = self.host_os_support() @@ -31,19 +32,26 @@ def is_os_supported(self, os_name=None): return False def setup(self, *args, **kwargs): - """! Configure plugin, this function should be called before plugin execute() method is used.""" + """Configure plugin. + + This function should be called before plugin execute() method is used. + """ # Note you need to have eACommander.exe on your system path! self.JN51XX_PROGRAMMER = "JN51xxProgrammer.exe" return True def execute(self, capability, *args, **kwargs): - """! Executes capability by name + """Reset a device using the JN51xxProgrammer.exe. + + If the "capability" name is not 'jn51xx' this method will just fail. + + Args: + capability: Capability name. + args: Additional arguments. + kwargs: Additional arguments. - @param capability Capability name - @param args Additional arguments - @param kwargs Additional arguments - @details Each capability e.g. may directly just call some command line program or execute building pythonic function - @return Capability call return value + Returns: + True if the reset succeeded, otherwise False. """ if not kwargs["serial"]: self.print_plugin_error("Error: serial port not set (not opened?)") @@ -53,12 +61,11 @@ def execute(self, capability, *args, **kwargs): if self.check_parameters(capability, *args, **kwargs): if kwargs["serial"]: if capability == "jn51xx": - # Example: - # The device should be automatically reset before the programmer disconnects. - # Issuing a command with no file to program or read will put the device into - # programming mode and then reset it. E.g. - # $ JN51xxProgrammer.exe -s COM5 -V0 - # COM5: Detected JN5179 with MAC address 00:15:8D:00:01:24:E0:37 + # Example: The device should be automatically reset before the + # programmer disconnects. Issuing a command with no file to program + # or read will put the device into programming mode and then reset + # it. E.g. $ JN51xxProgrammer.exe -s COM5 -V0 COM5: Detected JN5179 + # with MAC address 00:15:8D:00:01:24:E0:37 serial_port = kwargs["serial"] cmd = [self.JN51XX_PROGRAMMER, "-s", serial_port, "-V0"] result = self.run_command(cmd) @@ -66,5 +73,5 @@ def execute(self, capability, *args, **kwargs): def load_plugin(): - """Returns plugin available in this module""" + """Return plugin available in this module.""" return HostTestPluginResetMethod_JN51xx() diff --git a/src/htrun/host_tests_plugins/module_reset_mps2.py b/src/htrun/host_tests_plugins/module_reset_mps2.py index e1a2717b..7b8d7fdb 100644 --- a/src/htrun/host_tests_plugins/module_reset_mps2.py +++ b/src/htrun/host_tests_plugins/module_reset_mps2.py @@ -2,6 +2,7 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""Implements reset method for the ARM_MPS2 platform.""" import os import time @@ -12,10 +13,9 @@ class HostTestPluginResetMethod_MPS2(HostTestPluginBase): - """! Plugin used to reset ARM_MPS2 platform + """Plugin used to reset ARM_MPS2 platform. - @details Supports: - reboot.txt - startup from standby state, reboots when in run mode. + Supports reboot.txt startup from standby state, reboots when in run mode. """ # Plugin interface @@ -25,31 +25,34 @@ class HostTestPluginResetMethod_MPS2(HostTestPluginBase): required_parameters = ["disk"] def __init__(self): - """ctor""" + """Initialise the plugin.""" HostTestPluginBase.__init__(self) def touch_file(self, path): - """Touch file and set timestamp to items""" + """Touch file and set timestamp to items.""" with open(path, "a"): os.utime(path, None) def setup(self, *args, **kwargs): """Prepare / configure plugin to work. + This method can receive plugin specific parameters by kwargs and ignore other parameters which may affect other plugins. """ return True def execute(self, capability, *args, **kwargs): - """! Executes capability by name + """Reboot a device. - @param capability Capability name - @param args Additional arguments - @param kwargs Additional arguments + The "capability" name must be 'reboot.txt' or this method will just fail. - @details Each capability e.g. may directly just call some command line program or execute building pythonic function + Args: + capability: Capability name. + args: Additional arguments. + kwargs: Additional arguments. - @return Capability call return value + Returns: + True if the reset succeeded, otherwise False. """ result = False if not kwargs["disk"]: @@ -78,5 +81,5 @@ def execute(self, capability, *args, **kwargs): def load_plugin(): - """Returns plugin available in this module""" + """Return plugin available in this module.""" return HostTestPluginResetMethod_MPS2() diff --git a/src/htrun/host_tests_plugins/module_reset_pyocd.py b/src/htrun/host_tests_plugins/module_reset_pyocd.py index c90c6811..5023af03 100644 --- a/src/htrun/host_tests_plugins/module_reset_pyocd.py +++ b/src/htrun/host_tests_plugins/module_reset_pyocd.py @@ -2,6 +2,7 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""Use PyOCD to reset a target.""" from .host_test_plugins import HostTestPluginBase @@ -14,8 +15,8 @@ class HostTestPluginResetMethod_pyOCD(HostTestPluginBase): + """Plugin interface.""" - # Plugin interface name = "HostTestPluginResetMethod_pyOCD" type = "ResetMethod" stable = True @@ -23,25 +24,26 @@ class HostTestPluginResetMethod_pyOCD(HostTestPluginBase): required_parameters = ["target_id"] def __init__(self): - """! ctor - @details We can check module version by referring to version attribute - import pkg_resources - print pkg_resources.require("htrun")[0].version - '2.7' - """ + """Initialise plugin.""" HostTestPluginBase.__init__(self) def setup(self, *args, **kwargs): - """! Configure plugin, this function should be called before plugin execute() method is used.""" + """Configure plugin. + + This is a no-op for this plugin. + """ return True def execute(self, capability, *args, **kwargs): - """! Executes capability by name - @param capability Capability name - @param args Additional arguments - @param kwargs Additional arguments - @details Each capability e.g. may directly just call some command line program or execute building pythonic function - @return Capability call return value + """Reset a target using pyOCD. + + The "capability" name must be "pyocd". If it isn't this method will just fail. + + Args: + capability: Capability name. + + Returns: + True if the reset was successful, otherwise False. """ if not PYOCD_PRESENT: self.print_plugin_error( @@ -69,5 +71,5 @@ def execute(self, capability, *args, **kwargs): def load_plugin(): - """! Returns plugin available in this module""" + """Return plugin available in this module.""" return HostTestPluginResetMethod_pyOCD() diff --git a/src/htrun/host_tests_plugins/module_reset_silabs.py b/src/htrun/host_tests_plugins/module_reset_silabs.py index 0d1c6205..50ca0460 100644 --- a/src/htrun/host_tests_plugins/module_reset_silabs.py +++ b/src/htrun/host_tests_plugins/module_reset_silabs.py @@ -2,11 +2,13 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""Implements a reset method using the eACommander tool.""" from .host_test_plugins import HostTestPluginBase class HostTestPluginResetMethod_SiLabs(HostTestPluginBase): + """Plugin interface adaptor for the eACommander tool.""" # Plugin interface name = "HostTestPluginResetMethod_SiLabs" @@ -16,33 +18,41 @@ class HostTestPluginResetMethod_SiLabs(HostTestPluginBase): stable = True def __init__(self): - """ctor""" + """Initialise the plugin.""" HostTestPluginBase.__init__(self) def setup(self, *args, **kwargs): - """Configure plugin, this function should be called before plugin execute() method is used.""" + """Configure plugin. + + This function should be called before plugin execute() method is used. + """ # Note you need to have eACommander.exe on your system path! self.EACOMMANDER_CMD = "eACommander.exe" return True def execute(self, capability, *args, **kwargs): - """! Executes capability by name + """Reset a device using eACommander.exe. - @param capability Capability name - @param args Additional arguments - @param kwargs Additional arguments + "capability" is used to select the reset method used by eACommander, + either serial or USB. - @details Each capability e.g. may directly just call some command line program or execute building pythonic function + Args: + capability: Capability name. + args: Additional arguments. + kwargs: Additional arguments. - @return Capability call return value + Returns: + True if the reset succeeded, otherwise False. """ result = False if self.check_parameters(capability, *args, **kwargs) is True: disk = kwargs["disk"].rstrip("/\\") if capability == "eACommander": - # For this copy method 'disk' will be 'serialno' for eACommander command line parameters - # Note: Commands are executed in the order they are specified on the command line + # For this copy method 'disk' will be 'serialno' for eACommander command + # line parameters. + # Note: Commands are executed in the order they are specified on the + # command line cmd = [ self.EACOMMANDER_CMD, "--serialno", @@ -53,8 +63,10 @@ def execute(self, capability, *args, **kwargs): ] result = self.run_command(cmd) elif capability == "eACommander-usb": - # For this copy method 'disk' will be 'usb address' for eACommander command line parameters - # Note: Commands are executed in the order they are specified on the command line + # For this copy method 'disk' will be 'usb address' for eACommander + # command line parameters + # Note: Commands are executed in the order they are specified on the + # command line cmd = [ self.EACOMMANDER_CMD, "--usb", @@ -68,5 +80,5 @@ def execute(self, capability, *args, **kwargs): def load_plugin(): - """Returns plugin available in this module""" + """Return plugin available in this module.""" return HostTestPluginResetMethod_SiLabs() diff --git a/src/htrun/host_tests_plugins/module_reset_stlink.py b/src/htrun/host_tests_plugins/module_reset_stlink.py index 0174f68a..0c14816f 100644 --- a/src/htrun/host_tests_plugins/module_reset_stlink.py +++ b/src/htrun/host_tests_plugins/module_reset_stlink.py @@ -2,6 +2,7 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""Implements STLINK_CLI copy method.""" import os import sys @@ -12,6 +13,7 @@ class HostTestPluginResetMethod_Stlink(HostTestPluginBase): + """Plugin interface adaptor for STLINK_CLI.""" # Plugin interface name = "HostTestPluginResetMethod_Stlink" @@ -21,11 +23,11 @@ class HostTestPluginResetMethod_Stlink(HostTestPluginBase): stable = False def __init__(self): - """ctor""" + """Initialise plugin.""" HostTestPluginBase.__init__(self) def is_os_supported(self, os_name=None): - """! In this implementation this plugin only is supporeted under Windows machines""" + """Plugin only supported on Windows.""" # If no OS name provided use host OS name if not os_name: os_name = self.host_os_support() @@ -36,16 +38,26 @@ def is_os_supported(self, os_name=None): return False def setup(self, *args, **kwargs): - """! Configure plugin, this function should be called before plugin execute() method is used.""" + """Configure plugin. + + This function should be called before plugin execute() method is used. + """ # Note you need to have eACommander.exe on your system path! self.ST_LINK_CLI = "ST-LINK_CLI.exe" return True def create_stlink_fix_file(self, file_path): - """! Creates a file with a line separator - This is to work around a bug in ST-LINK CLI that does not let the target run after burning it. + """Create a file with a line separator. + + This is to work around a bug in ST-LINK CLI that does not let the target run + after burning it. See https://github.com/ARMmbed/mbed-os-tools/issues/147 for the details. - @param file_path A path to write into this file + + Note: + This method will exit the python interpreter if it encounters an OSError. + + Args: + file_path: A path to write into this file. """ try: with open(file_path, "w") as fix_file: @@ -55,15 +67,19 @@ def create_stlink_fix_file(self, file_path): sys.exit(1) def execute(self, capability, *args, **kwargs): - """! Executes capability by name + """Reset a device using the ST-LINK-CLI. + + Note: + This method will exit the python interpreter if it encounters an OSError. - @param capability Capability name - @param args Additional arguments - @param kwargs Additional arguments + Args: + capability: Capability name. + args: Additional arguments. + kwargs: Additional arguments. - @details Each capability e.g. may directly just call some command line program or execute building pythonic function - @return Capability call return value + Returns: + True if the reset succeeded, otherwise False. """ result = False if self.check_parameters(capability, *args, **kwargs) is True: @@ -73,8 +89,9 @@ def execute(self, capability, *args, **kwargs): cmd = [self.ST_LINK_CLI, "-Rst", "-Run"] # Due to the ST-LINK bug, we must press enter after burning the target - # We do this here automatically by passing a file which contains an `ENTER` (line separator) - # to the ST-LINK CLI as `stdin` for the running process + # We do this here automatically by passing a file which contains an + # `ENTER` (line separator) to the ST-LINK CLI as `stdin` for the running + # process enter_file_path = os.path.join(tempfile.gettempdir(), FIX_FILE_NAME) self.create_stlink_fix_file(enter_file_path) try: @@ -89,5 +106,5 @@ def execute(self, capability, *args, **kwargs): def load_plugin(): - """Returns plugin available in this module""" + """Return plugin available in this module.""" return HostTestPluginResetMethod_Stlink() diff --git a/src/htrun/host_tests_plugins/module_reset_target.py b/src/htrun/host_tests_plugins/module_reset_target.py index 49ff7f08..b1b0f78f 100644 --- a/src/htrun/host_tests_plugins/module_reset_target.py +++ b/src/htrun/host_tests_plugins/module_reset_target.py @@ -2,6 +2,7 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""Serial reset plugin.""" import re import pkg_resources @@ -9,6 +10,7 @@ class HostTestPluginResetMethod_Target(HostTestPluginBase): + """Plugin interface adapter for serial reset.""" # Plugin interface name = "HostTestPluginResetMethod_Target" @@ -18,12 +20,7 @@ class HostTestPluginResetMethod_Target(HostTestPluginBase): required_parameters = ["serial"] def __init__(self): - """! ctor - @details We can check module version by referring to version attribute - import pkg_resources - print pkg_resources.require("htrun")[0].version - '2.7' - """ + """Initialise plugin.""" HostTestPluginBase.__init__(self) self.re_float = re.compile("^\d+\.\d+") pyserial_version = pkg_resources.require("pyserial")[0].version @@ -31,8 +28,10 @@ def __init__(self): self.is_pyserial_v3 = float(self.pyserial_version) >= 3.0 def get_pyserial_version(self, pyserial_version): - """! Retrieve pyserial module version - @return Returns float with pyserial module number + """Retrieve pyserial module version. + + Returns: + Float with pyserial module number. """ version = 3.0 m = self.re_float.search(pyserial_version) @@ -44,14 +43,14 @@ def get_pyserial_version(self, pyserial_version): return version def safe_sendBreak(self, serial): - """! Closure for pyserial version dependant API calls""" + """Closure for pyserial version dependent API calls.""" if self.is_pyserial_v3: return self._safe_sendBreak_v3_0(serial) return self._safe_sendBreak_v2_7(serial) def _safe_sendBreak_v2_7(self, serial): - """! pyserial 2.7 API implementation of sendBreak/setBreak - @details + """Pyserial 2.7 API implementation of sendBreak/setBreak. + Below API is deprecated for pyserial 3.x versions! http://pyserial.readthedocs.org/en/latest/pyserial_api.html#serial.Serial.sendBreak http://pyserial.readthedocs.org/en/latest/pyserial_api.html#serial.Serial.setBreak @@ -61,7 +60,8 @@ def _safe_sendBreak_v2_7(self, serial): serial.sendBreak() except: # In Linux a termios.error is raised in sendBreak and in setBreak. - # The following setBreak() is needed to release the reset signal on the target mcu. + # The following setBreak() is needed to release the reset signal on the + # target mcu. try: serial.setBreak(False) except: @@ -69,8 +69,8 @@ def _safe_sendBreak_v2_7(self, serial): return result def _safe_sendBreak_v3_0(self, serial): - """! pyserial 3.x API implementation of send_brea / break_condition - @details + """Pyserial 3.x API implementation of send_break / break_condition. + http://pyserial.readthedocs.org/en/latest/pyserial_api.html#serial.Serial.send_break http://pyserial.readthedocs.org/en/latest/pyserial_api.html#serial.Serial.break_condition """ @@ -79,7 +79,8 @@ def _safe_sendBreak_v3_0(self, serial): serial.send_break() except: # In Linux a termios.error is raised in sendBreak and in setBreak. - # The following break_condition = False is needed to release the reset signal on the target mcu. + # The following break_condition = False is needed to release the reset + # signal on the target mcu. try: serial.break_condition = False except Exception as e: @@ -90,17 +91,22 @@ def _safe_sendBreak_v3_0(self, serial): return result def setup(self, *args, **kwargs): - """! Configure plugin, this function should be called before plugin execute() method is used.""" + """Configure plugin. + + This function should be called before plugin execute() method is used. + """ return True def execute(self, capability, *args, **kwargs): - """! Executes capability by name + """Reset a device using serial break. + + Args: + capability: Capability name. + args: Additional arguments. + kwargs: Additional arguments. - @param capability Capability name - @param args Additional arguments - @param kwargs Additional arguments - @details Each capability e.g. may directly just call some command line program or execute building pythonic function - @return Capability call return value + Returns: + True if the reset succeeded, otherwise False. """ if not kwargs["serial"]: self.print_plugin_error("Error: serial port not set (not opened?)") @@ -116,5 +122,5 @@ def execute(self, capability, *args, **kwargs): def load_plugin(): - """! Returns plugin available in this module""" + """Return plugin available in this module.""" return HostTestPluginResetMethod_Target() diff --git a/src/htrun/host_tests_plugins/module_reset_ublox.py b/src/htrun/host_tests_plugins/module_reset_ublox.py index 56ea24e6..add7fbb7 100644 --- a/src/htrun/host_tests_plugins/module_reset_ublox.py +++ b/src/htrun/host_tests_plugins/module_reset_ublox.py @@ -2,13 +2,14 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""Reset ublox devices using the jlink.exe tool.""" from .host_test_plugins import HostTestPluginBase class HostTestPluginResetMethod_ublox(HostTestPluginBase): + """Plugin interface adapter for jlink.exe.""" - # Plugin interface name = "HostTestPluginResetMethod_ublox" type = "ResetMethod" capabilities = ["ublox"] @@ -16,7 +17,7 @@ class HostTestPluginResetMethod_ublox(HostTestPluginBase): stable = False def is_os_supported(self, os_name=None): - """! In this implementation this plugin only is supporeted under Windows machines""" + """Plugin is only supported on Windows.""" # If no OS name provided use host OS name if not os_name: os_name = self.host_os_support() @@ -27,21 +28,27 @@ def is_os_supported(self, os_name=None): return False def setup(self, *args, **kwargs): - """! Configure plugin, this function should be called before plugin execute() method is used.""" - # Note you need to have jlink.exe on your system path! + """Configure plugin. + + This function should be called before plugin execute() method is used. + + Note: you need to have jlink.exe on your system path! + """ self.JLINK = "jlink.exe" return True def execute(self, capability, *args, **kwargs): - """! Executes capability by name + """Reset a ublox device using jlink.exe. - @param capability Capability name - @param args Additional arguments - @param kwargs Additional arguments + The "capability" name must be "ublox" or this method will just fail. - @details Each capability e.g. may directly just call some command line program or execute building pythonic function + Args: + capability: Capability name. + args: Additional arguments. + kwargs: Additional arguments. - @return Capability call return value + Returns: + True if the reset was successful, otherwise False. """ result = False if self.check_parameters(capability, *args, **kwargs) is True: @@ -54,5 +61,5 @@ def execute(self, capability, *args, **kwargs): def load_plugin(): - """Returns plugin available in this module""" + """Return plugin available in this module.""" return HostTestPluginResetMethod_ublox() diff --git a/src/htrun/host_tests_registry/__init__.py b/src/htrun/host_tests_registry/__init__.py index 06b6d76e..4abdd7d7 100644 --- a/src/htrun/host_tests_registry/__init__.py +++ b/src/htrun/host_tests_registry/__init__.py @@ -3,10 +3,10 @@ # SPDX-License-Identifier: Apache-2.0 # -"""! @package host_registry - -Host registry is used to store all host tests (by id) which can be called from test framework +"""host_registry package. +Host registry is used to store all host tests (by id) which can be called from the test +framework. """ from .host_registry import HostRegistry diff --git a/src/htrun/host_tests_registry/host_registry.py b/src/htrun/host_tests_registry/host_registry.py index b43d156b..cc822103 100644 --- a/src/htrun/host_tests_registry/host_registry.py +++ b/src/htrun/host_tests_registry/host_registry.py @@ -2,6 +2,7 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""Registry of available host tests.""" try: from imp import load_source @@ -10,6 +11,12 @@ import sys def load_source(module_name, file_path): + """Dynamically import a plugin module. + + Args: + module_name: Name of the module to load. + file_path: Path to the module. + """ spec = importlib.util.spec_from_file_location(module_name, file_path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) @@ -25,48 +32,55 @@ def load_source(module_name, file_path): class HostRegistry: - """Class stores registry with host tests and objects representing them""" + """Class stores registry with host tests and objects representing them.""" HOST_TESTS = {} # Map between host_test_name -> host_test_object def register_host_test(self, ht_name, ht_object): - """! Registers host test object by name + """Register host test object by name. - @param ht_name Host test unique name - @param ht_object Host test class object + Args: + ht_name: Host test unique name. + ht_object: Host test class object. """ if ht_name not in self.HOST_TESTS: self.HOST_TESTS[ht_name] = ht_object def unregister_host_test(self, ht_name): - """! Unregisters host test object by name + """Unregister host test object by name. - @param ht_name Host test unique name + Args: + ht_name: Host test unique name. """ if ht_name in self.HOST_TESTS: del self.HOST_TESTS[ht_name] def get_host_test(self, ht_name): - """! Fetches host test object by name + """Fetch host test object by name. - @param ht_name Host test unique name + Args: + ht_name: Host test unique name. - @return Host test callable object or None if object is not found + Returns: + Host test callable object or None if object is not found. """ return self.HOST_TESTS[ht_name] if ht_name in self.HOST_TESTS else None def is_host_test(self, ht_name): - """! Checks (by name) if host test object is registered already + """Check (by name) if host test object is registered already. - @param ht_name Host test unique name + Args: + ht_name: Host test unique name. - @return True if ht_name is registered (available), else False + Returns: + True if ht_name is registered (available), else False. """ return ht_name in self.HOST_TESTS and self.HOST_TESTS[ht_name] is not None def table(self, verbose=False): - """! Prints list of registered host test classes (by name) - @Detail For devel & debug purposes + """Print list of registered host test classes (by name). + + For dev & debug purposes. """ from prettytable import PrettyTable, HEADER @@ -85,8 +99,13 @@ def table(self, verbose=False): return pt.get_string() def register_from_path(self, path, verbose=False): - """Enumerates and registers locally stored host tests + """Enumerate and register locally stored host tests. + Host test are derived from htrun.BaseHostTest classes + + Args: + path: Path to the host tests directory. + verbose: Enable verbose logging. """ if path: path = path.strip('"') @@ -107,7 +126,7 @@ def _add_module_to_registry(self, path, module_file, verbose): mod = load_source(module_name, abspath(join(path, module_file))) except Exception as e: print( - "HOST: Error! While loading local host test module '%s'" + "HOST: Error while loading local host test module '%s'" % join(path, module_file) ) print("HOST: %s" % str(e)) diff --git a/src/htrun/host_tests_runner/__init__.py b/src/htrun/host_tests_runner/__init__.py index f78fda46..01f8f9fb 100644 --- a/src/htrun/host_tests_runner/__init__.py +++ b/src/htrun/host_tests_runner/__init__.py @@ -2,12 +2,11 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""greentea-host-test-runner. -"""! @package greentea-host-test-runner - -This package contains basic host test implementation with algorithms to flash and reset device. -Functionality can be overridden by set of plugins which can provide specialised flashing and reset implementations. - +This package contains basic host test implementation with algorithms to flash +and reset devices. Functionality can be overridden by set of plugins which can +provide specialised flashing and reset implementations. """ from pkg_resources import get_distribution diff --git a/src/htrun/host_tests_runner/host_test.py b/src/htrun/host_tests_runner/host_test.py index 8f5e9025..795e2157 100644 --- a/src/htrun/host_tests_runner/host_test.py +++ b/src/htrun/host_tests_runner/host_test.py @@ -2,6 +2,7 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""Classes performing test selection, test execution and reporting of test results.""" from sys import stdout from .target_base import TargetBase @@ -9,12 +10,18 @@ class HostTestResults(object): - """! Test results set by host tests """ + """Test results set by host tests.""" def enum(self, **enums): + """Return a new base type. + + Args: + enums: Dictionary of namespaces for the type. + """ return type("Enum", (), enums) def __init__(self): + """Initialise the test result type.""" self.TestResults = self.enum( RESULT_SUCCESS="success", RESULT_FAILURE="failure", @@ -60,63 +67,83 @@ def __init__(self): ] def get_test_result_int(self, test_result_str): - """! Maps test result string to unique integer """ + """Map test result string to unique integer. + + Args: + test_result_str: Test results as a string. + """ if test_result_str in self.TestResultsList: return self.TestResultsList.index(test_result_str) return -1 def __getitem__(self, test_result_str): - """! Returns numerical result code """ + """Return integer test result code. + + Args: + test_result_str: Test results as a string. + """ return self.get_test_result_int(test_result_str) class Test(HostTestResults): - """Base class for host test's test runner""" + """Base class for host test's test runner.""" def __init__(self, options): - """ctor""" + """Initialise the test runner. + + Args: + options: Options instance describing the target. + """ HostTestResults.__init__(self) self.target = TargetBase(options) def run(self): - """Test runner for host test. This function will start executing - test and forward test result via serial port to test suite - """ + """Run a host test.""" pass def setup(self): - """! Setup and check if configuration for test is correct. - @details This function can for example check if serial port is already opened - """ + """Set up and check if configuration for test is correct.""" pass def notify(self, msg): - """! On screen notification function - @param msg Text message sent to stdout directly + """Write a message to stdout. + + Flush immediately so the buffered data is immediately written to stdout. + + Args: + msg: Text to write to stdout. """ stdout.write(msg) stdout.flush() def print_result(self, result): - """! Test result unified printing function - @param result Should be a member of HostTestResults.RESULT_* enums + """Print test results in "KV" format packets. + + Args: + result: A member of HostTestResults.RESULT_* enums. """ self.notify("{{%s}}\n" % result) self.notify("{{%s}}\n" % self.RESULT_END) def finish(self): - """dctor for this class, finishes tasks and closes resources""" + """Finishes tasks and closes resources.""" pass def get_hello_string(self): - """Hello string used as first print""" + """Hello string used as first print.""" return "host test executor ver. " + __version__ class DefaultTestSelectorBase(Test): - """! Test class with serial port initialization - @details This is a base for other test selectors, initializes + """Test class with serial port initialization. + + This is a base for other test selectors. """ def __init__(self, options): + """Initialise test selector. + + Args: + options: Options instance describing the target. + """ Test.__init__(self, options=options) diff --git a/src/htrun/host_tests_runner/host_test_default.py b/src/htrun/host_tests_runner/host_test_default.py index bdfaca96..74f782ff 100644 --- a/src/htrun/host_tests_runner/host_test_default.py +++ b/src/htrun/host_tests_runner/host_test_default.py @@ -2,7 +2,7 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # - +"""Default host test.""" import re import sys @@ -35,13 +35,13 @@ class DefaultTestSelector(DefaultTestSelectorBase): - """! Select default host_test supervision (replaced after auto detection) """ + """Select default host_test supervision (replaced after auto detection).""" RESET_TYPE_SW_RST = "software_reset" RESET_TYPE_HW_RST = "hardware_reset" def __init__(self, options): - """! ctor""" + """Initialise plugin.""" self.options = options self.logger = HtrunLogger("HTST") @@ -71,7 +71,8 @@ def __init__(self, options): sys.exit(0) if options.global_resource_mgr or options.fast_model_connection: - # If Global/Simulator Resource Mgr is working it will handle reset/flashing workflow + # If Global/Simulator Resource Mgr is working it will handle + # reset/flashing workflow # So local plugins are offline self.options.skip_reset = True self.options.skip_flashing = True @@ -87,13 +88,17 @@ def __init__(self, options): DefaultTestSelectorBase.__init__(self, options) def is_host_test_obj_compatible(self, obj_instance): - """! Check if host test object loaded is actually host test class - derived from 'htrun.BaseHostTest()' - Additionaly if host test class implements custom ctor it should - call BaseHostTest().__Init__() - @param obj_instance Instance of host test derived class - @return True if obj_instance is derived from htrun.BaseHostTest() - and BaseHostTest.__init__() was called, else return False + """Check if host test object is derived from 'htrun.BaseHostTest()'. + + Additionaly if host test class implements custom ctor it should + call BaseHostTest().__Init__() + + Args: + obj_instance: Instance of host test derived class. + + Returns: + True if obj_instance is derived from htrun.BaseHostTest() and + BaseHostTest.__init__() was called, else False. """ result = False if obj_instance: @@ -102,14 +107,15 @@ def is_host_test_obj_compatible(self, obj_instance): # Check if host test (obj_instance) is derived from htrun.BaseHostTest() if not isinstance(obj_instance, BaseHostTest): - # In theory we should always get host test objects inheriting from BaseHostTest() - # because loader will only load those. + # In theory we should always get host test objects inheriting from + # BaseHostTest() because loader will only load those. self.logger.prn_err( "host test must inherit from htrun.BaseHostTest() class" ) result = False - # Check if BaseHostTest.__init__() was called when custom host test is created + # Check if BaseHostTest.__init__() was called when custom host test is + # created if not obj_instance.base_host_test_inited(): self.logger.prn_err( "custom host test __init__() must call BaseHostTest.__init__(self)" @@ -119,9 +125,12 @@ def is_host_test_obj_compatible(self, obj_instance): return result def run_test(self): - """! This function implements key-value protocol state-machine. - Handling of all events and connector are handled here. - @return Return self.TestResults.RESULT_* enum + """Implement key-value protocol state-machine. + + Handling of all events and connector are handled here. + + Returns: + self.TestResults.RESULT_* enum. """ result = None timeout_duration = 10 # Default test case timeout @@ -130,7 +139,10 @@ def run_test(self): dut_event_queue = Queue() # Events from host to DUT {k;v} def callback__notify_prn(key, value, timestamp): - """! Handles __norify_prn. Prints all lines in separate log line """ + """Handle __notify_prn. + + Prints all lines in a separate log line. + """ for line in value.splitlines(): self.logger.prn_inf(line) @@ -192,14 +204,19 @@ def start_conn_process(): return p def process_code_coverage(key, value, timestamp): - """! Process the found coverage key value and perform an idle - loop checking for more timeing out if there is no response from - the target within the idle timeout. - @param key The key from the first coverage event - @param value The value from the first coverage event - @param timestamp The timestamp from the first coverage event - @return The elapsed time taken by the processing of code coverage, - and the (key, value, and timestamp) of the next event + """Process the found coverage key value. + + Perform an idle loop checking for more timeing out if there is no response + from the target within the idle timeout. + + Args: + key: The key from the first coverage event. + value: The value from the first coverage event. + timestamp: The timestamp from the first coverage event. + + Returns: + The elapsed time taken by the processing of code coverage, and + the (key, value, and timestamp) of the next event. """ original_start_time = time() start_time = time() @@ -242,8 +259,8 @@ def process_code_coverage(key, value, timestamp): conn_process_started = True else: self.logger.prn_err( - "First expected event was '__conn_process_start', received '%s' instead" - % key + "First expected event was '__conn_process_start', received '%s' " + "instead" % key ) except QueueEmpty: @@ -299,8 +316,8 @@ def process_code_coverage(key, value, timestamp): # Check if host test object loaded is actually host test class # derived from 'htrun.BaseHostTest()' - # Additionaly if host test class implements custom ctor it should - # call BaseHostTest().__Init__() + # Additionaly if host test class implements custom ctor it + # should call BaseHostTest().__Init__() if self.test_supervisor and self.is_host_test_obj_compatible( self.test_supervisor ): @@ -309,10 +326,12 @@ def process_code_coverage(key, value, timestamp): event_queue, dut_event_queue, config ) try: - # After setup() user should already register all callbacks + # After setup() user should already register all + # callbacks self.test_supervisor.setup() except (TypeError, ValueError): - # setup() can throw in normal circumstances TypeError and ValueError + # setup() can throw in normal circumstances TypeError + # and ValueError self.logger.prn_err("host test setup() failed, reason:") self.logger.prn_inf("==== Traceback start ====") for line in traceback.format_exc().splitlines(): @@ -361,7 +380,8 @@ def process_code_coverage(key, value, timestamp): result = self.RESULT_IO_SERIAL event_queue.put(("__exit_event_queue", 0, time())) elif key == "__exit_event_queue": - # This event is sent by the host test indicating no more events expected + # This event is sent by the host test indicating no more events + # expected self.logger.prn_inf("%s received" % (key)) callbacks__exit_event_queue = True break @@ -390,7 +410,8 @@ def process_code_coverage(key, value, timestamp): if key == "__notify_complete": # This event is sent by Host Test, test result is in value - # or if value is None, value will be retrieved from HostTest.result() method + # or if value is None, value will be retrieved from + # HostTest.result() method self.logger.prn_inf("%s(%s)" % (key, str(value))) result = value event_queue.put(("__exit_event_queue", 0, time())) @@ -404,7 +425,8 @@ def process_code_coverage(key, value, timestamp): if value == DefaultTestSelector.RESET_TYPE_SW_RST: self.logger.prn_inf("Performing software reset.") - # Just disconnecting and re-connecting comm process will soft reset DUT + # Just disconnecting and re-connecting comm process will + # soft reset DUT elif value == DefaultTestSelector.RESET_TYPE_HW_RST: self.logger.prn_inf("Performing hard reset.") # request hardware reset @@ -441,7 +463,8 @@ def process_code_coverage(key, value, timestamp): callbacks__exit = True event_queue.put(("__exit_event_queue", 0, time())) elif key == "__exit_event_queue": - # This event is sent by the host test indicating no more events expected + # This event is sent by the host test indicating no more events + # expected self.logger.prn_inf("%s received" % (key)) callbacks__exit_event_queue = True break @@ -498,7 +521,8 @@ def process_code_coverage(key, value, timestamp): # NOTE: with the introduction of the '__exit_event_queue' event, there # should never be left events assuming the DUT has stopped sending data - # over the serial data. Leaving this for now to catch anything that slips through. + # over the serial data. Leaving this for now to catch anything that slips + # through. if callbacks_consume: # We are consuming all remaining events if requested @@ -510,7 +534,8 @@ def process_code_coverage(key, value, timestamp): if key == "__notify_complete": # This event is sent by Host Test, test result is in value - # or if value is None, value will be retrieved from HostTest.result() method + # or if value is None, value will be retrieved from + # HostTest.result() method self.logger.prn_inf("%s(%s)" % (key, str(value))) result = value elif key.startswith("__"): @@ -557,16 +582,21 @@ def process_code_coverage(key, value, timestamp): return result def execute(self): - """! Test runner for host test. - - @details This function will start executing test and forward test result via serial port - to test suite. This function is sensitive to work-flow flags such as --skip-flashing, - --skip-reset etc. - First function will flash device with binary, initialize serial port for communication, - reset target. On serial port handshake with test case will be performed. It is when host - test reads property data from serial port (sent over serial port). - At the end of the procedure proper host test (defined in set properties) will be executed - and test execution timeout will be measured. + """Test runner for host test. + + This function will execute the test and forward the test result via + serial port to the test suite. This function is sensitive to work-flow + flags such as --skip-flashing, --skip-reset etc... + + This function will: + Flash a firmware image to the DUT. + Initialize the serial port for communication. + Reset the device. + + A handshake with the device will be performed when the host test reads + property data from the serial port. At the end of the procedure the + appropriate host test (as defined in set properties) will be executed + and test execution timeout will be measured. """ result = self.RESULT_UNDEF @@ -604,12 +634,13 @@ def execute(self): return -3 # Keyboard interrupt def match_log(self, line): - """ - Matches lines from compare log with the target serial output. Compare log lines are matched in seq using index - self.compare_log_idx. Lines can be strings to be matched as is or regular expressions. + """Match lines from compare log with the target serial output. + + Compare log lines are matched in seq using index self.compare_log_idx. Lines can + be strings to be matched as is or regular expressions. - :param line: - :return: + Args: + line: Line of text to search. """ if self.compare_log_idx < len(self.compare_log): regex = self.compare_log[self.compare_log_idx] diff --git a/src/htrun/host_tests_runner/target_base.py b/src/htrun/host_tests_runner/target_base.py index a9bf20c3..bdf35e01 100644 --- a/src/htrun/host_tests_runner/target_base.py +++ b/src/htrun/host_tests_runner/target_base.py @@ -2,6 +2,7 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""Base class for targets.""" import json import os @@ -13,13 +14,15 @@ class TargetBase: - """! TargetBase class for a host driven test - @details This class stores information about things like disk, port, serial speed etc. - Class is also responsible for manipulation of serial port between host and mbed device + """TargetBase class for a host driven test. + + This class stores information necessary to communicate with the device + under test. It is responsible for managing serial port communication + between the host and the device. """ def __init__(self, options): - """ctor""" + """Initialise common target attributes.""" self.options = options self.logger = HtrunLogger("Greentea") # Options related to copy / reset the connected target device @@ -45,8 +48,9 @@ def __init__(self, options): self.serial_baud = DEFAULT_BAUD_RATE self.serial_timeout = 1 - # Users can use command to pass port speeds together with port name. E.g. COM4:115200:1 - # Format if PORT:SPEED:TIMEOUT + # Users can use command to pass port speeds together with port name. E.g. + # COM4:115200:1 + # Format is PORT:SPEED:TIMEOUT port_config = self.port.split(":") if self.port else "" if len(port_config) == 2: # -p COM4:115200 @@ -96,15 +100,18 @@ def copy_image( mcu=None, retry_copy=5, ): - """! Closure for copy_image_raw() method. - @return Returns result from copy plugin + """Copy an image to a target. + + Returns: + True if the copy succeeded, otherwise False. """ def get_remount_count(disk_path, tries=2): - """! Get the remount count from 'DETAILS.TXT' file - @return Returns count, None if not-available - """ + """Get the remount count from 'DETAILS.TXT' file. + Returns: + Remount count, or None if not available. + """ # In case of no disk path, nothing to do if disk_path is None: return None @@ -137,8 +144,10 @@ def get_remount_count(disk_path, tries=2): return None def check_flash_error(target_id, disk, initial_remount_count): - """! Check for flash errors - @return Returns false if FAIL.TXT present, else true + """Check for flash errors. + + Returns: + False if FAIL.TXT present, else True. """ if not target_id: self.logger.prn_wrn( @@ -149,7 +158,8 @@ def check_flash_error(target_id, disk, initial_remount_count): bad_files = set(["FAIL.TXT"]) # Re-try at max 5 times with 0.5 sec in delay for i in range(5): - # mbed_lstools.main.create() should be done inside the loop. Otherwise it will loop on same data. + # mbed_lstools.main.create() should be done inside the loop. Otherwise + # it will loop on same data. mbeds = create() mbed_list = mbeds.list_mbeds() # list of mbeds present # get first item in list with a matching target_id, if present @@ -244,10 +254,19 @@ def check_flash_error(target_id, disk, initial_remount_count): def copy_image_raw( self, image_path=None, disk=None, copy_method=None, port=None, mcu=None ): - """! Copy file depending on method you want to use. Handles exception - and return code from shell copy commands. - @return Returns result from copy plugin - @details Method which is actually copying image to connected target + """Copy a firmware image to disk with the given copy_method. + + Handles exception and return code from shell copy commands. + + Args: + image_path: Path to the firmware image to copy/flash. + disk: Destination path forr the firmware image. + copy_method: Copy plugin name to use. + port: Serial COM port. + mcu: Name of the MCU being targeted. + + Returns: + True if copy succeeded, otherwise False. """ # image_path - Where is binary with target's firmware @@ -271,10 +290,10 @@ def copy_image_raw( return result def hw_reset(self): - """ - Performs hardware reset of target ned device. + """Perform hardware reset of target device. - :return: + Returns: + True if the reset succeeded, otherwise False. """ device_info = {} result = ht_plugins.call_plugin( diff --git a/src/htrun/host_tests_toolbox/__init__.py b/src/htrun/host_tests_toolbox/__init__.py index 436ba526..36323078 100644 --- a/src/htrun/host_tests_toolbox/__init__.py +++ b/src/htrun/host_tests_toolbox/__init__.py @@ -2,6 +2,7 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""host_tests_toolbox package.""" from .host_functional import reset_dev from .host_functional import flash_dev diff --git a/src/htrun/host_tests_toolbox/host_functional.py b/src/htrun/host_tests_toolbox/host_functional.py index c26ffd70..80ece36b 100644 --- a/src/htrun/host_tests_toolbox/host_functional.py +++ b/src/htrun/host_tests_toolbox/host_functional.py @@ -2,7 +2,7 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # - +"""API to flash and reset devices using plugin methods.""" import sys import json from time import sleep @@ -13,11 +13,14 @@ def flash_dev( disk=None, image_path=None, copy_method="default", port=None, program_cycle_s=4 ): - """! Flash device using pythonic interface - @param disk Switch -d - @param image_path Switch -f - @param copy_method Switch -c (default: shell) - @param port Switch -p + """Flash a firmware image to a device. + + Args: + disk: Switch -d . + image_path: Switch -f . + copy_method: Switch -c (default: shell). + port: Switch -p . + program_cycle_s: Sleep time. """ if copy_method == "default": copy_method = "shell" @@ -43,18 +46,19 @@ def reset_dev( timeout=1, verbose=False, ): - """! Reset device using pythonic interface - @param port Switch -p - @param disk Switch -d - @param reset_type Switch -r - @param reset_timeout Switch -R - @param serial_port Serial port handler, set to None if you want this function to open serial - - @param baudrate Serial port baudrate - @param timeout Serial port timeout - @param verbose Verbose mode + """Reset a device. + + Args: + port: Switch -p . + disk: Switch -d . + reset_type: Switch -r . + reset_timeout: Switch -R . + serial_port: Serial port handler, set to None if you want this function + to open serial. + baudrate: Serial port baudrate. + timeout: Serial port timeout. + verbose: Verbose mode. """ - result = False if not serial_port: try: @@ -73,8 +77,9 @@ def reset_dev( def handle_send_break_cmd( port, disk, reset_type=None, baudrate=None, timeout=1, verbose=False ): - """! Resets platforms and prints serial port output - @detail Mix with switch -r RESET_TYPE and -p PORT for versatility + """Reset platform and print serial port output. + + Mix with switch -r RESET_TYPE and -p PORT for versatility. """ if not reset_type: reset_type = "default" diff --git a/src/htrun/htrun.py b/src/htrun/htrun.py index 9802d24f..c8de7d69 100644 --- a/src/htrun/htrun.py +++ b/src/htrun/htrun.py @@ -2,8 +2,8 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""Greentea Host Tests Runner.""" -# Greentea Host Tests Runner from multiprocessing import freeze_support from htrun import init_host_test_cli_params from htrun.host_tests_runner.host_test_default import DefaultTestSelector @@ -11,9 +11,10 @@ def main(): - """! This function drives command line tool 'htrun' which is using DefaultTestSelector - @details 1. Create DefaultTestSelector object and pass command line parameters - 2. Call default test execution function run() to start test instrumentation + """Drive command line tool 'htrun' which is using DefaultTestSelector. + + 1. Create DefaultTestSelector object and pass command line parameters. + 2. Call default test execution function run() to start test instrumentation. """ freeze_support() result = 0 diff --git a/test/host_tests/host_registry.py b/test/host_tests/host_registry.py index 8d042014..fa7bd9f2 100644 --- a/test/host_tests/host_registry.py +++ b/test/host_tests/host_registry.py @@ -2,6 +2,7 @@ # Copyright (c) 2021 Arm Limited and Contributors. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # +"""Tests for the HostRegistry class.""" import unittest from htrun.host_tests_registry import HostRegistry From 097afe64f94a675567648e4b5b69a92331573e31 Mon Sep 17 00:00:00 2001 From: Robert Walton Date: Thu, 15 Jul 2021 12:16:33 +0100 Subject: [PATCH 04/13] Remove unused imports Remove unused imports flagged by flake8. --- src/htrun/host_tests/rtc_auto.py | 2 +- src/htrun/host_tests/wait_us_auto.py | 1 - src/htrun/host_tests_conn_proxy/conn_primitive_fastmodel.py | 2 -- src/htrun/host_tests_runner/host_test_default.py | 1 - 4 files changed, 1 insertion(+), 5 deletions(-) diff --git a/src/htrun/host_tests/rtc_auto.py b/src/htrun/host_tests/rtc_auto.py index a0375312..022c23ab 100644 --- a/src/htrun/host_tests/rtc_auto.py +++ b/src/htrun/host_tests/rtc_auto.py @@ -5,7 +5,7 @@ """RTC auto test.""" import re -from time import time, strftime, gmtime +from time import strftime, gmtime from .. import BaseHostTest diff --git a/src/htrun/host_tests/wait_us_auto.py b/src/htrun/host_tests/wait_us_auto.py index 3174e6ad..b40a4f92 100644 --- a/src/htrun/host_tests/wait_us_auto.py +++ b/src/htrun/host_tests/wait_us_auto.py @@ -4,7 +4,6 @@ # """Test reads single characters from stdio and measures time between occurrences.""" -from time import time from .. import BaseHostTest diff --git a/src/htrun/host_tests_conn_proxy/conn_primitive_fastmodel.py b/src/htrun/host_tests_conn_proxy/conn_primitive_fastmodel.py index 03d577c5..5a6d26b1 100644 --- a/src/htrun/host_tests_conn_proxy/conn_primitive_fastmodel.py +++ b/src/htrun/host_tests_conn_proxy/conn_primitive_fastmodel.py @@ -4,8 +4,6 @@ # """Connect to fast models.""" -import telnetlib -import socket from .conn_primitive import ConnectorPrimitive, ConnectorPrimitiveException diff --git a/src/htrun/host_tests_runner/host_test_default.py b/src/htrun/host_tests_runner/host_test_default.py index 74f782ff..fdcc3124 100644 --- a/src/htrun/host_tests_runner/host_test_default.py +++ b/src/htrun/host_tests_runner/host_test_default.py @@ -26,7 +26,6 @@ from .host_test import DefaultTestSelectorBase from ..host_tests_logger import HtrunLogger from ..host_tests_conn_proxy import conn_process -from ..host_tests_toolbox.host_functional import handle_send_break_cmd if sys.version_info > (3, 0): from queue import Empty as QueueEmpty From e1dfddb6a24b288f86643ac19a88fede8bc6ba4d Mon Sep 17 00:00:00 2001 From: Robert Walton Date: Thu, 15 Jul 2021 12:20:22 +0100 Subject: [PATCH 05/13] FastModelConnectorPrimitive::read: Remove unused variable Simplify the logic and remove an unused variable. --- .../conn_primitive_fastmodel.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/src/htrun/host_tests_conn_proxy/conn_primitive_fastmodel.py b/src/htrun/host_tests_conn_proxy/conn_primitive_fastmodel.py index 5a6d26b1..e0ab3d15 100644 --- a/src/htrun/host_tests_conn_proxy/conn_primitive_fastmodel.py +++ b/src/htrun/host_tests_conn_proxy/conn_primitive_fastmodel.py @@ -149,19 +149,16 @@ def read(self, count): Returns: The data from the FastModel if the read was successful, otherwise False. """ - date = str() - if self.__resource_allocated(): - try: - data = self.resource.read() - except self.fm_agent_module.SimulatorError as e: - self.logger.prn_err( - "FastmodelConnectorPrimitive.read() failed: %s" % str(e) - ) - else: - return data - else: + if not self.__resource_allocated(): return False + try: + return self.resource.read() + except self.fm_agent_module.SimulatorError as e: + self.logger.prn_err( + "FastmodelConnectorPrimitive.read() failed: %s" % str(e) + ) + def write(self, payload, log=False): """Send text to the FastModel. From c4e94eff1c1647d1f677c9248f46e31aafde9075 Mon Sep 17 00:00:00 2001 From: Robert Walton Date: Thu, 15 Jul 2021 12:24:48 +0100 Subject: [PATCH 06/13] TargetBase: Fix unused and undefined variables Previously we tried to print an undefined variable during exception handling. We used a bare `except` statement which would suppress all possible exceptions including SystemExit and KeyboardInterrupt, print an "Unexpected error" message and forward the exception further up the stack. This does not seem like desirable behaviour, when a user hits CTRL-C they probably don't need to see an "Unexpected error" message in the log. Instead we now catch any exceptions derived from `Exception` (still not ideal) and print the error message from it. In another case we defined a local variable for an exception but it wasn't used, so remove it. --- src/htrun/host_tests_runner/target_base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/htrun/host_tests_runner/target_base.py b/src/htrun/host_tests_runner/target_base.py index bdf35e01..49fda7ed 100644 --- a/src/htrun/host_tests_runner/target_base.py +++ b/src/htrun/host_tests_runner/target_base.py @@ -87,7 +87,7 @@ def __init__(self, options): json_test_configuration_path, e.errno, e.strerror ) ) - except: + except Exception as e: self.logger.prn_err("Test configuration JSON Unexpected error:", str(e)) raise @@ -190,7 +190,7 @@ def check_flash_error(target_id, disk, initial_remount_count): ] ) common_items = bad_files.intersection(items) - except OSError as e: + except OSError: print("Failed to enumerate disk files, retrying") continue From 3bb30cb7b26be23443c787c3fdb19803ba36502b Mon Sep 17 00:00:00 2001 From: Robert Walton Date: Thu, 15 Jul 2021 12:34:40 +0100 Subject: [PATCH 07/13] HostTestRegistry::get_dict: Remove unused variable --- src/htrun/host_tests_plugins/host_test_registry.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/htrun/host_tests_plugins/host_test_registry.py b/src/htrun/host_tests_plugins/host_test_registry.py index df65610c..d8dcee92 100644 --- a/src/htrun/host_tests_plugins/host_test_registry.py +++ b/src/htrun/host_tests_plugins/host_test_registry.py @@ -135,7 +135,6 @@ def get_string(self): def get_dict(self): """Return a dictionary of registered plugins.""" - column_names = ["name", "type", "capabilities", "stable"] result = {} for plugin_name in sorted(self.PLUGINS.keys()): name = self.PLUGINS[plugin_name].name From 3edd0a010d2958209a45807dd97345c872572cd6 Mon Sep 17 00:00:00 2001 From: Robert Walton Date: Thu, 15 Jul 2021 12:36:49 +0100 Subject: [PATCH 08/13] HostTestPluginCopyMethod_Target::execute: Remove unused variable --- src/htrun/host_tests_plugins/module_copy_to_target.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/htrun/host_tests_plugins/module_copy_to_target.py b/src/htrun/host_tests_plugins/module_copy_to_target.py index 2f00869f..6066056a 100644 --- a/src/htrun/host_tests_plugins/module_copy_to_target.py +++ b/src/htrun/host_tests_plugins/module_copy_to_target.py @@ -65,8 +65,6 @@ def execute(self, capability, *args, **kwargs): self.print_plugin_error("Error: destination disk not specified") return False - # This optional parameter can be used if TargetID is provided (-t switch) - target_id = kwargs.get("target_id", None) pooling_timeout = kwargs.get("polling_timeout", 60) result = False From 2c6b7d2579d49119b3efac5e6705e13a905b8b5d Mon Sep 17 00:00:00 2001 From: Robert Walton Date: Thu, 15 Jul 2021 12:39:57 +0100 Subject: [PATCH 09/13] Fix invalid escape sequences in regex patterns Use raw strings which treat backslashes as literal characters to pass to the re module. --- src/htrun/host_tests/detect_auto.py | 2 +- src/htrun/host_tests/rtc_auto.py | 2 +- src/htrun/host_tests_plugins/module_reset_target.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/htrun/host_tests/detect_auto.py b/src/htrun/host_tests/detect_auto.py index 332553ac..19b9463d 100644 --- a/src/htrun/host_tests/detect_auto.py +++ b/src/htrun/host_tests/detect_auto.py @@ -11,7 +11,7 @@ class DetectPlatformTest(BaseHostTest): """Test to auto detect the platform.""" - PATTERN_MICRO_NAME = "Target '(\w+)'" + PATTERN_MICRO_NAME = r"Target '(\w+)'" re_detect_micro_name = re.compile(PATTERN_MICRO_NAME) def result(self): diff --git a/src/htrun/host_tests/rtc_auto.py b/src/htrun/host_tests/rtc_auto.py index 022c23ab..15c12905 100644 --- a/src/htrun/host_tests/rtc_auto.py +++ b/src/htrun/host_tests/rtc_auto.py @@ -12,7 +12,7 @@ class RTCTest(BaseHostTest): """Test RTC.""" - PATTERN_RTC_VALUE = "\[(\d+)\] \[(\d+-\d+-\d+ \d+:\d+:\d+ [AaPpMm]{2})\]" + PATTERN_RTC_VALUE = r"\[(\d+)\] \[(\d+-\d+-\d+ \d+:\d+:\d+ [AaPpMm]{2})\]" re_detect_rtc_value = re.compile(PATTERN_RTC_VALUE) __result = None diff --git a/src/htrun/host_tests_plugins/module_reset_target.py b/src/htrun/host_tests_plugins/module_reset_target.py index b1b0f78f..250de7b6 100644 --- a/src/htrun/host_tests_plugins/module_reset_target.py +++ b/src/htrun/host_tests_plugins/module_reset_target.py @@ -22,7 +22,7 @@ class HostTestPluginResetMethod_Target(HostTestPluginBase): def __init__(self): """Initialise plugin.""" HostTestPluginBase.__init__(self) - self.re_float = re.compile("^\d+\.\d+") + self.re_float = re.compile(r"^\d+\.\d+") pyserial_version = pkg_resources.require("pyserial")[0].version self.pyserial_version = self.get_pyserial_version(pyserial_version) self.is_pyserial_v3 = float(self.pyserial_version) >= 3.0 From 4aa25823c52c4dfc6e463b945502e856e5bdc452 Mon Sep 17 00:00:00 2001 From: Robert Walton Date: Thu, 15 Jul 2021 12:44:11 +0100 Subject: [PATCH 10/13] Replace bare except with Exception Catch the "base" non-interpreter exception instead of using bare except. We need to rework the error handling to only handle errors we care about. Instead of trying to trap all possible errors, which is an antipattern in exception based error handling. --- src/htrun/host_tests_plugins/module_reset_target.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/htrun/host_tests_plugins/module_reset_target.py b/src/htrun/host_tests_plugins/module_reset_target.py index 250de7b6..688d1b23 100644 --- a/src/htrun/host_tests_plugins/module_reset_target.py +++ b/src/htrun/host_tests_plugins/module_reset_target.py @@ -58,13 +58,13 @@ def _safe_sendBreak_v2_7(self, serial): result = True try: serial.sendBreak() - except: + except Exception: # In Linux a termios.error is raised in sendBreak and in setBreak. # The following setBreak() is needed to release the reset signal on the # target mcu. try: serial.setBreak(False) - except: + except Exception: result = False return result @@ -77,7 +77,7 @@ def _safe_sendBreak_v3_0(self, serial): result = True try: serial.send_break() - except: + except Exception: # In Linux a termios.error is raised in sendBreak and in setBreak. # The following break_condition = False is needed to release the reset # signal on the target mcu. From eba03bcebdceedb8a80625d2ea95d4cd0fc2d61b Mon Sep 17 00:00:00 2001 From: Robert Walton Date: Thu, 15 Jul 2021 13:00:02 +0100 Subject: [PATCH 11/13] Use instance checks for True and False objects Checking True or False by value makes no sense, they are not values. Instead we should check if we have an "instance" of the True or False objects. --- src/htrun/host_tests_conn_proxy/conn_proxy.py | 4 ++-- src/htrun/host_tests_runner/host_test_default.py | 4 ++-- src/htrun/host_tests_runner/target_base.py | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/htrun/host_tests_conn_proxy/conn_proxy.py b/src/htrun/host_tests_conn_proxy/conn_proxy.py index d0f3eb95..f29fb576 100644 --- a/src/htrun/host_tests_conn_proxy/conn_proxy.py +++ b/src/htrun/host_tests_conn_proxy/conn_proxy.py @@ -270,7 +270,7 @@ def __send_sync(timeout=None): pass # Check if target sent something else: # Return if state machine in host_test_default has finished to end process - if key == "__host_test_finished" and value == True: + if key == "__host_test_finished" and value is True: logger.prn_inf( "received special event '%s' value='%s', finishing" % (key, value) ) @@ -352,7 +352,7 @@ def __send_sync(timeout=None): else: __notify_conn_lost() break - elif last_sync == True: + elif last_sync is True: # SYNC lost connection event : Device not responding, send sync failed __notify_sync_failed() break diff --git a/src/htrun/host_tests_runner/host_test_default.py b/src/htrun/host_tests_runner/host_test_default.py index fdcc3124..1a2d9de1 100644 --- a/src/htrun/host_tests_runner/host_test_default.py +++ b/src/htrun/host_tests_runner/host_test_default.py @@ -616,9 +616,9 @@ def execute(self): # Execute test if flashing was successful or skipped test_result = self.run_test() - if test_result == True: + if test_result is True: result = self.RESULT_SUCCESS - elif test_result == False: + elif test_result is False: result = self.RESULT_FAILURE elif test_result is None: result = self.RESULT_ERROR diff --git a/src/htrun/host_tests_runner/target_base.py b/src/htrun/host_tests_runner/target_base.py index 49fda7ed..db239fa8 100644 --- a/src/htrun/host_tests_runner/target_base.py +++ b/src/htrun/host_tests_runner/target_base.py @@ -172,10 +172,10 @@ def check_flash_error(target_id, disk, initial_remount_count): "mount_point" in mbed_target and mbed_target["mount_point"] is not None ): - if not initial_remount_count is None: + if initial_remount_count is not None: new_remount_count = get_remount_count(disk) if ( - not new_remount_count is None + new_remount_count is not None and new_remount_count == initial_remount_count ): sleep(0.5) From 65ce3c1349beabe1aa8791e0da4af5e26f1504de Mon Sep 17 00:00:00 2001 From: Robert Walton Date: Fri, 16 Jul 2021 11:11:41 +0100 Subject: [PATCH 12/13] Format file with black --- src/htrun/host_tests_plugins/module_copy_pyocd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/htrun/host_tests_plugins/module_copy_pyocd.py b/src/htrun/host_tests_plugins/module_copy_pyocd.py index c676ab4e..275c8049 100644 --- a/src/htrun/host_tests_plugins/module_copy_pyocd.py +++ b/src/htrun/host_tests_plugins/module_copy_pyocd.py @@ -89,7 +89,7 @@ def execute(self, capability, *args, **kwargs): # Program the file programmer = FileProgrammer(session) - programmer.program(image_path, format=kwargs['format']) + programmer.program(image_path, format=kwargs["format"]) return True From bbb41198cf2b688adbed19b7288dfb7ea566bf7e Mon Sep 17 00:00:00 2001 From: Robert Walton Date: Tue, 6 Jul 2021 10:36:52 +0100 Subject: [PATCH 13/13] ci: Add github actions workflow The workflow contains two matrices to run on PRs: - lint: code formatting and static analysis - test: unit tests and code coverage --- .github/workflows/pr-check.yml | 60 ++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 .github/workflows/pr-check.yml diff --git a/.github/workflows/pr-check.yml b/.github/workflows/pr-check.yml new file mode 100644 index 00000000..9e0331cd --- /dev/null +++ b/.github/workflows/pr-check.yml @@ -0,0 +1,60 @@ +name: Test greentea tools + +on: [push, pull_request] + +jobs: + lint: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [3.6, 3.9] + + steps: + - uses: actions/checkout@v2 + + - name: Setup Python + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + + - name: Install tox + run: pip install tox + + - name: Code Formatting and Static Analysis + run: tox -e linting + + + test: + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [ubuntu-latest, macos-latest, windows-latest] + python-version: [3.6, 3.9] + + steps: + - uses: actions/checkout@v2 + with: + fetch-depth: 0 # Fetch history so setuptools-scm can calculate the version correctly + - name: Setup Python + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + + - name: Install tox + run: pip install tox + + - name: Run tests on ${{ matrix.os }} py ${{ matrix.python-version }} + run: tox -e py + + - name: Create Coverage Report + run: | + set -xe + python -m pip install coverage[toml] + python -m coverage xml + if: matrix.os == 'ubuntu-latest' && matrix.python-version == 3.9 + + - name: Upload Coverage Report + if: matrix.os == 'ubuntu-latest' && matrix.python-version == 3.9 + uses: "codecov/codecov-action@v1" + with: + fail_ci_if_error: true