Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions doc/source/serve/advanced-guides/advanced-autoscaling.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,34 @@ initializes slowly, you can increase `downscale_delay_s` to make the downscaling
happen more infrequently and avoid reinitialization when the application needs
to upscale again in the future.

:::{note}
Upscaling and downscaling calculations are made periodically by the Serve
Controller which by default runs every `0.1` seconds). You can set and check
this value using the `RAY_SERVE_CONTROL_LOOP_INTERVAL_S` environment variable.
`down/upscale_delay_s` restricts the autoscaler from adding or removing
replicas unless *all* calculations over the delay period unanimously agree to
either upscale or downscale. Any single *non-consistent* calculation will reset
the decision timer. In general `RAY_SERVE_CONTROL_LOOP_INTERVAL_S` should not
be changed for the purpose of balancing autoscaling.
:::

* **scaling_function [default_value="last"]: One of `"last","mean","min","max"`.
This parameter determines how delayed scaling decisions are made. Scaling
calculations are cached over the `upscale_delay_s` and `downscale_delay_s`
periods, and once *consistent* this parameter determines how the final number
of replicas is decided.
* `"last"`: The most recent scaling calculation is used.
* `"mean"`: The rounded mean value of all calculations is used.
* `"min"`: The minimum value of all calculations is used.
* `"max"`: The maximum value of all calculations is used.

:::{note}
Previously, the autoscaler would only use the most recent (`"last"`) scaling
calculation. This is maintained as the default behavior for backwards
compatibility, however it is recommended to use `"mean"` or `"max"` to better
accommodate traffic patterns.
:::

* **upscale_smoothing_factor [default_value=1.0] (DEPRECATED)**: This parameter
is renamed to `upscaling_factor`. `upscale_smoothing_factor` will be removed in
a future release.
Expand Down
3 changes: 3 additions & 0 deletions python/ray/serve/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
"RAY_SERVE_CONTROL_LOOP_INTERVAL_S cannot be negative."
)

# Max len of scaling decision history to prevent a memory leak.
MAX_SCALING_HISTORY_LENGTH = int(1e6)

#: Max time to wait for HTTP proxy in `serve.start()`.
HTTP_PROXY_TIMEOUT = 60

Expand Down
30 changes: 26 additions & 4 deletions python/ray/serve/autoscaling_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
import math
from typing import Any, Dict, Optional

from ray.serve._private.constants import CONTROL_LOOP_INTERVAL_S, SERVE_LOGGER_NAME
from ray.serve._private.constants import (
CONTROL_LOOP_INTERVAL_S,
MAX_SCALING_HISTORY_LENGTH,
SERVE_LOGGER_NAME,
)
from ray.serve.config import AutoscalingConfig
from ray.util.annotations import PublicAPI

Expand Down Expand Up @@ -57,7 +61,9 @@ def _calculate_desired_num_replicas(

# Multiply the distance to 1 by the smoothing ("gain") factor (default=1).
smoothed_error_ratio = 1 + ((error_ratio - 1) * scaling_factor)
desired_num_replicas = math.ceil(num_running_replicas * smoothed_error_ratio)
desired_num_replicas = math.ceil(
round(num_running_replicas * smoothed_error_ratio, 6)
)

# If desired num replicas is "stuck" because of the smoothing factor
# (meaning the traffic is low enough for the replicas to downscale
Expand Down Expand Up @@ -101,6 +107,8 @@ def replica_queue_length_autoscaling_policy(
seconds.
"""
decision_counter = policy_state.get("decision_counter", 0)
decision_history = policy_state.get("decision_history", [])

if num_running_replicas == 0:
# When 0 replicas and queries are queued, scale up the replicas
if total_num_requests > 0:
Expand All @@ -126,33 +134,47 @@ def replica_queue_length_autoscaling_policy(
# Otherwise, just increment.
if decision_counter < 0:
decision_counter = 0
decision_history = []
decision_counter += 1
decision_history.append(desired_num_replicas)

# Only actually scale the replicas if we've made this decision for
# 'scale_up_consecutive_periods' in a row.
if decision_counter > int(config.upscale_delay_s / CONTROL_LOOP_INTERVAL_S):
decision_num_replicas = config.decide_num_replicas(decision_history)
decision_counter = 0
decision_num_replicas = desired_num_replicas
decision_history = []

# Scale down.
elif desired_num_replicas < curr_target_num_replicas:
# If the previous decision was to scale up (the counter was
# positive), reset it to zero before decrementing.
if decision_counter > 0:
decision_counter = 0
decision_history = []

decision_counter -= 1
decision_history.append(desired_num_replicas)

# Only actually scale the replicas if we've made this decision for
# 'scale_down_consecutive_periods' in a row.
if decision_counter < -int(config.downscale_delay_s / CONTROL_LOOP_INTERVAL_S):
decision_num_replicas = config.decide_num_replicas(decision_history)
decision_counter = 0
decision_num_replicas = desired_num_replicas
decision_history = []

# Do nothing.
else:
decision_counter = 0
decision_history = []

policy_state["decision_counter"] = decision_counter

# Limit the length of the decision history to avoid unbounded memory usage
if len(decision_history) > MAX_SCALING_HISTORY_LENGTH:
decision_history = decision_history[-(MAX_SCALING_HISTORY_LENGTH - 10) :]
policy_state["decision_history"] = decision_history

return decision_num_replicas


Expand Down
23 changes: 23 additions & 0 deletions python/ray/serve/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,17 @@ class AutoscalingConfig(BaseModel):
# How long to wait before scaling up replicas
upscale_delay_s: NonNegativeFloat = 30.0

# Determines how scaling decisions are made based on calculated replica counts
# Replica counts are calculated many times over the upscale/downscale delay period
scaling_function: str = "last"
_scaling_functions = {
"last": staticmethod(lambda history: history[-1]),
"mean": staticmethod(lambda history: round(sum(history) / len(history))),
"max": staticmethod(max),
"min": staticmethod(min),
}
_scaling_function = _scaling_functions[scaling_function]

# Cloudpickled policy definition.
_serialized_policy_def: bytes = PrivateAttr(default=b"")

Expand Down Expand Up @@ -101,6 +112,15 @@ def replicas_settings_valid(cls, max_replicas, values):

return max_replicas

@validator("scaling_function", always=True)
def scaling_function_valid(cls, scaling_function: str):
if scaling_function not in cls._scaling_functions:
raise ValueError(
f"scaling_function must be one of {list(cls._scaling_functions.keys())}"
)
cls._scaling_function = cls._scaling_functions[scaling_function]
return scaling_function

def __init__(self, **kwargs):
super().__init__(**kwargs)
self.serialize_policy()
Expand Down Expand Up @@ -153,6 +173,9 @@ def get_downscaling_factor(self) -> PositiveFloat:
def get_target_ongoing_requests(self) -> PositiveFloat:
return self.target_ongoing_requests

def decide_num_replicas(self, history: List[int]) -> PositiveInt:
return round(self._scaling_function(history))


# Keep in sync with ServeDeploymentMode in dashboard/client/src/type/serve.ts
@Deprecated
Expand Down
75 changes: 75 additions & 0 deletions python/ray/serve/tests/test_autoscaling_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
ReplicaState,
)
from ray.serve._private.constants import (
CONTROL_LOOP_INTERVAL_S,
RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE,
SERVE_DEFAULT_APP_NAME,
SERVE_NAMESPACE,
Expand Down Expand Up @@ -1498,6 +1499,80 @@ def check_expected_statuses(
print("Statuses are as expected.")


@pytest.mark.parametrize("scaling_function", ["min", "max", "mean", "last"])
def test_autoscaling_decision_functions(serve_instance_with_signal, scaling_function):
client, signal = serve_instance_with_signal

# Compute over multiple control loop cycles to correct for misalignment
interval = CONTROL_LOOP_INTERVAL_S * 15
decision_history = [1, 9, 2]
scaling_delay = interval * len(decision_history)
app_config = {
"import_path": "ray.serve.tests.test_config_files.get_signal.app",
"deployments": [
{
"name": "A",
"autoscaling_config": {
"metrics_interval_s": 0.05,
"min_replicas": 0,
"max_replicas": 100,
"initial_replicas": 1,
"look_back_period_s": 0.2,
"downscale_delay_s": scaling_delay,
"upscale_delay_s": scaling_delay,
"target_ongoing_requests": 1.0,
"scaling_function": scaling_function,
},
"graceful_shutdown_timeout_s": 1,
"max_ongoing_requests": 1000,
}
],
}

# Wait for downscaling from 1 to 0 to sync with CONTROL_LOOP_INTERVAL_S
client.deploy_apps(ServeDeploySchema(**{"applications": [app_config]}))
wait_for_condition(
check_deployment_status,
name="A",
expected_status=DeploymentStatus.HEALTHY,
retry_interval_ms=5,
)
h = serve.get_app_handle(SERVE_DEFAULT_APP_NAME)
print("Waiting for downscaling signal to align with control loop.")
wait_for_condition(
check_deployment_status,
name="A",
expected_status=DeploymentStatus.DOWNSCALING,
retry_interval_ms=5,
timeout=scaling_delay + 1,
)
print("Downscaling detected.")

for replica_count in decision_history:
ray.get(signal.send.remote(clear=True))
[h.remote() for _ in range(replica_count)]
time.sleep(interval)

# Check that the scaling function is working as expected
if scaling_function == "min":
expected_replicas = min(decision_history)
elif scaling_function == "max":
expected_replicas = max(decision_history)
elif scaling_function == "mean":
expected_replicas = round(sum(decision_history) / len(decision_history))
elif scaling_function == "last":
expected_replicas = decision_history[-1]

print("Expected replicas: ", expected_replicas)
wait_for_condition(
check_num_replicas_eq, name="A", target=expected_replicas, timeout=scaling_delay
)

# Release signal so we don't get an ugly error message from the
# replica when the signal actor goes out of scope and gets killed
ray.get(signal.send.remote())


if __name__ == "__main__":
import sys

Expand Down
92 changes: 91 additions & 1 deletion python/ray/serve/tests/unit/test_autoscaling_policy.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import itertools
import sys

import pytest

from ray.serve._private.constants import CONTROL_LOOP_INTERVAL_S
from ray.serve._private.constants import (
CONTROL_LOOP_INTERVAL_S,
MAX_SCALING_HISTORY_LENGTH,
)
from ray.serve.autoscaling_policy import (
_calculate_desired_num_replicas,
replica_queue_length_autoscaling_policy,
Expand Down Expand Up @@ -635,6 +639,92 @@ def test_single_replica_receives_all_requests(self, ongoing_requests):
)
assert new_num_replicas == ongoing_requests / target_requests

@pytest.mark.parametrize("scaling_function", ["min", "max", "mean", "last"])
@pytest.mark.parametrize("scaling_delay", [0, 1, 10])
def test_scaling_decision_functions(self, scaling_function, scaling_delay):
policy_state = {}
min_replicas, max_replicas = 1, 10000
config = AutoscalingConfig(
min_replicas=min_replicas,
max_replicas=max_replicas,
target_ongoing_requests=1,
upscale_delay_s=scaling_delay,
downscale_delay_s=scaling_delay,
scaling_function=scaling_function,
)

current_replicas = 1
request_history = []
# Scaling from 0-1 does not utilize policy_state, start at 2
for i in itertools.chain(range(2, max_replicas), range(max_replicas, 2, -1)):
request_history.append(i)
new_num_replicas = replica_queue_length_autoscaling_policy(
config=config,
total_num_requests=i,
num_running_replicas=current_replicas,
curr_target_num_replicas=current_replicas,
capacity_adjusted_min_replicas=min_replicas,
capacity_adjusted_max_replicas=max_replicas,
policy_state=policy_state,
)

# Verify scaling decisions
if current_replicas != new_num_replicas:
if scaling_function == "min":
assert new_num_replicas == min(request_history)
elif scaling_function == "max":
assert new_num_replicas == max(request_history)
elif scaling_function == "mean":
assert new_num_replicas == round(
sum(request_history) / len(request_history)
)
elif scaling_function == "last":
assert new_num_replicas == request_history[-1]
request_history = []
# Reset expected scaling decision on direction reversal
elif current_replicas == i:
request_history = []

current_replicas = new_num_replicas

# Verify scaling decisions align with expected policy_state
assert policy_state["decision_history"] == request_history
assert abs(policy_state["decision_counter"]) == len(
policy_state["decision_history"]
)

def test_decision_history_bounds(self):
min_replicas, max_replicas = 0, 2
iterations = 100
config = AutoscalingConfig(
min_replicas=min_replicas,
max_replicas=max_replicas,
target_ongoing_requests=1,
upscale_delay_s=MAX_SCALING_HISTORY_LENGTH / CONTROL_LOOP_INTERVAL_S
+ iterations,
downscale_delay_s=MAX_SCALING_HISTORY_LENGTH / CONTROL_LOOP_INTERVAL_S
+ iterations,
)

policy_state = {
"decision_history": [2] * MAX_SCALING_HISTORY_LENGTH,
"decision_counter": MAX_SCALING_HISTORY_LENGTH,
}

for i in itertools.repeat(2, iterations):
_ = replica_queue_length_autoscaling_policy(
config=config,
total_num_requests=i,
num_running_replicas=1,
curr_target_num_replicas=1,
capacity_adjusted_min_replicas=min_replicas,
capacity_adjusted_max_replicas=max_replicas,
policy_state=policy_state,
)
# Decision history should be bounded, while decision counter increases
assert len(policy_state["decision_history"]) <= MAX_SCALING_HISTORY_LENGTH
assert policy_state["decision_counter"] > MAX_SCALING_HISTORY_LENGTH


if __name__ == "__main__":
sys.exit(pytest.main(["-v", "-s", __file__]))
3 changes: 3 additions & 0 deletions src/ray/protobuf/serve.proto
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ message AutoscalingConfig {

// The multiplicative "gain" factor to limit downscale.
optional double downscaling_factor = 15;

// How scaling decisions are made based on the calculated replica counts.
string scaling_function = 16;
}

//[Begin] LOGGING CONFIG
Expand Down