diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml new file mode 100644 index 000000000..7e1e30d68 --- /dev/null +++ b/.github/workflows/nightly.yml @@ -0,0 +1,10 @@ +name: Nightly + +on: + schedule: + # (12 AM PST) + - cron: "00 07 * * *" + +jobs: + nightly: + uses: ./.github/workflows/run-bench.yml diff --git a/.github/workflows/run-bench.yml b/.github/workflows/run-bench.yml new file mode 100644 index 000000000..ae68006fd --- /dev/null +++ b/.github/workflows/run-bench.yml @@ -0,0 +1,65 @@ +name: Run Bench +on: + workflow_call: + inputs: + sandbox-arg: + description: "Sandbox argument" + required: false + default: "--sandbox" + type: string + workflow_dispatch: + inputs: + sandbox-arg: + description: "Sandbox argument" + required: false + default: "--sandbox" + type: choice + options: + - "--sandbox" + - "--no-sandbox" + +jobs: + run-bench: + strategy: + matrix: + os: [ubuntu-latest-4-cores, windows-latest] + runs-on: ${{ matrix.os }} + steps: + # Prepare + - uses: actions/checkout@v2 + with: + submodules: recursive + - uses: actions-rs/toolchain@v1 + with: + toolchain: stable + - uses: Swatinem/rust-cache@v1 + with: + working-directory: temporalio/bridge + - uses: actions/setup-python@v4 + with: + python-version: "3.11" + + # Build + - run: python -m pip install --upgrade wheel poetry poethepoet + - run: poetry install --no-root -E opentelemetry + - run: poe build-develop-with-release + + # Run a bunch of bench tests. We run multiple times since results vary. + + - run: poe run-bench --workflow-count 100 --max-cached-workflows 100 --max-concurrent 100 ${{ inputs.sandbox-arg }} + - run: poe run-bench --workflow-count 100 --max-cached-workflows 100 --max-concurrent 100 ${{ inputs.sandbox-arg }} + - run: poe run-bench --workflow-count 100 --max-cached-workflows 100 --max-concurrent 100 ${{ inputs.sandbox-arg }} + + - run: poe run-bench --workflow-count 1000 --max-cached-workflows 1000 --max-concurrent 1000 ${{ inputs.sandbox-arg }} + - run: poe run-bench --workflow-count 1000 --max-cached-workflows 1000 --max-concurrent 1000 ${{ inputs.sandbox-arg }} + - run: poe run-bench --workflow-count 1000 --max-cached-workflows 1000 --max-concurrent 1000 ${{ inputs.sandbox-arg }} + + - run: poe run-bench --workflow-count 1000 --max-cached-workflows 100 --max-concurrent 100 ${{ inputs.sandbox-arg }} + - run: poe run-bench --workflow-count 1000 --max-cached-workflows 100 --max-concurrent 100 ${{ inputs.sandbox-arg }} + - run: poe run-bench --workflow-count 1000 --max-cached-workflows 100 --max-concurrent 100 ${{ inputs.sandbox-arg }} + + - run: poe run-bench --workflow-count 10000 --max-cached-workflows 10000 --max-concurrent 10000 ${{ inputs.sandbox-arg }} + - run: poe run-bench --workflow-count 10000 --max-cached-workflows 10000 --max-concurrent 10000 ${{ inputs.sandbox-arg }} + + - run: poe run-bench --workflow-count 10000 --max-cached-workflows 1000 --max-concurrent 1000 ${{ inputs.sandbox-arg }} + - run: poe run-bench --workflow-count 10000 --max-cached-workflows 1000 --max-concurrent 1000 ${{ inputs.sandbox-arg }} \ No newline at end of file diff --git a/README.md b/README.md index f90e1971d..da0b9343f 100644 --- a/README.md +++ b/README.md @@ -297,7 +297,8 @@ async def create_greeting_activity(info: GreetingInfo) -> str: Some things to note about the above code: * Workflows run in a sandbox by default. Users are encouraged to define workflows in files with no side effects or other - complicated code. See the [Workflow Sandbox](#workflow-sandbox) section for more details. + complicated code or unnecessary imports to other third party libraries. See the [Workflow Sandbox](#workflow-sandbox) + section for more details. * This workflow continually updates the queryable current greeting when signalled and can complete with the greeting on a different signal * Workflows are always classes and must have a single `@workflow.run` which is an `async def` function @@ -642,7 +643,7 @@ is immutable and contains three fields that can be customized, but only two have ###### Passthrough Modules -To make the sandbox quicker when importing known third party libraries, they can be added to the +To make the sandbox quicker and use less memory when importing known third party libraries, they can be added to the `SandboxRestrictions.passthrough_modules` set like so: ```python @@ -708,7 +709,17 @@ The sandbox is only a helper, it does not provide full protection. ###### Sandbox Performance -TODO: This is actively being measured; results to come soon +The sandbox does not add significant CPU or memory overhead for workflows that are in files which only import standard +library modules. This is because they are passed through from outside of the sandbox. However, every +non-standard-library import that is performed at the top of the same file the workflow is in will add CPU overhead (the +module is re-imported every workflow run) and memory overhead (each module independently cached as part of the workflow +run for isolation reasons). This becomes more apparent for large numbers of workflow runs. + +To mitigate this, users should: + +* Define workflows in files that have as few non-standard-library imports as possible +* Alter the max workflow cache and/or max concurrent workflows settings if memory grows too large +* Set third-party libraries as passthrough modules if they are known to be side-effect free ###### Extending Restricted Classes diff --git a/poetry.lock b/poetry.lock index 9bd138d36..e18a9357e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -612,6 +612,17 @@ category = "dev" optional = false python-versions = "*" +[[package]] +name = "psutil" +version = "5.9.3" +description = "Cross-platform lib for process and system monitoring in Python." +category = "dev" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" + +[package.extras] +test = ["enum34", "ipaddress", "mock", "pywin32", "wmi"] + [[package]] name = "py" version = "1.11.0" @@ -1071,7 +1082,7 @@ opentelemetry = ["opentelemetry-api", "opentelemetry-sdk"] [metadata] lock-version = "1.1" python-versions = "^3.7" -content-hash = "b7760614decf80c88ae7969affe1d734497ff51249f140bc7bdd6641ab0f39ff" +content-hash = "770d1cc471f32260e40ad39d2729c2176ade30a889ccc9972f3937df0f773a68" [metadata.files] appdirs = [ @@ -1558,6 +1569,44 @@ protoc-wheel-0 = [ {file = "protoc_wheel_0-21.5-py2.py3-none-win32.whl", hash = "sha256:19173b492c93f0e95dc0df5edc53c285b804ab141edfea122693aa6bc009ffbe"}, {file = "protoc_wheel_0-21.5-py2.py3-none-win_amd64.whl", hash = "sha256:ae766f84e3ce3e34d7e936f625f1a5ab081aa8d5add23dc98b81cf771844ea0e"}, ] +psutil = [ + {file = "psutil-5.9.3-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:b4a247cd3feaae39bb6085fcebf35b3b8ecd9b022db796d89c8f05067ca28e71"}, + {file = "psutil-5.9.3-cp27-cp27m-manylinux2010_i686.whl", hash = "sha256:5fa88e3d5d0b480602553d362c4b33a63e0c40bfea7312a7bf78799e01e0810b"}, + {file = "psutil-5.9.3-cp27-cp27m-manylinux2010_x86_64.whl", hash = "sha256:767ef4fa33acda16703725c0473a91e1832d296c37c63896c7153ba81698f1ab"}, + {file = "psutil-5.9.3-cp27-cp27m-win32.whl", hash = "sha256:9a4af6ed1094f867834f5f07acd1250605a0874169a5fcadbcec864aec2496a6"}, + {file = "psutil-5.9.3-cp27-cp27m-win_amd64.whl", hash = "sha256:fa5e32c7d9b60b2528108ade2929b115167fe98d59f89555574715054f50fa31"}, + {file = "psutil-5.9.3-cp27-cp27mu-manylinux2010_i686.whl", hash = "sha256:fe79b4ad4836e3da6c4650cb85a663b3a51aef22e1a829c384e18fae87e5e727"}, + {file = "psutil-5.9.3-cp27-cp27mu-manylinux2010_x86_64.whl", hash = "sha256:db8e62016add2235cc87fb7ea000ede9e4ca0aa1f221b40cef049d02d5d2593d"}, + {file = "psutil-5.9.3-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:941a6c2c591da455d760121b44097781bc970be40e0e43081b9139da485ad5b7"}, + {file = "psutil-5.9.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:71b1206e7909792d16933a0d2c1c7f04ae196186c51ba8567abae1d041f06dcb"}, + {file = "psutil-5.9.3-cp310-cp310-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f57d63a2b5beaf797b87024d018772439f9d3103a395627b77d17a8d72009543"}, + {file = "psutil-5.9.3-cp310-cp310-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e7507f6c7b0262d3e7b0eeda15045bf5881f4ada70473b87bc7b7c93b992a7d7"}, + {file = "psutil-5.9.3-cp310-cp310-win32.whl", hash = "sha256:1b540599481c73408f6b392cdffef5b01e8ff7a2ac8caae0a91b8222e88e8f1e"}, + {file = "psutil-5.9.3-cp310-cp310-win_amd64.whl", hash = "sha256:547ebb02031fdada635452250ff39942db8310b5c4a8102dfe9384ee5791e650"}, + {file = "psutil-5.9.3-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:d8c3cc6bb76492133474e130a12351a325336c01c96a24aae731abf5a47fe088"}, + {file = "psutil-5.9.3-cp36-cp36m-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:07d880053c6461c9b89cd5d4808f3b8336665fa3acdefd6777662c5ed73a851a"}, + {file = "psutil-5.9.3-cp36-cp36m-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5e8b50241dd3c2ed498507f87a6602825073c07f3b7e9560c58411c14fe1e1c9"}, + {file = "psutil-5.9.3-cp36-cp36m-win32.whl", hash = "sha256:828c9dc9478b34ab96be75c81942d8df0c2bb49edbb481f597314d92b6441d89"}, + {file = "psutil-5.9.3-cp36-cp36m-win_amd64.whl", hash = "sha256:ed15edb14f52925869250b1375f0ff58ca5c4fa8adefe4883cfb0737d32f5c02"}, + {file = "psutil-5.9.3-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:d266cd05bd4a95ca1c2b9b5aac50d249cf7c94a542f47e0b22928ddf8b80d1ef"}, + {file = "psutil-5.9.3-cp37-cp37m-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:7e4939ff75149b67aef77980409f156f0082fa36accc475d45c705bb00c6c16a"}, + {file = "psutil-5.9.3-cp37-cp37m-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:68fa227c32240c52982cb931801c5707a7f96dd8927f9102d6c7771ea1ff5698"}, + {file = "psutil-5.9.3-cp37-cp37m-win32.whl", hash = "sha256:beb57d8a1ca0ae0eb3d08ccaceb77e1a6d93606f0e1754f0d60a6ebd5c288837"}, + {file = "psutil-5.9.3-cp37-cp37m-win_amd64.whl", hash = "sha256:12500d761ac091f2426567f19f95fd3f15a197d96befb44a5c1e3cbe6db5752c"}, + {file = "psutil-5.9.3-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:ba38cf9984d5462b506e239cf4bc24e84ead4b1d71a3be35e66dad0d13ded7c1"}, + {file = "psutil-5.9.3-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:46907fa62acaac364fff0b8a9da7b360265d217e4fdeaca0a2397a6883dffba2"}, + {file = "psutil-5.9.3-cp38-cp38-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a04a1836894c8279e5e0a0127c0db8e198ca133d28be8a2a72b4db16f6cf99c1"}, + {file = "psutil-5.9.3-cp38-cp38-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8a4e07611997acf178ad13b842377e3d8e9d0a5bac43ece9bfc22a96735d9a4f"}, + {file = "psutil-5.9.3-cp38-cp38-win32.whl", hash = "sha256:6ced1ad823ecfa7d3ce26fe8aa4996e2e53fb49b7fed8ad81c80958501ec0619"}, + {file = "psutil-5.9.3-cp38-cp38-win_amd64.whl", hash = "sha256:35feafe232d1aaf35d51bd42790cbccb882456f9f18cdc411532902370d660df"}, + {file = "psutil-5.9.3-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:538fcf6ae856b5e12d13d7da25ad67f02113c96f5989e6ad44422cb5994ca7fc"}, + {file = "psutil-5.9.3-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:a3d81165b8474087bb90ec4f333a638ccfd1d69d34a9b4a1a7eaac06648f9fbe"}, + {file = "psutil-5.9.3-cp39-cp39-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:3a7826e68b0cf4ce2c1ee385d64eab7d70e3133171376cac53d7c1790357ec8f"}, + {file = "psutil-5.9.3-cp39-cp39-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9ec296f565191f89c48f33d9544d8d82b0d2af7dd7d2d4e6319f27a818f8d1cc"}, + {file = "psutil-5.9.3-cp39-cp39-win32.whl", hash = "sha256:9ec95df684583b5596c82bb380c53a603bb051cf019d5c849c47e117c5064395"}, + {file = "psutil-5.9.3-cp39-cp39-win_amd64.whl", hash = "sha256:4bd4854f0c83aa84a5a40d3b5d0eb1f3c128f4146371e03baed4589fe4f3c931"}, + {file = "psutil-5.9.3.tar.gz", hash = "sha256:7ccfcdfea4fc4b0a02ca2c31de7fcd186beb9cff8207800e14ab66f79c773af6"}, +] py = [ {file = "py-1.11.0-py2.py3-none-any.whl", hash = "sha256:607c53218732647dff4acdfcd50cb62615cedf612e72d1724fb1a0cc6405b378"}, {file = "py-1.11.0.tar.gz", hash = "sha256:51c75c4126074b472f746a24399ad32f6053d1b34b68d2fa41e558e6f4a98719"}, diff --git a/pyproject.toml b/pyproject.toml index 607cf5446..280bc817b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,7 @@ isort = "^5.10.1" mypy = "^0.971" mypy-protobuf = "^3.3.0" protoc-wheel-0 = "^21.1" +psutil = "^5.9.3" pydantic = "^1.9.1" pydocstyle = "^6.1.1" # TODO(cretz): Update when https://github.com/twisted/pydoctor/pull/595 released @@ -59,8 +60,8 @@ opentelemetry = ["opentelemetry-api", "opentelemetry-sdk"] grpc = ["grpc"] [tool.poe.tasks] -build-develop = ["build-bridge-develop"] -build-bridge-develop = "python scripts/setup_bridge.py develop" +build-develop = "python scripts/setup_bridge.py develop" +build-develop-with-release = { cmd = "python scripts/setup_bridge.py develop", env = { TEMPORAL_BUILD_RELEASE = "1" }} fix-wheel = "python scripts/fix_wheel.py" format = [{cmd = "black ."}, {cmd = "isort ."}] gen-docs = "pydoctor" @@ -75,6 +76,7 @@ lint = [ # https://github.com/PyCQA/pydocstyle/pull/511? lint-docs = "pydocstyle --ignore-decorators=overload" lint-types = "mypy --namespace-packages ." +run-bench = "python scripts/run_bench.py" test = "pytest" # Install local, run single pytest with env var, uninstall local diff --git a/scripts/run_bench.py b/scripts/run_bench.py new file mode 100644 index 000000000..9c024257f --- /dev/null +++ b/scripts/run_bench.py @@ -0,0 +1,137 @@ +import argparse +import asyncio +import json +import logging +import sys +import time +import uuid +from contextlib import asynccontextmanager +from datetime import timedelta +from typing import AsyncIterator + +from temporalio import activity, workflow +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import UnsandboxedWorkflowRunner, Worker +from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner + + +@workflow.defn +class BenchWorkflow: + @workflow.run + async def run(self, name: str) -> str: + return await workflow.execute_activity( + bench_activity, name, start_to_close_timeout=timedelta(seconds=30) + ) + + +@activity.defn +async def bench_activity(name: str) -> str: + return f"Hello, {name}!" + + +async def main(): + logging.basicConfig( + format="%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s", + level=logging.WARN, + datefmt="%Y-%m-%d %H:%M:%S", + ) + + logger = logging.getLogger(__name__) + max_mem = -1 + + parser = argparse.ArgumentParser(description="Run bench") + parser.add_argument("--workflow-count", type=int, required=True) + parser.add_argument("--sandbox", action=argparse.BooleanOptionalAction) + parser.add_argument("--max-cached-workflows", type=int, required=True) + parser.add_argument("--max-concurrent", type=int, required=True) + args = parser.parse_args() + + @asynccontextmanager + async def track_mem() -> AsyncIterator[None]: + # We intentionally import in here so the sandbox doesn't grow huge with + # this import + import psutil + + # Get mem every 800ms + process = psutil.Process() + + async def report_mem(): + nonlocal max_mem + while True: + try: + await asyncio.sleep(0.8) + finally: + # TODO(cretz): "vms" appears more accurate on Windows, but + # rss is more accurate on Linux + used_mem = process.memory_info().rss + if used_mem > max_mem: + max_mem = used_mem + + report_mem_task = asyncio.create_task(report_mem()) + try: + yield None + finally: + report_mem_task.cancel() + + logger.info("Running %s workflows", args.workflow_count) + async with track_mem(): + # Run with a local workflow environment + logger.debug("Starting local environment") + async with await WorkflowEnvironment.start_local() as env: + task_queue = f"task-queue-{uuid.uuid4()}" + + # Create a bunch of workflows + logger.debug("Starting %s workflows", args.workflow_count) + pre_start_seconds = time.monotonic() + handles = [ + await env.client.start_workflow( + BenchWorkflow.run, + f"user-{i}", + id=f"workflow-{i}-{uuid.uuid4()}", + task_queue=task_queue, + ) + for i in range(args.workflow_count) + ] + start_seconds = time.monotonic() - pre_start_seconds + + # Start a worker to run them + logger.debug("Starting worker") + async with Worker( + env.client, + task_queue=task_queue, + workflows=[BenchWorkflow], + activities=[bench_activity], + workflow_runner=SandboxedWorkflowRunner() + if args.sandbox + else UnsandboxedWorkflowRunner(), + max_cached_workflows=args.max_cached_workflows, + max_concurrent_workflow_tasks=args.max_concurrent, + max_concurrent_activities=args.max_concurrent, + ): + logger.debug("Worker started") + # Wait for them all + pre_result_seconds = time.monotonic() + for h in handles: + await h.result() + result_seconds = time.monotonic() - pre_result_seconds + logger.debug("All workflows complete") + + # Print results + json.dump( + { + "workflow_count": args.workflow_count, + "sandbox": args.sandbox or False, + "max_cached_workflows": args.max_cached_workflows, + "max_concurrent": args.max_concurrent, + "max_mem_mib": round(max_mem / 1024**2, 1), + "start_seconds": round(start_seconds, 1), + "result_seconds": round(result_seconds, 1), + "workflows_per_second": round(args.workflow_count / result_seconds, 1), + }, + sys.stdout, + indent=2, + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/scripts/setup_bridge.py b/scripts/setup_bridge.py index 53544502f..c59cdb12c 100644 --- a/scripts/setup_bridge.py +++ b/scripts/setup_bridge.py @@ -1,3 +1,4 @@ +import os import shutil from setuptools import setup @@ -11,6 +12,8 @@ binding=Binding.PyO3, py_limited_api=True, features=["pyo3/abi3-py37"], + # Allow local release builds if requested + debug=False if os.environ.get("TEMPORAL_BUILD_RELEASE") == "1" else None, ) ] diff --git a/temporalio/worker/workflow_sandbox/importer.py b/temporalio/worker/workflow_sandbox/importer.py index 8f2e35e59..6882c5e23 100644 --- a/temporalio/worker/workflow_sandbox/importer.py +++ b/temporalio/worker/workflow_sandbox/importer.py @@ -9,6 +9,7 @@ import builtins import functools import importlib +import importlib.util import logging import sys import threading @@ -175,6 +176,25 @@ def _import( if parent: setattr(sys.modules[parent], child, sys.modules[name]) + # If the module is __temporal_main__ and not already in sys.modules, + # we load it from whatever file __main__ was originally in + if name == "__temporal_main__": + orig_mod = _thread_local_sys_modules.orig["__main__"] + new_spec = importlib.util.spec_from_file_location( + name, orig_mod.__file__ + ) + if not new_spec: + raise ImportError( + f"No spec for __main__ file at {orig_mod.__file__}" + ) + elif not new_spec.loader: + raise ImportError( + f"Spec for __main__ file at {orig_mod.__file__} has no loader" + ) + new_mod = importlib.util.module_from_spec(new_spec) + sys.modules[name] = new_mod + new_spec.loader.exec_module(new_mod) + mod = importlib.__import__(name, globals, locals, fromlist, level) # Check for restrictions if necessary and apply if mod.__name__ not in self.modules_checked_for_restrictions: diff --git a/temporalio/worker/workflow_sandbox/runner.py b/temporalio/worker/workflow_sandbox/runner.py index 621fea533..25bbd5cf2 100644 --- a/temporalio/worker/workflow_sandbox/runner.py +++ b/temporalio/worker/workflow_sandbox/runner.py @@ -105,12 +105,18 @@ def __init__( self._create_instance() def _create_instance(self) -> None: + module_name = self.instance_details.defn.cls.__module__ + # If the module name is __main__ then we change to __temporal_main__ so + # we don't trigger top-level execution that happens in __main__. We do + # not support importing __main__. + if module_name == "__main__": + module_name = "__temporal_main__" try: # Import user code self._run_code( "with __temporal_importer.applied():\n" # Import the workflow code - f" from {self.instance_details.defn.cls.__module__} import {self.instance_details.defn.cls.__name__} as __temporal_workflow_class\n" + f" from {module_name} import {self.instance_details.defn.cls.__name__} as __temporal_workflow_class\n" f" from {self.runner_class.__module__} import {self.runner_class.__name__} as __temporal_runner_class\n", __temporal_importer=self.importer, )