Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ matrix:

install:
- eval `python $TRAVIS_BUILD_DIR/ci/travis/determine_tests_to_run.py`
- if [ $RAY_CI_TUNE_AFFECTED != "1" ] && [ $RAY_CI_RLLIB_AFFECTED != "1" ] && [ $RAY_CI_PYTHON_AFFECTED != "1" ]; then exit; fi
- if [ $RAY_CI_TUNE_AFFECTED != "1" ] && [ $RAY_CI_RLLIB_AFFECTED != "1" ] && [ $RAY_CI_SERVE_AFFECTED != "1" ] && [ $RAY_CI_PYTHON_AFFECTED != "1" ]; then exit; fi

- ./ci/suppress_output ./ci/travis/install-bazel.sh
- ./ci/suppress_output ./ci/travis/install-dependencies.sh
Expand Down Expand Up @@ -173,6 +173,11 @@ script:
- if [ $RAY_CI_RLLIB_AFFECTED == "1" ]; then ./ci/suppress_output python python/ray/rllib/tests/test_optimizers.py; fi
- if [ $RAY_CI_RLLIB_AFFECTED == "1" ]; then ./ci/suppress_output python python/ray/rllib/tests/test_evaluators.py; fi

# ray serve tests
- if [ $RAY_CI_SERVE_AFFECTED == "1" ] && [ $RAY_CI_PY3 == "1" ]; then ./ci/suppress_output python python/ray/experimental/serve/tests/test_actors.py; fi
- if [ $RAY_CI_SERVE_AFFECTED == "1" ] && [ $RAY_CI_PY3 == "1" ]; then ./ci/suppress_output python python/ray/experimental/serve/tests/test_deadline_router.py; fi
- if [ $RAY_CI_SERVE_AFFECTED == "1" ] && [ $RAY_CI_PY3 == "1" ]; then ./ci/suppress_output python python/ray/experimental/serve/tests/test_default_app.py; fi

# ray tests
# Python3.5+ only. Otherwise we will get `SyntaxError` regardless of how we set the tester.
- if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then python -c 'import sys;exit(sys.version_info>=(3,5))' || python -m pytest -v --durations=5 --timeout=300 python/ray/experimental/test/async_test.py; fi
Expand Down
44 changes: 35 additions & 9 deletions ci/travis/determine_tests_to_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import os
import subprocess
import sys
from pprint import pformat


def list_changed_files(commit_range):
Expand All @@ -26,24 +28,37 @@ def list_changed_files(commit_range):
return [s.strip() for s in out.decode().splitlines() if s is not None]


def print_and_log(s):
print(s)
sys.stderr.write(s)
sys.stderr.write("\n")


if __name__ == "__main__":

RAY_CI_TUNE_AFFECTED = 0
RAY_CI_RLLIB_AFFECTED = 0
RAY_CI_SERVE_AFFECTED = 0
RAY_CI_JAVA_AFFECTED = 0
RAY_CI_PYTHON_AFFECTED = 0
RAY_CI_LINUX_WHEELS_AFFECTED = 0
RAY_CI_MACOS_WHEELS_AFFECTED = 0

RAY_CI_PY3 = 1 if sys.version_info >= (3, 5) else 0

if os.environ["TRAVIS_EVENT_TYPE"] == "pull_request":

files = list_changed_files(os.environ["TRAVIS_COMMIT_RANGE"].replace(
"...", ".."))

skip_prefix_list = [
"doc/", "examples/", "dev/", "docker/", "kubernetes/", "site/"
"doc/", "examples/", "dev/", "docker/", "kubernetes/", "site/",
"ci/", ".travis.yml"
]

sys.stderr.write("Files Changed\n")
sys.stderr.write(pformat(files))

for changed_file in files:
if changed_file.startswith("python/ray/tune/"):
RAY_CI_TUNE_AFFECTED = 1
Expand All @@ -54,6 +69,10 @@ def list_changed_files(commit_range):
RAY_CI_RLLIB_AFFECTED = 1
RAY_CI_LINUX_WHEELS_AFFECTED = 1
RAY_CI_MACOS_WHEELS_AFFECTED = 1
elif changed_file.startswith("python/ray/experimental/serve"):
RAY_CI_SERVE_AFFECTED = 1
RAY_CI_LINUX_WHEELS_AFFECTED = 1
RAY_CI_MACOS_WHEELS_AFFECTED = 1
elif changed_file.startswith("python/"):
RAY_CI_TUNE_AFFECTED = 1
RAY_CI_RLLIB_AFFECTED = 1
Expand Down Expand Up @@ -89,11 +108,18 @@ def list_changed_files(commit_range):
RAY_CI_LINUX_WHEELS_AFFECTED = 1
RAY_CI_MACOS_WHEELS_AFFECTED = 1

print("export RAY_CI_TUNE_AFFECTED={}".format(RAY_CI_TUNE_AFFECTED))
print("export RAY_CI_RLLIB_AFFECTED={}".format(RAY_CI_RLLIB_AFFECTED))
print("export RAY_CI_JAVA_AFFECTED={}".format(RAY_CI_JAVA_AFFECTED))
print("export RAY_CI_PYTHON_AFFECTED={}".format(RAY_CI_PYTHON_AFFECTED))
print("export RAY_CI_LINUX_WHEELS_AFFECTED={}"
.format(RAY_CI_LINUX_WHEELS_AFFECTED))
print("export RAY_CI_MACOS_WHEELS_AFFECTED={}"
.format(RAY_CI_MACOS_WHEELS_AFFECTED))
print_and_log("export RAY_CI_PY3={}".format(RAY_CI_PY3))
print_and_log(
"export RAY_CI_TUNE_AFFECTED={}".format(RAY_CI_TUNE_AFFECTED))
print_and_log(
"export RAY_CI_RLLIB_AFFECTED={}".format(RAY_CI_RLLIB_AFFECTED))
print_and_log(
"export RAY_CI_SERVE_AFFECTED={}".format(RAY_CI_SERVE_AFFECTED))
print_and_log(
"export RAY_CI_JAVA_AFFECTED={}".format(RAY_CI_JAVA_AFFECTED))
print_and_log(
"export RAY_CI_PYTHON_AFFECTED={}".format(RAY_CI_PYTHON_AFFECTED))
print_and_log("export RAY_CI_LINUX_WHEELS_AFFECTED={}".format(
RAY_CI_LINUX_WHEELS_AFFECTED))
print_and_log("export RAY_CI_MACOS_WHEELS_AFFECTED={}".format(
RAY_CI_MACOS_WHEELS_AFFECTED))
6 changes: 4 additions & 2 deletions ci/travis/install-dependencies.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "linux" ]]; then
bash miniconda.sh -b -p $HOME/miniconda
export PATH="$HOME/miniconda/bin:$PATH"
pip install -q scipy tensorflow cython==0.29.0 gym opencv-python-headless pyyaml pandas==0.24.2 requests \
feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout flaky networkx tabulate psutil aiohttp
feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout flaky networkx tabulate psutil aiohttp \
flask
elif [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "macosx" ]]; then
# Install miniconda.
wget https://repo.continuum.io/miniconda/Miniconda2-4.5.4-MacOSX-x86_64.sh -O miniconda.sh -nv
Expand All @@ -48,7 +49,8 @@ elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "macosx" ]]; then
bash miniconda.sh -b -p $HOME/miniconda
export PATH="$HOME/miniconda/bin:$PATH"
pip install -q cython==0.29.0 tensorflow gym opencv-python-headless pyyaml pandas==0.24.2 requests \
feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout flaky networkx tabulate psutil aiohttp
feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout flaky networkx tabulate psutil aiohttp \
flask
elif [[ "$LINT" == "1" ]]; then
sudo apt-get update
sudo apt-get install -y build-essential curl unzip
Expand Down
12 changes: 0 additions & 12 deletions python/ray/experimental/serve/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,6 @@ Ray Serve Module
``ray.experimental.serve`` is a module for publishing your actors to
interact with outside world.

Use Case
--------

Serve machine learning model
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Scalable anayltics query
~~~~~~~~~~~~~~~~~~~~~~~~

Composible pipelines
~~~~~~~~~~~~~~~~~~~~

Architecture
------------

Expand Down
22 changes: 16 additions & 6 deletions python/ray/experimental/serve/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,29 @@

import sys

assert sys.version_info >= (3, ), (
"ray.experimental.serve is a python3 only library")
assert sys.version_info >= (3, 5), "ray.experimental.serve is a python3 only library"

from ray.experimental.serve.router import (DeadlineAwareRouter,
SingleQuery) # noqa: E402
from ray.experimental.serve.router import (
DeadlineAwareRouter,
SingleQuery,
start_router,
) # noqa: E402
from ray.experimental.serve.frontend import HTTPFrontendActor # noqa: E402
from ray.experimental.serve.mixin import (RayServeMixin,
batched_input) # noqa: E402
from ray.experimental.serve.mixin import (
RayServeMixin,
RayServable,
batched_input,
) # noqa: E402
from ray.experimental.serve.defaults import enqueue, call, register_actor, set_replica

__all__ = [
"DeadlineAwareRouter",
"SingleQuery",
"HTTPFrontendActor",
"RayServeMixin",
"batched_input",
"enqueue",
"call",
"register_actor",
"set_replica",
]
87 changes: 87 additions & 0 deletions python/ray/experimental/serve/defaults.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from ray.experimental.serve.frontend import HTTPFrontendActor
from ray.experimental.serve.router import DeadlineAwareRouter, start_router
from ray.experimental.serve.object_id import unwrap
from ray.experimental import named_actors
import ray

import time
from typing import List

_default_router = None
_init_called = False
_default_frontend_handle = None


def init_if_not_ready():
global _init_called, _default_router, _default_frontend_handle
if _init_called:
return
_init_called = True
_default_router = start_router(DeadlineAwareRouter, "DefaultRouter")
_default_frontend_handle = HTTPFrontendActor.remote()
_default_frontend_handle.start.remote()

named_actors.register_actor("DefaultHTTPFrontendActor", _default_frontend_handle)


def register_actor(
actor_name: str,
actor_class: ray.actor.ActorClass,
init_args: List = [],
init_kwargs: dict = {},
num_replicas: int = 1,
max_batch_size: int = -1, # Unbounded batch size
annotation: dict = {},
):
"""Register a new managed actor.
"""
assert isinstance(actor_class, ray.actor.ActorClass)
init_if_not_ready()

inner_cls = actor_class._modified_class
annotation.update(getattr(inner_cls, inner_cls.serve_method).__annotations__)
ray.get(
_default_router.register_actor.remote(
actor_name,
actor_class,
init_args,
init_kwargs,
num_replicas,
max_batch_size,
annotation,
)
)
return "http://0.0.0.0:8090/{}".format(actor_name)


def set_replica(actor_name, new_replica_count):
"""Scale a managed actor according to new_replica_count."""
init_if_not_ready()

ray.get(_default_router.set_replica.remote(actor_name, new_replica_count))


def call(actor_name, data, deadline_s=None):
"""Enqueue a request to one of the actor managed by this router.

Returns the result. If you want the ObjectID to be returned immediately,
use enqueue
"""
init_if_not_ready()

if deadline_s is None:
deadline_s = time.time() + 10
return ray.get(enqueue(actor_name, data, deadline_s))


def enqueue(actor_name, data, deadline_s=None):
"""Enqueue a request to one of the actor managed by this router.

Returns:
ray.ObjectID, the result object ID when the query is executed.
"""
init_if_not_ready()

if deadline_s is None:
deadline_s = time.time() + 10
return unwrap(_default_router.call.remote(actor_name, data, deadline_s))
114 changes: 114 additions & 0 deletions python/ray/experimental/serve/example_actors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
"""
This file contains example ray servable actors. These actors
are used for testing as well as demoing purpose.
"""

from __future__ import absolute_import, division, print_function

import time

import numpy as np

import ray
from ray.experimental.serve import RayServeMixin, batched_input, RayServable


@ray.remote
class TimesThree(RayServable):
def __call__(self, inp: int):
return inp * 3


@ray.remote
class VectorizedAdder(RayServeMixin):
"""Actor that adds scaler_increment to input batch.

result = np.array(input_batch) + scaler_increment
"""

def __init__(self, scaler_increment):
self.inc = scaler_increment

@batched_input
def __call__(self, input_batch):
arr = np.array(input_batch)
arr += self.inc
return arr.tolist()


@ray.remote
class ScalerAdder(RayServeMixin):
"""Actor that adds a scaler_increment to a single input."""

def __init__(self, scaler_increment):
self.inc = scaler_increment

def __call__(self, input_scaler):
return input_scaler + self.inc


@ray.remote
class VectorDouble(RayServeMixin):
"""Actor that doubles the batched input."""

@batched_input
def __call__(self, batched_vectors):
matrix = np.array(batched_vectors)
matrix *= 2
return [v.tolist() for v in matrix]


@ray.remote
class Counter(RayServeMixin):
"""Return the query id. Used for testing router."""

def __init__(self):
self.counter = 0

def __call__(self, batched_input):
self.counter += 1
return self.counter


@ray.remote
class CustomCounter(RayServeMixin):
"""Return the query id. Used for testing `serve_method` signature."""

serve_method = "count"

@batched_input
def count(self, input_batch):
return [1 for _ in range(len(input_batch))]


@ray.remote
class SleepOnFirst(RayServeMixin):
"""Sleep on the first request, return batch size.

Used for testing the DeadlineAwareRouter.
"""

def __init__(self, sleep_time):
self.nap_time = sleep_time

@batched_input
def __call__(self, input_batch):
time.sleep(self.nap_time)
return [len(input_batch) for _ in range(len(input_batch))]


@ray.remote
class SleepCounter(RayServeMixin):
"""Sleep on input argument seconds, return the query id.

Used to test the DeadlineAwareRouter.
"""

def __init__(self):
self.counter = 0

def __call__(self, inp):
time.sleep(inp)

self.counter += 1
return self.counter
Loading