Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions test/stress_tests/run_stress_tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/usr/bin/env bash

# Cause the script to exit if a single command fails.
set -e

# Show explicitly which commands are currently running.
set -x

ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd)

# Start a large cluster using the autoscaler.
ray up -y $ROOT_DIR/stress_testing_config.yaml

# Run a bunch of stress tests.
ray submit $ROOT_DIR/stress_testing_config.yaml test_many_tasks_and_transfers.py
ray submit $ROOT_DIR/stress_testing_config.yaml test_dead_actors.py

# Tear down the cluster.
ray down -y $ROOT_DIR/stress_testing_config.yaml
115 changes: 115 additions & 0 deletions test/stress_tests/stress_testing_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# An unique identifier for the head node and workers of this cluster.
cluster_name: stress-testing

# The minimum number of workers nodes to launch in addition to the head
# node. This number should be >= 0.
min_workers: 100

# The maximum number of workers nodes to launch in addition to the head
# node. This takes precedence over min_workers.
max_workers: 100

# The autoscaler will scale up the cluster to this target fraction of resource
# usage. For example, if a cluster of 10 nodes is 100% busy and
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
# can be decreased to increase the aggressiveness of upscaling.
# This value must be less than 1.0 for scaling to happen.
target_utilization_fraction: 0.8

# If a node is idle for this many minutes, it will be removed.
idle_timeout_minutes: 5

# Cloud-provider specific configuration.
provider:
type: aws
region: us-west-2
availability_zone: us-west-2a

# How Ray will authenticate with newly launched nodes.
auth:
ssh_user: ubuntu
# By default Ray creates a new private keypair, but you can also use your own.
# If you do so, make sure to also set "KeyName" in the head and worker node
# configurations below.
# ssh_private_key: /path/to/your/key.pem

# Provider-specific config for the head node, e.g. instance type. By default
# Ray will auto-configure unspecified fields such as SubnetId and KeyName.
# For more documentation on available fields, see:
# http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances
head_node:
InstanceType: m5.12xlarge
ImageId: ami-0def3275 # Default Ubuntu 16.04 AMI.

# Set primary volume to 25 GiB
BlockDeviceMappings:
- DeviceName: /dev/sda1
Ebs:
VolumeSize: 50

# Additional options in the boto docs.

# Provider-specific config for worker nodes, e.g. instance type. By default
# Ray will auto-configure unspecified fields such as SubnetId and KeyName.
# For more documentation on available fields, see:
# http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances
worker_nodes:
InstanceType: m5.large
ImageId: ami-0def3275 # Default Ubuntu 16.04 AMI.

# Set primary volume to 25 GiB
BlockDeviceMappings:
- DeviceName: /dev/sda1
Ebs:
VolumeSize: 50

# Run workers on spot by default. Comment this out to use on-demand.
InstanceMarketOptions:
MarketType: spot
# Additional options can be found in the boto docs, e.g.
# SpotOptions:
# MaxPrice: MAX_HOURLY_PRICE

# Additional options in the boto docs.

# Files or directories to copy to the head and worker nodes. The format is a
# dictionary from REMOTE_PATH: LOCAL_PATH, e.g.
file_mounts: {
# "/path1/on/remote/machine": "/path1/on/local/machine",
# "/path2/on/remote/machine": "/path2/on/local/machine",
}

# List of shell commands to run to set up nodes.
setup_commands:
# Consider uncommenting these if you run into dpkg locking issues
# - sudo pkill -9 apt-get || true
# - sudo pkill -9 dpkg || true
# - sudo dpkg --configure -a
# Install basics.
- sudo apt-get update
- sudo apt-get install -y cmake pkg-config build-essential autoconf curl libtool unzip flex bison python
# Install Anaconda.
- wget https://repo.continuum.io/archive/Anaconda3-5.0.1-Linux-x86_64.sh || true
- bash Anaconda3-5.0.1-Linux-x86_64.sh -b -p $HOME/anaconda3 || true
- echo 'export PATH="$HOME/anaconda3/bin:$PATH"' >> ~/.bashrc
# # Build Ray.
# - git clone https://github.com/ray-project/ray || true
- pip install boto3==1.4.8 cython==0.27.3
# - cd ray/python; git checkout master; git pull; pip install -e . --verbose
- pip install https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.5.3-cp36-cp36m-manylinux1_x86_64.whl

# Custom commands that will be run on the head node after common setup.
head_setup_commands: []

# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []

# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
- ray stop
- ulimit -n 65536; ray start --head --num-redis-shards=5 --redis-port=6379 --autoscaling-config=~/ray_bootstrap_config.yaml

# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
- ray stop
- ulimit -n 65536; ray start --redis-address=$RAY_HEAD_IP:6379 --num-gpus=100
72 changes: 72 additions & 0 deletions test/stress_tests/test_dead_actors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/env python

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import logging
import numpy as np
import sys

import ray

logger = logging.getLogger(__name__)

ray.init(redis_address="localhost:6379")


@ray.remote
class Child(object):
def __init__(self, death_probability):
self.death_probability = death_probability

def ping(self):
# Exit process with some probability.
exit_chance = np.random.rand()
if exit_chance > self.death_probability:
sys.exit(-1)


@ray.remote
class Parent(object):
def __init__(self, num_children, death_probability):
self.death_probability = death_probability
self.children = [
Child.remote(death_probability) for _ in range(num_children)
]

def ping(self, num_pings):
children_outputs = []
for _ in range(num_pings):
children_outputs += [
child.ping.remote() for child in self.children
]
try:
ray.get(children_outputs)
except Exception:
# Replace the children if one of them died.
self.__init__(len(self.children), self.death_probability)

def kill(self):
# Clean up children.
ray.get([child.__ray_terminate__.remote() for child in self.children])


num_parents = 10
num_children = 10
death_probability = 0.95

parents = [
Parent.remote(num_children, death_probability) for _ in range(num_parents)
]
for i in range(100):
ray.get([parent.ping.remote(10) for parent in parents])

# Kill a parent actor with some probability.
exit_chance = np.random.rand()
if exit_chance > death_probability:
parent_index = np.random.randint(len(parents))
parents[parent_index].kill.remote()
parents[parent_index] = Parent.remote(num_children, death_probability)

logger.info("Finished trial", i)
84 changes: 84 additions & 0 deletions test/stress_tests/test_many_tasks_and_transfers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#!/usr/bin/env python

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import logging
import time

import ray

logger = logging.getLogger(__name__)

ray.init(redis_address="localhost:6379")

# These numbers need to match the values in the autoscaler config file.
num_remote_nodes = 100
head_node_cpus = 2
num_remote_cpus = num_remote_nodes * head_node_cpus

# Wait until the expected number of nodes have joined the cluster.
while True:
if len(ray.global_state.client_table()) >= num_remote_nodes + 1:
break
logger.info("Nodes have all joined. There are {} resources."
.format(ray.global_state.cluster_resources()))


# Require 1 GPU to force the tasks to be on remote machines.
@ray.remote(num_gpus=1)
def f(size, *xs):
return np.ones(size, dtype=np.uint8)


# Require 1 GPU to force the actors to be on remote machines.
@ray.remote(num_cpus=1, num_gpus=1)
class Actor(object):
def method(self, size, *xs):
return np.ones(size, dtype=np.uint8)


# Launch a bunch of tasks.
start_time = time.time()
logger.info("Submitting many tasks.")
for i in range(10):
logger.info("Iteration {}".format(i))
ray.get([f.remote(0) for _ in range(100000)])
logger.info("Finished after {} seconds.".format(time.time() - start_time))

# Launch a bunch of tasks, each with a bunch of dependencies.
start_time = time.time()
logger.info("Submitting tasks with many dependencies.")
x_ids = []
for i in range(5):
logger.info("Iteration {}".format(i))
x_ids = [f.remote(0, *x_ids) for _ in range(10000)]
ray.get(x_ids)
logger.info("Finished after {} seconds.".format(time.time() - start_time))

# Create a bunch of actors.
start_time = time.time()
logger.info("Creating {} actors.".format(num_remote_cpus))
actors = [Actor.remote() for _ in range(num_remote_cpus)]
logger.info("Finished after {} seconds.".format(time.time() - start_time))

# Submit a bunch of small tasks to each actor.
start_time = time.time()
logger.info("Submitting many small actor tasks.")
x_ids = []
for _ in range(100000):
x_ids = [a.method.remote(0) for a in actors]
ray.get(x_ids)
logger.info("Finished after {} seconds.".format(time.time() - start_time))

# Submit a bunch of actor tasks with all-to-all communication.
start_time = time.time()
logger.info("Submitting actor tasks with all-to-all communication.")
x_ids = []
for _ in range(50):
for size_exponent in [0, 1, 2, 3, 4, 5, 6]:
x_ids = [a.method.remote(10**size_exponent, *x_ids) for a in actors]
ray.get(x_ids)
logger.info("Finished after {} seconds.".format(time.time() - start_time))