-
Notifications
You must be signed in to change notification settings - Fork 6.9k
Update metrics_utils for future global metrics aggregation in controller. #55568
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request refactors the metrics aggregation logic for autoscaling by introducing new generic aggregation methods in InMemoryMetricsStore and a consolidate_metrics_stores function. The changes are a good step towards improving the separation of concerns. However, I've identified a few critical issues where None return values from aggregation functions are not handled, which will lead to TypeError exceptions. Additionally, there's a bug in consolidate_metrics_stores that could cause data loss for a specific metric, and a minor API design concern. Please review the detailed comments for suggestions on how to address these points.
1424436 to
0988170
Compare
| return self._aggregate_reduce(keys, statistics.mean) | ||
|
|
||
|
|
||
| def consolidate_metrics_stores(*stores: InMemoryMetricsStore) -> InMemoryMetricsStore: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you explain which serve component would call this function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@abrarsheikh This would be called in the 'autoscaler_state_manager' to consolidate metrics from multiple handles. AFAIK there is no guarantee that replica-handle metrics are one-to-one (based on current logic), so I wanted to handle the case of the same replica reporting through different handles with a different cadence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a docstring explaining inputs and return value? Since there are 2 axes, handles and replicas, it would be helpful if the docstring could explain with that context in mind.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added here, as well as the aggregate functions as the tuple return is new.
You mentioned in the other PR to use a single aggregate function which takes an enum. Let me know if you want that here. I'd need to import the enum to test_metrics_utils and, replica.py and router.py, which I'm not sure is preferable.
| if key == QUEUED_REQUESTS_KEY: | ||
| # Sum queued requests across handle metrics. | ||
| merged.data[QUEUED_REQUESTS_KEY][-1].value += timeseries[-1].value | ||
| elif key not in merged.data or ( | ||
| timeseries | ||
| and ( | ||
| not merged.data[key] | ||
| or timeseries[-1].timestamp > merged.data[key][-1].timestamp | ||
| ) | ||
| ): | ||
| # Replace if not present or if newer datapoints are available | ||
| merged.data[key] = timeseries.copy() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
finding it hard to follow this logic. can you deconstruct the expression to better readability
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
each timeseries here is the number of requests (over e.g. past 30 seconds) that handle X send to replica Y right? are we choosing only the data from one out of all handles here, instead of aggregating across handles?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're going to have a lot of handle_metrics or replica_metrics ending up in the controller. We will store all of these as raw InMemoryMetricsStore objects. Each key stores one timeseries. Assuming that:
- Different
metric_store's might contain the same key. - All key's map to one unique replica.
This function allows us to feed all themetrics_storesin, and only get the latest metrics for each key. So if a key is found, we overwrite it's data if the newtimeseriesnewer. So this doesn't aggregate, it just consolidates the data and returns a single object representing the global state of allmetrics_stores.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll break out that large conditional for readability, and add more documentation to make this clear!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Stack-Attack I'm not convinced that's what we want. If we have 2 handles, A and B, sending requests to a single replica X, then handle A can have time series (3, 4, 5), and handle B can have time series (7, 8, 9). This means that handle A most recently recorded 5 requests that were sent to replica X, and handle B recorded 9 requests that were sent to replica X. This means that 14 requests in total are executing on replica X.
However this code seems to only be taking either 5 or 9, instead of 14.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, we could use the replica-level metric collection for running requests, and only collect the queued requests from handles? If custom metrics will be recorded at the replica anyway, this might be better? @abrarsheikh
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I recall correctly, one reason metrics collection was moved to handles was to strictly guarantee that replicas do not exceed their max_running limit. Handles currently report the number of requests they send to each replica, and this is the metric source for running_requests.
To support custom metrics in the future, however, we are going to need to send metrics from the replica-level right? So it might make sense to unify the metrics collection back at the replica, and send only queue length from the handles.
I think this is the only major blocker. Since either solution above has pro's and con's, I think I should leave it to maintainers to decide on the design.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @Stack-Attack, your proposal to send queued requests from the handle and the rest of the metrics from replicas makes sense to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we'll have to apply a windowed sum across all replica metrics?
that is correct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed the consolidate function to a merge function using a time window. In practice this window will be metrics_interval_s, which will be sent from the controller.
| return self._aggregate_reduce(keys, statistics.mean) | ||
|
|
||
|
|
||
| def consolidate_metrics_stores(*stores: InMemoryMetricsStore) -> InMemoryMetricsStore: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a docstring explaining inputs and return value? Since there are 2 axes, handles and replicas, it would be helpful if the docstring could explain with that context in mind.
| if key == QUEUED_REQUESTS_KEY: | ||
| # Sum queued requests across handle metrics. | ||
| merged.data[QUEUED_REQUESTS_KEY][-1].value += timeseries[-1].value | ||
| elif key not in merged.data or ( | ||
| timeseries | ||
| and ( | ||
| not merged.data[key] | ||
| or timeseries[-1].timestamp > merged.data[key][-1].timestamp | ||
| ) | ||
| ): | ||
| # Replace if not present or if newer datapoints are available | ||
| merged.data[key] = timeseries.copy() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
each timeseries here is the number of requests (over e.g. past 30 seconds) that handle X send to replica Y right? are we choosing only the data from one out of all handles here, instead of aggregating across handles?
|
Error in rebasing on master. Fixing now. |
Signed-off-by: Kyle Robinson <[email protected]>
Signed-off-by: Kyle Robinson <[email protected]>
Signed-off-by: Kyle Robinson <[email protected]>
Signed-off-by: Kyle Robinson <[email protected]>
Signed-off-by: Kyle Robinson <[email protected]>
zcin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CI is failing because the example you added in the docstring for _aggregate_reduce is tested and failing: https://buildkite.com/ray-project/microcheck/builds/23995#0198e5ec-e7b9-49a2-b299-2226bf66710e/189-869
| else: | ||
| merged.data[key] = timeseries.copy() | ||
|
|
||
| return merged |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, let's table this for when you raise the next PR!
| assert merged["m1"] == [ | ||
| TimeStampedValue(0, 101), | ||
| TimeStampedValue(2, 1), | ||
| TimeStampedValue(4, 2), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is there data at timestamp=4? I don't see data added for m1 at timestamp=4
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The windows can shift based on ordering. The only way to avoid this would be to compute the global start_timestamp before any merges.
merge(s1,s2, window=2):
t=1 => window==[0, 2) => sum(1) => TimeStampedValue(1,1)
t=3 => window==[2,4) => sum(2) => TimeStampedValue(3,2)
result = (1,1),(3,2)
merge(result, s4, window=2):
t=0 => window==[-1,1) => sum(100) => TimeStampedValue(0,100)
t=2 => window==[1,3) => sum(1) => TimeStampedValue(2,1)
t=4 => window==[3,5) => sum(2) => TimeStampedValue(4,2)
result = (0,100),(2,1),(4,2)
Note, this was still passing incorrectly due to:
@dataclass(order=True)
class TimeStampedValue:
timestamp: float
value: float = field(compare=False)
I added a comparison to the tests to force check the value as well. I think we should remove the compare=False, but it's not required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Stack-Attack Hmm it seems pretty strange that the windows can shift. can this be fixed if we instead key windows by start of the window instead of using ts_center = start + w * window_s + (window_s / 2.0)? aka directly use start + w * window_s?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Stack-Attack actually I'm not sure if that's the exact issue. We just need the windows to be consistent - aka if on the previous merge the windows were (0,2), (2,4) etc, it shouldn't shift to (1,3),(3,5) etc at the next merge. How about we make sure that the windows are always multiples of window_s, aka it's always (0,window_s), (window_s,2*window_s)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zcin Good call! Modified to snap start to window_s grid. I maintained the window_s / 2.0 as it should still be helpful when window_s matches the polling rate of the metrics.
Signed-off-by: Kyle Robinson <[email protected]>
Signed-off-by: Kyle Robinson <[email protected]>
|
@Stack-Attack could you please address @zcin 's comment? I think we are very close to closing this. |
Signed-off-by: Kyle Robinson <[email protected]>
3963cdb to
881edc2
Compare
Signed-off-by: Kyle Robinson <[email protected]>
Signed-off-by: abrar <[email protected]>
abrarsheikh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i made 3 changes
- added a validation for window_s
- apply the bucketing logic even when a ensure that if a single timeseries is passed
- added more unit tests
zcin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with one more comment! I think we're good to merge after this.
Signed-off-by: abrar <[email protected]>
…ler. (ray-project#55568) ## Why are these changes needed? These changes modify the autoscaler metrics collection and aggregation functions in preparation for global aggregation in the controller. ## Related issue number Partial for ray-project#46497 Required for ray-project#41135 ray-project#51905 <!-- For example: "Closes ray-project#1234" --> ## Checks - [x] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [x] I've run `scripts/format.sh` to lint the changes in this PR. - [x] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [x] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [x] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Kyle Robinson <[email protected]> Signed-off-by: Kyle Robinson <[email protected]> Signed-off-by: abrar <[email protected]> Co-authored-by: Abrar Sheikh <[email protected]> Co-authored-by: Cindy Zhang <[email protected]> Co-authored-by: abrar <[email protected]> Signed-off-by: sampan <[email protected]>
…ler. (ray-project#55568) ## Why are these changes needed? These changes modify the autoscaler metrics collection and aggregation functions in preparation for global aggregation in the controller. ## Related issue number Partial for ray-project#46497 Required for ray-project#41135 ray-project#51905 <!-- For example: "Closes ray-project#1234" --> ## Checks - [x] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [x] I've run `scripts/format.sh` to lint the changes in this PR. - [x] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [x] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [x] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Kyle Robinson <[email protected]> Signed-off-by: Kyle Robinson <[email protected]> Signed-off-by: abrar <[email protected]> Co-authored-by: Abrar Sheikh <[email protected]> Co-authored-by: Cindy Zhang <[email protected]> Co-authored-by: abrar <[email protected]> Signed-off-by: jugalshah291 <[email protected]>
…ler. (ray-project#55568) ## Why are these changes needed? These changes modify the autoscaler metrics collection and aggregation functions in preparation for global aggregation in the controller. ## Related issue number Partial for ray-project#46497 Required for ray-project#41135 ray-project#51905 <!-- For example: "Closes ray-project#1234" --> ## Checks - [x] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [x] I've run `scripts/format.sh` to lint the changes in this PR. - [x] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [x] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [x] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Kyle Robinson <[email protected]> Signed-off-by: Kyle Robinson <[email protected]> Signed-off-by: abrar <[email protected]> Co-authored-by: Abrar Sheikh <[email protected]> Co-authored-by: Cindy Zhang <[email protected]> Co-authored-by: abrar <[email protected]> Signed-off-by: yenhong.wong <[email protected]>
…ler. (#55568) ## Why are these changes needed? These changes modify the autoscaler metrics collection and aggregation functions in preparation for global aggregation in the controller. ## Related issue number Partial for #46497 Required for #41135 #51905 <!-- For example: "Closes #1234" --> ## Checks - [x] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [x] I've run `scripts/format.sh` to lint the changes in this PR. - [x] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [x] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [x] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Kyle Robinson <[email protected]> Signed-off-by: Kyle Robinson <[email protected]> Signed-off-by: abrar <[email protected]> Co-authored-by: Abrar Sheikh <[email protected]> Co-authored-by: Cindy Zhang <[email protected]> Co-authored-by: abrar <[email protected]> Signed-off-by: Douglas Strodtman <[email protected]>
…ler. Original PR #55568 by Stack-Attack Original: ray-project/ray#55568
… aggregation in controller. Merged from original PR #55568 Original: ray-project/ray#55568
…ler. (ray-project#55568) ## Why are these changes needed? These changes modify the autoscaler metrics collection and aggregation functions in preparation for global aggregation in the controller. ## Related issue number Partial for ray-project#46497 Required for ray-project#41135 ray-project#51905 <!-- For example: "Closes ray-project#1234" --> ## Checks - [x] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [x] I've run `scripts/format.sh` to lint the changes in this PR. - [x] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [x] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [x] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Kyle Robinson <[email protected]> Signed-off-by: Kyle Robinson <[email protected]> Signed-off-by: abrar <[email protected]> Co-authored-by: Abrar Sheikh <[email protected]> Co-authored-by: Cindy Zhang <[email protected]> Co-authored-by: abrar <[email protected]>
Why are these changes needed?
These changes modify the autoscaler metrics collection and aggregation functions in preparation for global aggregation in the controller.
Related issue number
Partial for #46497
Required for #41135 #51905
Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.