Skip to content

Conversation

@Stack-Attack
Copy link
Contributor

@Stack-Attack Stack-Attack commented Aug 13, 2025

Why are these changes needed?

These changes modify the autoscaler metrics collection and aggregation functions in preparation for global aggregation in the controller.

Related issue number

Partial for #46497

Required for #41135 #51905

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@Stack-Attack Stack-Attack requested a review from a team as a code owner August 13, 2025 07:42
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the metrics aggregation logic for autoscaling by introducing new generic aggregation methods in InMemoryMetricsStore and a consolidate_metrics_stores function. The changes are a good step towards improving the separation of concerns. However, I've identified a few critical issues where None return values from aggregation functions are not handled, which will lead to TypeError exceptions. Additionally, there's a bug in consolidate_metrics_stores that could cause data loss for a specific metric, and a minor API design concern. Please review the detailed comments for suggestions on how to address these points.

@Stack-Attack Stack-Attack changed the title Update metrics_utils for global metrics aggregation in controller. Update metrics_utils for future global metrics aggregation in controller. Aug 13, 2025
@Stack-Attack
Copy link
Contributor Author

@zcin Partial PR for #51905 is here.

@ray-gardener ray-gardener bot added community-contribution Contributed by the community core Issues that should be addressed in Ray Core observability Issues related to the Ray Dashboard, Logging, Metrics, Tracing, and/or Profiling labels Aug 13, 2025
return self._aggregate_reduce(keys, statistics.mean)


def consolidate_metrics_stores(*stores: InMemoryMetricsStore) -> InMemoryMetricsStore:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain which serve component would call this function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abrarsheikh This would be called in the 'autoscaler_state_manager' to consolidate metrics from multiple handles. AFAIK there is no guarantee that replica-handle metrics are one-to-one (based on current logic), so I wanted to handle the case of the same replica reporting through different handles with a different cadence.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a docstring explaining inputs and return value? Since there are 2 axes, handles and replicas, it would be helpful if the docstring could explain with that context in mind.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added here, as well as the aggregate functions as the tuple return is new.

You mentioned in the other PR to use a single aggregate function which takes an enum. Let me know if you want that here. I'd need to import the enum to test_metrics_utils and, replica.py and router.py, which I'm not sure is preferable.

Comment on lines 267 to 322
if key == QUEUED_REQUESTS_KEY:
# Sum queued requests across handle metrics.
merged.data[QUEUED_REQUESTS_KEY][-1].value += timeseries[-1].value
elif key not in merged.data or (
timeseries
and (
not merged.data[key]
or timeseries[-1].timestamp > merged.data[key][-1].timestamp
)
):
# Replace if not present or if newer datapoints are available
merged.data[key] = timeseries.copy()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finding it hard to follow this logic. can you deconstruct the expression to better readability

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

each timeseries here is the number of requests (over e.g. past 30 seconds) that handle X send to replica Y right? are we choosing only the data from one out of all handles here, instead of aggregating across handles?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're going to have a lot of handle_metrics or replica_metrics ending up in the controller. We will store all of these as raw InMemoryMetricsStore objects. Each key stores one timeseries. Assuming that:

  1. Different metric_store's might contain the same key.
  2. All key's map to one unique replica.
    This function allows us to feed all the metrics_stores in, and only get the latest metrics for each key. So if a key is found, we overwrite it's data if the new timeseries newer. So this doesn't aggregate, it just consolidates the data and returns a single object representing the global state of all metrics_stores.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll break out that large conditional for readability, and add more documentation to make this clear!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Stack-Attack I'm not convinced that's what we want. If we have 2 handles, A and B, sending requests to a single replica X, then handle A can have time series (3, 4, 5), and handle B can have time series (7, 8, 9). This means that handle A most recently recorded 5 requests that were sent to replica X, and handle B recorded 9 requests that were sent to replica X. This means that 14 requests in total are executing on replica X.

However this code seems to only be taking either 5 or 9, instead of 14.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, we could use the replica-level metric collection for running requests, and only collect the queued requests from handles? If custom metrics will be recorded at the replica anyway, this might be better? @abrarsheikh

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I recall correctly, one reason metrics collection was moved to handles was to strictly guarantee that replicas do not exceed their max_running limit. Handles currently report the number of requests they send to each replica, and this is the metric source for running_requests.

To support custom metrics in the future, however, we are going to need to send metrics from the replica-level right? So it might make sense to unify the metrics collection back at the replica, and send only queue length from the handles.

I think this is the only major blocker. Since either solution above has pro's and con's, I think I should leave it to maintainers to decide on the design.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @Stack-Attack, your proposal to send queued requests from the handle and the rest of the metrics from replicas makes sense to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'll have to apply a windowed sum across all replica metrics?

that is correct

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the consolidate function to a merge function using a time window. In practice this window will be metrics_interval_s, which will be sent from the controller.

return self._aggregate_reduce(keys, statistics.mean)


def consolidate_metrics_stores(*stores: InMemoryMetricsStore) -> InMemoryMetricsStore:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a docstring explaining inputs and return value? Since there are 2 axes, handles and replicas, it would be helpful if the docstring could explain with that context in mind.

Comment on lines 267 to 322
if key == QUEUED_REQUESTS_KEY:
# Sum queued requests across handle metrics.
merged.data[QUEUED_REQUESTS_KEY][-1].value += timeseries[-1].value
elif key not in merged.data or (
timeseries
and (
not merged.data[key]
or timeseries[-1].timestamp > merged.data[key][-1].timestamp
)
):
# Replace if not present or if newer datapoints are available
merged.data[key] = timeseries.copy()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

each timeseries here is the number of requests (over e.g. past 30 seconds) that handle X send to replica Y right? are we choosing only the data from one out of all handles here, instead of aggregating across handles?

@Stack-Attack
Copy link
Contributor Author

Error in rebasing on master. Fixing now.

@Stack-Attack Stack-Attack requested a review from zcin August 26, 2025 10:22
@abrarsheikh abrarsheikh added the go add ONLY when ready to merge, run all tests label Aug 26, 2025
Copy link
Contributor

@zcin zcin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CI is failing because the example you added in the docstring for _aggregate_reduce is tested and failing: https://buildkite.com/ray-project/microcheck/builds/23995#0198e5ec-e7b9-49a2-b299-2226bf66710e/189-869

else:
merged.data[key] = timeseries.copy()

return merged
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, let's table this for when you raise the next PR!

assert merged["m1"] == [
TimeStampedValue(0, 101),
TimeStampedValue(2, 1),
TimeStampedValue(4, 2),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is there data at timestamp=4? I don't see data added for m1 at timestamp=4

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The windows can shift based on ordering. The only way to avoid this would be to compute the global start_timestamp before any merges.

merge(s1,s2, window=2):
  t=1 => window==[0, 2) => sum(1) => TimeStampedValue(1,1)
  t=3 => window==[2,4) => sum(2) => TimeStampedValue(3,2)
  result = (1,1),(3,2)
  
merge(result, s4, window=2):
  t=0 => window==[-1,1) => sum(100) => TimeStampedValue(0,100)
  t=2 => window==[1,3) => sum(1) => TimeStampedValue(2,1)
  t=4 => window==[3,5) => sum(2) => TimeStampedValue(4,2)
  result = (0,100),(2,1),(4,2)

Note, this was still passing incorrectly due to:

@dataclass(order=True)
class TimeStampedValue:
    timestamp: float
    value: float = field(compare=False)

I added a comparison to the tests to force check the value as well. I think we should remove the compare=False, but it's not required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Stack-Attack Hmm it seems pretty strange that the windows can shift. can this be fixed if we instead key windows by start of the window instead of using ts_center = start + w * window_s + (window_s / 2.0)? aka directly use start + w * window_s?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Stack-Attack actually I'm not sure if that's the exact issue. We just need the windows to be consistent - aka if on the previous merge the windows were (0,2), (2,4) etc, it shouldn't shift to (1,3),(3,5) etc at the next merge. How about we make sure that the windows are always multiples of window_s, aka it's always (0,window_s), (window_s,2*window_s)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zcin Good call! Modified to snap start to window_s grid. I maintained the window_s / 2.0 as it should still be helpful when window_s matches the polling rate of the metrics.

@abrarsheikh
Copy link
Contributor

@Stack-Attack could you please address @zcin 's comment? I think we are very close to closing this.

Signed-off-by: abrar <[email protected]>
Copy link
Contributor

@abrarsheikh abrarsheikh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i made 3 changes

  1. added a validation for window_s
  2. apply the bucketing logic even when a ensure that if a single timeseries is passed
  3. added more unit tests

Copy link
Contributor

@zcin zcin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with one more comment! I think we're good to merge after this.

@zcin zcin merged commit 2cbf1e2 into ray-project:master Sep 4, 2025
5 checks passed
sampan-s-nayak pushed a commit to sampan-s-nayak/ray that referenced this pull request Sep 8, 2025
…ler. (ray-project#55568)

## Why are these changes needed?

These changes modify the autoscaler metrics collection and aggregation
functions in preparation for global aggregation in the controller.

## Related issue number
Partial for ray-project#46497

Required for ray-project#41135 ray-project#51905

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [x] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [x] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Kyle Robinson <[email protected]>
Signed-off-by: Kyle Robinson <[email protected]>
Signed-off-by: abrar <[email protected]>
Co-authored-by: Abrar Sheikh <[email protected]>
Co-authored-by: Cindy Zhang <[email protected]>
Co-authored-by: abrar <[email protected]>
Signed-off-by: sampan <[email protected]>
jugalshah291 pushed a commit to jugalshah291/ray_fork that referenced this pull request Sep 11, 2025
…ler. (ray-project#55568)

## Why are these changes needed?

These changes modify the autoscaler metrics collection and aggregation
functions in preparation for global aggregation in the controller.

## Related issue number
Partial for ray-project#46497

Required for ray-project#41135 ray-project#51905

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [x] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [x] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Kyle Robinson <[email protected]>
Signed-off-by: Kyle Robinson <[email protected]>
Signed-off-by: abrar <[email protected]>
Co-authored-by: Abrar Sheikh <[email protected]>
Co-authored-by: Cindy Zhang <[email protected]>
Co-authored-by: abrar <[email protected]>
Signed-off-by: jugalshah291 <[email protected]>
wyhong3103 pushed a commit to wyhong3103/ray that referenced this pull request Sep 12, 2025
…ler. (ray-project#55568)

## Why are these changes needed?

These changes modify the autoscaler metrics collection and aggregation
functions in preparation for global aggregation in the controller.

## Related issue number
Partial for ray-project#46497

Required for ray-project#41135 ray-project#51905

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [x] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [x] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Kyle Robinson <[email protected]>
Signed-off-by: Kyle Robinson <[email protected]>
Signed-off-by: abrar <[email protected]>
Co-authored-by: Abrar Sheikh <[email protected]>
Co-authored-by: Cindy Zhang <[email protected]>
Co-authored-by: abrar <[email protected]>
Signed-off-by: yenhong.wong <[email protected]>
dstrodtman pushed a commit that referenced this pull request Oct 6, 2025
…ler. (#55568)

## Why are these changes needed?

These changes modify the autoscaler metrics collection and aggregation
functions in preparation for global aggregation in the controller.

## Related issue number
Partial for #46497

Required for #41135 #51905

<!-- For example: "Closes #1234" -->

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [x] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [x] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Kyle Robinson <[email protected]>
Signed-off-by: Kyle Robinson <[email protected]>
Signed-off-by: abrar <[email protected]>
Co-authored-by: Abrar Sheikh <[email protected]>
Co-authored-by: Cindy Zhang <[email protected]>
Co-authored-by: abrar <[email protected]>
Signed-off-by: Douglas Strodtman <[email protected]>
snorkelopstesting4-web pushed a commit to snorkel-marlin-repos/ray-project_ray_pr_55568_5fb8ce7a-e80b-46af-9fd3-5970bcd17ef0 that referenced this pull request Oct 22, 2025
snorkelopstesting1-a11y added a commit to snorkel-marlin-repos/ray-project_ray_pr_55568_5fb8ce7a-e80b-46af-9fd3-5970bcd17ef0 that referenced this pull request Oct 22, 2025
… aggregation in controller.

Merged from original PR #55568
Original: ray-project/ray#55568
landscapepainter pushed a commit to landscapepainter/ray that referenced this pull request Nov 17, 2025
…ler. (ray-project#55568)

## Why are these changes needed?

These changes modify the autoscaler metrics collection and aggregation
functions in preparation for global aggregation in the controller.

## Related issue number
Partial for ray-project#46497 

Required for ray-project#41135 ray-project#51905 

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [x] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [x] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Kyle Robinson <[email protected]>
Signed-off-by: Kyle Robinson <[email protected]>
Signed-off-by: abrar <[email protected]>
Co-authored-by: Abrar Sheikh <[email protected]>
Co-authored-by: Cindy Zhang <[email protected]>
Co-authored-by: abrar <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community core Issues that should be addressed in Ray Core go add ONLY when ready to merge, run all tests observability Issues related to the Ray Dashboard, Logging, Metrics, Tracing, and/or Profiling

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants