diff --git a/.travis.yml b/.travis.yml index 7402e6fb6e78..283f56c8672c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -135,7 +135,7 @@ matrix: install: - eval `python $TRAVIS_BUILD_DIR/ci/travis/determine_tests_to_run.py` - - if [ $RAY_CI_TUNE_AFFECTED != "1" ] && [ $RAY_CI_RLLIB_AFFECTED != "1" ] && [ $RAY_CI_PYTHON_AFFECTED != "1" ]; then exit; fi + - if [ $RAY_CI_TUNE_AFFECTED != "1" ] && [ $RAY_CI_RLLIB_AFFECTED != "1" ] && [ $RAY_CI_SERVE_AFFECTED != "1" ] && [ $RAY_CI_PYTHON_AFFECTED != "1" ]; then exit; fi - ./ci/suppress_output ./ci/travis/install-bazel.sh - ./ci/suppress_output ./ci/travis/install-dependencies.sh @@ -173,6 +173,11 @@ script: - if [ $RAY_CI_RLLIB_AFFECTED == "1" ]; then ./ci/suppress_output python python/ray/rllib/tests/test_optimizers.py; fi - if [ $RAY_CI_RLLIB_AFFECTED == "1" ]; then ./ci/suppress_output python python/ray/rllib/tests/test_evaluators.py; fi + # ray serve tests + - if [ $RAY_CI_SERVE_AFFECTED == "1" ] && [ $RAY_CI_PY3 == "1" ]; then ./ci/suppress_output python python/ray/experimental/serve/tests/test_actors.py; fi + - if [ $RAY_CI_SERVE_AFFECTED == "1" ] && [ $RAY_CI_PY3 == "1" ]; then ./ci/suppress_output python python/ray/experimental/serve/tests/test_deadline_router.py; fi + - if [ $RAY_CI_SERVE_AFFECTED == "1" ] && [ $RAY_CI_PY3 == "1" ]; then ./ci/suppress_output python python/ray/experimental/serve/tests/test_default_app.py; fi + # ray tests # Python3.5+ only. Otherwise we will get `SyntaxError` regardless of how we set the tester. - if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then python -c 'import sys;exit(sys.version_info>=(3,5))' || python -m pytest -v --durations=5 --timeout=300 python/ray/experimental/test/async_test.py; fi diff --git a/ci/travis/determine_tests_to_run.py b/ci/travis/determine_tests_to_run.py index 7518a0148d72..721447fe6e3d 100644 --- a/ci/travis/determine_tests_to_run.py +++ b/ci/travis/determine_tests_to_run.py @@ -5,6 +5,8 @@ import os import subprocess +import sys +from pprint import pformat def list_changed_files(commit_range): @@ -26,24 +28,37 @@ def list_changed_files(commit_range): return [s.strip() for s in out.decode().splitlines() if s is not None] +def print_and_log(s): + print(s) + sys.stderr.write(s) + sys.stderr.write("\n") + + if __name__ == "__main__": RAY_CI_TUNE_AFFECTED = 0 RAY_CI_RLLIB_AFFECTED = 0 + RAY_CI_SERVE_AFFECTED = 0 RAY_CI_JAVA_AFFECTED = 0 RAY_CI_PYTHON_AFFECTED = 0 RAY_CI_LINUX_WHEELS_AFFECTED = 0 RAY_CI_MACOS_WHEELS_AFFECTED = 0 + RAY_CI_PY3 = 1 if sys.version_info >= (3, 5) else 0 + if os.environ["TRAVIS_EVENT_TYPE"] == "pull_request": files = list_changed_files(os.environ["TRAVIS_COMMIT_RANGE"].replace( "...", "..")) skip_prefix_list = [ - "doc/", "examples/", "dev/", "docker/", "kubernetes/", "site/" + "doc/", "examples/", "dev/", "docker/", "kubernetes/", "site/", + "ci/", ".travis.yml" ] + sys.stderr.write("Files Changed\n") + sys.stderr.write(pformat(files)) + for changed_file in files: if changed_file.startswith("python/ray/tune/"): RAY_CI_TUNE_AFFECTED = 1 @@ -54,6 +69,10 @@ def list_changed_files(commit_range): RAY_CI_RLLIB_AFFECTED = 1 RAY_CI_LINUX_WHEELS_AFFECTED = 1 RAY_CI_MACOS_WHEELS_AFFECTED = 1 + elif changed_file.startswith("python/ray/experimental/serve"): + RAY_CI_SERVE_AFFECTED = 1 + RAY_CI_LINUX_WHEELS_AFFECTED = 1 + RAY_CI_MACOS_WHEELS_AFFECTED = 1 elif changed_file.startswith("python/"): RAY_CI_TUNE_AFFECTED = 1 RAY_CI_RLLIB_AFFECTED = 1 @@ -89,11 +108,18 @@ def list_changed_files(commit_range): RAY_CI_LINUX_WHEELS_AFFECTED = 1 RAY_CI_MACOS_WHEELS_AFFECTED = 1 - print("export RAY_CI_TUNE_AFFECTED={}".format(RAY_CI_TUNE_AFFECTED)) - print("export RAY_CI_RLLIB_AFFECTED={}".format(RAY_CI_RLLIB_AFFECTED)) - print("export RAY_CI_JAVA_AFFECTED={}".format(RAY_CI_JAVA_AFFECTED)) - print("export RAY_CI_PYTHON_AFFECTED={}".format(RAY_CI_PYTHON_AFFECTED)) - print("export RAY_CI_LINUX_WHEELS_AFFECTED={}" - .format(RAY_CI_LINUX_WHEELS_AFFECTED)) - print("export RAY_CI_MACOS_WHEELS_AFFECTED={}" - .format(RAY_CI_MACOS_WHEELS_AFFECTED)) + print_and_log("export RAY_CI_PY3={}".format(RAY_CI_PY3)) + print_and_log( + "export RAY_CI_TUNE_AFFECTED={}".format(RAY_CI_TUNE_AFFECTED)) + print_and_log( + "export RAY_CI_RLLIB_AFFECTED={}".format(RAY_CI_RLLIB_AFFECTED)) + print_and_log( + "export RAY_CI_SERVE_AFFECTED={}".format(RAY_CI_SERVE_AFFECTED)) + print_and_log( + "export RAY_CI_JAVA_AFFECTED={}".format(RAY_CI_JAVA_AFFECTED)) + print_and_log( + "export RAY_CI_PYTHON_AFFECTED={}".format(RAY_CI_PYTHON_AFFECTED)) + print_and_log("export RAY_CI_LINUX_WHEELS_AFFECTED={}".format( + RAY_CI_LINUX_WHEELS_AFFECTED)) + print_and_log("export RAY_CI_MACOS_WHEELS_AFFECTED={}".format( + RAY_CI_MACOS_WHEELS_AFFECTED)) diff --git a/ci/travis/install-dependencies.sh b/ci/travis/install-dependencies.sh index 20ff2e64d210..8b014ea9f635 100755 --- a/ci/travis/install-dependencies.sh +++ b/ci/travis/install-dependencies.sh @@ -34,7 +34,8 @@ elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "linux" ]]; then bash miniconda.sh -b -p $HOME/miniconda export PATH="$HOME/miniconda/bin:$PATH" pip install -q scipy tensorflow cython==0.29.0 gym opencv-python-headless pyyaml pandas==0.24.2 requests \ - feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout flaky networkx tabulate psutil aiohttp + feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout flaky networkx tabulate psutil aiohttp \ + flask elif [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "macosx" ]]; then # Install miniconda. wget https://repo.continuum.io/miniconda/Miniconda2-4.5.4-MacOSX-x86_64.sh -O miniconda.sh -nv @@ -48,7 +49,8 @@ elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "macosx" ]]; then bash miniconda.sh -b -p $HOME/miniconda export PATH="$HOME/miniconda/bin:$PATH" pip install -q cython==0.29.0 tensorflow gym opencv-python-headless pyyaml pandas==0.24.2 requests \ - feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout flaky networkx tabulate psutil aiohttp + feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout flaky networkx tabulate psutil aiohttp \ + flask elif [[ "$LINT" == "1" ]]; then sudo apt-get update sudo apt-get install -y build-essential curl unzip diff --git a/python/ray/experimental/serve/README.rst b/python/ray/experimental/serve/README.rst index b59672c46912..4857b4309d24 100644 --- a/python/ray/experimental/serve/README.rst +++ b/python/ray/experimental/serve/README.rst @@ -4,18 +4,6 @@ Ray Serve Module ``ray.experimental.serve`` is a module for publishing your actors to interact with outside world. -Use Case --------- - -Serve machine learning model -~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Scalable anayltics query -~~~~~~~~~~~~~~~~~~~~~~~~ - -Composible pipelines -~~~~~~~~~~~~~~~~~~~~ - Architecture ------------ diff --git a/python/ray/experimental/serve/__init__.py b/python/ray/experimental/serve/__init__.py index 15757a9d5f54..7084838dbeee 100644 --- a/python/ray/experimental/serve/__init__.py +++ b/python/ray/experimental/serve/__init__.py @@ -10,14 +10,20 @@ import sys -assert sys.version_info >= (3, ), ( - "ray.experimental.serve is a python3 only library") +assert sys.version_info >= (3, 5), "ray.experimental.serve is a python3 only library" -from ray.experimental.serve.router import (DeadlineAwareRouter, - SingleQuery) # noqa: E402 +from ray.experimental.serve.router import ( + DeadlineAwareRouter, + SingleQuery, + start_router, +) # noqa: E402 from ray.experimental.serve.frontend import HTTPFrontendActor # noqa: E402 -from ray.experimental.serve.mixin import (RayServeMixin, - batched_input) # noqa: E402 +from ray.experimental.serve.mixin import ( + RayServeMixin, + RayServable, + batched_input, +) # noqa: E402 +from ray.experimental.serve.defaults import enqueue, call, register_actor, set_replica __all__ = [ "DeadlineAwareRouter", @@ -25,4 +31,8 @@ "HTTPFrontendActor", "RayServeMixin", "batched_input", + "enqueue", + "call", + "register_actor", + "set_replica", ] diff --git a/python/ray/experimental/serve/defaults.py b/python/ray/experimental/serve/defaults.py new file mode 100644 index 000000000000..6f94a334d889 --- /dev/null +++ b/python/ray/experimental/serve/defaults.py @@ -0,0 +1,87 @@ +from ray.experimental.serve.frontend import HTTPFrontendActor +from ray.experimental.serve.router import DeadlineAwareRouter, start_router +from ray.experimental.serve.object_id import unwrap +from ray.experimental import named_actors +import ray + +import time +from typing import List + +_default_router = None +_init_called = False +_default_frontend_handle = None + + +def init_if_not_ready(): + global _init_called, _default_router, _default_frontend_handle + if _init_called: + return + _init_called = True + _default_router = start_router(DeadlineAwareRouter, "DefaultRouter") + _default_frontend_handle = HTTPFrontendActor.remote() + _default_frontend_handle.start.remote() + + named_actors.register_actor("DefaultHTTPFrontendActor", _default_frontend_handle) + + +def register_actor( + actor_name: str, + actor_class: ray.actor.ActorClass, + init_args: List = [], + init_kwargs: dict = {}, + num_replicas: int = 1, + max_batch_size: int = -1, # Unbounded batch size + annotation: dict = {}, +): + """Register a new managed actor. + """ + assert isinstance(actor_class, ray.actor.ActorClass) + init_if_not_ready() + + inner_cls = actor_class._modified_class + annotation.update(getattr(inner_cls, inner_cls.serve_method).__annotations__) + ray.get( + _default_router.register_actor.remote( + actor_name, + actor_class, + init_args, + init_kwargs, + num_replicas, + max_batch_size, + annotation, + ) + ) + return "http://0.0.0.0:8090/{}".format(actor_name) + + +def set_replica(actor_name, new_replica_count): + """Scale a managed actor according to new_replica_count.""" + init_if_not_ready() + + ray.get(_default_router.set_replica.remote(actor_name, new_replica_count)) + + +def call(actor_name, data, deadline_s=None): + """Enqueue a request to one of the actor managed by this router. + + Returns the result. If you want the ObjectID to be returned immediately, + use enqueue + """ + init_if_not_ready() + + if deadline_s is None: + deadline_s = time.time() + 10 + return ray.get(enqueue(actor_name, data, deadline_s)) + + +def enqueue(actor_name, data, deadline_s=None): + """Enqueue a request to one of the actor managed by this router. + + Returns: + ray.ObjectID, the result object ID when the query is executed. + """ + init_if_not_ready() + + if deadline_s is None: + deadline_s = time.time() + 10 + return unwrap(_default_router.call.remote(actor_name, data, deadline_s)) diff --git a/python/ray/experimental/serve/example_actors.py b/python/ray/experimental/serve/example_actors.py new file mode 100644 index 000000000000..102a7a63d06b --- /dev/null +++ b/python/ray/experimental/serve/example_actors.py @@ -0,0 +1,114 @@ +""" +This file contains example ray servable actors. These actors +are used for testing as well as demoing purpose. +""" + +from __future__ import absolute_import, division, print_function + +import time + +import numpy as np + +import ray +from ray.experimental.serve import RayServeMixin, batched_input, RayServable + + +@ray.remote +class TimesThree(RayServable): + def __call__(self, inp: int): + return inp * 3 + + +@ray.remote +class VectorizedAdder(RayServeMixin): + """Actor that adds scaler_increment to input batch. + + result = np.array(input_batch) + scaler_increment + """ + + def __init__(self, scaler_increment): + self.inc = scaler_increment + + @batched_input + def __call__(self, input_batch): + arr = np.array(input_batch) + arr += self.inc + return arr.tolist() + + +@ray.remote +class ScalerAdder(RayServeMixin): + """Actor that adds a scaler_increment to a single input.""" + + def __init__(self, scaler_increment): + self.inc = scaler_increment + + def __call__(self, input_scaler): + return input_scaler + self.inc + + +@ray.remote +class VectorDouble(RayServeMixin): + """Actor that doubles the batched input.""" + + @batched_input + def __call__(self, batched_vectors): + matrix = np.array(batched_vectors) + matrix *= 2 + return [v.tolist() for v in matrix] + + +@ray.remote +class Counter(RayServeMixin): + """Return the query id. Used for testing router.""" + + def __init__(self): + self.counter = 0 + + def __call__(self, batched_input): + self.counter += 1 + return self.counter + + +@ray.remote +class CustomCounter(RayServeMixin): + """Return the query id. Used for testing `serve_method` signature.""" + + serve_method = "count" + + @batched_input + def count(self, input_batch): + return [1 for _ in range(len(input_batch))] + + +@ray.remote +class SleepOnFirst(RayServeMixin): + """Sleep on the first request, return batch size. + + Used for testing the DeadlineAwareRouter. + """ + + def __init__(self, sleep_time): + self.nap_time = sleep_time + + @batched_input + def __call__(self, input_batch): + time.sleep(self.nap_time) + return [len(input_batch) for _ in range(len(input_batch))] + + +@ray.remote +class SleepCounter(RayServeMixin): + """Sleep on input argument seconds, return the query id. + + Used to test the DeadlineAwareRouter. + """ + + def __init__(self): + self.counter = 0 + + def __call__(self, inp): + time.sleep(inp) + + self.counter += 1 + return self.counter diff --git a/python/ray/experimental/serve/examples/adder.py b/python/ray/experimental/serve/examples/adder.py deleted file mode 100644 index 862e61c7150b..000000000000 --- a/python/ray/experimental/serve/examples/adder.py +++ /dev/null @@ -1,47 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import numpy as np - -import ray -from ray.experimental.serve import RayServeMixin, batched_input - - -@ray.remote -class VectorizedAdder(RayServeMixin): - """Actor that adds scaler_increment to input batch. - - result = np.array(input_batch) + scaler_increment - """ - - def __init__(self, scaler_increment): - self.inc = scaler_increment - - @batched_input - def __call__(self, input_batch): - arr = np.array(input_batch) - arr += self.inc - return arr.tolist() - - -@ray.remote -class ScalerAdder(RayServeMixin): - """Actor that adds a scaler_increment to a single input.""" - - def __init__(self, scaler_increment): - self.inc = scaler_increment - - def __call__(self, input_scaler): - return input_scaler + self.inc - - -@ray.remote -class VectorDouble(RayServeMixin): - """Actor that doubles the batched input.""" - - @batched_input - def __call__(self, batched_vectors): - matrix = np.array(batched_vectors) - matrix *= 2 - return [v.tolist() for v in matrix] diff --git a/python/ray/experimental/serve/examples/counter.py b/python/ray/experimental/serve/examples/counter.py deleted file mode 100644 index 369d53fb5a8e..000000000000 --- a/python/ray/experimental/serve/examples/counter.py +++ /dev/null @@ -1,29 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import ray -from ray.experimental.serve import RayServeMixin, batched_input - - -@ray.remote -class Counter(RayServeMixin): - """Return the query id. Used for testing router.""" - - def __init__(self): - self.counter = 0 - - def __call__(self, batched_input): - self.counter += 1 - return self.counter - - -@ray.remote -class CustomCounter(RayServeMixin): - """Return the query id. Used for testing `serve_method` signature.""" - - serve_method = "count" - - @batched_input - def count(self, input_batch): - return [1 for _ in range(len(input_batch))] diff --git a/python/ray/experimental/serve/examples/halt.py b/python/ray/experimental/serve/examples/halt.py deleted file mode 100644 index eceb94d8653e..000000000000 --- a/python/ray/experimental/serve/examples/halt.py +++ /dev/null @@ -1,41 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import time - -import ray -from ray.experimental.serve import RayServeMixin, batched_input - - -@ray.remote -class SleepOnFirst(RayServeMixin): - """Sleep on the first request, return batch size. - - Used for testing the DeadlineAwareRouter. - """ - - def __init__(self, sleep_time): - self.nap_time = sleep_time - - @batched_input - def __call__(self, input_batch): - time.sleep(self.nap_time) - return [len(input_batch) for _ in range(len(input_batch))] - - -@ray.remote -class SleepCounter(RayServeMixin): - """Sleep on input argument seconds, return the query id. - - Used to test the DeadlineAwareRouter. - """ - - def __init__(self): - self.counter = 0 - - def __call__(self, inp): - time.sleep(inp) - - self.counter += 1 - return self.counter diff --git a/python/ray/experimental/serve/frontend/__init__.py b/python/ray/experimental/serve/frontend/__init__.py index b1cb44636bde..4d3dc5408f82 100644 --- a/python/ray/experimental/serve/frontend/__init__.py +++ b/python/ray/experimental/serve/frontend/__init__.py @@ -2,6 +2,7 @@ from __future__ import division from __future__ import print_function +from ray.experimental.serve.frontend import util from ray.experimental.serve.frontend.http_frontend import HTTPFrontendActor -__all__ = ["HTTPFrontendActor"] +__all__ = ["HTTPFrontendActor", "util"] diff --git a/python/ray/experimental/serve/frontend/http_frontend.py b/python/ray/experimental/serve/frontend/http_frontend.py index 66973caca838..3bee624aa569 100644 --- a/python/ray/experimental/serve/frontend/http_frontend.py +++ b/python/ray/experimental/serve/frontend/http_frontend.py @@ -4,19 +4,18 @@ import time -import uvicorn -from starlette.applications import Starlette -from starlette.responses import JSONResponse +from aiohttp import web +from jsonschema import validate, ValidationError import ray - - -def unwrap(future): - """Unwrap the result from ray.experimental.server router. - Router returns a list of object ids when you call them. - """ - - return ray.get(future)[0] +from ray.experimental.serve.frontend.util import ( + async_get, + async_unwrap, + annotation_to_json_schema, + get_base_schema, +) +from ray.experimental import async_api +import asyncio @ray.remote @@ -39,34 +38,104 @@ class HTTPFrontendActor: } """ - def __init__(self, ip="0.0.0.0", port=8080, router="DefaultRouter"): + def __init__(self, ip="0.0.0.0", port=8090, router="DefaultRouter"): self.ip = ip self.port = port self.router = ray.experimental.named_actors.get_actor(router) + async_api.init() + def start(self): - default_app = Starlette() + from prometheus_client import Histogram, Counter, Gauge, start_http_server + + app = web.Application() + routes = web.RouteTableDef() + buckets = (.005, .01, .025, .05, .075, .1, .12, 0.15, 0.18, 0.19, .20, + .21, .22, .23, .23, .24, .25, .26, .27, .3, .4, .5, .75, + 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 25.0, 50.0, 100.0, + 150.0, 200.0, float("inf")) + metric_hist = Histogram('response_latency_seconds', + 'Response latency (seconds)', + buckets=buckets) + metric_guage = Gauge('in_progress_requests', 'Requests in progress') + metric_counter_rcv = Counter('request_recv', 'Total requests received') + metric_counter_done = Counter('request_done', + 'Total requests finished') + + @routes.get("/") + async def list_actors(request: web.Request): + all_actors = await async_get(self.router.get_actors.remote()) + return web.json_response({"actors": all_actors}) + + @routes.get("/{actor_name}") + async def get_schema(request: web.Request): + model_name = request.match_info["actor_name"] + model_annotation = await async_get( + self.router.get_annotation.remote(model_name)) + schema = get_base_schema() + schema["properties"]["input"][ + "properties"] = annotation_to_json_schema(model_annotation) + return web.json_response(schema) + + @routes.post("/{actor_name}") + async def call_actor(request: web.Request): + metric_counter_rcv.inc() + + model_name = request.match_info["actor_name"] + + # Get the annotation + try: + model_annotation = await async_get( + self.router.get_annotation.remote(model_name)) + except ray.worker.RayTaskError as e: + return web.Response(text=str(e), status=400) - @default_app.route("/{actor}", methods=["GET", "POST"]) - async def dispatch_remote_function(request): data = await request.json() - actor_name = request.path_params["actor"] + # Validate Schema + schema = get_base_schema() + schema["properties"]["input"][ + "properties"] = annotation_to_json_schema(model_annotation) + + try: + validate(data, schema) + except ValidationError as e: + return web.Response(text=e.__unicode__(), status=400) + + # Prepare input + inp = data.pop("input") slo_seconds = data.pop("slo_ms") / 1000 deadline = time.perf_counter() + slo_seconds + with metric_hist.time(): + with metric_guage.track_inprogress(): + result_future = await async_unwrap( + self.router.call.remote(model_name, inp, deadline)) + result = await async_get(result_future) + + metric_counter_done.inc() + + if isinstance(result, ray.worker.RayTaskError): + return web.json_response({ + "success": False, + "actor": model_name, + "error": str(result) + }) + return web.json_response({ + "success": True, + "actor": model_name, + "result": result + }) - inp = data.pop("input") + app.add_routes(routes) + start_http_server(8000) - result_future = unwrap( - self.router.call.remote(actor_name, inp, deadline)) + web.run_app(app, host=self.ip, port=self.port) - # TODO(simon): change to asyncio ray.get - result = ray.get(result_future) + runner = web.AppRunner(app) + loop = asyncio.get_event_loop() + loop.run_until_complete(runner.setup()) - return JSONResponse({ - "success": True, - "actor": actor_name, - "result": result - }) + site = web.TCPSite(runner, self.ip, self.port, backlog=5000) - uvicorn.run(default_app, host=self.ip, port=self.port) + loop.run_until_complete(site.start()) + loop.run_until_complete(runner.cleanup()) diff --git a/python/ray/experimental/serve/frontend/util.py b/python/ray/experimental/serve/frontend/util.py new file mode 100644 index 000000000000..21d966924d07 --- /dev/null +++ b/python/ray/experimental/serve/frontend/util.py @@ -0,0 +1,36 @@ +from ray.experimental import async_api +import ray + + +async def async_get(future: ray.ObjectID): + result = await async_api.as_future(future) + return result + + +async def async_unwrap(future: ray.ObjectID): + """Unwrap the result from ray.experimental.server router. + Router returns a list of object ids when you call them. + """ + result = await async_get(future) + return result[0] + + +def get_base_schema(): + return { + "type": "object", + "properties": { + "slo_ms": { + "type": "number", + "description": "Service Level Objective (SLO) for this query, in ms", + }, + "input": { + "type": "object", + "description": "The input to the model. Should be JSON object with key as keyword argument.", + }, + }, + } + + +def annotation_to_json_schema(a): + mapping = {int: "integer", float: "number", str: "string", bool: "boolean"} + return {k: {"type": mapping[v]} for k, v in a.items()} diff --git a/python/ray/experimental/serve/mixin.py b/python/ray/experimental/serve/mixin.py index 858572634a04..c57d3263ac53 100644 --- a/python/ray/experimental/serve/mixin.py +++ b/python/ray/experimental/serve/mixin.py @@ -1,3 +1,9 @@ +""" +Mixins are classes that are designed to be included in other class. +This module tries to provide a RayServeMixin class that makes +an actor servable by ray.serve +""" + from __future__ import absolute_import from __future__ import division from __future__ import print_function @@ -25,7 +31,10 @@ def _execute_and_seal_error(method, arg, method_name): resultOID and retried by user. """ try: - return method(arg) + if isinstance(arg, dict): + return method(**arg) + else: + return method(arg) except Exception: return ray.worker.RayTaskError(method_name, traceback.format_exc()) @@ -57,7 +66,8 @@ def _dispatch(self, input_batch: List[SingleQuery]): ray.worker.global_worker.put_object(inp.result_object_id, res) else: for inp in input_batch: - result = _execute_and_seal_error(method, inp.data, - self.serve_method) - ray.worker.global_worker.put_object(inp.result_object_id, - result) + result = _execute_and_seal_error(method, inp.data, self.serve_method) + ray.worker.global_worker.put_object(inp.result_object_id, result) + + +RayServable = RayServeMixin diff --git a/python/ray/experimental/serve/object_id.py b/python/ray/experimental/serve/object_id.py index cdde52532a58..433044641e6c 100644 --- a/python/ray/experimental/serve/object_id.py +++ b/python/ray/experimental/serve/object_id.py @@ -15,7 +15,8 @@ def unwrap(future): def get_new_oid(): worker = ray.worker.global_worker - oid = ray._raylet.compute_put_id(worker.current_task_id, - worker.task_context.put_index) + oid = ray._raylet.compute_put_id( + worker.current_task_id, worker.task_context.put_index + ) worker.task_context.put_index += 1 return oid diff --git a/python/ray/experimental/serve/router/__init__.py b/python/ray/experimental/serve/router/__init__.py index dae5fcb7ce01..55e42e2af80a 100644 --- a/python/ray/experimental/serve/router/__init__.py +++ b/python/ray/experimental/serve/router/__init__.py @@ -2,8 +2,7 @@ from __future__ import division from __future__ import print_function -from ray.experimental.serve.router.routers import (DeadlineAwareRouter, - SingleQuery) +from ray.experimental.serve.router.routers import DeadlineAwareRouter, SingleQuery import ray diff --git a/python/ray/experimental/serve/router/routers.py b/python/ray/experimental/serve/router/routers.py index b11849039ce9..1760a6e0aa38 100644 --- a/python/ray/experimental/serve/router/routers.py +++ b/python/ray/experimental/serve/router/routers.py @@ -11,9 +11,12 @@ from ray.experimental.serve.utils.priority_queue import PriorityQueue ACTOR_NOT_REGISTERED_MSG: Callable = ( - lambda name: ("Actor {} is not registered with this router. Please use " - "'router.register_actor.remote(...)' " - "to register it.").format(name)) + lambda name: ( + "Actor {} is not registered with this router. Please use " + "'router.register_actor.remote(...)' " + "to register it." + ).format(name) +) # Use @total_ordering so we can sort SingleQuery @@ -27,8 +30,7 @@ class SingleQuery: deadline: The deadline in seconds. """ - def __init__(self, data, result_object_id: ray.ObjectID, - deadline_s: float): + def __init__(self, data, result_object_id: ray.ObjectID, deadline_s: float): self.data = data self.result_object_id = result_object_id self.deadline = deadline_s @@ -50,11 +52,9 @@ class DeadlineAwareRouter: def __init__(self, router_name): # Runtime Data - self.query_queues: Dict[str, PriorityQueue] = defaultdict( - PriorityQueue) + self.query_queues: Dict[str, PriorityQueue] = defaultdict(PriorityQueue) self.running_queries: Dict[ray.ObjectID, ray.actor.ActorHandle] = {} - self.actor_handles: Dict[str, List[ray.actor.ActorHandle]] = ( - defaultdict(list)) + self.actor_handles: Dict[str, List[ray.actor.ActorHandle]] = (defaultdict(list)) # Actor Metadata self.managed_actors: Dict[str, ray.actor.ActorClass] = {} @@ -64,6 +64,8 @@ def __init__(self, router_name): # Router Metadata self.name = router_name + self.annotation_cache: Dict[str, dict] = {} + def start(self): """Kick off the router loop""" @@ -73,13 +75,14 @@ def start(self): ray.experimental.get_actor(self.name).loop.remote() def register_actor( - self, - actor_name: str, - actor_class: ray.actor.ActorClass, - init_args: List = [], - init_kwargs: dict = {}, - num_replicas: int = 1, - max_batch_size: int = -1, # Unbounded batch size + self, + actor_name: str, + actor_class: ray.actor.ActorClass, + init_args: List = [], + init_kwargs: dict = {}, + num_replicas: int = 1, + max_batch_size: int = -1, # Unbounded batch size + annotation: dict = {}, ): """Register a new managed actor. """ @@ -88,12 +91,21 @@ def register_actor( self.max_batch_size[actor_name] = max_batch_size ray.experimental.get_actor(self.name).set_replica.remote( - actor_name, num_replicas) + actor_name, num_replicas + ) + + self.annotation_cache[actor_name] = annotation + + def get_actors(self): + return list(self.managed_actors.keys()) + + def get_annotation(self, actor_name): + assert actor_name in self.managed_actors, ACTOR_NOT_REGISTERED_MSG(actor_name) + return self.annotation_cache[actor_name] def set_replica(self, actor_name, new_replica_count): """Scale a managed actor according to new_replica_count.""" - assert actor_name in self.managed_actors, ( - ACTOR_NOT_REGISTERED_MSG(actor_name)) + assert actor_name in self.managed_actors, ACTOR_NOT_REGISTERED_MSG(actor_name) current_replicas = len(self.actor_handles[actor_name]) @@ -103,7 +115,8 @@ def set_replica(self, actor_name, new_replica_count): args = self.actor_init_arguments[actor_name][0] kwargs = self.actor_init_arguments[actor_name][1] new_actor_handle = self.managed_actors[actor_name].remote( - *args, **kwargs) + *args, **kwargs + ) self.actor_handles[actor_name].append(new_actor_handle) # Decrease the number of replicas @@ -120,8 +133,7 @@ def call(self, actor_name, data, deadline_s): List[ray.ObjectID] with length 1, the object ID wrapped inside is the result object ID when the query is executed. """ - assert actor_name in self.managed_actors, ( - ACTOR_NOT_REGISTERED_MSG(actor_name)) + assert actor_name in self.managed_actors, ACTOR_NOT_REGISTERED_MSG(actor_name) result_object_id = get_new_oid() @@ -131,7 +143,8 @@ def call(self, actor_name, data, deadline_s): data_object_id = ray.worker.global_worker._current_task.arguments()[1] self.query_queues[actor_name].push( - SingleQuery(data_object_id, result_object_id, deadline_s)) + SingleQuery(data_object_id, result_object_id, deadline_s) + ) return [result_object_id] @@ -153,8 +166,7 @@ def loop(self): for ready_oid in ready_oids: self.running_queries.pop(ready_oid) - busy_actors: Set[ray.actor.ActorHandle] = set( - self.running_queries.values()) + busy_actors: Set[ray.actor.ActorHandle] = set(self.running_queries.values()) # 2. Iterate over free actors and request queues, dispatch requests # batch to free actors. @@ -179,8 +191,7 @@ def loop(self): def _get_next_batch(self, actor_name: str) -> List[SingleQuery]: """Get next batch of request for the actor whose name is provided.""" - assert actor_name in self.query_queues, ( - ACTOR_NOT_REGISTERED_MSG(actor_name)) + assert actor_name in self.query_queues, ACTOR_NOT_REGISTERED_MSG(actor_name) inputs = [] batch_size = self.max_batch_size[actor_name] @@ -199,8 +210,9 @@ def _get_next_batch(self, actor_name: str) -> List[SingleQuery]: return inputs - def _mark_running(self, batch_oid: ray.ObjectID, - actor_handle: ray.actor.ActorHandle): + def _mark_running( + self, batch_oid: ray.ObjectID, actor_handle: ray.actor.ActorHandle + ): """Mark actor_handle as running identified by batch_oid. This means that if batch_oid is fullfilled, then actor_handle must be diff --git a/python/ray/experimental/serve/tests/test_actors.py b/python/ray/experimental/serve/tests/test_actors.py index 3b2748b73bf3..8fdeae96f955 100644 --- a/python/ray/experimental/serve/tests/test_actors.py +++ b/python/ray/experimental/serve/tests/test_actors.py @@ -7,8 +7,12 @@ import ray from ray.experimental.serve import SingleQuery -from ray.experimental.serve.examples.adder import ScalerAdder, VectorizedAdder -from ray.experimental.serve.examples.counter import Counter, CustomCounter +from ray.experimental.serve.example_actors import ( + ScalerAdder, + VectorizedAdder, + Counter, + CustomCounter, +) from ray.experimental.serve.object_id import get_new_oid INCREMENT = 3 @@ -28,8 +32,7 @@ def generated_inputs(): input_arr = np.arange(10) for i in input_arr: oid = get_new_oid() - inputs.append( - SingleQuery(data=i, result_object_id=oid, deadline_s=deadline)) + inputs.append(SingleQuery(data=i, result_object_id=oid, deadline_s=deadline)) return inputs diff --git a/python/ray/experimental/serve/tests/test_deadline_router.py b/python/ray/experimental/serve/tests/test_deadline_router.py index d1d4d6769794..2ddd292af713 100644 --- a/python/ray/experimental/serve/tests/test_deadline_router.py +++ b/python/ray/experimental/serve/tests/test_deadline_router.py @@ -8,8 +8,12 @@ import pytest import ray -from ray.experimental.serve.examples.adder import ScalerAdder, VectorizedAdder -from ray.experimental.serve.examples.halt import SleepCounter, SleepOnFirst +from ray.experimental.serve.example_actors import ( + ScalerAdder, + VectorizedAdder, + SleepCounter, + SleepOnFirst, +) from ray.experimental.serve.object_id import unwrap from ray.experimental.serve.router import DeadlineAwareRouter, start_router @@ -29,14 +33,15 @@ def router(): handle = start_router(DeadlineAwareRouter, "DefaultRouter") handle.register_actor.remote( - "VAdder", VectorizedAdder, - init_kwargs={"scaler_increment": 1}) # init args + "VAdder", VectorizedAdder, init_kwargs={"scaler_increment": 1} + ) # init args handle.register_actor.remote( - "SAdder", ScalerAdder, init_kwargs={"scaler_increment": 2}) + "SAdder", ScalerAdder, init_kwargs={"scaler_increment": 2} + ) handle.register_actor.remote( - "SleepFirst", SleepOnFirst, init_kwargs={"sleep_time": 1}) - handle.register_actor.remote( - "SleepCounter", SleepCounter, max_batch_size=1) + "SleepFirst", SleepOnFirst, init_kwargs={"sleep_time": 1} + ) + handle.register_actor.remote("SleepCounter", SleepCounter, max_batch_size=1) yield handle @@ -69,9 +74,7 @@ def test_scaler_adder(router: DeadlineAwareRouter, now: float): def test_batching_ability(router: DeadlineAwareRouter, now: float): first = unwrap(router.call.remote("SleepFirst", 1, now + 1)) - rest = [ - unwrap(router.call.remote("SleepFirst", 1, now + 1)) for _ in range(10) - ] + rest = [unwrap(router.call.remote("SleepFirst", 1, now + 1)) for _ in range(10)] assert ray.get(first) == 1 assert np.alltrue(np.array(ray.get(rest)) == 10) diff --git a/python/ray/experimental/serve/tests/test_default_app.py b/python/ray/experimental/serve/tests/test_default_app.py index 5eb758c5f7ed..9cde848266e5 100644 --- a/python/ray/experimental/serve/tests/test_default_app.py +++ b/python/ray/experimental/serve/tests/test_default_app.py @@ -9,7 +9,7 @@ import ray from ray.experimental.serve import DeadlineAwareRouter -from ray.experimental.serve.examples.adder import VectorizedAdder +from ray.experimental.serve.example_actors import TimesThree from ray.experimental.serve.frontend import HTTPFrontendActor from ray.experimental.serve.router import start_router @@ -17,7 +17,7 @@ NUMBER_OF_TRIES = 5 -@pytest.fixture +@pytest.fixture(scope="module") def get_router(): # We need this many workers so resource are not oversubscribed ray.init(num_cpus=4) @@ -29,18 +29,23 @@ def get_router(): def test_http_basic(get_router): router = get_router a = HTTPFrontendActor.remote(router=ROUTER_NAME) - a.start.remote() + frontend_crashed = a.start.remote() - router.register_actor.remote( - "VAdder", VectorizedAdder, init_kwargs={"scaler_increment": 1}) + router.register_actor.remote("TimesThree", TimesThree) for _ in range(NUMBER_OF_TRIES): try: - url = "http://0.0.0.0:8080/VAdder" - payload = {"input": 10, "slo_ms": 1000} + url = "http://0.0.0.0:8090/TimesThree" + payload = {"input": {"inp": 3}, "slo_ms": 1000} resp = requests.request("POST", url, json=payload) except Exception: # it is possible that the actor is not yet instantiated time.sleep(1) - assert resp.json() == {"success": True, "actor": "VAdder", "result": 11} + print(resp.text) + assert resp.json() == {"success": True, "actor": "TimesThree", "result": 9} + + url = "http://0.0.0.0:8090/" + resp = requests.request("POST", url) + print(resp.text) + assert resp.json() == {"actors": ["TimesThree"]} diff --git a/python/ray/experimental/serve/tests/test_defaults.py b/python/ray/experimental/serve/tests/test_defaults.py new file mode 100644 index 000000000000..2fca2f248225 --- /dev/null +++ b/python/ray/experimental/serve/tests/test_defaults.py @@ -0,0 +1,27 @@ +import time +import pytest +import ray +import requests + + +def test_simple(): + ray.init(num_cpus=4) + import ray.experimental.serve as srv + from ray.experimental.serve.example_actors import TimesThree + + actor_name = "Times3" + assert srv.register_actor( + actor_name, TimesThree + ) == "http://0.0.0.0:8090/{}".format(actor_name) + + srv.set_replica(actor_name, 2) + + assert srv.call(actor_name, 3) == 9 + assert ray.get(srv.enqueue(actor_name, 4, time.time() + 10)) == 12 + + url = "http://0.0.0.0:8090/Times3" + payload = {"input": {"inp": 5}, "slo_ms": 1000} + resp = requests.request("POST", url, json=payload) + assert resp.json() == {"success": True, "actor": "Times3", "result": 15} + + ray.shutdown() diff --git a/python/ray/experimental/serve/utils/__init__.py b/python/ray/experimental/serve/utils/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/python/ray/experimental/serve/utils/priority_queue.py b/python/ray/experimental/serve/utils/priority_queue.py index 05b7045b43a5..479b917e8ead 100644 --- a/python/ray/experimental/serve/utils/priority_queue.py +++ b/python/ray/experimental/serve/utils/priority_queue.py @@ -5,7 +5,23 @@ import heapq -class PriorityQueue: +class BaseQueue: + """Abstract base class for queue abstraction""" + + def push(self, item): + raise NotImplementedError() + + def pop(self): + raise NotImplementedError() + + def try_pop(self): + raise NotImplementedError() + + def __len__(self): + raise NotImplementedError() + + +class PriorityQueue(BaseQueue): """A min-heap class wrapping heapq module.""" def __init__(self): @@ -25,3 +41,25 @@ def try_pop(self): def __len__(self): return len(self.q) + + +class FIFOQueue(BaseQueue): + """A min-heap class wrapping heapq module.""" + + def __init__(self): + self.q = [] + + def push(self, item): + self.q.append(item) + + def pop(self): + return self.q.pop(0) + + def try_pop(self): + if len(self.q) == 0: + return None + else: + return self.pop() + + def __len__(self): + return len(self.q)