diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index 4ab89167ccb7ce..62a69ec2843a86 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -436,6 +436,11 @@ def ingest_transactions_options() -> list[click.Option]: is_flag=True, default=False, ), + click.Option( + ["--max-memory-percentage", "max_memory_percentage"], + default=1.0, + help="Maximum memory usage of the Redis cluster in % (0.0-1.0) before the consumer backpressures.", + ), *multiprocessing_options(default_max_batch_size=100), ], }, diff --git a/src/sentry/processing/backpressure/memory.py b/src/sentry/processing/backpressure/memory.py index 8b42ca9e1e83e3..cdba89a00beb53 100644 --- a/src/sentry/processing/backpressure/memory.py +++ b/src/sentry/processing/backpressure/memory.py @@ -4,6 +4,7 @@ import rb import requests +from redis import StrictRedis from rediscluster import RedisCluster @@ -47,7 +48,7 @@ def query_rabbitmq_memory_usage(host: str) -> ServiceMemory: # Based on configuration, this could be: # - a `rediscluster` Cluster (actually `RetryingRedisCluster`) # - a `rb.Cluster` (client side routing cluster client) -Cluster = Union[RedisCluster, rb.Cluster] +Cluster = Union[RedisCluster, rb.Cluster, StrictRedis] def get_memory_usage(node_id: str, info: Mapping[str, Any]) -> ServiceMemory: @@ -68,12 +69,14 @@ def get_host_port_info(node_id: str, cluster: Cluster) -> NodeInfo: # RedisCluster node mapping node = cluster.connection_pool.nodes.nodes.get(node_id) return NodeInfo(node["host"], node["port"]) - else: + elif isinstance(cluster, rb.Cluster): # rb.Cluster node mapping node = cluster.hosts[node_id] return NodeInfo(node.host, node.port) except Exception: - return NodeInfo(None, None) + pass + + return NodeInfo(None, None) def iter_cluster_memory_usage(cluster: Cluster) -> Generator[ServiceMemory]: @@ -83,6 +86,8 @@ def iter_cluster_memory_usage(cluster: Cluster) -> Generator[ServiceMemory]: if isinstance(cluster, RedisCluster): # `RedisCluster` returns these as a dictionary, with the node-id as key cluster_info = cluster.info() + elif isinstance(cluster, StrictRedis): + cluster_info = {"main": cluster.info()} else: # rb.Cluster returns a promise with a dictionary with a _local_ node-id as key with cluster.all() as client: diff --git a/src/sentry/spans/buffer.py b/src/sentry/spans/buffer.py index f289eb58632f70..b60ccea8455ffb 100644 --- a/src/sentry/spans/buffer.py +++ b/src/sentry/spans/buffer.py @@ -64,7 +64,7 @@ from __future__ import annotations import itertools -from collections.abc import MutableMapping, Sequence +from collections.abc import Generator, MutableMapping, Sequence from typing import Any, NamedTuple import rapidjson @@ -72,6 +72,7 @@ from django.utils.functional import cached_property from sentry_redis_tools.clients import RedisCluster, StrictRedis +from sentry.processing.backpressure.memory import ServiceMemory, iter_cluster_memory_usage from sentry.utils import metrics, redis # SegmentKey is an internal identifier used by the redis buffer that is also @@ -306,6 +307,27 @@ def _group_by_parent(self, spans: Sequence[Span]) -> dict[tuple[str, str], list[ return trees + def record_stored_segments(self): + with metrics.timer("spans.buffer.get_stored_segments"): + with self.client.pipeline(transaction=False) as p: + for shard in self.assigned_shards: + key = self._get_queue_key(shard) + p.zcard(key) + + result = p.execute() + + assert len(result) == len(self.assigned_shards) + + for shard_i, queue_size in zip(self.assigned_shards, result): + metrics.timing( + "spans.buffer.flush_segments.queue_size", + queue_size, + tags={"shard_i": shard_i}, + ) + + def get_memory_info(self) -> Generator[ServiceMemory]: + return iter_cluster_memory_usage(self.client) + def flush_segments(self, now: int, max_segments: int = 0) -> dict[SegmentKey, FlushedSegment]: cutoff = now @@ -318,13 +340,11 @@ def flush_segments(self, now: int, max_segments: int = 0) -> dict[SegmentKey, Fl p.zrangebyscore( key, 0, cutoff, start=0 if max_segments else None, num=max_segments or None ) - p.zcard(key) queue_keys.append(key) - result = iter(p.execute()) + result = p.execute() segment_keys: list[tuple[QueueKey, SegmentKey]] = [] - queue_sizes = [] with metrics.timer("spans.buffer.flush_segments.load_segment_data"): with self.client.pipeline(transaction=False) as p: @@ -335,18 +355,8 @@ def flush_segments(self, now: int, max_segments: int = 0) -> dict[SegmentKey, Fl segment_keys.append((queue_key, segment_key)) p.smembers(segment_key) - # ZCARD output - queue_sizes.append(next(result)) - segments = p.execute() - for shard_i, queue_size in zip(self.assigned_shards, queue_sizes): - metrics.timing( - "spans.buffer.flush_segments.queue_size", - queue_size, - tags={"shard_i": shard_i}, - ) - return_segments = {} num_has_root_spans = 0 diff --git a/src/sentry/spans/consumers/process/factory.py b/src/sentry/spans/consumers/process/factory.py index 8eeb86c05f681e..a31a019fa5a42f 100644 --- a/src/sentry/spans/consumers/process/factory.py +++ b/src/sentry/spans/consumers/process/factory.py @@ -37,6 +37,7 @@ def __init__( input_block_size: int | None, output_block_size: int | None, produce_to_pipe: Callable[[KafkaPayload], None] | None = None, + max_memory_percentage: float = 1.0, ): super().__init__() @@ -44,6 +45,7 @@ def __init__( self.max_batch_size = max_batch_size self.max_batch_time = max_batch_time self.max_flush_segments = max_flush_segments + self.max_memory_percentage = max_memory_percentage self.input_block_size = input_block_size self.output_block_size = output_block_size self.num_processes = num_processes @@ -66,8 +68,9 @@ def create_with_partitions( flusher = self._flusher = SpanFlusher( buffer, - self.max_flush_segments, - self.produce_to_pipe, + max_flush_segments=self.max_flush_segments, + max_memory_percentage=self.max_memory_percentage, + produce_to_pipe=self.produce_to_pipe, next_step=committer, ) @@ -93,19 +96,19 @@ def create_with_partitions( next_step=run_task, ) - # We use the produce timestamp to drive the clock for flushing, so that - # consumer backlogs do not cause segments to be flushed prematurely. - # The received timestamp in the span is too old for this purpose if - # Relay starts buffering, and we don't want that effect to propagate - # into this system. - def add_produce_timestamp_cb(message: Message[KafkaPayload]) -> tuple[int, KafkaPayload]: + def prepare_message(message: Message[KafkaPayload]) -> tuple[int, KafkaPayload]: + # We use the produce timestamp to drive the clock for flushing, so that + # consumer backlogs do not cause segments to be flushed prematurely. + # The received timestamp in the span is too old for this purpose if + # Relay starts buffering, and we don't want that effect to propagate + # into this system. return ( int(message.timestamp.timestamp() if message.timestamp else time.time()), message.payload, ) add_timestamp = RunTask( - function=add_produce_timestamp_cb, + function=prepare_message, next_step=batch, ) @@ -133,7 +136,7 @@ def process_batch( parent_span_id=val.get("parent_span_id"), project_id=val["project_id"], payload=payload.value, - is_segment_span=(val.get("parent_span_id") is None or val.get("is_remote")), + is_segment_span=bool(val.get("parent_span_id") is None or val.get("is_remote")), ) spans.append(span) diff --git a/src/sentry/spans/consumers/process/flusher.py b/src/sentry/spans/consumers/process/flusher.py index 7b800f3d287218..a6ebfdc3d66c5b 100644 --- a/src/sentry/spans/consumers/process/flusher.py +++ b/src/sentry/spans/consumers/process/flusher.py @@ -1,12 +1,14 @@ +import logging import multiprocessing import threading import time from collections.abc import Callable import rapidjson +import sentry_sdk from arroyo import Topic as ArroyoTopic from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration -from arroyo.processing.strategies.abstract import ProcessingStrategy +from arroyo.processing.strategies.abstract import MessageRejected, ProcessingStrategy from arroyo.types import FilteredPayload, Message from sentry.conf.types.kafka_definition import Topic @@ -14,6 +16,10 @@ from sentry.utils import metrics from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition +MAX_PROCESS_RESTARTS = 10 + +logger = logging.getLogger(__name__) + class SpanFlusher(ProcessingStrategy[FilteredPayload | int]): """ @@ -33,15 +39,19 @@ def __init__( self, buffer: SpansBuffer, max_flush_segments: int, + max_memory_percentage: float, produce_to_pipe: Callable[[KafkaPayload], None] | None, next_step: ProcessingStrategy[FilteredPayload | int], ): self.buffer = buffer self.max_flush_segments = max_flush_segments + self.max_memory_percentage = max_memory_percentage self.next_step = next_step self.stopped = multiprocessing.Value("i", 0) + self.redis_was_full = False self.current_drift = multiprocessing.Value("i", 0) + self.should_backpressure = multiprocessing.Value("i", 0) from sentry.utils.arroyo import _get_arroyo_subprocess_initializer @@ -59,6 +69,7 @@ def __init__( initializer, self.stopped, self.current_drift, + self.should_backpressure, self.buffer, self.max_flush_segments, produce_to_pipe, @@ -66,6 +77,8 @@ def __init__( daemon=True, ) + self.process_restarts = 0 + self.process.start() @staticmethod @@ -73,10 +86,13 @@ def main( initializer: Callable | None, stopped, current_drift, + should_backpressure, buffer: SpansBuffer, max_flush_segments: int, produce_to_pipe: Callable[[KafkaPayload], None] | None, ) -> None: + sentry_sdk.set_tag("sentry_spans_buffer_component", "flusher") + try: if initializer: initializer() @@ -102,6 +118,10 @@ def produce(payload: KafkaPayload) -> None: now = int(time.time()) + current_drift.value flushed_segments = buffer.flush_segments(max_segments=max_flush_segments, now=now) + should_backpressure.value = len(flushed_segments) >= max_flush_segments * len( + buffer.assigned_shards + ) + if not flushed_segments: time.sleep(1) continue @@ -139,8 +159,61 @@ def poll(self) -> None: self.next_step.poll() def submit(self, message: Message[FilteredPayload | int]) -> None: + # Note that submit is not actually a hot path. Their message payloads + # are mapped from *batches* of spans, and there are a handful of spans + # per second at most. If anything, self.poll() might even be called + # more often than submit() + if not self.process.is_alive(): + metrics.incr("sentry.spans.buffer.flusher_dead") + if self.process_restarts < MAX_PROCESS_RESTARTS: + self.process.start() + self.process_restarts += 1 + else: + raise RuntimeError( + "flusher process has crashed.\n\nSearch for sentry_spans_buffer_component:flusher in Sentry to get the original error." + ) + + self.buffer.record_stored_segments() + + # We pause insertion into Redis if the flusher is not making progress + # fast enough. We could backlog into Redis, but we assume, despite best + # efforts, it is still always going to be less durable than Kafka. + # Minimizing our Redis memory usage also makes COGS easier to reason + # about. + # + # should_backpressure is true if there are many segments to flush, but + # the flusher can't get all of them out. + if self.should_backpressure.value: + metrics.incr("sentry.spans.buffer.flusher.backpressure") + raise MessageRejected() + + # We set the drift. The backpressure based on redis memory comes after. + # If Redis is full for a long time, the drift will grow into a large + # negative value, effectively pausing flushing as well. if isinstance(message.payload, int): - self.current_drift.value = message.payload - int(time.time()) + self.current_drift.value = drift = message.payload - int(time.time()) + metrics.timing("sentry.spans.buffer.flusher.drift", drift) + + # We also pause insertion into Redis if Redis is too full. In this case + # we cannot allow the flusher to progress either, as it would write + # partial/fragmented segments to buffered-segments topic. We have to + # wait until the situation is improved manually. + if self.max_memory_percentage < 1.0: + memory_infos = list(self.buffer.get_memory_info()) + used = sum(x.used for x in memory_infos) + available = sum(x.available for x in memory_infos) + if available > 0 and used / available > self.max_memory_percentage: + if not self.redis_was_full: + logger.fatal("Pausing consumer due to Redis being full") + metrics.incr("sentry.spans.buffer.flusher.hard_backpressure") + self.redis_was_full = True + # Pause consumer if Redis memory is full. Because the drift is + # set before we emit backpressure, the flusher effectively + # stops as well. Alternatively we may simply crash the consumer + # but this would also trigger a lot of rebalancing. + raise MessageRejected() + + self.redis_was_full = False self.next_step.submit(message) def terminate(self) -> None: diff --git a/tests/sentry/spans/consumers/process/test_consumer.py b/tests/sentry/spans/consumers/process/test_consumer.py index 07d2a613898a84..fbf8fdc606371e 100644 --- a/tests/sentry/spans/consumers/process/test_consumer.py +++ b/tests/sentry/spans/consumers/process/test_consumer.py @@ -1,4 +1,3 @@ -import threading from datetime import datetime import rapidjson @@ -8,16 +7,7 @@ from sentry.spans.consumers.process.factory import ProcessSpansStrategyFactory -class FakeProcess(threading.Thread): - """ - Pretend this is multiprocessing.Process - """ - - def terminate(self): - pass - - -def test_basic(monkeypatch, request): +def test_basic(monkeypatch): # Flush very aggressively to make test pass instantly monkeypatch.setattr("time.sleep", lambda _: None) @@ -61,11 +51,6 @@ def add_commit(offsets, force=False): ) ) - @request.addfinalizer - def _(): - step.join() - fac.shutdown() - step.poll() fac._flusher.current_drift.value = 9000 # "advance" our "clock" diff --git a/tests/sentry/spans/consumers/process/test_flusher.py b/tests/sentry/spans/consumers/process/test_flusher.py new file mode 100644 index 00000000000000..4654865f8284fd --- /dev/null +++ b/tests/sentry/spans/consumers/process/test_flusher.py @@ -0,0 +1,83 @@ +import time +from time import sleep + +import rapidjson +from arroyo.processing.strategies.noop import Noop + +from sentry.spans.buffer import Span, SpansBuffer +from sentry.spans.consumers.process.flusher import SpanFlusher + + +def _payload(span_id: bytes) -> bytes: + return rapidjson.dumps({"span_id": span_id}).encode("ascii") + + +def test_backpressure(monkeypatch): + # Flush very aggressively to make join() faster + monkeypatch.setattr("time.sleep", lambda _: None) + + buffer = SpansBuffer(assigned_shards=list(range(1))) + + messages = [] + + def append(msg): + messages.append(msg) + sleep(1.0) + + flusher = SpanFlusher( + buffer, + max_flush_segments=1, + max_memory_percentage=1.0, + produce_to_pipe=append, + next_step=Noop(), + ) + + now = time.time() + + for i in range(200): + trace_id = f"{i:0>32x}" + + spans = [ + Span( + payload=_payload(b"a" * 16), + trace_id=trace_id, + span_id="a" * 16, + parent_span_id="b" * 16, + project_id=1, + ), + Span( + payload=_payload(b"d" * 16), + trace_id=trace_id, + span_id="d" * 16, + parent_span_id="b" * 16, + project_id=1, + ), + Span( + payload=_payload(b"c" * 16), + trace_id=trace_id, + span_id="c" * 16, + parent_span_id="b" * 16, + project_id=1, + ), + Span( + payload=_payload(b"b" * 16), + trace_id=trace_id, + span_id="b" * 16, + parent_span_id=None, + is_segment_span=True, + project_id=1, + ), + ] + + buffer.process_spans(spans, now=int(now)) + + # Advance drift to trigger idle timeout of all segments. The flusher should + # have way too much to do due to `max_flush_segments=1` and enter + # backpressure state. + + flusher.current_drift.value = 20000 + sleep(0.1) + + assert messages + + assert flusher.should_backpressure.value diff --git a/tests/sentry/spans/test_buffer.py b/tests/sentry/spans/test_buffer.py index e7cf39043f2f84..a45347d1b13dc7 100644 --- a/tests/sentry/spans/test_buffer.py +++ b/tests/sentry/spans/test_buffer.py @@ -172,6 +172,8 @@ def test_basic(buffer: SpansBuffer, spans): buffer.done_flush_segments(rv) assert buffer.flush_segments(now=30) == {} + assert list(buffer.get_memory_info()) + assert_clean(buffer.client)