Skip to content

Commit f10e44c

Browse files
authored
fix: Integration tests fixes (#2161)
Co-authored-by: Keiven Chang <[email protected]>
1 parent 7e3b3fa commit f10e44c

File tree

5 files changed

+109
-67
lines changed

5 files changed

+109
-67
lines changed

lib/runtime/src/component/component.rs

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -86,27 +86,17 @@ mod tests {
8686
// todo - make a distributed runtime fixture
8787
// todo - two options - fully mocked or integration test
8888
#[tokio::test]
89-
async fn test_publish() {
89+
async fn test_publish_and_subscribe() {
9090
let rt = Runtime::from_current().unwrap();
9191
let dtr = DistributedRuntime::from_settings(rt.clone()).await.unwrap();
92-
let ns = dtr.namespace("test".to_string()).unwrap();
93-
let cp = ns.component("component".to_string()).unwrap();
94-
cp.publish("test", &"test".to_string()).await.unwrap();
95-
rt.shutdown();
96-
}
97-
98-
#[tokio::test]
99-
async fn test_subscribe() {
100-
let rt = Runtime::from_current().unwrap();
101-
let dtr = DistributedRuntime::from_settings(rt.clone()).await.unwrap();
102-
let ns = dtr.namespace("test".to_string()).unwrap();
103-
let cp = ns.component("component".to_string()).unwrap();
92+
let ns = dtr.namespace("test_component".to_string()).unwrap();
93+
let cp = ns.component("test_component".to_string()).unwrap();
10494

105-
// Create a subscriber
106-
let mut subscriber = ns.subscribe("test").await.unwrap();
95+
// Create a subscriber on the component
96+
let mut subscriber = cp.subscribe("test_event").await.unwrap();
10797

108-
// Publish a message
109-
cp.publish("test", &"test_message".to_string())
98+
// Publish a message from the component
99+
cp.publish("test_event", &"test_message".to_string())
110100
.await
111101
.unwrap();
112102

lib/runtime/src/component/namespace.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,22 +99,24 @@ mod tests {
9999
async fn test_publish() {
100100
let rt = Runtime::from_current().unwrap();
101101
let dtr = DistributedRuntime::from_settings(rt.clone()).await.unwrap();
102-
let ns = dtr.namespace("test".to_string()).unwrap();
103-
ns.publish("test", &"test".to_string()).await.unwrap();
102+
let ns = dtr.namespace("test_namespace_publish".to_string()).unwrap();
103+
ns.publish("test_event", &"test".to_string()).await.unwrap();
104104
rt.shutdown();
105105
}
106106

107107
#[tokio::test]
108108
async fn test_subscribe() {
109109
let rt = Runtime::from_current().unwrap();
110110
let dtr = DistributedRuntime::from_settings(rt.clone()).await.unwrap();
111-
let ns = dtr.namespace("test".to_string()).unwrap();
111+
let ns = dtr
112+
.namespace("test_namespace_subscribe".to_string())
113+
.unwrap();
112114

113115
// Create a subscriber
114-
let mut subscriber = ns.subscribe("test").await.unwrap();
116+
let mut subscriber = ns.subscribe("test_event").await.unwrap();
115117

116118
// Publish a message
117-
ns.publish("test", &"test_message".to_string())
119+
ns.publish("test_event", &"test_message".to_string())
118120
.await
119121
.unwrap();
120122

lib/runtime/src/http_server.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ impl crate::traits::DistributedRuntimeProvider for HttpMetricsRegistry {
7777

7878
impl MetricsRegistry for HttpMetricsRegistry {
7979
fn basename(&self) -> String {
80-
"http_server".to_string()
80+
"dynamo".to_string()
8181
}
8282

8383
fn parent_hierarchy(&self) -> Vec<String> {
@@ -100,7 +100,7 @@ impl HttpServerState {
100100
// Note: This metric is created at the DRT level (no namespace), so we manually add "dynamo_" prefix
101101
// to maintain consistency with the project's metric naming convention
102102
let uptime_gauge = http_metrics_registry.as_ref().create_gauge(
103-
"dynamo_uptime_seconds",
103+
"system_uptime_seconds",
104104
"Total uptime of the DistributedRuntime in seconds",
105105
&[],
106106
)?;
@@ -368,9 +368,9 @@ mod tests {
368368
println!("Full metrics response:\n{}", response);
369369

370370
let expected = "\
371-
# HELP dynamo_uptime_seconds Total uptime of the DistributedRuntime in seconds
372-
# TYPE dynamo_uptime_seconds gauge
373-
dynamo_uptime_seconds{namespace=\"http_server\"} 42
371+
# HELP dynamo_system_uptime_seconds Total uptime of the DistributedRuntime in seconds
372+
# TYPE dynamo_system_uptime_seconds gauge
373+
dynamo_system_uptime_seconds{namespace=\"dynamo\"} 42
374374
";
375375
assert_eq!(response, expected);
376376
}

lib/runtime/src/metrics.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -797,7 +797,7 @@ mod test_prefixes {
797797
println!("\n=== Testing Invalid Namespace Behavior ===");
798798

799799
// Create a namespace with invalid name (contains hyphen)
800-
let invalid_namespace = drt.namespace("test-namespace").unwrap();
800+
let invalid_namespace = drt.namespace("@@123").unwrap();
801801

802802
// Debug: Let's see what the hierarchy looks like
803803
println!(
@@ -810,15 +810,15 @@ mod test_prefixes {
810810
);
811811
println!("Invalid namespace prefix: '{}'", invalid_namespace.prefix());
812812

813-
// Try to create a metric - this should fail because the namespace name will be used in the metric name
813+
// Try to create a metric - this should fail because "@@123" gets stripped to "" which is invalid
814814
let result = invalid_namespace.create_counter("test_counter", "A test counter", &[]);
815-
println!("Result with invalid namespace 'test-namespace':");
815+
println!("Result with invalid namespace '@@123':");
816816
println!("{:?}", result);
817817

818-
// The result should be an error from Prometheus
818+
// The result should be an error because empty metric names are invalid
819819
assert!(
820820
result.is_err(),
821-
"Creating metric with invalid namespace should fail"
821+
"Creating metric with namespace '@@123' should fail because it gets stripped to empty string"
822822
);
823823

824824
// For comparison, show a valid namespace works
@@ -926,15 +926,15 @@ testnamespace_testgauge{{component="testcomponent",namespace="testnamespace"}} 5
926926
println!("{}", namespace_output);
927927

928928
let expected_namespace_output = format!(
929-
r#"# HELP testintcounter A test int counter
930-
# TYPE testintcounter counter
931-
testintcounter{{namespace="testnamespace"}} 12345
932-
# HELP testnamespace_testcounter A test counter
929+
r#"# HELP testnamespace_testcounter A test counter
933930
# TYPE testnamespace_testcounter counter
934931
testnamespace_testcounter{{component="testcomponent",endpoint="testendpoint",namespace="testnamespace"}} 123.456789
935932
# HELP testnamespace_testgauge A test gauge
936933
# TYPE testnamespace_testgauge gauge
937934
testnamespace_testgauge{{component="testcomponent",namespace="testnamespace"}} 50000
935+
# HELP testnamespace_testintcounter A test int counter
936+
# TYPE testnamespace_testintcounter counter
937+
testnamespace_testintcounter{{namespace="testnamespace"}} 12345
938938
"#
939939
);
940940

@@ -1015,9 +1015,6 @@ testhistogram_bucket{{le="10"}} 3
10151015
testhistogram_bucket{{le="+Inf"}} 3
10161016
testhistogram_sum 7.5
10171017
testhistogram_count 3
1018-
# HELP testintcounter A test int counter
1019-
# TYPE testintcounter counter
1020-
testintcounter{{namespace="testnamespace"}} 12345
10211018
# HELP testintgauge A test int gauge
10221019
# TYPE testintgauge gauge
10231020
testintgauge 42
@@ -1031,6 +1028,9 @@ testnamespace_testcounter{{component="testcomponent",endpoint="testendpoint",nam
10311028
# HELP testnamespace_testgauge A test gauge
10321029
# TYPE testnamespace_testgauge gauge
10331030
testnamespace_testgauge{{component="testcomponent",namespace="testnamespace"}} 50000
1031+
# HELP testnamespace_testintcounter A test int counter
1032+
# TYPE testnamespace_testintcounter counter
1033+
testnamespace_testintcounter{{namespace="testnamespace"}} 12345
10341034
"#
10351035
);
10361036

lib/runtime/tests/soak.rs

Lines changed: 78 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,17 @@
1313
// See the License for the specific language governing permissions and
1414
// limitations under the License.
1515

16+
// cargo test --test soak integration::main --features integration
17+
//!
18+
//! It will send a batch of requests to the runtime and measure the throughput.
19+
//!
20+
//! It will also measure the latency of the requests.
21+
//!
22+
//! A reasonable soak test configuration to start off is 1 minute duration with 10000 batch load:
23+
//! export DYN_QUEUED_UP_PROCESSING=true
24+
//! export DYN_SOAK_BATCH_LOAD=10000
25+
//! export DYN_SOAK_RUN_DURATION=60s
26+
//! cargo test --test soak integration::main --features integration -- --nocapture
1627
#[cfg(feature = "integration")]
1728
mod integration {
1829

@@ -22,13 +33,17 @@ mod integration {
2233
logging,
2334
pipeline::{
2435
async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut,
25-
ResponseStream, SingleIn,
36+
PushRouter, ResponseStream, SingleIn,
2637
},
2738
protocols::annotated::Annotated,
28-
DistributedRuntime, ErrorContext, Result, Runtime, Worker,
39+
stream, DistributedRuntime, ErrorContext, Result, Runtime, Worker,
2940
};
3041
use futures::StreamExt;
31-
use std::{sync::Arc, time::Duration};
42+
use std::{
43+
sync::atomic::{AtomicU64, Ordering},
44+
sync::Arc,
45+
time::Duration,
46+
};
3247
use tokio::time::Instant;
3348

3449
#[test]
@@ -45,16 +60,29 @@ mod integration {
4560

4661
client.await??;
4762
distributed.shutdown();
48-
server.await??;
63+
let handler = server.await??;
64+
65+
// Print final backend counter value
66+
let final_count = handler.backend_counter.load(Ordering::Relaxed);
67+
println!(
68+
"Final RequestHandler backend_counter: {} requests processed",
69+
final_count
70+
);
4971

5072
Ok(())
5173
}
5274

53-
struct RequestHandler {}
75+
struct RequestHandler {
76+
backend_counter: AtomicU64,
77+
queued_up_processing: bool,
78+
}
5479

5580
impl RequestHandler {
56-
fn new() -> Arc<Self> {
57-
Arc::new(Self {})
81+
fn new(queued_up_processing: bool) -> Arc<Self> {
82+
Arc::new(Self {
83+
backend_counter: AtomicU64::new(0),
84+
queued_up_processing,
85+
})
5886
}
5987
}
6088

@@ -63,25 +91,40 @@ mod integration {
6391
async fn generate(&self, input: SingleIn<String>) -> Result<ManyOut<Annotated<String>>> {
6492
let (data, ctx) = input.into_parts();
6593

94+
// Increment backend counter
95+
self.backend_counter.fetch_add(1, Ordering::Relaxed);
96+
6697
let chars = data
6798
.chars()
6899
.map(|c| Annotated::from_data(c.to_string()))
69100
.collect::<Vec<_>>();
70101

71-
let stream = async_stream::stream! {
72-
for c in chars {
73-
yield c;
74-
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
75-
}
76-
};
77-
78-
Ok(ResponseStream::new(Box::pin(stream), ctx.context()))
102+
if self.queued_up_processing {
103+
// queued up processing - delayed response to saturate the queue
104+
let async_stream = async_stream::stream! {
105+
for c in chars {
106+
yield c;
107+
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
108+
}
109+
};
110+
Ok(ResponseStream::new(Box::pin(async_stream), ctx.context()))
111+
} else {
112+
// normal processing - immediate response
113+
let iter_stream = stream::iter(chars);
114+
Ok(ResponseStream::new(Box::pin(iter_stream), ctx.context()))
115+
}
79116
}
80117
}
81118

82-
async fn backend(runtime: DistributedRuntime) -> Result<()> {
119+
async fn backend(runtime: DistributedRuntime) -> Result<Arc<RequestHandler>> {
120+
// get the queued up processing setting from env (not delayed)
121+
let queued_up_processing =
122+
std::env::var("DYN_QUEUED_UP_PROCESSING").unwrap_or("false".to_string());
123+
let queued_up_processing: bool = queued_up_processing.parse().unwrap_or(false);
124+
83125
// attach an ingress to an engine
84-
let ingress = Ingress::for_engine(RequestHandler::new())?;
126+
let handler = RequestHandler::new(queued_up_processing);
127+
let ingress = Ingress::for_engine(handler.clone())?;
85128

86129
// // make the ingress discoverable via a component service
87130
// // we must first create a service, then we can attach one more more endpoints
@@ -95,39 +138,44 @@ mod integration {
95138
.endpoint_builder()
96139
.handler(ingress)
97140
.start()
98-
.await
141+
.await?;
142+
143+
Ok(handler)
99144
}
100145

101146
async fn client(runtime: DistributedRuntime) -> Result<()> {
102147
// get the run duration from env
103-
let run_duration = std::env::var("DYN_SOAK_RUN_DURATION").unwrap_or("1m".to_string());
148+
let run_duration = std::env::var("DYN_SOAK_RUN_DURATION").unwrap_or("3s".to_string());
104149
let run_duration =
105-
humantime::parse_duration(&run_duration).unwrap_or(Duration::from_secs(60));
150+
humantime::parse_duration(&run_duration).unwrap_or(Duration::from_secs(3));
106151

107-
let batch_load = std::env::var("DYN_SOAK_BATCH_LOAD").unwrap_or("10000".to_string());
108-
let batch_load: usize = batch_load.parse().unwrap_or(10000);
152+
let batch_load = std::env::var("DYN_SOAK_BATCH_LOAD").unwrap_or("100".to_string());
153+
let batch_load: usize = batch_load.parse().unwrap_or(100);
109154

110155
let client = runtime
111156
.namespace(DEFAULT_NAMESPACE)?
112157
.component("backend")?
113158
.endpoint("generate")
114-
.client::<String, Annotated<String>>()
159+
.client()
115160
.await?;
116161

117162
client.wait_for_instances().await?;
118-
let client = Arc::new(client);
163+
let router =
164+
PushRouter::<String, Annotated<String>>::from_client(client, Default::default())
165+
.await?;
166+
let router = Arc::new(router);
119167

120168
let start = Instant::now();
121169
let mut count = 0;
122170

123171
loop {
124172
let mut tasks = Vec::new();
125173
for _ in 0..batch_load {
126-
let client = client.clone();
174+
let router = router.clone();
127175
tasks.push(tokio::spawn(async move {
128176
let mut stream = tokio::time::timeout(
129-
Duration::from_secs(30),
130-
client.random("hello world".to_string().into()),
177+
Duration::from_secs(5),
178+
router.random("hello world".to_string().into()),
131179
)
132180
.await
133181
.context("request timed out")??;
@@ -147,7 +195,9 @@ mod integration {
147195

148196
let elapsed = start.elapsed();
149197
count += batch_load;
150-
println!("elapsed: {:?}; count: {}", elapsed, count);
198+
if count % 1000 == 0 {
199+
println!("elapsed: {:?}; count: {}", elapsed, count);
200+
}
151201

152202
if elapsed > run_duration {
153203
println!("done");

0 commit comments

Comments
 (0)