Skip to content

Commit df5471b

Browse files
authored
feat(taskworker): Zstd compress process profile task (#95545)
The taskworker platform now supports ZSTD compression. This PR enables it for `sentry.profiles.task.process_profile`.
1 parent b38eb8f commit df5471b

File tree

2 files changed

+104
-1
lines changed

2 files changed

+104
-1
lines changed

src/sentry/profiles/task.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
from sentry.silo.base import SiloMode
4949
from sentry.tasks.base import instrumented_task
5050
from sentry.taskworker.config import TaskworkerConfig
51+
from sentry.taskworker.constants import CompressionType
5152
from sentry.taskworker.namespaces import ingest_profiling_tasks
5253
from sentry.taskworker.retry import Retry
5354
from sentry.utils import json, metrics
@@ -137,6 +138,7 @@ def encode_payload(message: dict[str, Any]) -> str:
137138
times=2,
138139
delay=5,
139140
),
141+
compression_type=CompressionType.ZSTD,
140142
),
141143
)
142144
def process_profile_task(

tests/sentry/profiles/test_task.py

Lines changed: 102 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from unittest import mock
88
from unittest.mock import patch
99

10+
import msgpack
1011
import pytest
1112
from django.core.files.uploadedfile import SimpleUploadedFile
1213
from django.urls import reverse
@@ -32,7 +33,7 @@
3233
)
3334
from sentry.profiles.utils import Profile
3435
from sentry.signals import first_profile_received
35-
from sentry.testutils.cases import TransactionTestCase
36+
from sentry.testutils.cases import TestCase, TransactionTestCase
3637
from sentry.testutils.factories import Factories, get_fixture_path
3738
from sentry.testutils.helpers import Feature, override_options
3839
from sentry.testutils.pytest.fixtures import django_db_all
@@ -1171,3 +1172,103 @@ def test_process_profile_task_should_flip_project_flag(
11711172
)
11721173
project.refresh_from_db()
11731174
assert project.flags.has_profiles
1175+
1176+
1177+
class TestProcessProfileTaskDoubleCompression(TestCase):
1178+
"""
1179+
TODO(taskworker): Remove this test once we have deleted zlib compression.
1180+
Test class for validating the double compression flow:
1181+
1. Consumer does zlib compression and calls process_profile_task.delay()
1182+
2. Taskworker does zstd compression on the task parameters
1183+
3. Task worker decompresses zstd and task decompresses zlib
1184+
"""
1185+
1186+
@patch("sentry.profiles.task._track_outcome")
1187+
@patch("sentry.profiles.task._track_duration_outcome")
1188+
@patch("sentry.profiles.task._symbolicate_profile")
1189+
@patch("sentry.profiles.task._deobfuscate_profile")
1190+
@patch("sentry.profiles.task._push_profile_to_vroom")
1191+
def test_consumer_to_task_double_compression_flow(
1192+
self,
1193+
_push_profile_to_vroom,
1194+
_deobfuscate_profile,
1195+
_symbolicate_profile,
1196+
_track_duration_outcome,
1197+
_track_outcome,
1198+
):
1199+
"""
1200+
Test that the full consumer -> task flow works with double compression.
1201+
1202+
This test validates:
1203+
1. process_message in factory.py does zlib compression
1204+
2. taskworker layer does zstd compression
1205+
3. Both decompressions work correctly in the task execution
1206+
"""
1207+
from datetime import datetime
1208+
1209+
from arroyo.backends.kafka import KafkaPayload
1210+
from arroyo.types import BrokerValue, Message, Partition, Topic
1211+
from django.utils import timezone
1212+
1213+
from sentry.profiles.consumers.process.factory import ProcessProfileStrategyFactory
1214+
1215+
# Mock the task functions
1216+
_push_profile_to_vroom.return_value = True
1217+
_deobfuscate_profile.return_value = True
1218+
_symbolicate_profile.return_value = True
1219+
1220+
# Get the profile fixture data
1221+
profile = generate_sample_v2_profile()
1222+
1223+
# Create a message dict like the consumer would receive from Kafka
1224+
message_dict = {
1225+
"organization_id": self.organization.id,
1226+
"project_id": self.project.id,
1227+
"key_id": 1,
1228+
"received": int(timezone.now().timestamp()),
1229+
"payload": json.dumps(profile),
1230+
}
1231+
1232+
# Pack the message with msgpack (like the consumer receives from Kafka)
1233+
payload = msgpack.packb(message_dict)
1234+
1235+
# Create the processing strategy (this will call process_message)
1236+
processing_strategy = ProcessProfileStrategyFactory().create_with_partitions(
1237+
commit=mock.Mock(), partitions={}
1238+
)
1239+
1240+
# Use self.tasks() to run the actual task with both compression layers
1241+
with self.tasks():
1242+
# Submit the message to the processing strategy
1243+
# This calls process_message which does:
1244+
# 1. zlib compression of the msgpack data
1245+
# 2. process_profile_task.delay() which adds zstd compression
1246+
processing_strategy.submit(
1247+
Message(
1248+
BrokerValue(
1249+
KafkaPayload(
1250+
b"key",
1251+
payload,
1252+
[],
1253+
),
1254+
Partition(Topic("profiles"), 1),
1255+
1,
1256+
datetime.now(),
1257+
)
1258+
)
1259+
)
1260+
processing_strategy.poll()
1261+
processing_strategy.join(1)
1262+
processing_strategy.terminate()
1263+
1264+
# Verify the task was executed successfully
1265+
assert _push_profile_to_vroom.call_count == 1
1266+
assert _deobfuscate_profile.call_count == 1
1267+
assert _symbolicate_profile.call_count == 1
1268+
assert _track_duration_outcome.call_count == 1
1269+
1270+
# Verify the profile was processed with correct data
1271+
processed_profile = _push_profile_to_vroom.call_args[0][0]
1272+
assert processed_profile["organization_id"] == self.organization.id
1273+
assert processed_profile["project_id"] == self.project.id
1274+
assert processed_profile["platform"] == profile["platform"]

0 commit comments

Comments
 (0)