diff --git a/lib/runtime/examples/local_client_demo.rs b/lib/runtime/examples/local_client_demo.rs new file mode 100644 index 0000000000..4f6747a8ff --- /dev/null +++ b/lib/runtime/examples/local_client_demo.rs @@ -0,0 +1,118 @@ +// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Example demonstrating LocalClient functionality + +use dynamo_runtime::component::LocalClient; +use dynamo_runtime::engine::{AsyncEngine, AsyncEngineContextProvider, async_trait}; +use dynamo_runtime::pipeline::network::Ingress; +use dynamo_runtime::pipeline::{Context, ManyOut, ResponseStream, SingleIn}; +use dynamo_runtime::protocols::annotated::Annotated; +use dynamo_runtime::{DistributedRuntime, Runtime, distributed::DistributedConfig}; +use futures::StreamExt; +use std::sync::Arc; + +/// Simple test engine that echoes strings +struct SimpleEchoEngine; + +#[async_trait] +impl AsyncEngine, ManyOut>, anyhow::Error> for SimpleEchoEngine { + async fn generate( + &self, + request: SingleIn, + ) -> Result>, anyhow::Error> { + println!("Engine received: {}", *request); + + let response = Annotated { + data: Some(format!("Echo: {}", *request)), + id: None, + event: None, + comment: None, + }; + + let context = request.context(); + + // Create a simple stream that yields the response once + let stream = futures::stream::once(async move { response }); + Ok(ResponseStream::new(Box::pin(stream), context)) + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Initialize tracing + tracing_subscriber::fmt::init(); + + println!("=== LocalClient Demo ===\n"); + + // Create runtime and DRT + println!("1. Creating runtime..."); + let runtime = Runtime::from_current()?; + + let config = DistributedConfig { + etcd_config: Default::default(), + nats_config: Default::default(), + is_static: true, + }; + + let drt = DistributedRuntime::new(runtime, config).await?; + println!(" ✓ Runtime created\n"); + + // Create namespace, component, and endpoint + println!("2. Creating endpoint structure..."); + let namespace = drt.namespace("demo")?; + let component = namespace.component("echo-service")?; + let service = component.service_builder().create().await?; + let endpoint = service.endpoint("echo"); + println!(" ✓ Created endpoint: demo/echo-service/echo\n"); + + // Create and register an engine + println!("3. Creating and registering engine..."); + let engine: Arc, ManyOut>, anyhow::Error>> = + Arc::new(SimpleEchoEngine); + + // Wrap the engine in an Ingress to make it a PushWorkHandler + let ingress = Ingress::for_engine(engine)?; + + // Create the endpoint instance with the ingress as handler (setup phase) + let _endpoint_instance = endpoint + .endpoint_builder() + .handler(ingress) + .create() + .await?; + println!(" ✓ Engine registered automatically during endpoint creation\n"); + + // Create a LocalClient using the endpoint's convenience method + println!("4. Creating LocalClient..."); + let local_client: LocalClient, ManyOut>, anyhow::Error> = + endpoint.local_client().await?; + println!(" ✓ LocalClient created successfully\n"); + + // Demonstrate local client usage + println!("5. Testing LocalClient invocation..."); + println!(" (This bypasses all network layers and invokes the engine directly)"); + + // Create a request with context + let request = Context::new("Hello from LocalClient!".to_string()); + + // Generate response using the local client + let mut response_stream = local_client.generate(request).await?; + let response = response_stream.next().await.expect("Expected response"); + + println!(" Request: Hello from LocalClient!"); + if let Some(data) = &response.data { + println!(" Response: {}", data); + } + println!(); + + // Show the benefits + println!("6. LocalClient Benefits:"); + println!(" ✓ No network overhead"); + println!(" ✓ No etcd watching required"); + println!(" ✓ No instance discovery needed"); + println!(" ✓ Direct in-process engine invocation"); + println!(" ✓ Perfect for testing and local development\n"); + + println!("Demo completed successfully!"); + Ok(()) +} diff --git a/lib/runtime/src/component.rs b/lib/runtime/src/component.rs index 1129313d01..9471fd33d3 100644 --- a/lib/runtime/src/component.rs +++ b/lib/runtime/src/component.rs @@ -64,11 +64,13 @@ mod client; #[allow(clippy::module_inception)] mod component; mod endpoint; +mod local_client; mod namespace; mod registry; pub mod service; pub use client::{Client, InstanceSource}; +pub use local_client::LocalClient; /// The root etcd path where each instance registers itself in etcd. /// An instance is namespace+component+endpoint+lease_id and must be unique. @@ -87,6 +89,8 @@ pub enum TransportType { pub struct RegistryInner { services: HashMap, stats_handlers: HashMap>>>, + /// Service-level flag to enable local registry for endpoints + pub(crate) service_enable_local_registry: HashMap, } #[derive(Clone)] @@ -140,6 +144,10 @@ pub struct Component { // A static component's endpoints cannot be discovered via etcd, they are // fixed at startup time. is_static: bool, + + /// Flag to enable local engine registry for endpoints + #[builder(default = "true")] + pub(crate) enable_local_registry: bool, } impl Hash for Component { @@ -429,6 +437,18 @@ impl Endpoint { &self.component } + /// Create a local client for this endpoint + pub async fn local_client( + &self, + ) -> Result> + where + Req: crate::engine::Data, + Resp: crate::engine::StreamData + crate::engine::AsyncEngineContextProvider, + E: crate::engine::Data, + { + local_client::LocalClient::from_endpoint(self).await + } + // todo(ryan): deprecate this as we move to Discovery traits and Component Identifiers pub fn path(&self) -> String { format!( diff --git a/lib/runtime/src/component/endpoint.rs b/lib/runtime/src/component/endpoint.rs index b79a4676cb..3c1b8f0752 100644 --- a/lib/runtime/src/component/endpoint.rs +++ b/lib/runtime/src/component/endpoint.rs @@ -5,154 +5,136 @@ use derive_getters::Dissolve; use tokio_util::sync::CancellationToken; use super::*; +use crate::transports::etcd; -pub use async_nats::service::endpoint::Stats as EndpointStats; +/// Extract the full namespace hierarchy from a Namespace +fn get_namespace_hierarchy(namespace: &super::Namespace) -> Vec { + let mut segments = Vec::new(); + let mut current: Option<&super::Namespace> = Some(namespace); -#[derive(Educe, Builder, Dissolve)] -#[educe(Debug)] -#[builder(pattern = "owned", build_fn(private, name = "build_internal"))] -pub struct EndpointConfig { - #[builder(private)] - endpoint: Endpoint, + // Walk up the parent chain to collect all namespace segments + while let Some(ns) = current { + segments.push(ns.name.clone()); + current = ns.parent.as_deref(); + } - // todo: move lease to component/service - /// Lease - #[educe(Debug(ignore))] - #[builder(default)] - lease: Option, + // Reverse to get root-to-leaf order + segments.reverse(); + segments +} - /// Endpoint handler - #[educe(Debug(ignore))] +pub use async_nats::service::endpoint::Stats as EndpointStats; + +/// An endpoint instance that has been created but not yet started +pub struct EndpointInstance { + // Core fields + endpoint: Endpoint, handler: Arc, - /// Stats handler - #[educe(Debug(ignore))] - #[builder(default, private)] - _stats_handler: Option, + // Optional fields (may be None in static mode) + lease: Option, + etcd_client: Option, - /// Additional labels for metrics - #[builder(default, setter(into))] + // Always available + stats_handler: Option, metrics_labels: Option>, - - /// Whether to wait for inflight requests to complete during shutdown - #[builder(default = "true")] graceful_shutdown: bool, + local_engine_info: Option<(String, Arc)>, // Prepared but not registered - /// Health check payload for this endpoint - /// This payload will be sent to the endpoint during health checks - /// to verify it's responding properly - #[educe(Debug(ignore))] - #[builder(default, setter(into, strip_option))] + // Pre-computed values for start() + lease_id: i64, + service_name: String, health_check_payload: Option, } -impl EndpointConfigBuilder { - pub(crate) fn from_endpoint(endpoint: Endpoint) -> Self { - Self::default().endpoint(endpoint) - } - - pub fn stats_handler(self, handler: F) -> Self - where - F: FnMut(EndpointStats) -> serde_json::Value + Send + Sync + 'static, - { - self._stats_handler(Some(Box::new(handler))) - } - - pub async fn start(self) -> Result<()> { - let ( - endpoint, - lease, - handler, - stats_handler, - metrics_labels, - graceful_shutdown, - health_check_payload, - ) = self.build_internal()?.dissolve(); - let lease = lease.or(endpoint.drt().primary_lease()); - let lease_id = lease.as_ref().map(|l| l.id()).unwrap_or(0); - +impl EndpointInstance { + /// Start the endpoint on the network + pub async fn start(mut self) -> Result<()> { tracing::debug!( "Starting endpoint: {}", - endpoint.etcd_path_with_lease_id(lease_id) + self.endpoint.etcd_path_with_lease_id(self.lease_id) ); - let service_name = endpoint.component.service_name(); - - // acquire the registry lock - let registry = endpoint.drt().component_registry.inner.lock().await; + // Register local engine if configured + let local_engine_key = if let Some((key, engine)) = self.local_engine_info.take() { + tracing::debug!("Registering local engine for endpoint: {}", key); + self.endpoint + .drt() + .register_local_engine(key.clone(), engine) + .await?; + Some(key) + } else { + None + }; - let metrics_labels: Option> = metrics_labels - .as_ref() - .map(|v| v.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect()); - // Add metrics to the handler. The endpoint provides additional information to the handler. - handler.add_metrics(&endpoint, metrics_labels.as_deref())?; + // acquire the registry lock to get the service group + let registry = self.endpoint.drt().component_registry.inner.lock().await; // get the group let group = registry .services - .get(&service_name) - .map(|service| service.group(endpoint.component.service_name())) + .get(&self.service_name) + .map(|service| service.group(self.endpoint.component.service_name())) .ok_or(error!("Service not found"))?; // get the stats handler map let handler_map = registry .stats_handlers - .get(&service_name) + .get(&self.service_name) .cloned() .expect("no stats handler registry; this is unexpected"); drop(registry); // insert the stats handler - if let Some(stats_handler) = stats_handler { + if let Some(stats_handler) = self.stats_handler { handler_map .lock() .unwrap() - .insert(endpoint.subject_to(lease_id), stats_handler); + .insert(self.endpoint.subject_to(self.lease_id), stats_handler); } // creates an endpoint for the service let service_endpoint = group - .endpoint(&endpoint.name_with_id(lease_id)) + .endpoint(&self.endpoint.name_with_id(self.lease_id)) .await .map_err(|e| anyhow::anyhow!("Failed to start endpoint: {e}"))?; // Create a token that responds to both runtime shutdown and lease expiration - let runtime_shutdown_token = endpoint.drt().child_token(); + let runtime_shutdown_token = self.endpoint.drt().child_token(); // Extract all values needed from endpoint before any spawns - let namespace_name = endpoint.component.namespace.name.clone(); - let component_name = endpoint.component.name.clone(); - let endpoint_name = endpoint.name.clone(); - let system_health = endpoint.drt().system_health.clone(); - let subject = endpoint.subject_to(lease_id); - let etcd_path = endpoint.etcd_path_with_lease_id(lease_id); - let etcd_client = endpoint.component.drt.etcd_client.clone(); + let namespace_name = self.endpoint.component.namespace.name.clone(); + let component_name = self.endpoint.component.name.clone(); + let endpoint_name = self.endpoint.name.clone(); + let system_health = self.endpoint.drt().system_health.clone(); + let subject = self.endpoint.subject_to(self.lease_id); + let etcd_path = self.endpoint.etcd_path_with_lease_id(self.lease_id); // Register health check target in SystemHealth if provided - if let Some(health_check_payload) = &health_check_payload { + if let Some(health_check_payload) = &self.health_check_payload { let instance = Instance { component: component_name.clone(), endpoint: endpoint_name.clone(), namespace: namespace_name.clone(), - instance_id: lease_id, + instance_id: self.lease_id, transport: TransportType::NatsTcp(subject.clone()), }; tracing::debug!(subject = %subject, "Registering endpoint health check target"); let guard = system_health.lock().unwrap(); guard.register_health_check_target(&subject, instance, health_check_payload.clone()); if let Some(notifier) = guard.get_endpoint_health_check_notifier(&subject) { - handler.set_endpoint_health_check_notifier(notifier)?; + self.handler.set_endpoint_health_check_notifier(notifier)?; } } - let cancel_token = if let Some(lease) = lease.as_ref() { + let cancel_token = if let Some(lease) = self.lease.as_ref() { // Create a new token that will be cancelled when EITHER the lease expires OR runtime shutdown occurs let combined_token = CancellationToken::new(); let combined_for_select = combined_token.clone(); let lease_token = lease.child_token(); // Use secondary runtime for this lightweight monitoring task - endpoint.drt().runtime().secondary().spawn(async move { + self.endpoint.drt().runtime().secondary().spawn(async move { tokio::select! { _ = lease_token.cancelled() => { tracing::trace!("Lease cancelled, triggering endpoint shutdown"); @@ -170,27 +152,30 @@ impl EndpointConfigBuilder { }; // Register with graceful shutdown tracker if needed - if graceful_shutdown { + if self.graceful_shutdown { tracing::debug!( "Registering endpoint '{}' with graceful shutdown tracker", - endpoint.name + self.endpoint.name ); - let tracker = endpoint.drt().graceful_shutdown_tracker(); + let tracker = self.endpoint.drt().graceful_shutdown_tracker(); tracker.register_endpoint(); } else { - tracing::debug!("Endpoint '{}' has graceful_shutdown=false", endpoint.name); + tracing::debug!( + "Endpoint '{}' has graceful_shutdown=false", + self.endpoint.name + ); } let push_endpoint = PushEndpoint::builder() - .service_handler(handler) + .service_handler(self.handler) .cancellation_token(cancel_token.clone()) - .graceful_shutdown(graceful_shutdown) + .graceful_shutdown(self.graceful_shutdown) .build() .map_err(|e| anyhow::anyhow!("Failed to build push endpoint: {e}"))?; // launch in primary runtime - let tracker_clone = if graceful_shutdown { - Some(endpoint.drt().graceful_shutdown_tracker()) + let tracker_clone = if self.graceful_shutdown { + Some(self.endpoint.drt().graceful_shutdown_tracker()) } else { None }; @@ -199,6 +184,9 @@ impl EndpointConfigBuilder { let namespace_name_for_task = namespace_name.clone(); let component_name_for_task = component_name.clone(); let endpoint_name_for_task = endpoint_name.clone(); + let drt_for_cleanup = self.endpoint.drt().clone(); + let local_engine_key_for_task = local_engine_key.clone(); + let lease_id_for_task = self.lease_id; let task = tokio::spawn(async move { let result = push_endpoint @@ -207,7 +195,7 @@ impl EndpointConfigBuilder { namespace_name_for_task, component_name_for_task, endpoint_name_for_task, - lease_id, + lease_id_for_task, system_health, ) .await; @@ -218,6 +206,12 @@ impl EndpointConfigBuilder { tracker.unregister_endpoint(); } + // Unregister from local engine registry if it was registered + if let Some(key) = local_engine_key_for_task { + tracing::debug!("Unregistering local engine for endpoint: {}", key); + drt_for_cleanup.unregister_local_engine(&key).await; + } + result }); @@ -228,23 +222,251 @@ impl EndpointConfigBuilder { component: component_name, endpoint: endpoint_name, namespace: namespace_name, - instance_id: lease_id, + instance_id: self.lease_id, transport: TransportType::NatsTcp(subject), }; let info = serde_json::to_vec_pretty(&info)?; - if let Some(etcd_client) = &etcd_client + // Register in etcd (only if not static) + if let Some(etcd_client) = &self.etcd_client && let Err(e) = etcd_client - .kv_create(&etcd_path, info, Some(lease_id)) + .kv_create(&etcd_path, info, Some(self.lease_id)) .await { tracing::error!("Failed to register discoverable service: {:?}", e); cancel_token.cancel(); return Err(error!("Failed to register discoverable service")); } + task.await??; + Ok(()) + } + + /// Start the endpoint in the background as a critical task + /// + /// Returns a handle that can be used to monitor or cancel the endpoint. + /// If the endpoint fails critically, it will trigger parent token cancellation + /// (usually leading to worker shutdown). + pub fn start_background_handle(self) -> Result { + let endpoint_name = format!( + "{}/{}/{}:{}", + self.endpoint.component.namespace.name, + self.endpoint.component.name, + self.endpoint.name, + self.lease_id + ); + + let parent_token = self.endpoint.drt().primary_token(); + let endpoint_name_for_closure = endpoint_name.clone(); + + crate::utils::tasks::critical::CriticalTaskExecutionHandle::new( + |cancel_token| async move { + // Monitor cancellation while starting + tokio::select! { + result = self.start() => result, + _ = cancel_token.cancelled() => { + tracing::info!("Endpoint {} cancelled during startup", endpoint_name_for_closure); + Ok(()) + } + } + }, + parent_token, + &format!("endpoint-{}", endpoint_name), + ) + } + + /// Start the endpoint in the background and register it with the distributed runtime + /// + /// This method will: + /// 1. Create a critical task to run the endpoint + /// 2. Register the task handle for tracking and cleanup + /// 3. If the endpoint fails critically, it will trigger worker shutdown + pub async fn start_background(self) -> Result<()> { + // Generate key for tracking (same format as local engine registry) + let key = if let Some((ref engine_key, _)) = self.local_engine_info { + engine_key.clone() + } else { + // Generate key even if no local engine + format!( + "{}/{}/{}:{}", + self.endpoint.component.namespace.name, + self.endpoint.component.name, + self.endpoint.name, + self.lease_id + ) + }; + + let drt = self.endpoint.drt().clone(); + let handle = self.start_background_handle()?; + + // Register the handle for tracking + drt.register_background_endpoint(key, handle).await; Ok(()) } } + +#[derive(Educe, Builder, Dissolve)] +#[educe(Debug)] +#[builder(pattern = "owned", build_fn(private, name = "build_internal"))] +pub struct EndpointConfig { + #[builder(private)] + endpoint: Endpoint, + + // todo: move lease to component/service + /// Lease + #[educe(Debug(ignore))] + #[builder(default)] + lease: Option, + + /// Endpoint handler + #[educe(Debug(ignore))] + handler: Arc, + + /// Stats handler + #[educe(Debug(ignore))] + #[builder(default, private)] + _stats_handler: Option, + + /// Additional labels for metrics + #[builder(default, setter(into))] + metrics_labels: Option>, + + /// Whether to wait for inflight requests to complete during shutdown + #[builder(default = "true")] + graceful_shutdown: bool, + + /// Health check payload for this endpoint + /// This payload will be sent to the endpoint during health checks + /// to verify it's responding properly + #[educe(Debug(ignore))] + #[builder(default, setter(into, strip_option))] + health_check_payload: Option, +} + +impl EndpointConfigBuilder { + pub(crate) fn from_endpoint(endpoint: Endpoint) -> Self { + Self::default().endpoint(endpoint) + } + + pub fn stats_handler(self, handler: F) -> Self + where + F: FnMut(EndpointStats) -> serde_json::Value + Send + Sync + 'static, + { + self._stats_handler(Some(Box::new(handler))) + } + + /// Start the endpoint directly (backwards compatible) + /// This is equivalent to calling create().await?.start().await + pub async fn start(self) -> Result<()> { + let instance = self.create().await?; + instance.start().await + } + + /// Create the endpoint instance (setup phase) + /// This validates configuration, registers local engines, and prepares for starting + pub async fn create(self) -> Result { + let ( + endpoint, + lease, + handler, + stats_handler, + metrics_labels, + graceful_shutdown, + health_check_payload, + ) = self.build_internal()?.dissolve(); + let lease = lease.or(endpoint.drt().primary_lease()); + let lease_id = lease.as_ref().map(|l| l.id()).unwrap_or(0); + + let service_name = endpoint.component.service_name(); + let etcd_client = endpoint.component.drt.etcd_client.clone(); + + tracing::debug!( + "Creating endpoint: {}", + endpoint.etcd_path_with_lease_id(lease_id) + ); + + // acquire the registry lock + let registry = endpoint.drt().component_registry.inner.lock().await; + + let metrics_labels: Option> = metrics_labels + .as_ref() + .map(|v| v.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect()); + + // Add metrics to the handler. The endpoint provides additional information to the handler. + handler.add_metrics(&endpoint, metrics_labels.as_deref())?; + + // Check if local registry is enabled and handler has a type-erased engine + let enable_local_registry = registry + .service_enable_local_registry + .get(&endpoint.component.service_name()) + .copied() + .unwrap_or(true); + + let local_engine_info = if enable_local_registry + && let Some(any_engine) = handler.as_any_engine() + { + use crate::v2::entity::{ComponentDescriptor, EndpointDescriptor, NamespaceDescriptor}; + + // Extract the full namespace hierarchy + let namespace_segments = get_namespace_hierarchy(&endpoint.component.namespace); + + // Create the descriptor for this endpoint, handling internal namespaces + let namespace_desc = if namespace_segments.first().is_some_and(|s| s.starts_with('_')) { + NamespaceDescriptor::new_internal( + &namespace_segments + .iter() + .map(|s| s.as_str()) + .collect::>(), + ) + } else { + NamespaceDescriptor::new( + &namespace_segments + .iter() + .map(|s| s.as_str()) + .collect::>(), + ) + } + .map_err(|e| anyhow::anyhow!("Invalid namespace: {}", e))?; + + let component_desc = namespace_desc + .component(&endpoint.component.name) + .map_err(|e| anyhow::anyhow!("Invalid component: {}", e))?; + let endpoint_desc = component_desc + .endpoint(&endpoint.name) + .map_err(|e| anyhow::anyhow!("Invalid endpoint: {}", e))?; + + // Prepare registration info but don't register yet + let key = endpoint_desc.to_string(); + tracing::debug!("Prepared local engine registration for endpoint: {}", key); + Some((key, any_engine)) + } else { + None + }; + + // Drop registry lock before returning + drop(registry); + + // Convert metrics_labels back to owned version for storage + let metrics_labels = metrics_labels.map(|v| { + v.into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect() + }); + + Ok(EndpointInstance { + endpoint, + handler, + lease, + etcd_client, + stats_handler, + metrics_labels, + graceful_shutdown, + local_engine_info, + lease_id, + service_name, + health_check_payload, + }) + } +} diff --git a/lib/runtime/src/component/local_client.rs b/lib/runtime/src/component/local_client.rs new file mode 100644 index 0000000000..769170583c --- /dev/null +++ b/lib/runtime/src/component/local_client.rs @@ -0,0 +1,127 @@ +// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Local client implementation for direct engine access without network overhead + +use crate::engine::{ + AnyAsyncEngine, AsyncEngine, AsyncEngineContextProvider, Data, DowncastAnyAsyncEngine, + StreamData, +}; +use crate::traits::DistributedRuntimeProvider; +use crate::v2::entity::{ComponentDescriptor, EndpointDescriptor, NamespaceDescriptor}; +use crate::{Result, error}; +use std::sync::Arc; + +use super::Endpoint; + +/// A client that directly invokes local engines without network overhead +#[derive(Clone)] +pub struct LocalClient +where + Req: Data, + Resp: StreamData + AsyncEngineContextProvider, + E: Data, +{ + engine: Arc>, + descriptor: EndpointDescriptor, +} + +impl LocalClient +where + Req: Data, + Resp: StreamData + AsyncEngineContextProvider, + E: Data, +{ + /// Create a LocalClient from an endpoint descriptor + pub async fn from_descriptor( + endpoint: &Endpoint, + descriptor: EndpointDescriptor, + ) -> Result { + let key = descriptor.to_string(); + + // Get the engine from the local registry + let any_engine = endpoint + .drt() + .get_local_engine(&key) + .await + .ok_or_else(|| error!("No local engine registered for endpoint: {}", key))?; + + // Downcast to the specific types + let engine = any_engine + .downcast_stream::() + .ok_or_else(|| error!("Type mismatch when downcasting local engine for: {}", key))?; + + Ok(Self { engine, descriptor }) + } + + /// Create a LocalClient from an endpoint + pub async fn from_endpoint(endpoint: &Endpoint) -> Result { + // Extract the full namespace hierarchy + let namespace_segments = Self::get_namespace_hierarchy(&endpoint.component.namespace); + + // Create the descriptor for this endpoint + let namespace_desc = NamespaceDescriptor::new( + &namespace_segments + .iter() + .map(|s| s.as_str()) + .collect::>(), + ) + .map_err(|e| error!("Invalid namespace: {}", e))?; + + let component_desc = namespace_desc + .component(&endpoint.component.name) + .map_err(|e| error!("Invalid component: {}", e))?; + let endpoint_desc = component_desc + .endpoint(&endpoint.name) + .map_err(|e| error!("Invalid endpoint: {}", e))?; + + Self::from_descriptor(endpoint, endpoint_desc).await + } + + /// Extract the full namespace hierarchy from a Namespace + fn get_namespace_hierarchy(namespace: &super::Namespace) -> Vec { + let mut segments = Vec::new(); + let mut current: Option<&super::Namespace> = Some(namespace); + + // Walk up the parent chain to collect all namespace segments + while let Some(ns) = current { + segments.push(ns.name.clone()); + current = ns.parent.as_deref(); + } + + // Reverse to get root-to-leaf order + segments.reverse(); + segments + } + + /// Generate a response using the local engine directly + pub async fn generate(&self, request: Req) -> Result { + self.engine.generate(request).await + } + + /// Get the descriptor for this client + pub fn descriptor(&self) -> &EndpointDescriptor { + &self.descriptor + } + + /// Get the underlying engine + pub fn engine(&self) -> &Arc> { + &self.engine + } +} + +/// Extract the full namespace hierarchy from a Namespace +fn get_namespace_hierarchy(namespace: &super::Namespace) -> Vec { + let mut segments = Vec::new(); + let mut current: Option<&super::Namespace> = Some(namespace); + + // Walk up the parent chain to collect all namespace segments + while let Some(ns) = current { + segments.push(ns.name.clone()); + current = ns.parent.as_deref(); + } + + // Reverse to get root-to-leaf order + segments.reverse(); + segments +} diff --git a/lib/runtime/src/component/service.rs b/lib/runtime/src/component/service.rs index 2156339a33..a92a5decb8 100644 --- a/lib/runtime/src/component/service.rs +++ b/lib/runtime/src/component/service.rs @@ -25,12 +25,28 @@ pub struct ServiceConfig { /// Description #[builder(default)] description: Option, + + /// Whether to enable local engine registry for endpoints + #[builder(default = "true")] + pub(crate) enable_local_registry: bool, } impl ServiceConfigBuilder { + /// Enable local engine registry for endpoints (default) + pub fn with_local_registry(mut self) -> Self { + self.enable_local_registry = Some(true); + self + } + + /// Disable local engine registry for endpoints + pub fn without_local_registry(mut self) -> Self { + self.enable_local_registry = Some(false); + self + } + /// Create the [`Component`]'s service and store it in the registry. pub async fn create(self) -> Result { - let (component, description) = self.build_internal()?.dissolve(); + let (component, description, enable_local_registry) = self.build_internal()?.dissolve(); let version = "0.0.1".to_string(); @@ -82,7 +98,12 @@ impl ServiceConfigBuilder { // insert the stats handler into the registry guard .stats_handlers - .insert(service_name, stats_handler_registry_clone); + .insert(service_name.clone(), stats_handler_registry_clone); + + // Store the enable_local_registry flag for this service + guard + .service_enable_local_registry + .insert(service_name.clone(), enable_local_registry); // drop the guard to unlock the mutex drop(guard); diff --git a/lib/runtime/src/distributed.rs b/lib/runtime/src/distributed.rs index 00f109e7b4..0213d87698 100644 --- a/lib/runtime/src/distributed.rs +++ b/lib/runtime/src/distributed.rs @@ -83,6 +83,8 @@ impl DistributedRuntime { component_registry: component::Registry::new(), is_static, instance_sources: Arc::new(Mutex::new(HashMap::new())), + local_engines: Arc::new(Mutex::new(HashMap::new())), + background_endpoints: Arc::new(Mutex::new(HashMap::new())), hierarchy_to_metricsregistry: Arc::new(std::sync::RwLock::new(HashMap::< String, crate::MetricsRegistryEntry, @@ -345,6 +347,69 @@ impl DistributedRuntime { let registries = self.hierarchy_to_metricsregistry.read().unwrap(); registries.keys().cloned().collect() } + + /// Register a local engine for direct access without network overhead + pub(crate) async fn register_local_engine( + &self, + key: String, + engine: Arc, + ) -> Result<()> { + let mut engines = self.local_engines.lock().await; + engines.insert(key, engine); + Ok(()) + } + + /// Retrieve a local engine by key + pub(crate) async fn get_local_engine( + &self, + key: &str, + ) -> Option> { + let engines = self.local_engines.lock().await; + engines.get(key).cloned() + } + + /// Unregister a local engine + pub(crate) async fn unregister_local_engine( + &self, + key: &str, + ) -> Option> { + let mut engines = self.local_engines.lock().await; + engines.remove(key) + } + + /// Register a background endpoint handle + pub(crate) async fn register_background_endpoint( + &self, + key: String, + handle: crate::utils::tasks::critical::CriticalTaskExecutionHandle, + ) { + let mut endpoints = self.background_endpoints.lock().await; + + // Store the handle without detaching so we can cancel it later + endpoints.insert(key, handle); + } + + /// Cancel a specific background endpoint gracefully + pub async fn cancel_endpoint(&self, key: &str) -> bool { + let mut endpoints = self.background_endpoints.lock().await; + if let Some(handle) = endpoints.remove(key) { + handle.cancel(); + handle.detach(); // Detach after cancelling so it can be dropped + true + } else { + false + } + } + + /// Cancel all background endpoints (for shutdown) + pub async fn cancel_all_endpoints(&self) { + let mut endpoints = self.background_endpoints.lock().await; + for (key, handle) in endpoints.drain() { + tracing::debug!("Cancelling background endpoint: {}", key); + handle.cancel(); + handle.detach(); // Detach after cancelling so it can be dropped + } + } } #[derive(Dissolve)] diff --git a/lib/runtime/src/engine.rs b/lib/runtime/src/engine.rs index 0598dad563..e013069a04 100644 --- a/lib/runtime/src/engine.rs +++ b/lib/runtime/src/engine.rs @@ -78,6 +78,12 @@ use futures::stream::Stream; pub trait Data: Send + Sync + 'static {} impl Data for T {} +/// A trait for stream data that only requires Send, not Sync. +/// This is used for response types that are consumed sequentially, not shared between threads. +/// **Do not manually implement this trait** - the blanket implementation covers all valid types. +pub trait StreamData: Send + 'static {} +impl StreamData for T {} + /// [`DataStream`] is a type alias for a stream of [`Data`] items. This can be adapted to a [`ResponseStream`] /// by associating it with a [`AsyncEngineContext`]. pub type DataUnary = Pin + Send>>; @@ -338,6 +344,41 @@ where } } +/// A variant of AnyEngineWrapper that uses StreamData for the response type. +/// This allows non-Sync response types (like streams) to be stored in the registry. +struct StreamAnyEngineWrapper +where + Req: Data, + Resp: StreamData + AsyncEngineContextProvider, + E: Data, +{ + engine: Arc>, + _phantom: PhantomData, +} + +impl AnyAsyncEngine for StreamAnyEngineWrapper +where + Req: Data, + Resp: StreamData + AsyncEngineContextProvider, + E: Data, +{ + fn request_type_id(&self) -> TypeId { + TypeId::of::() + } + + fn response_type_id(&self) -> TypeId { + TypeId::of::() + } + + fn error_type_id(&self) -> TypeId { + TypeId::of::() + } + + fn as_any(&self) -> &dyn Any { + &self.engine + } +} + /// An extension trait that provides a convenient way to type-erase an `AsyncEngine`. /// /// This trait provides the `.into_any_engine()` method on any `Arc>`, @@ -369,6 +410,38 @@ where } } +/// An extension trait for converting AsyncEngine with StreamData response to AnyAsyncEngine. +/// This is used for engines whose response types don't need to be Sync. +pub trait AsStreamAnyAsyncEngine { + /// Converts a typed `AsyncEngine` with StreamData response into a type-erased `AnyAsyncEngine`. + fn into_stream_any_engine(self) -> Arc; + + /// Converts a typed `AsyncEngine` with StreamData response into a type-erased `AnyAsyncEngine`. + /// This is the non-consuming version of into_stream_any_engine. + fn as_stream_any(&self) -> Option>; +} + +impl AsStreamAnyAsyncEngine for Arc> +where + Req: Data, + Resp: StreamData + AsyncEngineContextProvider, + E: Data, +{ + fn into_stream_any_engine(self) -> Arc { + Arc::new(StreamAnyEngineWrapper { + engine: self, + _phantom: PhantomData, + }) + } + + fn as_stream_any(&self) -> Option> { + Some(Arc::new(StreamAnyEngineWrapper { + engine: self.clone(), + _phantom: PhantomData, + })) + } +} + /// An extension trait that provides a convenient method to downcast an `AnyAsyncEngine`. /// /// This trait provides the `.downcast()` method on `Arc`, @@ -398,6 +471,16 @@ pub trait DowncastAnyAsyncEngine { Req: Data, Resp: Data + AsyncEngineContextProvider, E: Data; + + /// Attempts to downcast an `AnyAsyncEngine` to a specific `AsyncEngine` type with StreamData response. + /// + /// Returns `Some(engine)` if the type parameters match the original engine, + /// or `None` if the types don't match. + fn downcast_stream(&self) -> Option>> + where + Req: Data, + Resp: StreamData + AsyncEngineContextProvider, + E: Data; } impl DowncastAnyAsyncEngine for Arc { @@ -418,6 +501,24 @@ impl DowncastAnyAsyncEngine for Arc { None } } + + fn downcast_stream(&self) -> Option>> + where + Req: Data, + Resp: StreamData + AsyncEngineContextProvider, + E: Data, + { + if self.request_type_id() == TypeId::of::() + && self.response_type_id() == TypeId::of::() + && self.error_type_id() == TypeId::of::() + { + self.as_any() + .downcast_ref::>>() + .cloned() + } else { + None + } + } } #[cfg(test)] @@ -470,7 +571,7 @@ mod tests { let typed_engine: Arc> = Arc::new(MockEngine); // 4. Use the extension trait to erase the type - let any_engine = typed_engine.into_any_engine(); + let any_engine = AsAnyAsyncEngine::into_any_engine(typed_engine); // Check type IDs are preserved assert_eq!(any_engine.request_type_id(), TypeId::of::()); diff --git a/lib/runtime/src/lib.rs b/lib/runtime/src/lib.rs index acaef5aab3..7e2a879a70 100644 --- a/lib/runtime/src/lib.rs +++ b/lib/runtime/src/lib.rs @@ -41,6 +41,7 @@ pub mod system_health; pub mod traits; pub mod transports; pub mod utils; +pub mod v2; pub mod worker; pub mod distributed; @@ -53,6 +54,7 @@ pub use worker::Worker; use crate::metrics::prometheus_names::distributed_runtime; use component::{Endpoint, InstanceSource}; +use engine::AnyAsyncEngine; use utils::GracefulShutdownTracker; use config::HealthStatus; @@ -165,6 +167,14 @@ pub struct DistributedRuntime { instance_sources: Arc>>>, + // Local engine registry for direct access without network overhead + // Keys are endpoint paths from EndpointDescriptor::path_string() + local_engines: Arc>>>, + + // Registry of background endpoint handles + // Key: endpoint path (same as local_engines key) + background_endpoints: Arc>>, + // Health Status system_health: Arc>, diff --git a/lib/runtime/src/pipeline/network.rs b/lib/runtime/src/pipeline/network.rs index cfe88bbcef..f36c9f526d 100644 --- a/lib/runtime/src/pipeline/network.rs +++ b/lib/runtime/src/pipeline/network.rs @@ -276,6 +276,10 @@ struct RequestControlMessage { pub struct Ingress { segment: OnceLock>>, metrics: OnceLock>, + // Store the original engine for local registry + engine: OnceLock>, + // Store type-erased engine for local registry + any_engine: OnceLock>, /// Endpoint-specific notifier for health check timer resets endpoint_health_check_notifier: OnceLock>, } @@ -285,6 +289,8 @@ impl Ingress { Arc::new(Self { segment: OnceLock::new(), metrics: OnceLock::new(), + engine: OnceLock::new(), + any_engine: OnceLock::new(), endpoint_health_check_notifier: OnceLock::new(), }) } @@ -322,13 +328,15 @@ impl Ingress { pub fn for_engine(engine: ServiceEngine) -> Result> { let frontend = SegmentSource::::new(); - let backend = ServiceBackend::from_engine(engine); + let backend = ServiceBackend::from_engine(engine.clone()); // create the pipeline let pipeline = frontend.link(backend)?.link(frontend)?; let ingress = Ingress::new(); ingress.attach(pipeline)?; + // Store the engine for local registry access + let _ = ingress.engine.set(engine); Ok(ingress) } @@ -337,6 +345,21 @@ impl Ingress { fn metrics(&self) -> Option<&Arc> { self.metrics.get() } + + /// Get the underlying engine if available (for local registry) + pub fn engine(&self) -> Option<&ServiceEngine> { + self.engine.get() + } + + /// Get the type-erased engine if available (for local registry) + pub fn any_engine(&self) -> Option> { + self.any_engine.get().cloned() + } + + /// Set the type-erased engine (called when we have proper trait bounds) + pub fn set_any_engine(&self, any_engine: Arc) { + let _ = self.any_engine.set(any_engine); + } } #[async_trait] @@ -350,6 +373,11 @@ pub trait PushWorkHandler: Send + Sync { metrics_labels: Option<&[(&str, &str)]>, ) -> Result<()>; + /// Get the underlying engine as AnyAsyncEngine for local registry (optional) + fn as_any_engine(&self) -> Option> { + None + } + /// Set the endpoint-specific notifier for health check timer resets fn set_endpoint_health_check_notifier( &self, diff --git a/lib/runtime/src/pipeline/network/ingress/push_handler.rs b/lib/runtime/src/pipeline/network/ingress/push_handler.rs index 65e0fe1f34..fe3621d1dd 100644 --- a/lib/runtime/src/pipeline/network/ingress/push_handler.rs +++ b/lib/runtime/src/pipeline/network/ingress/push_handler.rs @@ -126,14 +126,32 @@ where Ingress::add_metrics(self, endpoint, metrics_labels) } - fn set_endpoint_health_check_notifier(&self, notifier: Arc) -> Result<()> { + fn as_any_engine(&self) -> Option> { + use crate::engine::AsStreamAnyAsyncEngine; use crate::pipeline::network::Ingress; + + // First try to get the pre-stored type-erased engine + if let Some(engine) = Ingress::any_engine(self) { + return Some(engine); + } + + // If no pre-stored engine, try to create one from the underlying engine + if let Some(service_engine) = Ingress::engine(self) { + // Try to convert using StreamData-compatible wrapper + if let Some(stream_engine) = service_engine.as_stream_any() { + return Some(stream_engine); + } + } + + None + } + + fn set_endpoint_health_check_notifier(&self, notifier: Arc) -> Result<()> { self.endpoint_health_check_notifier .set(notifier) .map_err(|_| anyhow::anyhow!("Endpoint health check notifier already set"))?; Ok(()) } - async fn handle_payload(&self, payload: Bytes) -> Result<(), PipelineError> { let start_time = std::time::Instant::now(); diff --git a/lib/runtime/src/traits.rs b/lib/runtime/src/traits.rs index 09b3b1bb1a..ac32652262 100644 --- a/lib/runtime/src/traits.rs +++ b/lib/runtime/src/traits.rs @@ -4,6 +4,9 @@ pub mod events; use super::{DistributedRuntime, Runtime}; +use crate::engine::AnyAsyncEngine; +use std::sync::Arc; + /// A trait for objects that proivde access to the [Runtime] pub trait RuntimeProvider { fn rt(&self) -> &Runtime; diff --git a/lib/runtime/src/v2/entity/descriptor.rs b/lib/runtime/src/v2/entity/descriptor.rs new file mode 100644 index 0000000000..004d1724ed --- /dev/null +++ b/lib/runtime/src/v2/entity/descriptor.rs @@ -0,0 +1,1089 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Composition-based entity descriptor system for distributed component management +//! +//! This module provides a composition-based descriptor system where all descriptor types +//! wrap a central EntityDescriptor struct. This design enables natural path extension, +//! hierarchical namespaces, and type-safe transitions between descriptor levels. +//! +//! Design principles: +//! - Single EntityDescriptor core with optional fields +//! - Hierarchical namespace segments: ["prod", "api", "v1"] +//! - Built-in path extension via path_segments +//! - Type-safe wrapper structs for each descriptor level +//! - Fluent API for descriptor transitions + +use derive_builder::Builder; +use serde::{Deserialize, Serialize}; +use std::fmt; +use thiserror::Error; +use validator::{Validate, ValidationError}; + +use super::validation; + +/// Errors that can occur during descriptor operations +#[derive(Error, Debug, Clone, PartialEq)] +pub enum DescriptorError { + #[error( + "Invalid namespace segment: '{segment}'. Must contain only lowercase letters, numbers, hyphens, and underscores" + )] + InvalidNamespaceSegment { segment: String }, + + #[error( + "Invalid component name: '{name}'. Must contain only lowercase letters, numbers, hyphens, and underscores" + )] + InvalidComponentName { name: String }, + + #[error( + "Invalid endpoint name: '{name}'. Must contain only lowercase letters, numbers, hyphens, and underscores" + )] + InvalidEndpointName { name: String }, + + #[error( + "Invalid path segment: '{segment}'. Must contain only lowercase letters, numbers, hyphens, underscores, and dots" + )] + InvalidPathSegment { segment: String }, + + #[error("Empty namespace segments not allowed")] + EmptyNamespace, + + #[error("Empty name not allowed")] + EmptyName, + + #[error("Empty path segment not allowed")] + EmptyPathSegment, + + #[error("Parse error: {message}")] + ParseError { message: String }, + + #[error("Validation error: {message}")] + ValidationError { message: String }, + + #[error("Reserved prefix: '{name}'. Names starting with '_' are reserved for internal use")] + ReservedPrefix { name: String }, + + #[error("Invalid transition: {message}")] + InvalidTransition { message: String }, + + #[error("Builder error: {message}")] + BuilderError { message: String }, +} + +impl From for DescriptorError { + fn from(err: validator::ValidationError) -> Self { + let message = match err.code.as_ref() { + "empty_identifier" => "Identifier cannot be empty".to_string(), + "invalid_identifier_chars" => "Identifier contains invalid characters. Only lowercase letters, numbers, hyphens, and underscores are allowed".to_string(), + "empty_path_segment" => "Path segment cannot be empty".to_string(), + "invalid_path_segment_chars" => "Path segment contains invalid characters. Only lowercase letters, numbers, hyphens, underscores, and dots are allowed".to_string(), + "reserved_internal_prefix" => "Names starting with '_' are reserved for internal use".to_string(), + "empty_collection" => "Collection cannot be empty".to_string(), + "component_name_too_long" => "Component name too long (max 63 characters)".to_string(), + "endpoint_name_too_long" => "Endpoint name too long (max 63 characters)".to_string(), + _ => format!("Validation error: {}", err.code), + }; + DescriptorError::ValidationError { message } + } +} + +/// Convert ValidationErrors (multiple errors) to DescriptorError +impl From for DescriptorError { + fn from(errors: validator::ValidationErrors) -> Self { + // Take the first error or create a generic message + if let Some((_field, field_errors)) = errors.field_errors().iter().next() + && let Some(error) = field_errors.first() + { + return Self::from((*error).clone()); + } + + DescriptorError::ValidationError { + message: "Multiple validation errors occurred".to_string(), + } + } +} + +/// Convert derive_builder errors to DescriptorError +impl From for DescriptorError { + fn from(err: derive_builder::UninitializedFieldError) -> Self { + DescriptorError::BuilderError { + message: format!("Required field not set: {}", err), + } + } +} + +/// Instance type for descriptors +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum InstanceType { + /// Distributed instance that can be discovered via etcd + Distributed, + /// Local instance that is static and not discoverable + Local, +} + +impl fmt::Display for InstanceType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + InstanceType::Distributed => write!(f, "distributed"), + InstanceType::Local => write!(f, "local"), + } + } +} + +/// Core entity descriptor using composition-based design +/// +/// All specialized descriptors wrap this single struct, providing: +/// - Hierarchical namespace segments +/// - Optional component, endpoint, and instance fields +/// - Natural path extension via path_segments +/// - Type-safe transitions between descriptor levels +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct EntityDescriptor { + /// Hierarchical namespace segments like ["prod", "api", "v1"] + namespace_segments: Vec, + /// Optional component name + component: Option, + /// Optional endpoint name + endpoint: Option, + /// Optional instance ID for distributed instances + instance: Option, + /// Additional path segments for extension + path_segments: Vec, +} + +/// Builder for ergonomic descriptor construction +/// +/// This builder allows constructing complex descriptors with single-point error handling, +/// replacing the need for multiple `.unwrap()` calls in descriptor chains. +/// +/// # Example +/// ```rust +/// use dynamo_runtime::v2::DescriptorBuilder; +/// +/// // Instead of: +/// // let instance = NamespaceDescriptor::new(&["prod"]) +/// // .unwrap() +/// // .component("api") +/// // .unwrap() +/// // .endpoint("http") +/// // .unwrap() +/// // .instance(456); +/// +/// // You can now write: +/// let spec = DescriptorBuilder::default() +/// .namespace_segments(vec!["prod".to_string()]) +/// .component("api") +/// .endpoint("http") +/// .instance(456) +/// .build() +/// .unwrap(); +/// let instance = spec.build_instance().unwrap(); +/// ``` +#[derive(Builder, Validate)] +#[builder(derive(Debug))] +#[validate(schema(function = "validate_descriptor_spec"))] +pub struct DescriptorSpec { + /// Namespace segments (required) + #[builder(setter(into))] + #[validate(length(min = 1))] + pub namespace_segments: Vec, + + /// Optional component name + #[builder(default, setter(strip_option, into))] + pub component: Option, + + /// Optional endpoint name + #[builder(default, setter(strip_option, into))] + pub endpoint: Option, + + /// Optional instance ID + #[builder(default, setter(strip_option))] + pub instance: Option, + + /// Path extension segments + #[builder(default, setter(into))] + pub path_segments: Vec, + + /// Whether this is an internal namespace (allows underscore prefixes) + #[builder(default)] + pub internal: bool, +} + +/// Schema validation function for DescriptorSpec +fn validate_descriptor_spec(spec: &DescriptorSpec) -> Result<(), ValidationError> { + // Validate namespace segments with internal prefix handling + for segment in &spec.namespace_segments { + validation::validate_namespace_segment(segment, spec.internal) + .map_err(|_| ValidationError::new("invalid_namespace_segment"))?; + } + + // Validate component name if provided + if let Some(ref component) = spec.component { + validation::validate_component_name(component) + .map_err(|_| ValidationError::new("invalid_component_name"))?; + } + + // Validate endpoint name if provided + if let Some(ref endpoint) = spec.endpoint { + validation::validate_endpoint_name(endpoint) + .map_err(|_| ValidationError::new("invalid_endpoint_name"))?; + } + + // Validate path segments if provided + for segment in &spec.path_segments { + validation::validate_path_segment(segment) + .map_err(|_| ValidationError::new("invalid_path_segment"))?; + } + + Ok(()) +} + +/// Convenience type alias for the builder +pub type DescriptorBuilder = DescriptorSpecBuilder; + +impl DescriptorSpec { + /// Build the EntityDescriptor from the spec + pub fn build_entity(self) -> Result { + // Validate the spec before building + self.validate()?; + + Ok(EntityDescriptor { + namespace_segments: self.namespace_segments, + component: self.component, + endpoint: self.endpoint, + instance: self.instance, + path_segments: self.path_segments, + }) + } + + /// Build a NamespaceDescriptor + pub fn build_namespace(self) -> Result { + Ok(NamespaceDescriptor(self.build_entity()?)) + } + + /// Build a ComponentDescriptor (requires component to be set) + pub fn build_component(self) -> Result { + if self.component.is_none() { + return Err(DescriptorError::BuilderError { + message: "Component name is required for ComponentDescriptor".to_string(), + }); + } + Ok(ComponentDescriptor(self.build_entity()?)) + } + + /// Build an EndpointDescriptor (requires component and endpoint to be set) + pub fn build_endpoint(self) -> Result { + if self.component.is_none() { + return Err(DescriptorError::BuilderError { + message: "Component name is required for EndpointDescriptor".to_string(), + }); + } + if self.endpoint.is_none() { + return Err(DescriptorError::BuilderError { + message: "Endpoint name is required for EndpointDescriptor".to_string(), + }); + } + Ok(EndpointDescriptor(self.build_entity()?)) + } + + /// Build an InstanceDescriptor (requires component, endpoint, and instance to be set) + pub fn build_instance(self) -> Result { + if self.component.is_none() { + return Err(DescriptorError::BuilderError { + message: "Component name is required for InstanceDescriptor".to_string(), + }); + } + if self.endpoint.is_none() { + return Err(DescriptorError::BuilderError { + message: "Endpoint name is required for InstanceDescriptor".to_string(), + }); + } + if self.instance.is_none() { + return Err(DescriptorError::BuilderError { + message: "Instance ID is required for InstanceDescriptor".to_string(), + }); + } + Ok(InstanceDescriptor(self.build_entity()?)) + } +} + +impl EntityDescriptor { + /// Create a new entity descriptor with namespace segments + pub fn new_namespace(segments: &[&str]) -> Result { + if segments.is_empty() { + return Err(DescriptorError::EmptyNamespace); + } + + let validated_segments = segments + .iter() + .map(|s| { + Self::validate_namespace_segment(s)?; + Ok(s.to_string()) + }) + .collect::, DescriptorError>>()?; + + Ok(Self { + namespace_segments: validated_segments, + component: None, + endpoint: None, + instance: None, + path_segments: Vec::new(), + }) + } + + /// Create an internal namespace (allows underscore prefix) + pub fn new_internal_namespace(segments: &[&str]) -> Result { + if segments.is_empty() { + return Err(DescriptorError::EmptyNamespace); + } + + let validated_segments = segments + .iter() + .map(|s| { + Self::validate_internal_segment(s)?; + Ok(s.to_string()) + }) + .collect::, DescriptorError>>()?; + + Ok(Self { + namespace_segments: validated_segments, + component: None, + endpoint: None, + instance: None, + path_segments: Vec::new(), + }) + } + + /// Add a component to this descriptor + pub fn with_component(mut self, name: &str) -> Result { + Self::validate_component_name(name)?; + self.component = Some(name.to_string()); + Ok(self) + } + + /// Add an endpoint to this descriptor + pub fn with_endpoint(mut self, name: &str) -> Result { + Self::validate_endpoint_name(name)?; + self.endpoint = Some(name.to_string()); + Ok(self) + } + + /// Add an instance ID to this descriptor + pub fn with_instance(mut self, instance_id: i64) -> Self { + self.instance = Some(instance_id); + self + } + + /// Add path segments to this descriptor + pub fn with_path(mut self, segments: &[&str]) -> Result { + let validated_segments = segments + .iter() + .map(|s| { + Self::validate_path_segment(s)?; + Ok(s.to_string()) + }) + .collect::, DescriptorError>>()?; + + self.path_segments.extend(validated_segments); + Ok(self) + } + + /// Get namespace segments + pub fn namespace_segments(&self) -> &[String] { + &self.namespace_segments + } + + /// Get component name if present + pub fn component(&self) -> Option<&str> { + self.component.as_deref() + } + + /// Get endpoint name if present + pub fn endpoint(&self) -> Option<&str> { + self.endpoint.as_deref() + } + + /// Get instance ID if present + pub fn instance(&self) -> Option { + self.instance + } + + /// Get path segments + pub fn path_segments(&self) -> &[String] { + &self.path_segments + } + + /// Generate the full path string + pub fn path_string(&self) -> String { + let mut parts = Vec::new(); + + // Add namespace segments + parts.extend(self.namespace_segments.iter().cloned()); + + // Add component if present + if let Some(ref component) = self.component { + parts.push(component.clone()); + } + + // Add endpoint if present + if let Some(ref endpoint) = self.endpoint { + parts.push(endpoint.clone()); + } + + // Add instance if present + if let Some(instance_id) = self.instance { + parts.push(format!("instance-{:x}", instance_id)); + } + + // Add path segments + parts.extend(self.path_segments.iter().cloned()); + + parts.join(".") + } + + /// Generate etcd key with dynamo:// prefix + pub fn etcd_key(&self) -> String { + format!("dynamo://{}", self.path_string()) + } + + /// Check if this uses an internal namespace + pub fn is_internal(&self) -> bool { + self.namespace_segments + .first() + .is_some_and(|s| s.starts_with('_')) + } + + // Validation functions + /// Public validation method for namespace segments + pub fn validate_namespace_segment(segment: &str) -> Result<(), DescriptorError> { + validation::validate_namespace_segment(segment, false)?; + Ok(()) + } + + /// Public validation method for internal namespace segments + pub fn validate_internal_segment(segment: &str) -> Result<(), DescriptorError> { + validation::validate_namespace_segment(segment, true)?; + Ok(()) + } + + /// Public validation method for component names + pub fn validate_component_name(name: &str) -> Result<(), DescriptorError> { + validation::validate_component_name(name)?; + Ok(()) + } + + /// Public validation method for endpoint names + pub fn validate_endpoint_name(name: &str) -> Result<(), DescriptorError> { + validation::validate_endpoint_name(name)?; + Ok(()) + } + + /// Public validation method for path segments + pub fn validate_path_segment(segment: &str) -> Result<(), DescriptorError> { + validation::validate_path_segment(segment)?; + Ok(()) + } + + /// Public validation method for object names (Oscar compatibility) + pub fn validate_object_name(name: &str) -> Result<(), DescriptorError> { + validation::validate_object_name(name)?; + Ok(()) + } +} + +/// Namespace descriptor wrapper providing type-safe namespace operations +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct NamespaceDescriptor(EntityDescriptor); + +impl NamespaceDescriptor { + /// Create a new namespace descriptor + pub fn new(segments: &[&str]) -> Result { + let entity = EntityDescriptor::new_namespace(segments)?; + Ok(NamespaceDescriptor(entity)) + } + + /// Create a new internal namespace descriptor (allows underscore prefix) + pub fn new_internal(segments: &[&str]) -> Result { + let entity = EntityDescriptor::new_internal_namespace(segments)?; + Ok(NamespaceDescriptor(entity)) + } + + /// Get namespace segments + pub fn segments(&self) -> &[String] { + self.0.namespace_segments() + } + + /// Get the namespace name (first segment for backwards compatibility) + pub fn name(&self) -> &str { + &self.0.namespace_segments()[0] + } + + /// Check if this is an internal namespace + pub fn is_internal(&self) -> bool { + self.0.is_internal() + } + + /// Convert to component descriptor + pub fn component(&self, name: &str) -> Result { + let entity = self.0.clone().with_component(name)?; + Ok(ComponentDescriptor(entity)) + } + + /// Get the inner EntityDescriptor + pub fn entity(&self) -> &EntityDescriptor { + &self.0 + } +} + +impl fmt::Display for NamespaceDescriptor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0.namespace_segments().join(".")) + } +} + +/// Component descriptor wrapper providing type-safe component operations +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct ComponentDescriptor(EntityDescriptor); + +impl ComponentDescriptor { + /// Create a new component descriptor from namespace + pub fn new( + namespace: NamespaceDescriptor, + component_name: &str, + ) -> Result { + let entity = namespace.0.with_component(component_name)?; + Ok(ComponentDescriptor(entity)) + } + + /// Get component name + pub fn name(&self) -> Option<&str> { + self.0.component() + } + + /// Get namespace descriptor + pub fn namespace(&self) -> NamespaceDescriptor { + let ns_segments = self + .0 + .namespace_segments() + .iter() + .map(|s| s.as_str()) + .collect::>(); + let entity = if self.0.is_internal() { + EntityDescriptor::new_internal_namespace(&ns_segments).unwrap() + } else { + EntityDescriptor::new_namespace(&ns_segments).unwrap() + }; + NamespaceDescriptor(entity) + } + + /// Get the full path (namespace.component) + pub fn path(&self) -> String { + self.0.path_string() + } + + /// Convert to endpoint descriptor + pub fn endpoint(&self, name: &str) -> Result { + let entity = self.0.clone().with_endpoint(name)?; + Ok(EndpointDescriptor(entity)) + } + + /// Get the inner EntityDescriptor + pub fn entity(&self) -> &EntityDescriptor { + &self.0 + } +} + +impl fmt::Display for ComponentDescriptor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0.path_string()) + } +} + +/// Endpoint descriptor wrapper providing type-safe endpoint operations +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct EndpointDescriptor(EntityDescriptor); + +impl EndpointDescriptor { + /// Create a new endpoint descriptor from component + pub fn new( + component: ComponentDescriptor, + endpoint_name: &str, + ) -> Result { + let entity = component.0.with_endpoint(endpoint_name)?; + Ok(EndpointDescriptor(entity)) + } + + /// Get endpoint name + pub fn name(&self) -> Option<&str> { + self.0.endpoint() + } + + /// Get component descriptor + pub fn component(&self) -> ComponentDescriptor { + let ns_segments = self + .0 + .namespace_segments() + .iter() + .map(|s| s.as_str()) + .collect::>(); + let mut entity = if self.0.is_internal() { + EntityDescriptor::new_internal_namespace(&ns_segments).unwrap() + } else { + EntityDescriptor::new_namespace(&ns_segments).unwrap() + }; + if let Some(component) = self.0.component() { + entity = entity.with_component(component).unwrap(); + } + ComponentDescriptor(entity) + } + + /// Get namespace descriptor + pub fn namespace(&self) -> NamespaceDescriptor { + let ns_segments = self + .0 + .namespace_segments() + .iter() + .map(|s| s.as_str()) + .collect::>(); + let entity = if self.0.is_internal() { + EntityDescriptor::new_internal_namespace(&ns_segments).unwrap() + } else { + EntityDescriptor::new_namespace(&ns_segments).unwrap() + }; + NamespaceDescriptor(entity) + } + + /// Get the full path (namespace.component.endpoint) + pub fn path(&self) -> String { + self.0.path_string() + } + + /// Convert to instance descriptor + pub fn instance(&self, instance_id: i64) -> InstanceDescriptor { + let entity = self.0.clone().with_instance(instance_id); + InstanceDescriptor(entity) + } + + /// Get the inner EntityDescriptor + pub fn entity(&self) -> &EntityDescriptor { + &self.0 + } +} + +impl fmt::Display for EndpointDescriptor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0.path_string()) + } +} + +/// Instance descriptor wrapper providing type-safe instance operations +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct InstanceDescriptor(EntityDescriptor); + +impl InstanceDescriptor { + /// Create a new instance descriptor from endpoint + pub fn new(endpoint: EndpointDescriptor, instance_id: i64) -> Self { + let entity = endpoint.0.with_instance(instance_id); + InstanceDescriptor(entity) + } + + /// Get the instance ID + pub fn instance_id(&self) -> Option { + self.0.instance() + } + + /// Get endpoint descriptor + pub fn endpoint(&self) -> EndpointDescriptor { + let ns_segments = self + .0 + .namespace_segments() + .iter() + .map(|s| s.as_str()) + .collect::>(); + let mut entity = if self.0.is_internal() { + EntityDescriptor::new_internal_namespace(&ns_segments).unwrap() + } else { + EntityDescriptor::new_namespace(&ns_segments).unwrap() + }; + if let Some(component) = self.0.component() { + entity = entity.with_component(component).unwrap(); + } + if let Some(endpoint) = self.0.endpoint() { + entity = entity.with_endpoint(endpoint).unwrap(); + } + EndpointDescriptor(entity) + } + + /// Get component descriptor + pub fn component(&self) -> ComponentDescriptor { + let ns_segments = self + .0 + .namespace_segments() + .iter() + .map(|s| s.as_str()) + .collect::>(); + let mut entity = if self.0.is_internal() { + EntityDescriptor::new_internal_namespace(&ns_segments).unwrap() + } else { + EntityDescriptor::new_namespace(&ns_segments).unwrap() + }; + if let Some(component) = self.0.component() { + entity = entity.with_component(component).unwrap(); + } + ComponentDescriptor(entity) + } + + /// Get namespace descriptor + pub fn namespace(&self) -> NamespaceDescriptor { + let ns_segments = self + .0 + .namespace_segments() + .iter() + .map(|s| s.as_str()) + .collect::>(); + let entity = if self.0.is_internal() { + EntityDescriptor::new_internal_namespace(&ns_segments).unwrap() + } else { + EntityDescriptor::new_namespace(&ns_segments).unwrap() + }; + NamespaceDescriptor(entity) + } + + /// Get the full path with instance ID + pub fn path(&self) -> String { + self.0.path_string() + } + + /// Get the inner EntityDescriptor + pub fn entity(&self) -> &EntityDescriptor { + &self.0 + } +} + +impl fmt::Display for InstanceDescriptor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0.path_string()) + } +} + +/// Path descriptor wrapper providing path extension capabilities +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct PathDescriptor(EntityDescriptor); + +impl PathDescriptor { + /// Create a path descriptor from any other descriptor type + pub fn from_namespace(namespace: NamespaceDescriptor) -> Self { + PathDescriptor(namespace.0) + } + + pub fn from_component(component: ComponentDescriptor) -> Self { + PathDescriptor(component.0) + } + + pub fn from_endpoint(endpoint: EndpointDescriptor) -> Self { + PathDescriptor(endpoint.0) + } + + pub fn from_instance(instance: InstanceDescriptor) -> Self { + PathDescriptor(instance.0) + } + + /// Extend this path with additional segments + pub fn extend(&self, segments: &[&str]) -> Result { + let entity = self.0.clone().with_path(segments)?; + Ok(PathDescriptor(entity)) + } + + /// Add a single segment to this path + pub fn with_segment(&self, segment: &str) -> Result { + self.extend(&[segment]) + } + + /// Get the full path string + pub fn path(&self) -> String { + self.0.path_string() + } + + /// Generate etcd key with dynamo:// prefix + pub fn etcd_key(&self) -> String { + self.0.etcd_key() + } + + /// Get path segments + pub fn segments(&self) -> &[String] { + self.0.path_segments() + } + + /// Get the inner EntityDescriptor + pub fn entity(&self) -> &EntityDescriptor { + &self.0 + } +} + +impl fmt::Display for PathDescriptor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0.path_string()) + } +} + +/// Trait for converting descriptors to PathDescriptor for extension +pub trait ToPath { + fn to_path(self) -> PathDescriptor; +} + +impl ToPath for NamespaceDescriptor { + fn to_path(self) -> PathDescriptor { + PathDescriptor::from_namespace(self) + } +} + +impl ToPath for ComponentDescriptor { + fn to_path(self) -> PathDescriptor { + PathDescriptor::from_component(self) + } +} + +impl ToPath for EndpointDescriptor { + fn to_path(self) -> PathDescriptor { + PathDescriptor::from_endpoint(self) + } +} + +impl ToPath for InstanceDescriptor { + fn to_path(self) -> PathDescriptor { + PathDescriptor::from_instance(self) + } +} + +/// Conversion utilities between descriptor types and strings +impl From<&NamespaceDescriptor> for String { + fn from(desc: &NamespaceDescriptor) -> Self { + desc.to_string() + } +} + +impl From<&ComponentDescriptor> for String { + fn from(desc: &ComponentDescriptor) -> Self { + desc.path() + } +} + +impl From<&EndpointDescriptor> for String { + fn from(desc: &EndpointDescriptor) -> Self { + desc.path() + } +} + +impl From<&InstanceDescriptor> for String { + fn from(desc: &InstanceDescriptor) -> Self { + desc.path() + } +} + +impl From<&PathDescriptor> for String { + fn from(desc: &PathDescriptor) -> Self { + desc.path() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_entity_descriptor_creation() { + let entity = EntityDescriptor::new_namespace(&["prod", "api"]).unwrap(); + assert_eq!(entity.namespace_segments(), &["prod", "api"]); + assert!(entity.component().is_none()); + assert!(entity.endpoint().is_none()); + assert!(entity.instance().is_none()); + assert!(entity.path_segments().is_empty()); + } + + #[test] + fn test_internal_namespace_creation() { + let entity = EntityDescriptor::new_internal_namespace(&["_internal", "oscar"]).unwrap(); + assert_eq!(entity.namespace_segments(), &["_internal", "oscar"]); + assert!(entity.is_internal()); + } + + #[test] + fn test_fluent_descriptor_building() { + let entity = EntityDescriptor::new_namespace(&["prod", "api"]) + .unwrap() + .with_component("gateway") + .unwrap() + .with_endpoint("http") + .unwrap() + .with_instance(12345) + .with_path(&["health", "check"]) + .unwrap(); + + assert_eq!(entity.namespace_segments(), &["prod", "api"]); + assert_eq!(entity.component(), Some("gateway")); + assert_eq!(entity.endpoint(), Some("http")); + assert_eq!(entity.instance(), Some(12345)); + assert_eq!(entity.path_segments(), &["health", "check"]); + } + + #[test] + fn test_path_generation() { + let entity = EntityDescriptor::new_internal_namespace(&["_internal", "oscar"]) + .unwrap() + .with_component("objects") + .unwrap() + .with_path(&["tokenizer.json-a1b2c3d4", "metadata"]) + .unwrap(); + + let expected_path = "_internal.oscar.objects.tokenizer.json-a1b2c3d4.metadata"; + assert_eq!(entity.path_string(), expected_path); + assert_eq!(entity.etcd_key(), format!("dynamo://{}", expected_path)); + } + + #[test] + fn test_wrapper_descriptor_creation() { + let ns = NamespaceDescriptor::new_internal(&["_internal", "oscar"]).unwrap(); + let comp = ns.component("objects").unwrap(); + let endpoint = comp.endpoint("registry").unwrap(); + let instance = endpoint.instance(123); + + assert_eq!(ns.segments(), &["_internal", "oscar"]); + assert_eq!(comp.name(), Some("objects")); + assert_eq!(endpoint.name(), Some("registry")); + assert_eq!(instance.instance_id(), Some(123)); + } + + #[test] + fn test_path_extension() { + let ns = NamespaceDescriptor::new_internal(&["_internal", "oscar"]).unwrap(); + let path = ns + .to_path() + .extend(&["objects", "tokenizer.json-a1b2c3d4"]) + .unwrap() + .with_segment("metadata") + .unwrap(); + + let expected = "_internal.oscar.objects.tokenizer.json-a1b2c3d4.metadata"; + assert_eq!(path.path(), expected); + assert_eq!(path.etcd_key(), format!("dynamo://{}", expected)); + } + + #[test] + fn test_descriptor_transitions() { + let instance = NamespaceDescriptor::new(&["prod"]) + .unwrap() + .component("api") + .unwrap() + .endpoint("http") + .unwrap() + .instance(456); + + // Test going back down the hierarchy + let endpoint = instance.endpoint(); + let component = instance.component(); + let namespace = instance.namespace(); + + assert_eq!(namespace.name(), "prod"); + assert_eq!(component.name(), Some("api")); + assert_eq!(endpoint.name(), Some("http")); + } + + #[test] + fn test_validation() { + // Valid cases + assert!(EntityDescriptor::new_namespace(&["valid-name"]).is_ok()); + assert!(EntityDescriptor::new_internal_namespace(&["_internal"]).is_ok()); + + // Invalid cases + assert!(EntityDescriptor::new_namespace(&[]).is_err()); // Empty + assert!(EntityDescriptor::new_namespace(&["Invalid-Name"]).is_err()); // Uppercase + assert!(EntityDescriptor::new_namespace(&["_internal"]).is_err()); // Reserved prefix + assert!(EntityDescriptor::new_namespace(&["name.with.dots"]).is_err()); // Dots + } + + #[test] + fn test_descriptor_builder_ergonomics() { + // Before: Multiple .unwrap() calls with error handling at each step + // let instance = NamespaceDescriptor::new(&["prod"]) + // .unwrap() + // .component("api") + // .unwrap() + // .endpoint("http") + // .unwrap() + // .instance(456); + + // After: Single error handling point with builder pattern + let instance = DescriptorBuilder::default() + .namespace_segments(vec!["prod".to_string()]) + .component("api") + .endpoint("http") + .instance(456) + .build() + .unwrap() + .build_instance() + .unwrap(); + + assert_eq!(instance.instance_id(), Some(456)); + assert_eq!(instance.namespace().segments(), &["prod"]); + assert_eq!(instance.component().name(), Some("api")); + assert_eq!(instance.endpoint().name(), Some("http")); + } + + #[test] + fn test_descriptor_builder_component_only() { + let component = DescriptorBuilder::default() + .namespace_segments(vec!["test".to_string()]) + .component("service") + .build() + .unwrap() + .build_component() + .unwrap(); + + assert_eq!(component.namespace().segments(), &["test"]); + assert_eq!(component.name(), Some("service")); + } + + #[test] + fn test_descriptor_builder_validation() { + // Test empty namespace segments (validation happens in build_namespace) + let spec = DescriptorBuilder::default() + .namespace_segments(Vec::::new()) + .build() + .unwrap(); + let result = spec.build_namespace(); + assert!(result.is_err()); + + // Test invalid component name (validation happens in build_component) + let spec = DescriptorBuilder::default() + .namespace_segments(vec!["test".to_string()]) + .component("Invalid-Component!") + .build() + .unwrap(); + let result = spec.build_component(); + assert!(result.is_err()); + + // Test endpoint without component + let spec = DescriptorBuilder::default() + .namespace_segments(vec!["test".to_string()]) + .endpoint("http") + .build() + .unwrap(); + + let endpoint_result = spec.build_endpoint(); + assert!(endpoint_result.is_err()); + } + + #[test] + fn test_descriptor_builder_internal_namespace() { + let namespace = DescriptorBuilder::default() + .namespace_segments(vec!["_internal".to_string(), "oscar".to_string()]) + .internal(true) + .build() + .unwrap() + .build_namespace() + .unwrap(); + + assert!(namespace.is_internal()); + assert_eq!(namespace.segments(), &["_internal", "oscar"]); + } +} diff --git a/lib/runtime/src/v2/entity/mod.rs b/lib/runtime/src/v2/entity/mod.rs new file mode 100644 index 0000000000..763d7fcd5d --- /dev/null +++ b/lib/runtime/src/v2/entity/mod.rs @@ -0,0 +1,16 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Entity descriptor system for type-safe component identification +//! +//! This module provides descriptors that enforce entity relationships and naming +//! conventions at compile time, replacing string-based identities with structured +//! descriptor types. + +pub mod descriptor; +pub mod validation; + +pub use descriptor::{ + ComponentDescriptor, DescriptorBuilder, DescriptorError, EndpointDescriptor, EntityDescriptor, + InstanceDescriptor, InstanceType, NamespaceDescriptor, PathDescriptor, ToPath, +}; diff --git a/lib/runtime/src/v2/entity/validation.rs b/lib/runtime/src/v2/entity/validation.rs new file mode 100644 index 0000000000..f232a4deb7 --- /dev/null +++ b/lib/runtime/src/v2/entity/validation.rs @@ -0,0 +1,186 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Centralized validation functions for entity descriptors +//! +//! This module provides the core validation logic used across the entity descriptor +//! system. These validators are private to the entity module and accessed through +//! the public EntityDescriptor validation methods. + +use once_cell::sync::Lazy; +use regex::Regex; +use validator::{Validate, ValidationError}; + +/// Regex for identifier validation (components, endpoints, namespaces) +/// Allows: lowercase letters, digits, hyphens, underscores +static IDENTIFIER_REGEX: Lazy = Lazy::new(|| Regex::new(r"^[a-z0-9_-]+$").unwrap()); + +/// Regex for path segment validation (includes dots for object names) +/// Allows: lowercase letters, digits, hyphens, underscores, dots +static PATH_SEGMENT_REGEX: Lazy = Lazy::new(|| Regex::new(r"^[a-z0-9_.-]+$").unwrap()); + +/// Validate identifier names (components, endpoints, namespace segments) +/// Does not allow dots - stricter than path segments +pub(super) fn validate_identifier(name: &str) -> Result<(), ValidationError> { + if name.is_empty() { + return Err(ValidationError::new("empty_identifier")); + } + + if !IDENTIFIER_REGEX.is_match(name) { + return Err(ValidationError::new("invalid_identifier_chars")); + } + + Ok(()) +} + +/// Validate path segment names (object names, path extensions) +/// Allows dots for file-like names (e.g., "tokenizer.json", "model.bin") +pub(super) fn validate_path_segment(segment: &str) -> Result<(), ValidationError> { + if segment.is_empty() { + return Err(ValidationError::new("empty_path_segment")); + } + + if !PATH_SEGMENT_REGEX.is_match(segment) { + return Err(ValidationError::new("invalid_path_segment_chars")); + } + + Ok(()) +} + +/// Validate namespace segment with reserved prefix checking +pub(super) fn validate_namespace_segment( + segment: &str, + allow_internal: bool, +) -> Result<(), ValidationError> { + validate_identifier(segment)?; + + // Check for reserved _internal prefix + if segment.starts_with('_') && !allow_internal { + return Err(ValidationError::new("reserved_internal_prefix")); + } + + Ok(()) +} + +/// Validate that a collection is non-empty +pub(super) fn validate_non_empty_collection(collection: &[T]) -> Result<(), ValidationError> { + if collection.is_empty() { + return Err(ValidationError::new("empty_collection")); + } + Ok(()) +} + +/// Validate component name with length restrictions +pub(super) fn validate_component_name(name: &str) -> Result<(), ValidationError> { + validate_identifier(name)?; + + if name.len() > 63 { + return Err(ValidationError::new("component_name_too_long")); + } + + Ok(()) +} + +/// Validate endpoint name with length restrictions +pub(super) fn validate_endpoint_name(name: &str) -> Result<(), ValidationError> { + validate_identifier(name)?; + + if name.len() > 63 { + return Err(ValidationError::new("endpoint_name_too_long")); + } + + Ok(()) +} + +/// Validate object name for Oscar (allows dots, longer length limit) +pub(super) fn validate_object_name(name: &str) -> Result<(), ValidationError> { + validate_path_segment(name)?; + + if name.len() > 255 { + return Err(ValidationError::new("object_name_too_long")); + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_validate_identifier() { + // Valid identifiers + assert!(validate_identifier("valid").is_ok()); + assert!(validate_identifier("with-hyphens").is_ok()); + assert!(validate_identifier("with_underscores").is_ok()); + assert!(validate_identifier("with123numbers").is_ok()); + + // Invalid identifiers + assert!(validate_identifier("").is_err()); + assert!(validate_identifier("With-Capitals").is_err()); + assert!(validate_identifier("with.dots").is_err()); + assert!(validate_identifier("with spaces").is_err()); + assert!(validate_identifier("with/slash").is_err()); + } + + #[test] + fn test_validate_path_segment() { + // Valid path segments (includes dots) + assert!(validate_path_segment("valid").is_ok()); + assert!(validate_path_segment("with-hyphens").is_ok()); + assert!(validate_path_segment("with_underscores").is_ok()); + assert!(validate_path_segment("with.dots").is_ok()); + assert!(validate_path_segment("tokenizer.json").is_ok()); + assert!(validate_path_segment("model-v1.bin").is_ok()); + + // Invalid path segments + assert!(validate_path_segment("").is_err()); + assert!(validate_path_segment("With-Capitals").is_err()); + assert!(validate_path_segment("with spaces").is_err()); + assert!(validate_path_segment("with/slash").is_err()); + } + + #[test] + fn test_validate_namespace_segment() { + // Valid namespace segments (no internal prefix) + assert!(validate_namespace_segment("valid", false).is_ok()); + assert!(validate_namespace_segment("prod", false).is_ok()); + + // Invalid - internal prefix not allowed + assert!(validate_namespace_segment("_internal", false).is_err()); + assert!(validate_namespace_segment("_system", false).is_err()); + + // Valid - internal prefix allowed + assert!(validate_namespace_segment("_internal", true).is_ok()); + assert!(validate_namespace_segment("_system", true).is_ok()); + } + + #[test] + fn test_validate_non_empty_collection() { + assert!(validate_non_empty_collection(&["item"]).is_ok()); + assert!(validate_non_empty_collection(&Vec::::new()).is_err()); + } + + #[test] + fn test_validate_component_name() { + assert!(validate_component_name("api").is_ok()); + assert!(validate_component_name("a".repeat(63).as_str()).is_ok()); + assert!(validate_component_name(&"a".repeat(64)).is_err()); // Too long + } + + #[test] + fn test_validate_endpoint_name() { + assert!(validate_endpoint_name("http").is_ok()); + assert!(validate_endpoint_name(&"a".repeat(63)).is_ok()); + assert!(validate_endpoint_name(&"a".repeat(64)).is_err()); // Too long + } + + #[test] + fn test_validate_object_name() { + assert!(validate_object_name("tokenizer.json").is_ok()); + assert!(validate_object_name(&"a".repeat(255)).is_ok()); + assert!(validate_object_name(&"a".repeat(256)).is_err()); // Too long + assert!(validate_object_name("model.bin").is_ok()); + assert!(validate_object_name("config-v1_final.yaml").is_ok()); + } +} diff --git a/lib/runtime/src/v2/mod.rs b/lib/runtime/src/v2/mod.rs new file mode 100644 index 0000000000..1174e41469 --- /dev/null +++ b/lib/runtime/src/v2/mod.rs @@ -0,0 +1,21 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Dynamo Runtime v2 API +//! +//! This module provides the next generation of Dynamo runtime APIs, built around +//! entity descriptors for improved type safety, validation, and developer experience. +//! +//! The v2 API introduces: +//! - Strong compile-time guarantees about entity relationships +//! - Comprehensive validation with detailed error messages +//! - Type-safe descriptor system for components, endpoints, and instances +//! - Improved ergonomics through fluent builder patterns +//! - Forward compatibility for descriptor-based systems + +pub mod entity; + +pub use entity::{ + ComponentDescriptor, DescriptorBuilder, DescriptorError, EndpointDescriptor, + InstanceDescriptor, NamespaceDescriptor, PathDescriptor, +}; diff --git a/lib/runtime/tests/local_client.rs b/lib/runtime/tests/local_client.rs new file mode 100644 index 0000000000..0a4c40f536 --- /dev/null +++ b/lib/runtime/tests/local_client.rs @@ -0,0 +1,180 @@ +// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Tests for LocalClient functionality + +use dynamo_runtime::component::LocalClient; +use dynamo_runtime::engine::{AsyncEngine, AsyncEngineContextProvider, async_trait}; +use dynamo_runtime::pipeline::network::Ingress; +use dynamo_runtime::pipeline::{Context, ManyOut, ResponseStream, SingleIn}; +use dynamo_runtime::protocols::annotated::Annotated; +use dynamo_runtime::{DistributedRuntime, Runtime, distributed::DistributedConfig}; +use futures::StreamExt; +use std::sync::Arc; + +/// Simple test engine that echoes the input +struct EchoEngine; + +#[async_trait] +impl AsyncEngine, ManyOut>, anyhow::Error> for EchoEngine { + async fn generate( + &self, + request: SingleIn, + ) -> Result>, anyhow::Error> { + let response = Annotated { + data: Some((*request).clone()), + id: None, + event: None, + comment: None, + }; + let ctx = request.context(); + + // Create a simple stream that yields the response once + let stream = futures::stream::once(async move { response }); + Ok(ResponseStream::new(Box::pin(stream), ctx)) + } +} + +#[tokio::test] +async fn test_local_client_registration_and_retrieval() -> Result<(), Box> { + // Create runtime and DRT + let runtime = Runtime::from_current()?; + let config = DistributedConfig { + etcd_config: Default::default(), + nats_config: Default::default(), + is_static: true, + }; + let drt = DistributedRuntime::new(runtime, config).await?; + + // Create namespace, component, and endpoint + let namespace = drt.namespace("test-ns")?; + let component = namespace.component("test-component")?; + let service = component.service_builder().create().await?; + let endpoint = service.endpoint("test-endpoint"); + + // Create an engine and configure the endpoint with it + let engine: Arc, ManyOut>, anyhow::Error>> = + Arc::new(EchoEngine); + + // Wrap the engine in an Ingress to make it a PushWorkHandler + let ingress = Ingress::for_engine(engine)?; + + // Create the endpoint instance with the ingress as handler (setup phase) + let _endpoint_instance = endpoint + .endpoint_builder() + .handler(ingress) + .create() + .await?; + println!("Created endpoint instance with local engine registered"); + + // Create a LocalClient using the endpoint's convenience method + let local_client: LocalClient, ManyOut>, anyhow::Error> = + endpoint.local_client().await?; + + // Test the local client + let request = Context::new("Hello, LocalClient!".to_string()); + + let mut response_stream = local_client.generate(request).await?; + let response = response_stream.next().await.expect("Expected response"); + + assert_eq!(response.data, Some("Hello, LocalClient!".to_string())); + println!("LocalClient test passed: received '{:?}'", response); + + // Note: We don't need to start the endpoint for local client testing + // The engine is registered during create() and available for local access + + Ok(()) +} + +#[tokio::test] +async fn test_local_client_with_ingress() -> Result<(), Box> { + // Create runtime and DRT + let runtime = Runtime::from_current()?; + let config = DistributedConfig { + etcd_config: Default::default(), + nats_config: Default::default(), + is_static: true, + }; + let drt = DistributedRuntime::new(runtime, config).await?; + + // Create namespace, component, and endpoint + let namespace = drt.namespace("test-ns2")?; + let component = namespace.component("test-component2")?; + let service = component.service_builder().create().await?; + let endpoint = service.endpoint("test-endpoint2"); + + // Create an Ingress with an engine + let engine: Arc, ManyOut>, anyhow::Error>> = + Arc::new(EchoEngine); + let ingress = Ingress::for_engine(engine.clone())?; + + // Create the endpoint instance with the ingress as handler + let _endpoint_instance = endpoint + .endpoint_builder() + .handler(ingress) + .create() + .await?; + println!("Created endpoint instance with ingress and local engine registered"); + + // Now we can create a LocalClient using the convenience method + let local_client: LocalClient, ManyOut>, anyhow::Error> = + endpoint.local_client().await?; + + // Test the local client + let request = Context::new("Test with Ingress".to_string()); + + let mut response_stream = local_client.generate(request).await?; + let response = response_stream.next().await.expect("Expected response"); + + assert_eq!(response.data, Some("Test with Ingress".to_string())); + println!( + "LocalClient with Ingress test passed: received '{:?}'", + response + ); + + // Note: The engine is automatically registered during endpoint creation + + Ok(()) +} + +#[tokio::test] +async fn test_local_client_type_mismatch() -> Result<(), Box> { + // Create runtime and DRT + let runtime = Runtime::from_current()?; + let config = DistributedConfig { + etcd_config: Default::default(), + nats_config: Default::default(), + is_static: true, + }; + let drt = DistributedRuntime::new(runtime, config).await?; + + // Create namespace, component, and endpoint + let namespace = drt.namespace("test-ns3")?; + let component = namespace.component("test-component3")?; + let service = component.service_builder().create().await?; + let endpoint = service.endpoint("test-endpoint3"); + + // Create an endpoint instance with a String engine + let engine: Arc, ManyOut>, anyhow::Error>> = + Arc::new(EchoEngine); + let ingress = Ingress::for_engine(engine)?; + let _endpoint_instance = endpoint + .endpoint_builder() + .handler(ingress) + .create() + .await?; + println!("Created endpoint with String engine"); + + // Try to create a LocalClient with different types (this should fail) + type TestLocalClient = LocalClient, ManyOut>, anyhow::Error>; + let result: Result = endpoint.local_client().await; + + assert!(result.is_err(), "Expected type mismatch error"); + if let Err(e) = result { + println!("Got expected error for type mismatch: {}", e); + } + + // Note: The engine is automatically registered during endpoint creation + + Ok(()) +} diff --git a/lib/runtime/tests/local_client_simple.rs b/lib/runtime/tests/local_client_simple.rs new file mode 100644 index 0000000000..4032dddb0a --- /dev/null +++ b/lib/runtime/tests/local_client_simple.rs @@ -0,0 +1,131 @@ +// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Simple test to demonstrate LocalClient functionality + +use dynamo_runtime::component::LocalClient; +use dynamo_runtime::engine::{AsyncEngine, AsyncEngineContextProvider, async_trait}; +use dynamo_runtime::pipeline::network::Ingress; +use dynamo_runtime::pipeline::{Context, ManyOut, ResponseStream, SingleIn}; +use dynamo_runtime::protocols::annotated::Annotated; +use dynamo_runtime::{DistributedRuntime, Runtime, distributed::DistributedConfig}; +use futures::StreamExt; +use std::sync::Arc; + +/// Simple test engine that echoes strings +struct SimpleEchoEngine; + +#[async_trait] +impl AsyncEngine, ManyOut>, anyhow::Error> for SimpleEchoEngine { + async fn generate( + &self, + request: SingleIn, + ) -> Result>, anyhow::Error> { + let response = Annotated { + data: Some(format!("Echo: {}", *request)), + id: None, + event: None, + comment: None, + }; + let context = request.context(); + + // Create a simple stream that yields the response once + let stream = futures::stream::once(async move { response }); + Ok(ResponseStream::new(Box::pin(stream), context)) + } +} + +#[tokio::test] +async fn test_local_client_basic() -> Result<(), Box> { + // Create runtime and DRT + let runtime = Runtime::from_current()?; + + let config = DistributedConfig { + etcd_config: Default::default(), + nats_config: Default::default(), + is_static: true, + }; + + let drt = DistributedRuntime::new(runtime, config).await?; + + // Create namespace, component, and endpoint + let namespace = drt.namespace("test-ns")?; + let component = namespace.component("test-component")?; + let service = component.service_builder().create().await?; + let endpoint = service.endpoint("test-endpoint"); + + // Create an engine and configure the endpoint with it + let engine: Arc, ManyOut>, anyhow::Error>> = + Arc::new(SimpleEchoEngine); + + // Wrap the engine in an Ingress to make it a PushWorkHandler + let ingress = Ingress::for_engine(engine)?; + + // Create the endpoint instance with the ingress as handler (setup phase) + let _endpoint_instance = endpoint + .endpoint_builder() + .handler(ingress) + .create() + .await?; + println!("✓ Created endpoint instance with local engine registered"); + + // Create a LocalClient using the endpoint's convenience method + let local_client: LocalClient, ManyOut>, anyhow::Error> = + endpoint.local_client().await?; + println!("✓ Created LocalClient successfully"); + + // Test the local client with context + let request = Context::new("Hello, LocalClient!".to_string()); + + let mut response_stream = local_client.generate(request).await?; + let response = response_stream.next().await.expect("Expected response"); + assert_eq!(response.data, Some("Echo: Hello, LocalClient!".to_string())); + println!("✓ LocalClient test passed: received '{:?}'", response.data); + + // Note: We don't need to start the endpoint for local client testing + // The engine is registered during create() and available for local access + + Ok(()) +} + +#[tokio::test] +async fn test_local_client_type_safety() -> Result<(), Box> { + // Create runtime and DRT + let runtime = Runtime::from_current()?; + + let config = DistributedConfig { + etcd_config: Default::default(), + nats_config: Default::default(), + is_static: true, + }; + + let drt = DistributedRuntime::new(runtime, config).await?; + + // Create namespace, component, and endpoint + let namespace = drt.namespace("test-ns2")?; + let component = namespace.component("test-component2")?; + let service = component.service_builder().create().await?; + let endpoint = service.endpoint("test-endpoint2"); + + // Create an endpoint instance with a String engine + let engine: Arc, ManyOut>, anyhow::Error>> = + Arc::new(SimpleEchoEngine); + let ingress = Ingress::for_engine(engine)?; + let _endpoint_instance = endpoint + .endpoint_builder() + .handler(ingress) + .create() + .await?; + println!("✓ Created endpoint with String engine"); + + // Try to create a LocalClient with different types (this should fail) + type TestLocalClient = LocalClient, ManyOut>, anyhow::Error>; + let result: Result = endpoint.local_client().await; + assert!(result.is_err(), "Expected type mismatch error"); + + if let Err(e) = result { + println!("✓ Got expected error for type mismatch: {}", e); + } + + Ok(()) +}