Skip to content

Commit 1e5f760

Browse files
committed
fix: read epoxy config from local replica id
1 parent 1116470 commit 1e5f760

File tree

14 files changed

+55
-58
lines changed

14 files changed

+55
-58
lines changed

packages/common/metrics/src/providers.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
// Based off of https://github.com/tokio-rs/tracing-opentelemetry/blob/v0.1.x/examples/opentelemetry-otlp.rs
22
// Based off of https://github.com/tokio-rs/tracing-opentelemetry/blob/v0.1.x/examples/opentelemetry-otlp.rs
33

4-
use std::sync::{Arc, RwLock, OnceLock};
5-
use opentelemetry::{KeyValue, global};
64
use opentelemetry::trace::{SamplingResult, SpanKind};
5+
use opentelemetry::{KeyValue, global};
76
use opentelemetry_otlp::WithExportConfig;
87
use opentelemetry_sdk::{
98
Resource,
@@ -12,6 +11,7 @@ use opentelemetry_sdk::{
1211
trace::{RandomIdGenerator, Sampler, SdkTracerProvider},
1312
};
1413
use opentelemetry_semantic_conventions::{SCHEMA_URL, attribute::SERVICE_VERSION};
14+
use std::sync::{Arc, OnceLock, RwLock};
1515

1616
/// Dynamic sampler that can be updated at runtime
1717
#[derive(Clone, Debug)]

packages/common/runtime/src/traces.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ pub fn init_tracing_subscriber(otel_providers: &Option<OtelProviderGuard>) {
2626
Some(providers) => {
2727
let tracer = providers.tracer_provider.tracer("tracing-otel-subscriber");
2828

29-
let otel_trace_layer =
30-
OpenTelemetryLayer::new(tracer).with_filter(build_filter_from_env_var("RUST_TRACE"));
29+
let otel_trace_layer = OpenTelemetryLayer::new(tracer)
30+
.with_filter(build_filter_from_env_var("RUST_TRACE"));
3131

3232
let otel_metric_layer = MetricsLayer::new(providers.meter_provider.clone())
3333
.with_filter(build_filter_from_env_var("RUST_TRACE"));

packages/core/api-peer/src/internal.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ pub async fn bump_serverless_autoscaler(
4747
}
4848

4949
#[derive(Serialize, Deserialize)]
50+
#[serde(deny_unknown_fields)]
5051
pub struct SetTracingConfigRequest {
5152
#[serde(default, skip_serializing_if = "Option::is_none")]
5253
pub filter: Option<Option<String>>,
@@ -55,6 +56,7 @@ pub struct SetTracingConfigRequest {
5556
}
5657

5758
#[derive(Serialize)]
59+
#[serde(deny_unknown_fields)]
5860
pub struct SetTracingConfigResponse {}
5961

6062
#[tracing::instrument(skip_all)]

packages/core/api-public/src/actors/get_or_create.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use axum::{
44
response::{IntoResponse, Response},
55
};
66
use rivet_api_builder::{
7-
extract::{Extension, Json, Query},
87
ApiError,
8+
extract::{Extension, Json, Query},
99
};
1010
use rivet_types::actors::CrashPolicy;
1111
use rivet_util::Id;

packages/core/api-public/src/actors/utils.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,7 @@ use std::collections::HashMap;
1010
/// Helper function to fetch an actor by ID, automatically routing to the correct datacenter
1111
/// based on the actor ID's label.
1212
#[tracing::instrument(skip_all)]
13-
pub async fn fetch_actor_by_id(
14-
ctx: &ApiCtx,
15-
actor_id: Id,
16-
namespace: String,
17-
) -> Result<Actor> {
13+
pub async fn fetch_actor_by_id(ctx: &ApiCtx, actor_id: Id, namespace: String) -> Result<Actor> {
1814
let list_query = rivet_api_types::actors::list::ListQuery {
1915
namespace,
2016
actor_ids: Some(actor_id.to_string()),

packages/core/api-public/src/namespaces.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,7 @@ use crate::ctx::ApiCtx;
2121
security(("bearer_auth" = [])),
2222
)]
2323
#[tracing::instrument(skip_all)]
24-
pub async fn list(
25-
Extension(ctx): Extension<ApiCtx>,
26-
Query(query): Query<ListQuery>,
27-
) -> Response {
24+
pub async fn list(Extension(ctx): Extension<ApiCtx>, Query(query): Query<ListQuery>) -> Response {
2825
match list_inner(ctx, query).await {
2926
Ok(response) => Json(response).into_response(),
3027
Err(err) => ApiError::from(err).into_response(),

packages/core/api-public/src/runners.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,7 @@ use crate::ctx::ApiCtx;
2222
security(("bearer_auth" = [])),
2323
)]
2424
#[tracing::instrument(skip_all)]
25-
pub async fn list(
26-
Extension(ctx): Extension<ApiCtx>,
27-
Query(query): Query<ListQuery>,
28-
) -> Response {
25+
pub async fn list(Extension(ctx): Extension<ApiCtx>, Query(query): Query<ListQuery>) -> Response {
2926
match list_inner(ctx, query).await {
3027
Ok(response) => Json(response).into_response(),
3128
Err(err) => ApiError::from(err).into_response(),

packages/infra/engine/src/run_config.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@ pub fn config(_rivet_config: rivet_config::Config) -> Result<RunConfigData> {
2222
ServiceKind::Standalone,
2323
|config, pools| Box::pin(pegboard_serverless::start(config, pools)),
2424
),
25-
Service::new("tracing_reconfigure", ServiceKind::Standalone, |config, pools| {
26-
Box::pin(rivet_tracing_reconfigure::start(config, pools))
27-
}),
25+
Service::new(
26+
"tracing_reconfigure",
27+
ServiceKind::Standalone,
28+
|config, pools| Box::pin(rivet_tracing_reconfigure::start(config, pools)),
29+
),
2830
];
2931

3032
Ok(RunConfigData { services })

packages/services/epoxy/src/http_client.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use epoxy_protocol::{
55
versioned,
66
};
77
use futures_util::{StreamExt, stream::FuturesUnordered};
8+
use gas::prelude::*;
89
use rivet_api_builder::ApiCtx;
910
use std::future::Future;
1011
use vbare::OwnedVersionedData;
@@ -24,6 +25,7 @@ fn find_replica_address(
2425
.map(|r| r.api_peer_url.clone())
2526
}
2627

28+
#[tracing::instrument(skip_all, fields(%from_replica_id, ?replica_ids, ?quorum_type))]
2729
pub async fn fanout_to_replicas<F, Fut, T>(
2830
from_replica_id: ReplicaId,
2931
replica_ids: &[ReplicaId],
@@ -93,6 +95,7 @@ where
9395
Ok(successful_responses)
9496
}
9597

98+
#[tracing::instrument(skip_all)]
9699
pub async fn send_message(
97100
ctx: &ApiCtx,
98101
config: &protocol::ClusterConfig,
@@ -102,6 +105,7 @@ pub async fn send_message(
102105
send_message_to_address(ctx, replica_url, request).await
103106
}
104107

108+
#[tracing::instrument(skip_all, fields(%replica_url))]
105109
pub async fn send_message_to_address(
106110
ctx: &ApiCtx,
107111
replica_url: String,
@@ -116,8 +120,7 @@ pub async fn send_message_to_address(
116120
"sending message to replica directly"
117121
);
118122

119-
return crate::replica::message_request::message_request(&ctx, from_replica_id, request)
120-
.await;
123+
return crate::replica::message_request::message_request(&ctx, request).await;
121124
}
122125

123126
let mut replica_url = url::Url::parse(&replica_url)?;
@@ -139,6 +142,7 @@ pub async fn send_message_to_address(
139142
.post(replica_url.to_string())
140143
.body(request.serialize()?)
141144
.send()
145+
.custom_instrument(tracing::info_span!("http_request"))
142146
.await;
143147

144148
let response = match response_result {

packages/services/epoxy/src/http_routes.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ pub async fn message(ctx: ApiCtx, path: VersionedPath, _query: (), body: Bytes)
2828
);
2929

3030
// Process message directly using ops
31-
let response =
32-
crate::replica::message_request::message_request(&ctx, current_replica_id, request).await?;
31+
let response = crate::replica::message_request::message_request(&ctx, request).await?;
3332

3433
versioned::Response::latest(response).serialize(path.version)
3534
}

0 commit comments

Comments
 (0)