diff --git a/test/stress_tests/run_stress_tests.sh b/test/stress_tests/run_stress_tests.sh new file mode 100755 index 000000000000..ba0886037c9e --- /dev/null +++ b/test/stress_tests/run_stress_tests.sh @@ -0,0 +1,19 @@ +#!/usr/bin/env bash + +# Cause the script to exit if a single command fails. +set -e + +# Show explicitly which commands are currently running. +set -x + +ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd) + +# Start a large cluster using the autoscaler. +ray up -y $ROOT_DIR/stress_testing_config.yaml + +# Run a bunch of stress tests. +ray submit $ROOT_DIR/stress_testing_config.yaml test_many_tasks_and_transfers.py +ray submit $ROOT_DIR/stress_testing_config.yaml test_dead_actors.py + +# Tear down the cluster. +ray down -y $ROOT_DIR/stress_testing_config.yaml diff --git a/test/stress_tests/stress_testing_config.yaml b/test/stress_tests/stress_testing_config.yaml new file mode 100644 index 000000000000..7126366b53fd --- /dev/null +++ b/test/stress_tests/stress_testing_config.yaml @@ -0,0 +1,115 @@ +# An unique identifier for the head node and workers of this cluster. +cluster_name: stress-testing + +# The minimum number of workers nodes to launch in addition to the head +# node. This number should be >= 0. +min_workers: 100 + +# The maximum number of workers nodes to launch in addition to the head +# node. This takes precedence over min_workers. +max_workers: 100 + +# The autoscaler will scale up the cluster to this target fraction of resource +# usage. For example, if a cluster of 10 nodes is 100% busy and +# target_utilization is 0.8, it would resize the cluster to 13. This fraction +# can be decreased to increase the aggressiveness of upscaling. +# This value must be less than 1.0 for scaling to happen. +target_utilization_fraction: 0.8 + +# If a node is idle for this many minutes, it will be removed. +idle_timeout_minutes: 5 + +# Cloud-provider specific configuration. +provider: + type: aws + region: us-west-2 + availability_zone: us-west-2a + +# How Ray will authenticate with newly launched nodes. +auth: + ssh_user: ubuntu +# By default Ray creates a new private keypair, but you can also use your own. +# If you do so, make sure to also set "KeyName" in the head and worker node +# configurations below. +# ssh_private_key: /path/to/your/key.pem + +# Provider-specific config for the head node, e.g. instance type. By default +# Ray will auto-configure unspecified fields such as SubnetId and KeyName. +# For more documentation on available fields, see: +# http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances +head_node: + InstanceType: m5.12xlarge + ImageId: ami-0def3275 # Default Ubuntu 16.04 AMI. + + # Set primary volume to 25 GiB + BlockDeviceMappings: + - DeviceName: /dev/sda1 + Ebs: + VolumeSize: 50 + + # Additional options in the boto docs. + +# Provider-specific config for worker nodes, e.g. instance type. By default +# Ray will auto-configure unspecified fields such as SubnetId and KeyName. +# For more documentation on available fields, see: +# http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances +worker_nodes: + InstanceType: m5.large + ImageId: ami-0def3275 # Default Ubuntu 16.04 AMI. + + # Set primary volume to 25 GiB + BlockDeviceMappings: + - DeviceName: /dev/sda1 + Ebs: + VolumeSize: 50 + + # Run workers on spot by default. Comment this out to use on-demand. + InstanceMarketOptions: + MarketType: spot + # Additional options can be found in the boto docs, e.g. + # SpotOptions: + # MaxPrice: MAX_HOURLY_PRICE + + # Additional options in the boto docs. + +# Files or directories to copy to the head and worker nodes. The format is a +# dictionary from REMOTE_PATH: LOCAL_PATH, e.g. +file_mounts: { +# "/path1/on/remote/machine": "/path1/on/local/machine", +# "/path2/on/remote/machine": "/path2/on/local/machine", +} + +# List of shell commands to run to set up nodes. +setup_commands: + # Consider uncommenting these if you run into dpkg locking issues + # - sudo pkill -9 apt-get || true + # - sudo pkill -9 dpkg || true + # - sudo dpkg --configure -a + # Install basics. + - sudo apt-get update + - sudo apt-get install -y cmake pkg-config build-essential autoconf curl libtool unzip flex bison python + # Install Anaconda. + - wget https://repo.continuum.io/archive/Anaconda3-5.0.1-Linux-x86_64.sh || true + - bash Anaconda3-5.0.1-Linux-x86_64.sh -b -p $HOME/anaconda3 || true + - echo 'export PATH="$HOME/anaconda3/bin:$PATH"' >> ~/.bashrc + # # Build Ray. + # - git clone https://github.com/ray-project/ray || true + - pip install boto3==1.4.8 cython==0.27.3 + # - cd ray/python; git checkout master; git pull; pip install -e . --verbose + - pip install https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.5.3-cp36-cp36m-manylinux1_x86_64.whl + +# Custom commands that will be run on the head node after common setup. +head_setup_commands: [] + +# Custom commands that will be run on worker nodes after common setup. +worker_setup_commands: [] + +# Command to start ray on the head node. You don't need to change this. +head_start_ray_commands: + - ray stop + - ulimit -n 65536; ray start --head --num-redis-shards=5 --redis-port=6379 --autoscaling-config=~/ray_bootstrap_config.yaml + +# Command to start ray on worker nodes. You don't need to change this. +worker_start_ray_commands: + - ray stop + - ulimit -n 65536; ray start --redis-address=$RAY_HEAD_IP:6379 --num-gpus=100 diff --git a/test/stress_tests/test_dead_actors.py b/test/stress_tests/test_dead_actors.py new file mode 100644 index 000000000000..72b801142635 --- /dev/null +++ b/test/stress_tests/test_dead_actors.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import logging +import numpy as np +import sys + +import ray + +logger = logging.getLogger(__name__) + +ray.init(redis_address="localhost:6379") + + +@ray.remote +class Child(object): + def __init__(self, death_probability): + self.death_probability = death_probability + + def ping(self): + # Exit process with some probability. + exit_chance = np.random.rand() + if exit_chance > self.death_probability: + sys.exit(-1) + + +@ray.remote +class Parent(object): + def __init__(self, num_children, death_probability): + self.death_probability = death_probability + self.children = [ + Child.remote(death_probability) for _ in range(num_children) + ] + + def ping(self, num_pings): + children_outputs = [] + for _ in range(num_pings): + children_outputs += [ + child.ping.remote() for child in self.children + ] + try: + ray.get(children_outputs) + except Exception: + # Replace the children if one of them died. + self.__init__(len(self.children), self.death_probability) + + def kill(self): + # Clean up children. + ray.get([child.__ray_terminate__.remote() for child in self.children]) + + +num_parents = 10 +num_children = 10 +death_probability = 0.95 + +parents = [ + Parent.remote(num_children, death_probability) for _ in range(num_parents) +] +for i in range(100): + ray.get([parent.ping.remote(10) for parent in parents]) + + # Kill a parent actor with some probability. + exit_chance = np.random.rand() + if exit_chance > death_probability: + parent_index = np.random.randint(len(parents)) + parents[parent_index].kill.remote() + parents[parent_index] = Parent.remote(num_children, death_probability) + + logger.info("Finished trial", i) diff --git a/test/stress_tests/test_many_tasks_and_transfers.py b/test/stress_tests/test_many_tasks_and_transfers.py new file mode 100644 index 000000000000..87b8239a08fe --- /dev/null +++ b/test/stress_tests/test_many_tasks_and_transfers.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import numpy as np +import logging +import time + +import ray + +logger = logging.getLogger(__name__) + +ray.init(redis_address="localhost:6379") + +# These numbers need to match the values in the autoscaler config file. +num_remote_nodes = 100 +head_node_cpus = 2 +num_remote_cpus = num_remote_nodes * head_node_cpus + +# Wait until the expected number of nodes have joined the cluster. +while True: + if len(ray.global_state.client_table()) >= num_remote_nodes + 1: + break +logger.info("Nodes have all joined. There are {} resources." + .format(ray.global_state.cluster_resources())) + + +# Require 1 GPU to force the tasks to be on remote machines. +@ray.remote(num_gpus=1) +def f(size, *xs): + return np.ones(size, dtype=np.uint8) + + +# Require 1 GPU to force the actors to be on remote machines. +@ray.remote(num_cpus=1, num_gpus=1) +class Actor(object): + def method(self, size, *xs): + return np.ones(size, dtype=np.uint8) + + +# Launch a bunch of tasks. +start_time = time.time() +logger.info("Submitting many tasks.") +for i in range(10): + logger.info("Iteration {}".format(i)) + ray.get([f.remote(0) for _ in range(100000)]) +logger.info("Finished after {} seconds.".format(time.time() - start_time)) + +# Launch a bunch of tasks, each with a bunch of dependencies. +start_time = time.time() +logger.info("Submitting tasks with many dependencies.") +x_ids = [] +for i in range(5): + logger.info("Iteration {}".format(i)) + x_ids = [f.remote(0, *x_ids) for _ in range(10000)] +ray.get(x_ids) +logger.info("Finished after {} seconds.".format(time.time() - start_time)) + +# Create a bunch of actors. +start_time = time.time() +logger.info("Creating {} actors.".format(num_remote_cpus)) +actors = [Actor.remote() for _ in range(num_remote_cpus)] +logger.info("Finished after {} seconds.".format(time.time() - start_time)) + +# Submit a bunch of small tasks to each actor. +start_time = time.time() +logger.info("Submitting many small actor tasks.") +x_ids = [] +for _ in range(100000): + x_ids = [a.method.remote(0) for a in actors] +ray.get(x_ids) +logger.info("Finished after {} seconds.".format(time.time() - start_time)) + +# Submit a bunch of actor tasks with all-to-all communication. +start_time = time.time() +logger.info("Submitting actor tasks with all-to-all communication.") +x_ids = [] +for _ in range(50): + for size_exponent in [0, 1, 2, 3, 4, 5, 6]: + x_ids = [a.method.remote(10**size_exponent, *x_ids) for a in actors] +ray.get(x_ids) +logger.info("Finished after {} seconds.".format(time.time() - start_time))