Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
6ce4b13
Rough outline
tobias-wilfert Sep 19, 2025
f08dd9c
Copy logic for `handle_submit_client_reports`.
tobias-wilfert Sep 19, 2025
adae25d
Adapt `handle_submit_client_reports` logic
tobias-wilfert Sep 19, 2025
cdfd669
Copy logic for `handle_flush_buckets`.
tobias-wilfert Sep 19, 2025
6551766
tmp save
tobias-wilfert Sep 24, 2025
4800bb0
Remove code that will not be needed in proxy mode
tobias-wilfert Sep 24, 2025
c293dc0
Update the tests that are now already breaking
tobias-wilfert Sep 24, 2025
2d019d4
Try getting the basics working first before addressing everything.
tobias-wilfert Sep 25, 2025
3684d37
Remove outdated comments, add new ones and clean up code.
tobias-wilfert Oct 14, 2025
4f8939d
More clean up
tobias-wilfert Oct 14, 2025
130f83d
Rudimentary tests
tobias-wilfert Oct 14, 2025
6b83213
remove comment
tobias-wilfert Oct 14, 2025
5175ddd
Update ratelimiting test to also check proxy mode
tobias-wilfert Oct 14, 2025
9b660e5
Add initial version of one large test.
tobias-wilfert Oct 14, 2025
ac14520
Test the ratelimiting in a proxy test rather than changing the store …
tobias-wilfert Oct 15, 2025
b0f5ed9
Don't start the aggregator and cogs in proxy mode.
tobias-wilfert Oct 15, 2025
213e7ea
Improve test by using random data and adding the expect outcome as pa…
tobias-wilfert Oct 16, 2025
4001c74
Make the edge-cases more explicit
tobias-wilfert Oct 16, 2025
26174bb
Remove pool from proxy mode processor
tobias-wilfert Oct 17, 2025
0f6646b
Avoid monkey patching in test, asset outcomes against sorted list + m…
tobias-wilfert Oct 17, 2025
ebcbd4c
Move proxy_processor into own file
tobias-wilfert Oct 17, 2025
7b517d6
Update comment + readd sleep to make CI happy
tobias-wilfert Oct 17, 2025
44b9e0d
Merge branch 'master' into tobias-wilfert/feat/simplify-proxy-mode
tobias-wilfert Oct 17, 2025
8eae7e0
Add changelog entry
tobias-wilfert Oct 17, 2025
6af517c
Remove outdated item types
tobias-wilfert Oct 17, 2025
dca6f1f
Merge branch 'master' into tobias-wilfert/feat/simplify-proxy-mode
tobias-wilfert Oct 17, 2025
b278533
update changelog entry
tobias-wilfert Oct 17, 2025
0c02913
Merge branch 'master' into tobias-wilfert/feat/simplify-proxy-mode
tobias-wilfert Oct 17, 2025
c315533
Remove redundant Inner struct
tobias-wilfert Oct 21, 2025
3dc2099
remove default impl for `ProxyAddrs`
tobias-wilfert Oct 21, 2025
3943930
update comment
tobias-wilfert Oct 21, 2025
c54e9e8
fixes
tobias-wilfert Oct 21, 2025
c30d12e
add comments
tobias-wilfert Oct 21, 2025
ddc9186
don't split envelope but rather send as is
tobias-wilfert Oct 21, 2025
a91c189
Merge branch 'tobias-wilfert/feat/simplify-proxy-mode' of github.com:…
tobias-wilfert Oct 21, 2025
f0edd19
remove now unused import
tobias-wilfert Oct 21, 2025
59cb398
Merge branch 'master' into tobias-wilfert/feat/simplify-proxy-mode
tobias-wilfert Oct 21, 2025
13c6112
make autoscaling service optional as a whole rather than just the ser…
tobias-wilfert Oct 21, 2025
99f4959
Merge branch 'tobias-wilfert/feat/simplify-proxy-mode' of github.com:…
tobias-wilfert Oct 21, 2025
e395328
Changing autoscaling endpoint response
tobias-wilfert Oct 21, 2025
26f646c
Merge branch 'master' into tobias-wilfert/feat/simplify-proxy-mode
tobias-wilfert Oct 22, 2025
fa8f354
Merge branch 'master' into tobias-wilfert/feat/simplify-proxy-mode
tobias-wilfert Oct 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

**Breaking Changes**:

- Simplify proxy mode to forward without processing. ([#5165](https://github.com/getsentry/relay/pull/5165))

**Bug Fixes**:

- Make referer optional in Vercel Log Drain Transform. ([#5273](https://github.com/getsentry/relay/pull/5273))
Expand Down
13 changes: 8 additions & 5 deletions relay-server/src/endpoints/autoscaling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ use std::fmt::Write;

/// Returns internal metrics data for relay.
pub async fn handle(state: ServiceState) -> (StatusCode, String) {
let data = match state
.autoscaling()
.send(AutoscalingMessageKind::Check)
.await
{
let Some(autoscaling) = state.autoscaling() else {
return (
StatusCode::NOT_FOUND,
"Autoscaling metrics not enabled".to_owned(),
);
};

let data = match autoscaling.send(AutoscalingMessageKind::Check).await {
Ok(data) => data,
Err(_) => {
return (
Expand Down
126 changes: 78 additions & 48 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::services::processor::{
};
use crate::services::projects::cache::{ProjectCacheHandle, ProjectCacheService};
use crate::services::projects::source::ProjectSource;
use crate::services::proxy_processor::{ProxyAddrs, ProxyProcessorService};
use crate::services::relays::{RelayCache, RelayCacheService};
use crate::services::stats::RelayStats;
#[cfg(feature = "processing")]
Expand Down Expand Up @@ -72,7 +73,7 @@ pub struct Registry {
pub upstream_relay: Addr<UpstreamRelay>,
pub envelope_buffer: PartitionedEnvelopeBuffer,
pub project_cache_handle: ProjectCacheHandle,
pub autoscaling: Addr<AutoscalingMetrics>,
pub autoscaling: Option<Addr<AutoscalingMetrics>>,
}

/// Constructs a Tokio [`relay_system::Runtime`] configured for running [services](relay_system::Service).
Expand Down Expand Up @@ -181,7 +182,11 @@ impl ServiceState {

// Create an address for the `EnvelopeProcessor`, which can be injected into the
// other services.
let (processor, processor_rx) = channel(EnvelopeProcessorService::name());
let (processor, processor_rx) = match config.relay_mode() {
relay_config::RelayMode::Proxy => channel(ProxyProcessorService::name()),
relay_config::RelayMode::Managed => channel(EnvelopeProcessorService::name()),
};

let outcome_producer = services.start(OutcomeProducerService::create(
config.clone(),
upstream_relay.clone(),
Expand Down Expand Up @@ -209,16 +214,6 @@ impl ServiceState {
let project_cache_handle =
ProjectCacheService::new(Arc::clone(&config), project_source).start_in(services);

let aggregator = RouterService::new(
handle.clone(),
config.default_aggregator_config().clone(),
config.secondary_aggregator_configs().clone(),
Some(processor.clone().recipient()),
project_cache_handle.clone(),
);
let aggregator_handle = aggregator.handle();
let aggregator = services.start(aggregator);

let metric_outcomes = MetricOutcomes::new(outcome_aggregator.clone());

#[cfg(feature = "processing")]
Expand All @@ -238,38 +233,11 @@ impl ServiceState {
})
.transpose()?;

let cogs = CogsService::new(&config);
let cogs = Cogs::new(CogsServiceRecorder::new(&config, services.start(cogs)));

#[cfg(feature = "processing")]
let global_rate_limits = redis_clients
.as_ref()
.map(|p| services.start(GlobalRateLimitsService::new(p.quotas.clone())));

let processor_pool = create_processor_pool(&config)?;
services.start_with(
EnvelopeProcessorService::new(
processor_pool.clone(),
config.clone(),
global_config_handle,
project_cache_handle.clone(),
cogs,
#[cfg(feature = "processing")]
redis_clients.clone(),
processor::Addrs {
outcome_aggregator: outcome_aggregator.clone(),
upstream_relay: upstream_relay.clone(),
#[cfg(feature = "processing")]
store_forwarder: store.clone(),
aggregator: aggregator.clone(),
#[cfg(feature = "processing")]
global_rate_limits,
},
metric_outcomes.clone(),
),
processor_rx,
);

let envelope_buffer = PartitionedEnvelopeBuffer::create(
config.spool_partitions(),
config.clone(),
Expand All @@ -281,6 +249,75 @@ impl ServiceState {
services,
);

let (processor_pool, aggregator_handle, autoscaling) = match config.relay_mode() {
relay_config::RelayMode::Proxy => {
services.start_with(
ProxyProcessorService::new(
config.clone(),
project_cache_handle.clone(),
ProxyAddrs {
outcome_aggregator: outcome_aggregator.clone(),
upstream_relay: upstream_relay.clone(),
},
),
processor_rx,
);
(None, None, None)
}
relay_config::RelayMode::Managed => {
let processor_pool = create_processor_pool(&config)?;

let aggregator = RouterService::new(
handle.clone(),
config.default_aggregator_config().clone(),
config.secondary_aggregator_configs().clone(),
Some(processor.clone().recipient()),
project_cache_handle.clone(),
);
let aggregator_handle = aggregator.handle();
let aggregator = services.start(aggregator);

let cogs = CogsService::new(&config);
let cogs = Cogs::new(CogsServiceRecorder::new(&config, services.start(cogs)));

services.start_with(
EnvelopeProcessorService::new(
processor_pool.clone(),
config.clone(),
global_config_handle,
project_cache_handle.clone(),
cogs,
#[cfg(feature = "processing")]
redis_clients.clone(),
processor::Addrs {
outcome_aggregator: outcome_aggregator.clone(),
upstream_relay: upstream_relay.clone(),
#[cfg(feature = "processing")]
store_forwarder: store.clone(),
aggregator: aggregator.clone(),
#[cfg(feature = "processing")]
global_rate_limits,
},
metric_outcomes.clone(),
),
processor_rx,
);

let autoscaling = services.start(AutoscalingMetricService::new(
memory_stat.clone(),
envelope_buffer.clone(),
handle.clone(),
processor_pool.clone(),
));

(
Some(processor_pool),
Some(aggregator_handle),
Some(autoscaling),
)
}
};

let health_check = services.start(HealthCheckService::new(
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
Expand All @@ -289,13 +326,6 @@ impl ServiceState {
envelope_buffer.clone(),
));

let autoscaling = services.start(AutoscalingMetricService::new(
memory_stat.clone(),
envelope_buffer.clone(),
handle.clone(),
processor_pool.clone(),
));

services.start(RelayStats::new(
config.clone(),
handle.clone(),
Expand Down Expand Up @@ -348,8 +378,8 @@ impl ServiceState {
&self.inner.memory_checker
}

pub fn autoscaling(&self) -> &Addr<AutoscalingMetrics> {
&self.inner.registry.autoscaling
pub fn autoscaling(&self) -> Option<&Addr<AutoscalingMetrics>> {
self.inner.registry.autoscaling.as_ref()
}

/// Returns the V2 envelope buffer, if present.
Expand Down
9 changes: 6 additions & 3 deletions relay-server/src/services/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl StatusUpdate {
pub struct HealthCheckService {
config: Arc<Config>,
memory_checker: MemoryChecker,
aggregator: RouterHandle,
aggregator: Option<RouterHandle>,
upstream_relay: Addr<UpstreamRelay>,
envelope_buffer: PartitionedEnvelopeBuffer,
}
Expand All @@ -94,7 +94,7 @@ impl HealthCheckService {
pub fn new(
config: Arc<Config>,
memory_checker: MemoryChecker,
aggregator: RouterHandle,
aggregator: Option<RouterHandle>,
upstream_relay: Addr<UpstreamRelay>,
envelope_buffer: PartitionedEnvelopeBuffer,
) -> Self {
Expand Down Expand Up @@ -145,7 +145,10 @@ impl HealthCheckService {
}

async fn aggregator_probe(&self) -> Status {
Status::from(self.aggregator.can_accept_metrics())
self.aggregator
.as_ref()
.map(|agg| Status::from(agg.can_accept_metrics()))
.unwrap_or(Status::Healthy)
}

async fn spool_health_probe(&self) -> Status {
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub mod outcome;
pub mod outcome_aggregator;
pub mod processor;
pub mod projects;
pub mod proxy_processor;
pub mod relays;
pub mod server;
pub mod stats;
Expand Down
10 changes: 5 additions & 5 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3358,7 +3358,7 @@ impl RateLimiter {
}
}

fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
let envelope_body: Vec<u8> = match http_encoding {
HttpEncoding::Identity => return Ok(body.clone()),
HttpEncoding::Deflate => {
Expand Down Expand Up @@ -3392,10 +3392,10 @@ fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, st
/// An upstream request that submits an envelope via HTTP.
#[derive(Debug)]
pub struct SendEnvelope {
envelope: TypedEnvelope<Processed>,
body: Bytes,
http_encoding: HttpEncoding,
project_cache: ProjectCacheHandle,
pub envelope: TypedEnvelope<Processed>,
pub body: Bytes,
pub http_encoding: HttpEncoding,
pub project_cache: ProjectCacheHandle,
}
Comment on lines 3393 to 3399
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the reader, we discussed what to do here and decided to just make it public for this PR, we can clean up visibility and types in a follow up

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried to move the function (encode_payload) and struct (SendEnvelope) around and not super happy with any of the locations. What we could do though is have a public constructor for the struct rather than have the individual fields public, I think that might be nicer 🤔


impl UpstreamRequest for SendEnvelope {
Expand Down
Loading
Loading