Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions lib/runtime/src/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,3 +355,20 @@ impl DistributedConfig {
config
}
}

#[cfg(test)]
pub mod test_helpers {
//! Common test helper functions for DistributedRuntime tests
// TODO: Use in-memory DistributedRuntime for tests instead of full runtime when available.

/// Helper function to create a DRT instance for tests
/// Uses from_current to leverage existing tokio runtime
/// Note: Settings are read from environment variables inside DistributedRuntime::from_settings_without_discovery
#[cfg(feature = "integration")]
pub async fn create_test_drt_async() -> crate::DistributedRuntime {
let rt = crate::Runtime::from_current().unwrap();
crate::DistributedRuntime::from_settings_without_discovery(rt)
.await
.unwrap()
}
}
23 changes: 8 additions & 15 deletions lib/runtime/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,15 +578,6 @@ mod test_helpers {
use super::prometheus_names::{nats_client, nats_service};
use super::*;

/// Helper function to create a DRT instance for testing in async contexts
#[cfg(feature = "integration")]
pub async fn create_test_drt_async() -> crate::DistributedRuntime {
let rt = crate::Runtime::from_current().unwrap();
crate::DistributedRuntime::from_settings_without_discovery(rt)
.await
.unwrap()
}

/// Base function to filter Prometheus output lines based on a predicate.
/// Returns lines that match the predicate, converted to String.
fn filter_prometheus_lines<F>(input: &str, mut predicate: F) -> Vec<String>
Expand Down Expand Up @@ -822,7 +813,6 @@ mod test_metricsregistry_units {
println!("✓ Prometheus metric parsing works correctly!");
}

#[cfg(feature = "integration")]
#[test]
fn test_metrics_registry_entry_callbacks() {
use crate::MetricsRegistryEntry;
Expand Down Expand Up @@ -923,11 +913,12 @@ mod test_metricsregistry_units {
#[cfg(test)]
mod test_metricsregistry_prefixes {
use super::*;
use crate::distributed::test_helpers::create_test_drt_async;
use prometheus::core::Collector;

#[tokio::test]
async fn test_hierarchical_prefixes_and_parent_hierarchies() {
let drt = super::test_helpers::create_test_drt_async().await;
let drt = create_test_drt_async().await;

const DRT_NAME: &str = "";
const NAMESPACE_NAME: &str = "ns901";
Expand Down Expand Up @@ -1002,7 +993,7 @@ mod test_metricsregistry_prefixes {
#[tokio::test]
async fn test_recursive_namespace() {
// Create a distributed runtime for testing
let drt = super::test_helpers::create_test_drt_async().await;
let drt = create_test_drt_async().await;

// Create a deeply chained namespace: ns1.ns2.ns3
let ns1 = drt.namespace("ns1").unwrap();
Expand Down Expand Up @@ -1054,13 +1045,14 @@ mod test_metricsregistry_prometheus_fmt_outputs {
use super::prometheus_names::{nats_client, nats_service};
use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS};
use super::*;
use crate::distributed::test_helpers::create_test_drt_async;
use prometheus::Counter;
use std::sync::Arc;

#[tokio::test]
async fn test_prometheusfactory_using_metrics_registry_trait() {
// Setup real DRT and registry using the test-friendly constructor
let drt = super::test_helpers::create_test_drt_async().await;
let drt = create_test_drt_async().await;

// Use a simple constant namespace name
let namespace_name = "ns345";
Expand Down Expand Up @@ -1312,13 +1304,14 @@ mod test_metricsregistry_nats {
use super::prometheus_names::{nats_client, nats_service};
use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS};
use super::*;
use crate::distributed::test_helpers::create_test_drt_async;
use crate::pipeline::PushRouter;
use crate::{DistributedRuntime, Runtime};
use tokio::time::{sleep, Duration};
#[tokio::test]
async fn test_drt_nats_metrics() {
// Setup real DRT and registry using the test-friendly constructor
let drt = super::test_helpers::create_test_drt_async().await;
let drt = create_test_drt_async().await;

// Get DRT output which should include NATS client metrics
let drt_output = drt.prometheus_metrics_fmt().unwrap();
Expand Down Expand Up @@ -1382,7 +1375,7 @@ mod test_metricsregistry_nats {
// the values of the metrics.

// Setup real DRT and registry using the test-friendly constructor
let drt = super::test_helpers::create_test_drt_async().await;
let drt = create_test_drt_async().await;

// Create a namespace and components from the DRT
let namespace = drt.namespace("ns789").unwrap();
Expand Down
87 changes: 19 additions & 68 deletions lib/runtime/src/system_status_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,14 @@

use crate::config::HealthStatus;
use crate::logging::make_request_span;
use crate::logging::TraceParent;
use crate::metrics::MetricsRegistry;
use crate::traits::DistributedRuntimeProvider;
use axum::{body, http::StatusCode, response::IntoResponse, routing::get, Router};
use axum::{http::StatusCode, response::IntoResponse, routing::get, Router};
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::OnceLock;
use std::sync::{Arc, OnceLock};
use std::time::Instant;
use tokio::{net::TcpListener, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use tower_http::trace::DefaultMakeSpan;
use tower_http::trace::TraceLayer;

/// System status server information containing socket address and handle
Expand Down Expand Up @@ -296,61 +292,10 @@ async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
}

// Regular tests: cargo test system_status_server --lib
// Integration tests: cargo test system_status_server --lib --features integration

#[cfg(test)]
/// Helper function to create a DRT instance for basic unit tests
/// Uses from_current to leverage existing tokio runtime without environment configuration
async fn create_test_drt_async() -> crate::DistributedRuntime {
let rt = crate::Runtime::from_current().unwrap();
crate::DistributedRuntime::from_settings_without_discovery(rt)
.await
.unwrap()
}

#[cfg(test)]
/// Helper function to create a DRT instance for integration tests
/// Uses spawn_blocking to create runtime safely without ownership issues
/// Enables system status server for integration testing
/// Note: This function uses environment variables to configure and create the DistributedRuntime.
async fn create_test_drt_with_settings_async() -> crate::DistributedRuntime {
// Create runtime in blocking context where it can be safely dropped
let handle = tokio::task::spawn_blocking(|| {
// Load configuration from environment/settings
let config = crate::config::RuntimeConfig::from_settings().unwrap();

// Create runtime with the configuration and extract handle
let runtime = config.create_runtime().unwrap();
let handle = runtime.handle().clone();

// Runtime will be automatically dropped when it goes out of scope
handle
})
.await
.unwrap();

// Create Runtime using external handle (no ownership)
let rt = crate::Runtime::from_handle(handle).unwrap();
crate::DistributedRuntime::from_settings_without_discovery(rt)
.await
.unwrap()
}

#[cfg(test)]
mod tests {
use super::*;
use crate::logging::tests::load_log;
use crate::metrics::MetricsRegistry;
use anyhow::{anyhow, Result};
use chrono::{DateTime, Utc};
use jsonschema::{Draft, JSONSchema};
use rstest::rstest;
use serde_json::Value;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::sync::Arc;
use stdio_override::*;
use tokio::time::{sleep, Duration};
use tokio::time::Duration;

// This is a basic test to verify the HTTP server is working before testing other more complicated tests
#[tokio::test]
Expand Down Expand Up @@ -381,8 +326,19 @@ mod tests {
"HTTP server should shut down when cancel token is cancelled"
);
}
}

// Integration tests: cargo test system_status_server --lib --features integration
#[cfg(all(test, feature = "integration"))]
mod integration_tests {
use super::*;
use crate::distributed::test_helpers::create_test_drt_async;
use crate::metrics::MetricsRegistry;
use anyhow::Result;
use rstest::rstest;
use std::sync::Arc;
use tokio::time::Duration;

#[cfg(feature = "integration")]
#[tokio::test]
async fn test_uptime_without_initialization() {
// Test that uptime returns an error if start time is not initialized
Expand All @@ -398,7 +354,6 @@ mod tests {
.await;
}

#[cfg(feature = "integration")]
#[tokio::test]
async fn test_runtime_metrics_initialization_and_namespace() {
// Test that metrics have correct namespace
Expand Down Expand Up @@ -439,7 +394,6 @@ mod tests {
.await;
}

#[cfg(feature = "integration")]
#[tokio::test]
async fn test_start_time_initialization() {
// Test that start time can only be initialized once
Expand Down Expand Up @@ -516,7 +470,7 @@ mod tests {
("DYN_SYSTEM_LIVE_PATH", custom_live_path),
],
(async || {
let drt = Arc::new(create_test_drt_with_settings_async().await);
let drt = Arc::new(create_test_drt_async().await);

// Get system status server info from DRT (instead of manually spawning)
let system_info = drt
Expand Down Expand Up @@ -575,7 +529,6 @@ mod tests {
}

#[tokio::test]
#[cfg(feature = "integration")]
async fn test_health_endpoint_tracing() -> Result<()> {
use std::sync::Arc;

Expand All @@ -596,7 +549,7 @@ mod tests {

crate::logging::init();

let drt = Arc::new(create_test_drt_with_settings_async().await);
let drt = Arc::new(create_test_drt_async().await);

// Get system status server info from DRT (instead of manually spawning)
let system_info = drt
Expand Down Expand Up @@ -631,7 +584,6 @@ mod tests {
Ok(())
}

#[cfg(feature = "integration")]
#[tokio::test]
async fn test_health_endpoint_with_changing_health_status() {
// Test health endpoint starts in not ready status, then becomes ready
Expand All @@ -646,7 +598,7 @@ mod tests {
("DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Some(ENDPOINT_HEALTH_CONFIG)),
],
async {
let drt = Arc::new(create_test_drt_with_settings_async().await);
let drt = Arc::new(create_test_drt_async().await);

// Check if system status server was started
let system_info_opt = drt.system_status_server_info();
Expand Down Expand Up @@ -745,7 +697,6 @@ mod tests {
.await;
}

#[cfg(feature = "integration")]
#[tokio::test]
async fn test_spawn_system_status_server_endpoints() {
// use reqwest for HTTP requests
Expand All @@ -756,7 +707,7 @@ mod tests {
("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready")),
],
async {
let drt = Arc::new(create_test_drt_with_settings_async().await);
let drt = Arc::new(create_test_drt_async().await);

// Get system status server info from DRT (instead of manually spawning)
let system_info = drt
Expand Down
Loading