-
-
Notifications
You must be signed in to change notification settings - Fork 4.5k
ref(span-buffer): Add backpressure #91707
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
7fa72a7
wip
untitaker 557e310
wip
untitaker 4848c20
Merge remote-tracking branch 'origin/master' into ref/span-buffer-bac…
untitaker 8992892
fix testcase and bug in pipeline usage
untitaker 83c1b0e
restart process up to ten times
untitaker 8fc6188
wip
untitaker 553d939
rewrite backpressure again
untitaker 64454fc
debug ci
untitaker d87e92d
more debugging
untitaker 617b11f
Merge branch 'master' into ref/span-buffer-backpressure
untitaker 1f53480
address review feedback
untitaker 70f9053
reference tag
untitaker File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,19 +1,25 @@ | ||
| import logging | ||
| import multiprocessing | ||
| import threading | ||
| import time | ||
| from collections.abc import Callable | ||
|
|
||
| import rapidjson | ||
| import sentry_sdk | ||
| from arroyo import Topic as ArroyoTopic | ||
| from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration | ||
| from arroyo.processing.strategies.abstract import ProcessingStrategy | ||
| from arroyo.processing.strategies.abstract import MessageRejected, ProcessingStrategy | ||
| from arroyo.types import FilteredPayload, Message | ||
|
|
||
| from sentry.conf.types.kafka_definition import Topic | ||
| from sentry.spans.buffer import SpansBuffer | ||
| from sentry.utils import metrics | ||
| from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition | ||
|
|
||
| MAX_PROCESS_RESTARTS = 10 | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class SpanFlusher(ProcessingStrategy[FilteredPayload | int]): | ||
| """ | ||
|
|
@@ -33,15 +39,19 @@ def __init__( | |
| self, | ||
| buffer: SpansBuffer, | ||
| max_flush_segments: int, | ||
| max_memory_percentage: float, | ||
| produce_to_pipe: Callable[[KafkaPayload], None] | None, | ||
| next_step: ProcessingStrategy[FilteredPayload | int], | ||
| ): | ||
| self.buffer = buffer | ||
| self.max_flush_segments = max_flush_segments | ||
| self.max_memory_percentage = max_memory_percentage | ||
| self.next_step = next_step | ||
|
|
||
| self.stopped = multiprocessing.Value("i", 0) | ||
| self.redis_was_full = False | ||
| self.current_drift = multiprocessing.Value("i", 0) | ||
| self.should_backpressure = multiprocessing.Value("i", 0) | ||
|
|
||
| from sentry.utils.arroyo import _get_arroyo_subprocess_initializer | ||
|
|
||
|
|
@@ -59,24 +69,30 @@ def __init__( | |
| initializer, | ||
| self.stopped, | ||
| self.current_drift, | ||
| self.should_backpressure, | ||
| self.buffer, | ||
| self.max_flush_segments, | ||
| produce_to_pipe, | ||
| ), | ||
| daemon=True, | ||
| ) | ||
|
|
||
| self.process_restarts = 0 | ||
|
|
||
| self.process.start() | ||
|
|
||
| @staticmethod | ||
| def main( | ||
| initializer: Callable | None, | ||
| stopped, | ||
| current_drift, | ||
| should_backpressure, | ||
| buffer: SpansBuffer, | ||
| max_flush_segments: int, | ||
| produce_to_pipe: Callable[[KafkaPayload], None] | None, | ||
| ) -> None: | ||
| sentry_sdk.set_tag("sentry_spans_buffer_component", "flusher") | ||
|
|
||
| try: | ||
| if initializer: | ||
| initializer() | ||
|
|
@@ -102,6 +118,10 @@ def produce(payload: KafkaPayload) -> None: | |
| now = int(time.time()) + current_drift.value | ||
| flushed_segments = buffer.flush_segments(max_segments=max_flush_segments, now=now) | ||
|
|
||
| should_backpressure.value = len(flushed_segments) >= max_flush_segments * len( | ||
| buffer.assigned_shards | ||
| ) | ||
|
|
||
| if not flushed_segments: | ||
| time.sleep(1) | ||
| continue | ||
|
|
@@ -139,8 +159,61 @@ def poll(self) -> None: | |
| self.next_step.poll() | ||
|
|
||
| def submit(self, message: Message[FilteredPayload | int]) -> None: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a short docstring that shows the two conditions we return with back pressure. Let's also document in which of the two cases the flusher itself will stop and why. |
||
| # Note that submit is not actually a hot path. Their message payloads | ||
| # are mapped from *batches* of spans, and there are a handful of spans | ||
| # per second at most. If anything, self.poll() might even be called | ||
| # more often than submit() | ||
| if not self.process.is_alive(): | ||
| metrics.incr("sentry.spans.buffer.flusher_dead") | ||
| if self.process_restarts < MAX_PROCESS_RESTARTS: | ||
| self.process.start() | ||
| self.process_restarts += 1 | ||
| else: | ||
| raise RuntimeError( | ||
| "flusher process has crashed.\n\nSearch for sentry_spans_buffer_component:flusher in Sentry to get the original error." | ||
| ) | ||
|
|
||
| self.buffer.record_stored_segments() | ||
|
|
||
| # We pause insertion into Redis if the flusher is not making progress | ||
| # fast enough. We could backlog into Redis, but we assume, despite best | ||
| # efforts, it is still always going to be less durable than Kafka. | ||
| # Minimizing our Redis memory usage also makes COGS easier to reason | ||
| # about. | ||
| # | ||
| # should_backpressure is true if there are many segments to flush, but | ||
| # the flusher can't get all of them out. | ||
| if self.should_backpressure.value: | ||
| metrics.incr("sentry.spans.buffer.flusher.backpressure") | ||
| raise MessageRejected() | ||
|
|
||
| # We set the drift. The backpressure based on redis memory comes after. | ||
| # If Redis is full for a long time, the drift will grow into a large | ||
| # negative value, effectively pausing flushing as well. | ||
| if isinstance(message.payload, int): | ||
| self.current_drift.value = message.payload - int(time.time()) | ||
| self.current_drift.value = drift = message.payload - int(time.time()) | ||
| metrics.timing("sentry.spans.buffer.flusher.drift", drift) | ||
|
|
||
| # We also pause insertion into Redis if Redis is too full. In this case | ||
| # we cannot allow the flusher to progress either, as it would write | ||
| # partial/fragmented segments to buffered-segments topic. We have to | ||
| # wait until the situation is improved manually. | ||
| if self.max_memory_percentage < 1.0: | ||
| memory_infos = list(self.buffer.get_memory_info()) | ||
| used = sum(x.used for x in memory_infos) | ||
| available = sum(x.available for x in memory_infos) | ||
| if available > 0 and used / available > self.max_memory_percentage: | ||
| if not self.redis_was_full: | ||
| logger.fatal("Pausing consumer due to Redis being full") | ||
| metrics.incr("sentry.spans.buffer.flusher.hard_backpressure") | ||
| self.redis_was_full = True | ||
| # Pause consumer if Redis memory is full. Because the drift is | ||
| # set before we emit backpressure, the flusher effectively | ||
| # stops as well. Alternatively we may simply crash the consumer | ||
| # but this would also trigger a lot of rebalancing. | ||
| raise MessageRejected() | ||
|
|
||
| self.redis_was_full = False | ||
| self.next_step.submit(message) | ||
|
|
||
| def terminate(self) -> None: | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please consider measuring this in bytes rather than set cardinality as spans can be very different from each other.
A way to do this is to keep a sharded counter of the size of what you write, when you add a batch of spans you increment, when you flush you reduce. Then you can rely on the amount of data in redis rather than the number of spans.
Or, better, measure the amount of free memory in redis like backpressure does. Then you cannot go wrong with the signal to trigger backpressure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. also had a conversation with jan and had to split up into two kinds of backpressure. see updated pr description