-
Notifications
You must be signed in to change notification settings - Fork 67
Add MultiprocessingConcurrencyLimiter to gateway #399
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
def __enter__(self): | ||
logger.debug("Entering concurrency limiter semaphore") | ||
if self.semaphore and not self.semaphore.acquire(block=self.blocking): | ||
logger.warning("Too many requests, returning 429") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we log the # of requests instead of this log? i.e. # of requests that exceed the concurrency limit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any requests over the concurrency limit will immediately be returned as 429, so we'd always be logging whatever the value of MAX_CONCURRENCY
is (which still seems good to log though)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code seems good, but I would like more complete load test results (e.g. some stripped-down version of the load test docs phil and I have written) since the change we're making is quite dependent on stuff outside of this code, and there's nontrivial interactions between this code, whatever istio setup we have, where the requests are coming from, etc.
E.g. one reason you might be seeing different behavior with just up
and when deployed on a test deployment are because there may be request queues hidden somewhere for your test deployment (e.g. there's a bunch of other stuff in between your devbox and the gateway pods, like some istio layer for sure, the istio-proxy pods), as opposed to when testing locally
@squeakymouse is this rate-limiting per-user or is it system-wide? I think something we could consider working towards in the future would be implementing per-user rate limits - I could foresee this being awkward if someone sends their first request ever at a time when system load is high and gets back a 429. Actually, the best case might be instituting per-user rate limits and system rate limits. This way, no one user can monopolize all our throughput at any point in time. What do you think? Also, I could be totally missing a line in the code and the per-user rate limits have already been added. |
try: | ||
LoggerTagManager.set(LoggerTagKey.REQUEST_ID, str(uuid.uuid4())) | ||
return await call_next(request) | ||
with concurrency_limiter: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do people think about trying this out with just a specific route at first? Looking at the breakdown in the past week, get_/v1/async-tasks/_task_id
is the most common route by far.
I'm not sure if it make sense to do a global limit, as we know some routes take more time than others.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any code greater than or equal to 200 and less than 400 indicates success. Any other code indicates failure.
@squeakymouse From the docs, it looks like if our readiness probe route returns a 429
, it would cause the pod to be marked as unready
, and should result in 503s from istio again.
It is a little odd since our experimentation doesn't seem to show that...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I think it shows up as the context deadline exceeded (Client.Timeout exceeded while awaiting headers)
errors? 🤔
Does this mean I should try to exclude the healthcheck route from the concurrency limiting?
@@ -0,0 +1,35 @@ | |||
from multiprocessing import BoundedSemaphore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wonder if we should be using an async
semaphore rather than a multiprocessing one, since FastAPI is using async.
This SO comment seems to suggest that we should be using the corresponding semaphore:
use the correct type of semaphore for the form of concurrency being used
curious if there was consideration of 3P libraries, like slowapi, where rate limiting can be extended to redis, time-bounded, by user strategy, easy customization of limits per route, etc |
Edit: resolved in-person. Per-user rate limits and slowapi are out of scope for now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
get_or_create_aioredis_pool() | ||
|
||
|
||
# these routes should match those exempt from the concurrency limiter in the middleware |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To better link the routes between here <> middleware with code, we could define a shared list variable. Something like this could work.
health_routes = ["/healthcheck", "/healthz", "/readyz"]
def healthcheck() -> Response:
"""Returns 200 if the app is healthy."""
return Response(status_code=200)
for endpoint in health_routes:
app.get(endpoint)(healthcheck)
The code here can then refer to the health_routes
variable.
try: | ||
LoggerTagManager.set(LoggerTagKey.REQUEST_ID, str(uuid.uuid4())) | ||
return await call_next(request) | ||
if request.url.path in ["/healthcheck", "/healthz", "/readyz"]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: would just add a comment here that we intentionally exclude health check routes from the concurrency limiter.
Pull Request Summary
Add concurrency limiter so gateway returns 429s instead of 503s when overloaded
Remove kubernetes liveness probe to prevent pods from restarting when under high load and the healthcheck fails (purpose of the liveness probe is to restart in case of deadlocks? which doesn't seem relevant for us)
Increase Uvicorn worker concurrency now that we have rate limiting (the latter should take precedence so that we don't return 503s)
Test Plan and Usage Guide
Load tested with https://github.com/rakyll/hey on a test deployment and saw 429s instead of 503s for concurrency up to 5000