-
Notifications
You must be signed in to change notification settings - Fork 67
Add MultiprocessingConcurrencyLimiter to gateway #399
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
9654430
Add MultiprocessingConcurrencyLimiter to gateway
squeakymouse 0d89d40
increase values
squeakymouse 264d576
remove liveness probe
squeakymouse 9b520d0
concurrency 700, add comments
squeakymouse 997da37
500?
squeakymouse fb78098
Merge branch 'main' into katiewu/concurrency-limiting-gateway
squeakymouse 81785d8
omit healthcheck from concurrency limiter
squeakymouse f3d3660
address comments
squeakymouse File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
36 changes: 36 additions & 0 deletions
36
model-engine/model_engine_server/common/concurrency_limiter.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
from multiprocessing import BoundedSemaphore | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm wonder if we should be using an This SO comment seems to suggest that we should be using the corresponding semaphore:
|
||
from multiprocessing.synchronize import BoundedSemaphore as BoundedSemaphoreType | ||
from typing import Optional | ||
|
||
from fastapi import HTTPException | ||
from model_engine_server.core.loggers import logger_name, make_logger | ||
|
||
logger = make_logger(logger_name()) | ||
|
||
|
||
class MultiprocessingConcurrencyLimiter: | ||
def __init__(self, concurrency: Optional[int], fail_on_concurrency_limit: bool): | ||
self.concurrency = concurrency | ||
if concurrency is not None: | ||
if concurrency < 1: | ||
raise ValueError("Concurrency should be at least 1") | ||
self.semaphore: Optional[BoundedSemaphoreType] = BoundedSemaphore(value=concurrency) | ||
self.blocking = ( | ||
not fail_on_concurrency_limit | ||
) # we want to block if we want to queue up requests | ||
else: | ||
self.semaphore = None | ||
self.blocking = False # Unused | ||
|
||
def __enter__(self): | ||
logger.debug("Entering concurrency limiter semaphore") | ||
if self.semaphore and not self.semaphore.acquire(block=self.blocking): | ||
logger.warning(f"Too many requests (max {self.concurrency}), returning 429") | ||
raise HTTPException(status_code=429, detail="Too many requests") | ||
# Just raises an HTTPException. | ||
# __exit__ should not run; otherwise the release() doesn't have an acquire() | ||
|
||
def __exit__(self, type, value, traceback): | ||
logger.debug("Exiting concurrency limiter semaphore") | ||
if self.semaphore: | ||
self.semaphore.release() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do people think about trying this out with just a specific route at first? Looking at the breakdown in the past week,
get_/v1/async-tasks/_task_id
is the most common route by far.I'm not sure if it make sense to do a global limit, as we know some routes take more time than others.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-http-request
@squeakymouse From the docs, it looks like if our readiness probe route returns a
429
, it would cause the pod to be marked asunready
, and should result in 503s from istio again.It is a little odd since our experimentation doesn't seem to show that...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I think it shows up as the
context deadline exceeded (Client.Timeout exceeded while awaiting headers)
errors? 🤔Does this mean I should try to exclude the healthcheck route from the concurrency limiting?