Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/nightly.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
name: Nightly

on:
schedule:
# (12 AM PST)
- cron: "00 07 * * *"

jobs:
nightly:
uses: ./.github/workflows/run-bench.yml
65 changes: 65 additions & 0 deletions .github/workflows/run-bench.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
name: Run Bench
on:
workflow_call:
inputs:
sandbox-arg:
description: "Sandbox argument"
required: false
default: "--sandbox"
type: string
workflow_dispatch:
inputs:
sandbox-arg:
description: "Sandbox argument"
required: false
default: "--sandbox"
type: choice
options:
- "--sandbox"
- "--no-sandbox"

jobs:
run-bench:
strategy:
matrix:
os: [ubuntu-latest-4-cores, windows-latest]
runs-on: ${{ matrix.os }}
steps:
# Prepare
- uses: actions/checkout@v2
with:
submodules: recursive
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
- uses: Swatinem/rust-cache@v1
with:
working-directory: temporalio/bridge
- uses: actions/setup-python@v4
with:
python-version: "3.11"

# Build
- run: python -m pip install --upgrade wheel poetry poethepoet
- run: poetry install --no-root -E opentelemetry
- run: poe build-develop-with-release

# Run a bunch of bench tests. We run multiple times since results vary.

- run: poe run-bench --workflow-count 100 --max-cached-workflows 100 --max-concurrent 100 ${{ inputs.sandbox-arg }}
- run: poe run-bench --workflow-count 100 --max-cached-workflows 100 --max-concurrent 100 ${{ inputs.sandbox-arg }}
- run: poe run-bench --workflow-count 100 --max-cached-workflows 100 --max-concurrent 100 ${{ inputs.sandbox-arg }}

- run: poe run-bench --workflow-count 1000 --max-cached-workflows 1000 --max-concurrent 1000 ${{ inputs.sandbox-arg }}
- run: poe run-bench --workflow-count 1000 --max-cached-workflows 1000 --max-concurrent 1000 ${{ inputs.sandbox-arg }}
- run: poe run-bench --workflow-count 1000 --max-cached-workflows 1000 --max-concurrent 1000 ${{ inputs.sandbox-arg }}

- run: poe run-bench --workflow-count 1000 --max-cached-workflows 100 --max-concurrent 100 ${{ inputs.sandbox-arg }}
- run: poe run-bench --workflow-count 1000 --max-cached-workflows 100 --max-concurrent 100 ${{ inputs.sandbox-arg }}
- run: poe run-bench --workflow-count 1000 --max-cached-workflows 100 --max-concurrent 100 ${{ inputs.sandbox-arg }}

- run: poe run-bench --workflow-count 10000 --max-cached-workflows 10000 --max-concurrent 10000 ${{ inputs.sandbox-arg }}
- run: poe run-bench --workflow-count 10000 --max-cached-workflows 10000 --max-concurrent 10000 ${{ inputs.sandbox-arg }}

- run: poe run-bench --workflow-count 10000 --max-cached-workflows 1000 --max-concurrent 1000 ${{ inputs.sandbox-arg }}
- run: poe run-bench --workflow-count 10000 --max-cached-workflows 1000 --max-concurrent 1000 ${{ inputs.sandbox-arg }}
17 changes: 14 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,8 @@ async def create_greeting_activity(info: GreetingInfo) -> str:
Some things to note about the above code:

* Workflows run in a sandbox by default. Users are encouraged to define workflows in files with no side effects or other
complicated code. See the [Workflow Sandbox](#workflow-sandbox) section for more details.
complicated code or unnecessary imports to other third party libraries. See the [Workflow Sandbox](#workflow-sandbox)
section for more details.
* This workflow continually updates the queryable current greeting when signalled and can complete with the greeting on
a different signal
* Workflows are always classes and must have a single `@workflow.run` which is an `async def` function
Expand Down Expand Up @@ -642,7 +643,7 @@ is immutable and contains three fields that can be customized, but only two have

###### Passthrough Modules

To make the sandbox quicker when importing known third party libraries, they can be added to the
To make the sandbox quicker and use less memory when importing known third party libraries, they can be added to the
`SandboxRestrictions.passthrough_modules` set like so:

```python
Expand Down Expand Up @@ -708,7 +709,17 @@ The sandbox is only a helper, it does not provide full protection.

###### Sandbox Performance

TODO: This is actively being measured; results to come soon
The sandbox does not add significant CPU or memory overhead for workflows that are in files which only import standard
library modules. This is because they are passed through from outside of the sandbox. However, every
non-standard-library import that is performed at the top of the same file the workflow is in will add CPU overhead (the
module is re-imported every workflow run) and memory overhead (each module independently cached as part of the workflow
run for isolation reasons). This becomes more apparent for large numbers of workflow runs.

To mitigate this, users should:

* Define workflows in files that have as few non-standard-library imports as possible
* Alter the max workflow cache and/or max concurrent workflows settings if memory grows too large
* Set third-party libraries as passthrough modules if they are known to be side-effect free

###### Extending Restricted Classes

Expand Down
51 changes: 50 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ isort = "^5.10.1"
mypy = "^0.971"
mypy-protobuf = "^3.3.0"
protoc-wheel-0 = "^21.1"
psutil = "^5.9.3"
pydantic = "^1.9.1"
pydocstyle = "^6.1.1"
# TODO(cretz): Update when https://github.com/twisted/pydoctor/pull/595 released
Expand All @@ -59,8 +60,8 @@ opentelemetry = ["opentelemetry-api", "opentelemetry-sdk"]
grpc = ["grpc"]

[tool.poe.tasks]
build-develop = ["build-bridge-develop"]
build-bridge-develop = "python scripts/setup_bridge.py develop"
build-develop = "python scripts/setup_bridge.py develop"
build-develop-with-release = { cmd = "python scripts/setup_bridge.py develop", env = { TEMPORAL_BUILD_RELEASE = "1" }}
fix-wheel = "python scripts/fix_wheel.py"
format = [{cmd = "black ."}, {cmd = "isort ."}]
gen-docs = "pydoctor"
Expand All @@ -75,6 +76,7 @@ lint = [
# https://github.com/PyCQA/pydocstyle/pull/511?
lint-docs = "pydocstyle --ignore-decorators=overload"
lint-types = "mypy --namespace-packages ."
run-bench = "python scripts/run_bench.py"
test = "pytest"

# Install local, run single pytest with env var, uninstall local
Expand Down
137 changes: 137 additions & 0 deletions scripts/run_bench.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import argparse
import asyncio
import json
import logging
import sys
import time
import uuid
from contextlib import asynccontextmanager
from datetime import timedelta
from typing import AsyncIterator

from temporalio import activity, workflow
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import UnsandboxedWorkflowRunner, Worker
from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner


@workflow.defn
class BenchWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
bench_activity, name, start_to_close_timeout=timedelta(seconds=30)
)


@activity.defn
async def bench_activity(name: str) -> str:
return f"Hello, {name}!"


async def main():
logging.basicConfig(
format="%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s",
level=logging.WARN,
datefmt="%Y-%m-%d %H:%M:%S",
)

logger = logging.getLogger(__name__)
max_mem = -1

parser = argparse.ArgumentParser(description="Run bench")
parser.add_argument("--workflow-count", type=int, required=True)
parser.add_argument("--sandbox", action=argparse.BooleanOptionalAction)
parser.add_argument("--max-cached-workflows", type=int, required=True)
parser.add_argument("--max-concurrent", type=int, required=True)
args = parser.parse_args()

@asynccontextmanager
async def track_mem() -> AsyncIterator[None]:
# We intentionally import in here so the sandbox doesn't grow huge with
# this import
import psutil

# Get mem every 800ms
process = psutil.Process()

async def report_mem():
nonlocal max_mem
while True:
try:
await asyncio.sleep(0.8)
finally:
# TODO(cretz): "vms" appears more accurate on Windows, but
# rss is more accurate on Linux
used_mem = process.memory_info().rss
if used_mem > max_mem:
max_mem = used_mem

report_mem_task = asyncio.create_task(report_mem())
try:
yield None
finally:
report_mem_task.cancel()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth waiting for the task to be cancelled here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just interrupts a sleep, no value I don't think, but I guess I could


logger.info("Running %s workflows", args.workflow_count)
async with track_mem():
# Run with a local workflow environment
logger.debug("Starting local environment")
async with await WorkflowEnvironment.start_local() as env:
task_queue = f"task-queue-{uuid.uuid4()}"

# Create a bunch of workflows
logger.debug("Starting %s workflows", args.workflow_count)
pre_start_seconds = time.monotonic()
handles = [
await env.client.start_workflow(
BenchWorkflow.run,
f"user-{i}",
id=f"workflow-{i}-{uuid.uuid4()}",
task_queue=task_queue,
)
for i in range(args.workflow_count)
]
start_seconds = time.monotonic() - pre_start_seconds

# Start a worker to run them
logger.debug("Starting worker")
async with Worker(
env.client,
task_queue=task_queue,
workflows=[BenchWorkflow],
activities=[bench_activity],
workflow_runner=SandboxedWorkflowRunner()
if args.sandbox
else UnsandboxedWorkflowRunner(),
max_cached_workflows=args.max_cached_workflows,
max_concurrent_workflow_tasks=args.max_concurrent,
max_concurrent_activities=args.max_concurrent,
):
logger.debug("Worker started")
# Wait for them all
pre_result_seconds = time.monotonic()
for h in handles:
await h.result()
result_seconds = time.monotonic() - pre_result_seconds
logger.debug("All workflows complete")

# Print results
json.dump(
{
"workflow_count": args.workflow_count,
"sandbox": args.sandbox or False,
"max_cached_workflows": args.max_cached_workflows,
"max_concurrent": args.max_concurrent,
"max_mem_mib": round(max_mem / 1024**2, 1),
"start_seconds": round(start_seconds, 1),
"result_seconds": round(result_seconds, 1),
"workflows_per_second": round(args.workflow_count / result_seconds, 1),
},
sys.stdout,
indent=2,
)


if __name__ == "__main__":
asyncio.run(main())
3 changes: 3 additions & 0 deletions scripts/setup_bridge.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import shutil

from setuptools import setup
Expand All @@ -11,6 +12,8 @@
binding=Binding.PyO3,
py_limited_api=True,
features=["pyo3/abi3-py37"],
# Allow local release builds if requested
debug=False if os.environ.get("TEMPORAL_BUILD_RELEASE") == "1" else None,
)
]

Expand Down
Loading