diff --git a/tests/micro/zephyr/test_utils.py b/tests/micro/zephyr/test_utils.py index e4a22d2be647..e3a52dc79ab1 100644 --- a/tests/micro/zephyr/test_utils.py +++ b/tests/micro/zephyr/test_utils.py @@ -20,7 +20,7 @@ import pathlib import tarfile import tempfile -from typing import Union +import logging import numpy as np @@ -31,12 +31,19 @@ import tvm.micro from tvm.micro import export_model_library_format -from tvm.micro.testing import mlf_extract_workspace_size_bytes +from tvm.micro.model_library_format import generate_c_interface_header +from tvm.micro.testing import ( + mlf_extract_workspace_size_bytes, + aot_transport_init_wait, + aot_transport_find_message, +) TEMPLATE_PROJECT_DIR = pathlib.Path(tvm.micro.get_microtvm_template_projects("zephyr")) BOARDS = TEMPLATE_PROJECT_DIR / "boards.json" +_LOG = logging.getLogger(__name__) + def zephyr_boards() -> dict: """Returns a dict mapping board to target model""" @@ -68,7 +75,9 @@ def has_fpu(board: str): return board in fpu_boards -def build_project(temp_dir, zephyr_board, west_cmd, mod, build_config, extra_files_tar=None): +def build_project( + temp_dir, zephyr_board, west_cmd, mod, build_config, simd=False, extra_files_tar=None +): project_dir = temp_dir / "project" with tempfile.TemporaryDirectory() as tar_temp_dir: @@ -76,21 +85,22 @@ def build_project(temp_dir, zephyr_board, west_cmd, mod, build_config, extra_fil export_model_library_format(mod, model_tar_path) workspace_size = mlf_extract_workspace_size_bytes(model_tar_path) + project_options = { + "extra_files_tar": extra_files_tar, + "project_type": "aot_demo", + "west_cmd": west_cmd, + "verbose": bool(build_config.get("debug")), + "zephyr_board": zephyr_board, + "compile_definitions": [ + # TODO(mehrdadh): It fails without offset. + f"-DWORKSPACE_SIZE={workspace_size + 128}", + ], + } + if simd: + project_options["config_main_stack_size"] = 1536 + project = tvm.micro.project.generate_project_from_mlf( - str(TEMPLATE_PROJECT_DIR), - project_dir, - model_tar_path, - { - "extra_files_tar": extra_files_tar, - "project_type": "aot_demo", - "west_cmd": west_cmd, - "verbose": bool(build_config.get("debug")), - "zephyr_board": zephyr_board, - "compile_definitions": [ - # TODO(mehrdadh): It fails without offset. - f"-DWORKSPACE_SIZE={workspace_size + 128}", - ], - }, + str(TEMPLATE_PROJECT_DIR), project_dir, model_tar_path, project_options ) project.build() return project, project_dir @@ -167,3 +177,56 @@ def loadCMSIS(temp_dir): urlretrieve(file_url, f"{temp_path}/{file_name}") except HTTPError as e: print(f"Failed to download {file_url}: {e}") + + +def run_model(project): + project.flash() + + with project.transport() as transport: + aot_transport_init_wait(transport) + transport.write(b"infer%", timeout_sec=5) + result_line = aot_transport_find_message(transport, "result", timeout_sec=60) + + result_line = result_line.strip("\n") + result_line = result_line.split(":") + result = int(result_line[1]) + time = int(result_line[2]) + _LOG.info(f"Result: {result}\ttime: {time} ms") + + return result, time + + +def generate_project( + temp_dir, board, west_cmd, lowered, build_config, sample, output_shape, output_type, load_cmsis +): + with tempfile.NamedTemporaryFile() as tar_temp_file: + with tarfile.open(tar_temp_file.name, "w:gz") as tf: + with tempfile.TemporaryDirectory() as tar_temp_dir: + model_files_path = os.path.join(tar_temp_dir, "include") + os.mkdir(model_files_path) + if load_cmsis: + loadCMSIS(model_files_path) + tf.add( + model_files_path, arcname=os.path.relpath(model_files_path, tar_temp_dir) + ) + header_path = generate_c_interface_header( + lowered.libmod_name, ["input_1"], ["output"], [], 0, model_files_path + ) + tf.add(header_path, arcname=os.path.relpath(header_path, tar_temp_dir)) + + create_header_file("input_data", sample, "include", tf) + create_header_file( + "output_data", np.zeros(shape=output_shape, dtype=output_type), "include", tf + ) + + project, project_dir = build_project( + temp_dir, + board, + west_cmd, + lowered, + build_config, + simd=load_cmsis, + extra_files_tar=tar_temp_file.name, + ) + + return project, project_dir diff --git a/tests/micro/zephyr/test_zephyr_aot.py b/tests/micro/zephyr/test_zephyr_aot.py index 1827a43aae65..856be340e4ac 100644 --- a/tests/micro/zephyr/test_zephyr_aot.py +++ b/tests/micro/zephyr/test_zephyr_aot.py @@ -33,7 +33,6 @@ from tvm.relay.backend import Executor, Runtime from tvm.contrib.download import download_testdata -from tvm.micro.model_library_format import generate_c_interface_header from tvm.micro.testing import aot_transport_init_wait, aot_transport_find_message import test_utils @@ -78,41 +77,19 @@ def test_tflite(temp_dir, board, west_cmd, tvm_debug): sample_path = download_testdata(sample_url, "keyword_spotting_int8_6.pyc.npy", module="data") sample = np.load(sample_path) - with tempfile.NamedTemporaryFile() as tar_temp_file: - with tarfile.open(tar_temp_file.name, "w:gz") as tf: - with tempfile.TemporaryDirectory() as tar_temp_dir: - model_files_path = os.path.join(tar_temp_dir, "include") - os.mkdir(model_files_path) - header_path = generate_c_interface_header( - lowered.libmod_name, ["input_1"], ["output"], [], 0, model_files_path - ) - tf.add(header_path, arcname=os.path.relpath(header_path, tar_temp_dir)) - - test_utils.create_header_file("input_data", sample, "include", tf) - test_utils.create_header_file( - "output_data", np.zeros(shape=output_shape, dtype="int8"), "include", tf - ) - - project, _ = test_utils.build_project( - temp_dir, - board, - west_cmd, - lowered, - build_config, - extra_files_tar=tar_temp_file.name, - ) + project, _ = test_utils.generate_project( + temp_dir, + board, + west_cmd, + lowered, + build_config, + sample, + output_shape, + "int8", + load_cmsis=False, + ) - project.flash() - with project.transport() as transport: - aot_transport_init_wait(transport) - transport.write(b"infer%", timeout_sec=5) - result_line = aot_transport_find_message(transport, "result", timeout_sec=60) - - result_line = result_line.strip("\n") - result_line = result_line.split(":") - result = int(result_line[1]) - time = int(result_line[2]) - logging.info(f"Result: {result}\ttime: {time} ms") + result, time = test_utils.run_model(project) assert result == 6 @@ -140,31 +117,10 @@ def test_qemu_make_fail(temp_dir, board, west_cmd, tvm_debug): with tvm.transform.PassContext(opt_level=3, config={"tir.disable_vectorize": True}): lowered = relay.build(ir_mod, target, executor=executor, runtime=runtime) - # Generate input/output header files - with tempfile.NamedTemporaryFile() as tar_temp_file: - with tarfile.open(tar_temp_file.name, "w:gz") as tf: - with tempfile.TemporaryDirectory() as tar_temp_dir: - model_files_path = os.path.join(tar_temp_dir, "include") - os.mkdir(model_files_path) - header_path = generate_c_interface_header( - lowered.libmod_name, ["input_1"], ["output"], [], 0, model_files_path - ) - tf.add(header_path, arcname=os.path.relpath(header_path, tar_temp_dir)) - test_utils.create_header_file( - "input_data", np.zeros(shape=shape, dtype=dtype), "include", tf - ) - test_utils.create_header_file( - "output_data", np.zeros(shape=shape, dtype=dtype), "include", tf - ) - - project, project_dir = test_utils.build_project( - temp_dir, - board, - west_cmd, - lowered, - build_config, - extra_files_tar=tar_temp_file.name, - ) + sample = np.zeros(shape=shape, dtype=dtype) + project, project_dir = test_utils.generate_project( + temp_dir, board, west_cmd, lowered, build_config, sample, shape, dtype, load_cmsis=False + ) file_path = ( pathlib.Path(project_dir) / "build" / "zephyr" / "CMakeFiles" / "run.dir" / "build.make" diff --git a/tests/micro/zephyr/test_zephyr_armv7m.py b/tests/micro/zephyr/test_zephyr_armv7m.py index 53ffb1e43961..78ce1a11e247 100644 --- a/tests/micro/zephyr/test_zephyr_armv7m.py +++ b/tests/micro/zephyr/test_zephyr_armv7m.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +from json import load import logging import os import pathlib @@ -32,8 +33,6 @@ from tvm import relay from tvm.contrib.download import download_testdata -from tvm.micro.model_library_format import generate_c_interface_header -from tvm.micro.testing import aot_transport_init_wait, aot_transport_find_message from tvm.relay.backend import Executor, Runtime import test_utils @@ -103,59 +102,6 @@ def _apply_desired_layout_no_simd(relay_mod): return seq(relay_mod) -def _generate_project(temp_dir, board, west_cmd, lowered, build_config, sample, output_shape): - - with tempfile.NamedTemporaryFile() as tar_temp_file: - with tarfile.open(tar_temp_file.name, "w:gz") as tf: - with tempfile.TemporaryDirectory() as tar_temp_dir: - model_files_path = os.path.join(tar_temp_dir, "include") - os.mkdir(model_files_path) - test_utils.loadCMSIS(model_files_path) - tf.add(model_files_path, arcname=os.path.relpath(model_files_path, tar_temp_dir)) - header_path = generate_c_interface_header( - lowered.libmod_name, ["input_1"], ["output"], [], model_files_path - ) - tf.add(header_path, arcname=os.path.relpath(header_path, tar_temp_dir)) - - test_utils.create_header_file("input_data", sample, "include", tf) - test_utils.create_header_file( - "output_data", np.zeros(shape=output_shape, dtype="float32"), "include", tf - ) - - project, _ = test_utils.build_project( - temp_dir, - board, - west_cmd, - lowered, - build_config, - extra_files_tar=tar_temp_file.name, - ) - - return project - - -def _run_model(temp_dir, board, west_cmd, lowered, build_config, sample, output_shape): - - project = _generate_project( - temp_dir, board, west_cmd, lowered, build_config, sample, output_shape - ) - - project.flash() - - with project.transport() as transport: - aot_transport_init_wait(transport) - transport.write(b"infer%", timeout_sec=5) - result_line = aot_transport_find_message(transport, "result", timeout_sec=60) - - result_line = result_line.strip("\n") - result_line = result_line.split(":") - result = int(result_line[1]) - time = int(result_line[2]) - _LOG.info(f"Result: {result}\ttime: {time} ms") - - return result, time - - @tvm.testing.requires_micro def test_armv7m_intrinsic(temp_dir, board, west_cmd, tvm_debug): """Testing a ARM v7m SIMD extension.""" @@ -165,6 +111,7 @@ def test_armv7m_intrinsic(temp_dir, board, west_cmd, tvm_debug): "stm32f746xx_disco", "nucleo_f746zg", "nucleo_l4r5zi", + "nrf5340dk_nrf5340_cpuapp", ]: pytest.skip(msg="Platform does not support ARM v7m SIMD extenion.") @@ -196,16 +143,38 @@ def test_armv7m_intrinsic(temp_dir, board, west_cmd, tvm_debug): os.makedirs(temp_dir_no_simd, exist_ok=True) with tvm.transform.PassContext(opt_level=3, config={"tir.disable_vectorize": True}): - lowered_simd = relay.build(relay_mod_simd, target_simd, params=params) + lowered_simd = relay.build( + relay_mod_simd, target_simd, params=params, runtime=runtime, executor=executor + ) lowered_no_simd = relay.build( relay_mod_no_simd, target, params=params, runtime=runtime, executor=executor ) - result_simd, time_simd = _run_model( - temp_dir_simd, board, west_cmd, lowered_simd, build_config, sample, output_shape + + simd_project, _ = test_utils.generate_project( + temp_dir_simd, + board, + west_cmd, + lowered_simd, + build_config, + sample, + output_shape, + "float32", + load_cmsis=True, ) - result_no_simd, time_no_simd = _run_model( - temp_dir_no_simd, board, west_cmd, lowered_no_simd, build_config, sample, output_shape + result_simd, time_simd = test_utils.run_model(simd_project) + + no_simd_project, _ = test_utils.generate_project( + temp_dir_no_simd, + board, + west_cmd, + lowered_no_simd, + build_config, + sample, + output_shape, + "float32", + load_cmsis=False, ) + result_no_simd, time_no_simd = test_utils.run_model(no_simd_project) assert result_no_simd == result_simd == 2