Skip to content

Conversation

@ryanolson
Copy link
Contributor

@ryanolson ryanolson commented Sep 16, 2025

Summary

Implements LocalClient for direct, in-process engine access without network overhead. Enables type-safe engine invocation for testing and local development scenarios where network latency is not desired.

Key Technical Changes

1. StreamData Trait for Non-Sync Response Types

  • Problem: ManyOut<T> streams don't satisfy Sync requirement for local registry storage
  • Solution: New StreamData trait requiring only Send + 'static (not Sync)
  • Implementation: StreamAnyEngineWrapper provides type erasure for heterogeneous storage
  • Benefit: Enables streaming responses in local client scenarios
// New trait for stream-compatible responses
pub trait StreamData: Send + 'static {}
impl<T: Send + 'static> StreamData for T {}

// Type-erased wrapper for storage
pub struct StreamAnyEngineWrapper<Req, Resp, E> {
    engine: Arc<dyn AsyncEngine<Req, Resp, E>>,
}

2. LocalClient Implementation

  • Type-safe downcasting: Generic client with proper error handling for mismatched types
  • Direct invocation: Bypasses network stack (etcd/NATS) for zero-latency calls
  • Automatic registration: Engines registered during endpoint lifecycle management
  • Registry lookup: Efficient HashMap-based engine resolution using entity descriptors
// Usage example
let local_client: LocalClient<SingleIn<i32>, ManyOut<Annotated<i32>>, anyhow::Error> = 
    endpoint.local_client().await?;
let response = local_client.generate(request).await?;

3. Endpoint Lifecycle Management

  • Resource leak fix: Moved local engine registration from create() to start() to prevent leaks when EndpointInstance is dropped without starting
  • Background execution: Added start_background() methods using CriticalTaskExecutionHandle for proper cleanup and failure propagation
  • Graceful shutdown: Background endpoints register with graceful shutdown tracker and unregister on completion

4. Internal Namespace Handling

  • Validation fix: Added conditional logic to use NamespaceDescriptor::new_internal() for namespaces starting with "_"
  • Entity descriptors: Runtime v2 descriptors provide fluent builders with proper validation for namespaces, components, and endpoints

5. Health Check Integration

  • Endpoint-specific notifications: Added health check timer reset notifications when streams complete
  • SystemHealth integration: Proper registration and cleanup of health check targets

Architecture

┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│   LocalClient   │────│  Local Registry  │────│  Actual Engine  │
│  (Type-safe)    │    │   (HashMap)      │    │  (In-process)   │
└─────────────────┘    └──────────────────┘    └─────────────────┘
         │                        │                       │
         │                        │                       │
         ▼                        ▼                       ▼
   Direct method call      Fast lookup           Zero network overhead

Files Changed

Core Implementation

  • lib/runtime/src/engine.rs - StreamData trait and StreamAnyEngineWrapper
  • lib/runtime/src/component/local_client.rs - LocalClient with type-safe downcasting
  • lib/runtime/src/component/endpoint.rs - Lifecycle management and resource leak fixes
  • lib/runtime/src/distributed.rs - Local registry and background endpoint tracking

Integration Points

  • lib/runtime/src/pipeline/network/ingress/push_handler.rs - Engine registration and health check notifications
  • lib/runtime/src/pipeline/network.rs - PushWorkHandler trait extensions
  • lib/runtime/src/v2/entity.rs - Runtime v2 entity descriptors with validation

Examples and Tests

  • examples/local_client_demo.rs - Comprehensive demo with proper Annotated struct usage
  • lib/runtime/src/component/local_client/tests.rs - Type-safety and error handling tests

Testing Strategy

Automated Tests

  • Type safety: Verified LocalClient rejects mismatched engine types
  • Stream handling: Tested ManyOut<Annotated<T>> with proper field usage (data, id, event, comment)
  • Error propagation: Validated proper error handling throughout the stack
  • Registry integration: Confirmed engines are properly registered and accessible

Manual Verification

  • Clippy compliance: All warnings resolved (unused variables, complex types)
  • Resource cleanup: Verified no leaks when EndpointInstance is dropped
  • Background execution: Confirmed proper cleanup with CriticalTaskExecutionHandle

Migration Notes

For Existing Code

  • Existing endpoint usage remains unchanged
  • LocalClient is opt-in via endpoint.local_client().await?
  • Per-service toggle available to disable local registry if needed

Performance Considerations

  • Zero network latency: Direct in-process calls
  • Minimal overhead: HashMap lookup + type casting
  • Memory efficient: Shared Arc references, no data copying

Addresses Review Feedback

  1. Resource leaks: Fixed by moving registration from create() to start()
  2. Internal namespace handling: Added proper validation for "_" prefixed namespaces
  3. Documentation coverage: Comprehensive docs added (>80% coverage target)
  4. Background execution: Implemented using CriticalTaskExecutionHandle pattern
  5. Health check integration: Proper endpoint-specific timer reset notifications

Risk Assessment

  • Low risk: Additive feature, doesn't modify existing network paths
  • Type safety: Compile-time guarantees prevent runtime type errors
  • Graceful degradation: Falls back to network calls if local engine unavailable
  • Resource management: RAII patterns ensure proper cleanup

- Remove unnecessary clippy allow annotation
- Use is_some_and which is available in Rust 1.70+ (we're on 1.89)
- Clean up code to follow clippy recommendations
- Add v2::entity module for type-safe entity descriptors
- Implement LocalClient for bypassing network overhead
- Add local engine registry to DistributedRuntime
- Auto-register engines during endpoint creation
- Add comprehensive tests and examples

This implementation allows direct local engine invocation without the
overhead of etcd watching, instance discovery, or network layers. Engines
are automatically registered when endpoints are created and can be
accessed via LocalClient using the v2::entity descriptor system for
consistent addressing.
- Fix namespace hierarchy extraction to walk parent chain properly
- Make registry methods internal (pub(crate)) for better encapsulation
- Add ServiceBuilder support with with_local_registry/without_local_registry
- Check enable_local_registry flag during endpoint creation (enabled by default)
- Add endpoint.local_client() convenience method for consistency
- Update tests to use new API without direct registry access
- Fix duplicate namespace hierarchy helper functions

This improves the LocalClient API to be more consistent with existing
patterns while properly handling hierarchical namespaces and providing
better encapsulation of internal implementation details.
…lient encapsulation

- Split EndpointConfigBuilder into create() and start() phases
- create() handles setup including local engine registration
- start() handles network serving
- Maintain backwards compatibility with existing start() -> create().start()
- Remove public register_local_engine function, now internal
- Fix namespace hierarchy extraction to properly walk parent chains
- Update tests to use new create/start pattern (tests need further API alignment)

The core implementation compiles successfully, though tests need additional work
to align with current pipeline API requirements.

Signed-off-by: Ryan Olson <[email protected]>
This change addresses trait bound conflicts in the local client system by introducing
a new StreamData trait that only requires Send (not Sync) for response types.

Key changes:
- Add StreamData trait for response types that don't need Sync
- Create StreamAnyEngineWrapper for type erasure with StreamData responses
- Update LocalClient to use StreamData constraint for response types
- Add downcast_stream() method for StreamData-compatible downcasting
- Fix clippy warnings about complex types and unused variables
- Rewrite local_client_demo.rs to use new endpoint builder pattern

This allows ManyOut<T> types (Pin<Box<dyn AsyncEngineStream<T>>>) to work
properly with the local engine registry, since streams are consumed
sequentially and don't need to be shared between threads.

Signed-off-by: Ryan Olson <[email protected]>
@ryanolson
Copy link
Contributor Author

/ok to test f3d24a9

@ryanolson
Copy link
Contributor Author

@coderabbitai full review

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 18, 2025

✅ Actions performed

Full review triggered.

@ryanolson ryanolson marked this pull request as ready for review September 18, 2025 07:46
@ryanolson ryanolson requested a review from a team as a code owner September 18, 2025 07:46
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 18, 2025

Walkthrough

Adds a per-endpoint LocalClient for in-process engine invocation, a local engine registry in DistributedRuntime, and endpoint lifecycle refactors to register/unregister local engines. Extends engine type-erasure to support streaming responses. Introduces a v2 descriptor system (namespace/component/endpoint/instance). Adds service-level toggles, example, and tests.

Changes

Cohort / File(s) Summary
Local client and registration flow
lib/runtime/src/component.rs, lib/runtime/src/component/local_client.rs, lib/runtime/src/component/endpoint.rs, lib/runtime/src/distributed.rs, lib/runtime/src/pipeline/network.rs, lib/runtime/src/pipeline/network/ingress/push_handler.rs
Adds LocalClient type and Endpoint::local_client(); registers engines in a new in-process registry; refactors endpoint startup via EndpointConfig/EndpointInstance with local-engine register/unregister; Ingress exposes typed and type-erased engines; PushWorkHandler can surface AnyAsyncEngine.
Engine type-erasure (stream support)
lib/runtime/src/engine.rs, lib/runtime/src/traits.rs
Adds StreamData and AsStreamAnyAsyncEngine; extends AnyAsyncEngine downcast with downcast_stream; enables storing/retrieving engines with streaming responses; minor import additions.
Runtime scaffolding and v2 exposure
lib/runtime/src/lib.rs, lib/runtime/src/v2/mod.rs, lib/runtime/src/v2/entity/mod.rs, lib/runtime/src/v2/entity/descriptor.rs, lib/runtime/src/v2/entity/validation.rs
Exposes v2 module; implements descriptor system (Namespace/Component/Endpoint/Instance/Path) with builders, validation, and conversions; adds local_engines field to DistributedRuntime.
Service-level configuration
lib/runtime/src/component/service.rs
Introduces per-service enable_local_registry flag (default true) with builder methods; wires through service creation and registry maps.
Examples and tests
lib/runtime/examples/local_client_demo.rs, lib/runtime/tests/local_client.rs, lib/runtime/tests/local_client_simple.rs
Adds example demonstrating LocalClient with SimpleEchoEngine; adds tests covering registration, retrieval, invocation, and type-mismatch behavior.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Dev as Developer
  participant RT as Runtime/DRT
  participant EP as EndpointConfig/Instance
  participant REG as Local Engine Registry
  participant ING as Ingress (PushWorkHandler)

  Note over Dev,EP: Endpoint creation with local engine
  Dev->>EP: build EndpointConfig::from_endpoint(...)
  Dev->>ING: Ingress::for_engine(engine.clone())
  EP->>REG: register_local_engine(key, engine as AnyAsyncEngine)
  EP->>RT: create service endpoint (lease, etcd registration)
  Note over EP,REG: On shutdown: unregister_local_engine(key)
Loading
sequenceDiagram
  autonumber
  participant Client as LocalClient<Req,Resp,E>
  participant EP as Endpoint
  participant REG as Local Engine Registry
  participant ENG as AsyncEngine<Req,Resp,E>

  Note over Client,EP: Local client acquisition and call
  Client->>EP: Endpoint::local_client()
  EP->>REG: get_local_engine(key)
  REG-->>EP: Arc<dyn AnyAsyncEngine>
  EP-->>Client: LocalClient (downcast_stream to concrete types)

  Client->>ENG: generate(request)
  ENG-->>Client: Resp (e.g., ResponseStream)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related PRs

Poem

A rabbit taps the local line, so near, so fleet, so fine.
No network winds to slow the hop—just keys that softly pop.
Descriptors map the meadow’s trails,
Streams flow by with tidy tales.
Echoes bounce—thump-thump!—in place;
Ship it fast, with whiskered grace. 🐇✨

Pre-merge checks

❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 75.16% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
Description Check ⚠️ Warning The pull request description provides an extensive technical overview but does not adhere to the repository’s required template, as it uses custom headings like “## Summary” and “## Key Technical Changes” and omits mandatory sections such as “#### Overview”, “#### Where should the reviewer start?”, and “#### Related Issues:”. Please restructure the PR description to follow the template exactly by adding the “#### Overview” section with a concise summary, the “#### Details” section listing specific changes, a “#### Where should the reviewer start?” section highlighting key files, and a “#### Related Issues:” section referencing relevant issue numbers.
✅ Passed checks (1 passed)
Check name Status Explanation
Title Check ✅ Passed The title "feat: local client" succinctly describes the primary addition of the LocalClient feature and follows a clear, conventional style to highlight the main change in the pull request.

Tip

👮 Agentic pre-merge checks are now available in preview!

Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.

  • Built-in checks – Quickly apply ready-made checks to enforce title conventions, require pull request descriptions that follow templates, validate linked issues for compliance, and more.
  • Custom agentic checks – Define your own rules using CodeRabbit’s advanced agentic capabilities to enforce organization-specific policies and workflows. For example, you can instruct CodeRabbit’s agent to verify that API documentation is updated whenever API schema files are modified in a PR. Note: Upto 5 custom checks are currently allowed during the preview period. Pricing for this feature will be announced in a few weeks.

Please see the documentation for more information.

Example:

reviews:
  pre_merge_checks:
    custom_checks:
      - name: "Undocumented Breaking Changes"
        mode: "warning"
        instructions: |
          Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).

Please share your feedback with us on this Discord post.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (19)
lib/runtime/src/traits.rs (2)

19-21: Remove unused imports (will warn/fail under deny(unused_imports)).

AnyAsyncEngine and Arc aren’t referenced in this file.

Apply:

-use crate::engine::AnyAsyncEngine;
-use std::sync::Arc;

22-22: Fix doc typo.

“proivde” → “provide”.

-/// A trait for objects that proivde access to the [Runtime]
+/// A trait for objects that provide access to the [Runtime]
lib/runtime/src/lib.rs (1)

55-55: Consider feature‑gating v2 while it stabilizes.

If this surface is still in flux, hide behind #[cfg(feature = "v2")] to control exposure.

- pub mod v2;
+ #[cfg(feature = "v2")]
+ pub mod v2;
lib/runtime/src/pipeline/network/ingress/push_handler.rs (1)

141-159: LGTM, with a small caching improvement.

After successful conversion, cache the type‑erased engine to avoid repeated as_stream_any() work.

-        if let Some(service_engine) = Ingress::engine(self) {
+        if let Some(service_engine) = Ingress::engine(self) {
             // Try to convert using StreamData-compatible wrapper
-            if let Some(stream_engine) = service_engine.as_stream_any() {
-                return Some(stream_engine);
+            if let Some(stream_engine) = service_engine.as_stream_any() {
+                // Cache for future calls if API available
+                if let Some(setter) = Some(()) { /* placeholder for presence check */ }
+                {
+                    // If Ingress exposes set_any_engine, use it:
+                    // Ingress::set_any_engine(self, stream_engine.clone());
+                }
+                return Some(stream_engine);
             }
         }
lib/runtime/src/distributed.rs (2)

324-333: Guard against duplicate registrations or return the previous entry.

Today this silently overwrites. Either reject duplicates or return the replaced engine for callers to dispose.

-    pub(crate) async fn register_local_engine(
-        &self,
-        key: String,
-        engine: Arc<dyn crate::engine::AnyAsyncEngine>,
-    ) -> Result<()> {
-        let mut engines = self.local_engines.lock().await;
-        engines.insert(key, engine);
-        Ok(())
-    }
+    pub(crate) async fn register_local_engine(
+        &self,
+        key: String,
+        engine: Arc<dyn crate::engine::AnyAsyncEngine>,
+    ) -> Result<Option<Arc<dyn crate::engine::AnyAsyncEngine>>> {
+        let mut engines = self.local_engines.lock().await;
+        Ok(engines.insert(key, engine))
+    }

If you prefer rejection:

-        let mut engines = self.local_engines.lock().await;
-        engines.insert(key, engine);
-        Ok(())
+        let mut engines = self.local_engines.lock().await;
+        if engines.contains_key(&key) {
+            anyhow::bail!("local engine already registered for key: {key}");
+        }
+        engines.insert(key, engine);
+        Ok(())

344-351: Consider lightweight logging for observability.

Add trace logs on register/get/unregister with key to aid debugging.

-        let engines = self.local_engines.lock().await;
+        let engines = self.local_engines.lock().await;
+        tracing::trace!("get_local_engine: key={}", key);

(similar for register/unregister)

lib/runtime/src/v2/mod.rs (1)

18-21: Re‑export InstanceType and ToPath for ergonomics.

They’re exposed under entity::, but commonly needed alongside the other descriptors.

 pub use entity::{
-    ComponentDescriptor, DescriptorBuilder, DescriptorError, EndpointDescriptor,
-    InstanceDescriptor, NamespaceDescriptor, PathDescriptor,
+    ComponentDescriptor, DescriptorBuilder, DescriptorError, EndpointDescriptor,
+    InstanceDescriptor, InstanceType, NamespaceDescriptor, PathDescriptor, ToPath,
 };
lib/runtime/examples/local_client_demo.rs (2)

6-13: Drop unused import.

AsyncEngineContextProvider isn’t used.

-use dynamo_runtime::engine::{AsyncEngine, AsyncEngineContextProvider, async_trait};
+use dynamo_runtime::engine::{AsyncEngine, async_trait};

71-76: Minor: simplify type annotation.

Let type inference handle the trait object to reduce verbosity.

-    let engine: Arc<dyn AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, anyhow::Error>> =
-        Arc::new(SimpleEchoEngine);
+    let engine = Arc::new(SimpleEchoEngine)
+        as Arc<dyn AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, anyhow::Error>>;
lib/runtime/tests/local_client_simple.rs (1)

6-13: Drop unused import.

AsyncEngineContextProvider isn’t used.

-use dynamo_runtime::engine::{AsyncEngine, AsyncEngineContextProvider, async_trait};
+use dynamo_runtime::engine::{AsyncEngine, async_trait};
lib/runtime/src/pipeline/network.rs (1)

287-290: Consider documenting the purpose of these fields.

While the inline comment mentions "local registry", it would be helpful to add doc comments explaining when and how these fields are populated and used.

-    // Store the original engine for local registry
-    engine: OnceLock<ServiceEngine<Req, Resp>>,
-    // Store type-erased engine for local registry
-    any_engine: OnceLock<Arc<dyn crate::engine::AnyAsyncEngine>>,
+    /// The original typed engine, stored for local registry access.
+    /// Set when the Ingress is created via `for_engine`.
+    engine: OnceLock<ServiceEngine<Req, Resp>>,
+    /// Type-erased version of the engine for dynamic dispatch.
+    /// Can be set via `set_any_engine` when proper trait bounds are available.
+    any_engine: OnceLock<Arc<dyn crate::engine::AnyAsyncEngine>>,
lib/runtime/src/component/local_client.rs (1)

81-95: Remove duplicate get_namespace_hierarchy in local_client.rs

local_client.rs contains two identical implementations — an impl method at lines 82–95 (used via Self::get_namespace_hierarchy) and an unused module-level copy at lines 114–127; remove the module-level copy (or consolidate with the helper in lib/runtime/src/component/endpoint.rs:23).

lib/runtime/src/engine.rs (4)

210-224: Docs out of sync with trait bounds (Resp no longer requires Data).

The comment still says “Resp: Data + AsyncEngineContextProvider,” but the bound is only AsyncEngineContextProvider. Also a small grammar nit in “is does not need.”

Apply:

-/// - `Resp`: The response type that implements both `Data` and `AsyncEngineContextProvider`
+/// - `Resp`: The response type that implements `AsyncEngineContextProvider`.
+///   Use `AsAnyAsyncEngine` for `Resp: Data` and `AsStreamAnyAsyncEngine` for non-`Sync` responses.
-/// The synchronous Engine version is does not need to be awaited.
+/// The synchronous Engine version does not need to be awaited.

425-456: as_stream_any need not return Option.

It always returns Some(...). Consider simplifying the API to return Arc<dyn AnyAsyncEngine>.

-pub trait AsStreamAnyAsyncEngine {
+pub trait AsStreamAnyAsyncEngine {
     /// Converts a typed `AsyncEngine` with StreamData response into a type-erased `AnyAsyncEngine`.
     fn into_stream_any_engine(self) -> Arc<dyn AnyAsyncEngine>;

     /// Converts a typed `AsyncEngine` with StreamData response into a type-erased `AnyAsyncEngine`.
     /// This is the non-consuming version of into_stream_any_engine.
-    fn as_stream_any(&self) -> Option<Arc<dyn AnyAsyncEngine>>;
+    fn as_stream_any(&self) -> Arc<dyn AnyAsyncEngine>;
 }
@@
-    fn as_stream_any(&self) -> Option<Arc<dyn AnyAsyncEngine>> {
-        Some(Arc::new(StreamAnyEngineWrapper {
+    fn as_stream_any(&self) -> Arc<dyn AnyAsyncEngine> {
+        Arc::new(StreamAnyEngineWrapper {
             engine: self.clone(),
             _phantom: PhantomData,
-        }))
+        })
     }

487-496: Add tests for the new downcast_stream path.

Great addition. Please add success/failure tests analogous to the existing downcast test.

Minimal sketch:

#[tokio::test]
async fn test_stream_downcast() {
    // Define a Resp that is Send + 'static but not Sync (simulate), implement AsyncEngineContextProvider
    // Implement a mock AsyncEngine<Req, Resp, E>, erase with into_stream_any_engine(), then assert:
    // - any.downcast_stream::<Req, Resp, E>().is_some()
    // - any.downcast::<Req, Resp, E>().is_none()
}

Also applies to: 517-534


498-516: Deduplicate identical downcast bodies.

Both impls share identical TypeId checks and cloning. Consider factoring into a small internal helper to reduce duplication.

Also applies to: 517-534

lib/runtime/src/component/endpoint.rs (1)

88-95: Avoid panicking on poisoned stats handler mutex.

lock().unwrap() can panic. Consider handling the error or using a non-poisoning mutex (e.g., parking_lot).

-            handler_map
-                .lock()
-                .unwrap()
-                .insert(self.endpoint.subject_to(self.lease_id), stats_handler);
+            if let Ok(mut map) = handler_map.lock() {
+                map.insert(self.endpoint.subject_to(self.lease_id), stats_handler);
+            } else {
+                tracing::warn!("Stats handler registry mutex poisoned; skipping insert");
+            }
lib/runtime/src/v2/entity/validation.rs (1)

24-34: Validation errors lack offending value context.

Attaching the invalid input to ValidationError params improves downstream error mapping.

-    if !IDENTIFIER_REGEX.is_match(name) {
-        return Err(ValidationError::new("invalid_identifier_chars"));
+    if !IDENTIFIER_REGEX.is_match(name) {
+        let mut err = ValidationError::new("invalid_identifier_chars");
+        err.add_param(std::borrow::Cow::Borrowed("value"), &name.to_string());
+        return Err(err);
     }

Apply similarly for other validators so higher layers can create precise errors.

lib/runtime/src/v2/entity/descriptor.rs (1)

73-88: Map validator codes to typed DescriptorError variants.

You already define rich variants (InvalidComponentName, etc.), but the From<ValidationError> currently flattens to a generic ValidationError message. Prefer emitting the typed variants to enable precise matching upstream.

 impl From<validator::ValidationError> for DescriptorError {
     fn from(err: validator::ValidationError) -> Self {
-        let message = match err.code.as_ref() {
-            "empty_identifier" => "Identifier cannot be empty".to_string(),
-            "invalid_identifier_chars" => "Identifier contains invalid characters. Only lowercase letters, numbers, hyphens, and underscores are allowed".to_string(),
-            "empty_path_segment" => "Path segment cannot be empty".to_string(),
-            "invalid_path_segment_chars" => "Path segment contains invalid characters. Only lowercase letters, numbers, hyphens, underscores, and dots are allowed".to_string(),
-            "reserved_internal_prefix" => "Names starting with '_' are reserved for internal use".to_string(),
-            "empty_collection" => "Collection cannot be empty".to_string(),
-            "component_name_too_long" => "Component name too long (max 63 characters)".to_string(),
-            "endpoint_name_too_long" => "Endpoint name too long (max 63 characters)".to_string(),
-            _ => format!("Validation error: {}", err.code),
-        };
-        DescriptorError::ValidationError { message }
+        match err.code.as_ref() {
+            "invalid_namespace_segment" | "reserved_internal_prefix" | "empty_identifier" | "invalid_identifier_chars" => {
+                DescriptorError::InvalidNamespaceSegment { segment: "<unknown>".into() }
+            }
+            "invalid_component_name" | "component_name_too_long" => {
+                DescriptorError::InvalidComponentName { name: "<unknown>".into() }
+            }
+            "invalid_endpoint_name" | "endpoint_name_too_long" => {
+                DescriptorError::InvalidEndpointName { name: "<unknown>".into() }
+            }
+            "invalid_path_segment" | "invalid_path_segment_chars" | "empty_path_segment" => {
+                DescriptorError::InvalidPathSegment { segment: "<unknown>".into() }
+            }
+            _ => DescriptorError::ValidationError { message: err.code.to_string() },
+        }
     }
 }

Note: If you adopt the suggestion in validation.rs to attach the offending value in err.params["value"], you can populate the real strings here instead of "<unknown>".

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between aa80ac4 and f3d24a9.

📒 Files selected for processing (17)
  • lib/runtime/examples/local_client_demo.rs (1 hunks)
  • lib/runtime/src/component.rs (4 hunks)
  • lib/runtime/src/component/endpoint.rs (6 hunks)
  • lib/runtime/src/component/local_client.rs (1 hunks)
  • lib/runtime/src/component/service.rs (2 hunks)
  • lib/runtime/src/distributed.rs (2 hunks)
  • lib/runtime/src/engine.rs (6 hunks)
  • lib/runtime/src/lib.rs (3 hunks)
  • lib/runtime/src/pipeline/network.rs (4 hunks)
  • lib/runtime/src/pipeline/network/ingress/push_handler.rs (1 hunks)
  • lib/runtime/src/traits.rs (1 hunks)
  • lib/runtime/src/v2/entity/descriptor.rs (1 hunks)
  • lib/runtime/src/v2/entity/mod.rs (1 hunks)
  • lib/runtime/src/v2/entity/validation.rs (1 hunks)
  • lib/runtime/src/v2/mod.rs (1 hunks)
  • lib/runtime/tests/local_client.rs (1 hunks)
  • lib/runtime/tests/local_client_simple.rs (1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-07-14T21:25:56.930Z
Learnt from: ryanolson
PR: ai-dynamo/dynamo#1919
File: lib/runtime/src/engine.rs:168-168
Timestamp: 2025-07-14T21:25:56.930Z
Learning: The AsyncEngineContextProvider trait in lib/runtime/src/engine.rs was intentionally changed from `Send + Sync + Debug` to `Send + Debug` because the Sync bound was overly constraining. The trait should only require Send + Debug as designed.

Applied to files:

  • lib/runtime/src/traits.rs
  • lib/runtime/src/lib.rs
  • lib/runtime/src/engine.rs
📚 Learning: 2025-09-11T03:24:47.820Z
Learnt from: kthui
PR: ai-dynamo/dynamo#3004
File: lib/runtime/src/pipeline/network/ingress/push_handler.rs:271-277
Timestamp: 2025-09-11T03:24:47.820Z
Learning: In lib/runtime/src/pipeline/network/ingress/push_handler.rs, the maintainer prefers to keep the existing error comparison logic using format!("{:?}", err) == STREAM_ERR_MSG unchanged until proper error types are implemented, even though it has technical debt. Avoid suggesting changes to working legacy code that will be refactored later.

Applied to files:

  • lib/runtime/src/pipeline/network/ingress/push_handler.rs
🔇 Additional comments (33)
lib/runtime/src/v2/entity/mod.rs (1)

1-16: Solid public re‑exports.

Surface looks coherent and self‑contained. No issues.

lib/runtime/tests/local_client_simple.rs (1)

121-129: Nice negative test.

Good assertion for type mismatch behavior via runtime downcast.

lib/runtime/src/distributed.rs (1)

98-99: Approve — local engine registry verified; unregister called on endpoint teardown.
Arc<Mutex<...>> ownership and pub(crate) visibility are appropriate. register_local_engine / get_local_engine / unregister_local_engine are defined in lib/runtime/src/distributed.rs (lines 325, 336, 345). Endpoint registers at lib/runtime/src/component/endpoint.rs:340 and unregisters on teardown at lib/runtime/src/component/endpoint.rs:194; local_client retrieves via get_local_engine at lib/runtime/src/component/local_client.rs:45.

lib/runtime/src/lib.rs (1)

280-283: No change required — AnyAsyncEngine is explicitly Send + Sync.
AnyAsyncEngine is declared pub trait AnyAsyncEngine: Send + Sync (lib/runtime/src/engine.rs:303); usage with Arc<dyn AnyAsyncEngine> in lib/runtime/src/lib.rs:282 and the pipeline accessors (lib/runtime/src/pipeline/network.rs:360–366) is consistent — no refactor needed.

lib/runtime/src/component.rs (5)

67-67: LGTM!

The addition of the local_client module follows the existing module organization pattern.


73-73: LGTM!

The public re-export of LocalClient provides appropriate API access for users.


92-94: LGTM!

The service-level registry control map service_enable_local_registry follows the existing pattern for service configuration tracking.


440-450: LGTM!

The local_client method provides a clean API for creating per-endpoint local clients with appropriate trait bounds for request/response types and error handling.


148-150: Incorrect file reference — validate or document enable_local_registry precedence

enable_local_registry is defined in lib/runtime/src/component/service.rs (builder default = true; with_local_registry/without_local_registry) and read per-service in lib/runtime/src/component/endpoint.rs (reads registry.service_enable_local_registry and unwrap_or(true)). The original comment referenced lib/runtime/src/component.rs — update that reference and either add validation at service creation to detect/resolve conflicting settings or explicitly document that the service-level setting controls endpoint behavior. Files: lib/runtime/src/component/service.rs (builder/create) and lib/runtime/src/component/endpoint.rs (per-service read).

Likely an incorrect or invalid review comment.

lib/runtime/src/component/service.rs (4)

41-44: LGTM!

The service-level enable_local_registry flag with a sensible default of true provides fine-grained control over local engine registry behavior.


47-57: LGTM!

The builder methods with_local_registry and without_local_registry provide a clear and intuitive API for controlling the local registry behavior.


61-61: LGTM!

Properly extracting enable_local_registry from the dissolved configuration.


113-118: LGTM!

The service_name.clone() usage correctly avoids move issues, and the registry insertion properly stores the flag for later reference.

lib/runtime/src/pipeline/network.rs (4)

298-299: LGTM!

Properly initializing the new OnceLock fields in the constructor.


336-344: LGTM!

Correctly cloning the engine before moving it into the backend, then storing the original for local registry access.


354-367: LGTM!

The accessor methods for the engine fields are well-structured and follow Rust conventions for optional values.


381-384: LGTM!

The trait method with a sensible default implementation allows optional exposure of type-erased engines without breaking existing implementations.

lib/runtime/tests/local_client.rs (4)

16-36: LGTM!

The EchoEngine implementation is clean and correctly implements the AsyncEngine trait for testing purposes.


38-87: Test properly validates LocalClient registration and retrieval.

The test comprehensively validates:

  1. Engine registration through the Ingress wrapper
  2. LocalClient creation via the endpoint's convenience method
  3. Correct data flow through the local client

Good coverage of the happy path.


89-138: LGTM!

This test validates the LocalClient functionality when the engine is wrapped in an Ingress, ensuring compatibility with the pipeline architecture.


140-180: Excellent type-safety validation.

This test properly validates that LocalClient enforces type safety by:

  1. Registering an engine with one type signature
  2. Attempting to create a client with a mismatched type signature
  3. Correctly expecting and handling the type mismatch error

This is crucial for preventing runtime type errors in production.

lib/runtime/src/component/local_client.rs (3)

35-55: LGTM!

The from_descriptor method properly handles engine retrieval and type downcasting with clear error messages.


57-79: LGTM!

The from_endpoint method correctly builds the descriptor hierarchy and delegates to from_descriptor.


97-110: LGTM!

The generate method and accessor methods provide a clean interface for direct engine invocation.

lib/runtime/src/engine.rs (3)

180-182: AsyncEngineContextProvider correctly omits Sync (per prior decision).

Matches the "Send + Debug only" requirement from your previous change. No action needed.


359-393: Stream erasure wrapper looks sound.

Type IDs and Any exposure mirror the non‑stream path; bounds correctly use StreamData + AsyncEngineContextProvider. LGTM.


585-587: Test update to call the extension trait is correct.

Using AsAnyAsyncEngine::into_any_engine(typed_engine) aligns with the new ergonomic path.

lib/runtime/src/component/endpoint.rs (1)

22-36: Namespace traversal helper looks good.

Reverse order to root‑to‑leaf is correct, no needless allocations. LGTM.

lib/runtime/src/v2/entity/validation.rs (2)

14-21: Regexes and allowed charsets look appropriate.

Anchors and char classes are correct for the documented constraints.


106-186: Tests cover main cases; length limits checked.

Good breadth across validators and reserved-prefix behavior.

lib/runtime/src/v2/entity/descriptor.rs (3)

212-239: DescriptorSpec schema validation reads well.

Namespace handling with internal toggle is clear, and per-field validation is delegated correctly.


416-448: Path formatting includes instance-{:x}; double-check consumers.

Hex encoding is fine; just ensure watchers/parsers expect the instance- prefix and hex format.


943-969: Wrapper transition tests are solid.

They exercise ToPath and etcd key generation paths. Good coverage.

Comment on lines 213 to 223
// Register in etcd (only if not static)
if let Some(etcd_client) = &self.etcd_client
&& let Err(e) = etcd_client
.kv_create(&etcd_path, info, Some(lease_id))
.kv_create(&etcd_path, info, Some(self.lease_id))
.await
{
tracing::error!("Failed to register discoverable service: {:?}", e);
cancel_token.cancel();
return Err(error!("Failed to register discoverable service"));
}
task.await??;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Bug: passing lease_id=0 to etcd when no lease.

If self.lease is None, we still pass Some(self.lease_id) (which is 0). That likely produces an invalid etcd lease association and can fail registration.

Apply:

-        if let Some(etcd_client) = &self.etcd_client
-            && let Err(e) = etcd_client
-                .kv_create(&etcd_path, info, Some(self.lease_id))
-                .await
-        {
+        let lease_opt = if self.lease.is_some() { Some(self.lease_id) } else { None };
+        if let Some(etcd_client) = &self.etcd_client {
+            if let Err(e) = etcd_client.kv_create(&etcd_path, info, lease_opt).await {
                 tracing::error!("Failed to register discoverable service: {:?}", e);
                 cancel_token.cancel();
                 return Err(error!("Failed to register discoverable service"));
-        }
+            }
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Register in etcd (only if not static)
if let Some(etcd_client) = &self.etcd_client
&& let Err(e) = etcd_client
.kv_create(&etcd_path, info, Some(lease_id))
.kv_create(&etcd_path, info, Some(self.lease_id))
.await
{
tracing::error!("Failed to register discoverable service: {:?}", e);
cancel_token.cancel();
return Err(error!("Failed to register discoverable service"));
}
task.await??;
// Register in etcd (only if not static)
let lease_opt = if self.lease.is_some() { Some(self.lease_id) } else { None };
if let Some(etcd_client) = &self.etcd_client {
if let Err(e) = etcd_client
.kv_create(&etcd_path, info, lease_opt)
.await
{
tracing::error!("Failed to register discoverable service: {:?}", e);
cancel_token.cancel();
return Err(error!("Failed to register discoverable service"));
}
}
🤖 Prompt for AI Agents
In lib/runtime/src/component/endpoint.rs around lines 213 to 223, the code
always passes Some(self.lease_id) (which is 0 when no lease) to etcd and can
create an invalid lease association; change the call to only pass a
Some(lease_id) when a real lease exists (e.g., derive the lease argument from
self.lease or self.lease_id wrapped in an Option) and pass None when there is no
lease so etcd receives no lease parameter; keep the error handling (logging,
cancel_token.cancel(), return Err) unchanged.

Comment on lines 312 to 346
let local_engine_key = if enable_local_registry
&& let Some(any_engine) = handler.as_any_engine()
{
use crate::v2::entity::{ComponentDescriptor, EndpointDescriptor, NamespaceDescriptor};

// Extract the full namespace hierarchy
let namespace_segments = get_namespace_hierarchy(&endpoint.component.namespace);

// Create the descriptor for this endpoint
let namespace_desc = NamespaceDescriptor::new(
&namespace_segments
.iter()
.map(|s| s.as_str())
.collect::<Vec<_>>(),
)
.map_err(|e| anyhow::anyhow!("Invalid namespace: {}", e))?;
let component_desc = namespace_desc
.component(&endpoint.component.name)
.map_err(|e| anyhow::anyhow!("Invalid component: {}", e))?;
let endpoint_desc = component_desc
.endpoint(&endpoint.name)
.map_err(|e| anyhow::anyhow!("Invalid endpoint: {}", e))?;

// Register using the path string as key
let key = endpoint_desc.to_string();
tracing::debug!("Registering local engine for endpoint: {}", key);
endpoint
.drt()
.register_local_engine(key.clone(), any_engine)
.await?;
Some(key)
} else {
None
};

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Local-engine registration runs in create(); leaks if start() is never called.

If users call create() but drop the EndpointInstance without start(), the engine stays registered. Either move registration into start() or add a Drop hook to unregister best‑effort when not started.

Option A (preferred): move the registration block into start() right before spawning the endpoint and set local_engine_key there.

Option B: add a Drop guard for fallback cleanup:

impl Drop for EndpointInstance {
    fn drop(&mut self) {
        if let Some(key) = self.local_engine_key.take() {
            let drt = self.endpoint.drt().clone();
            // Best-effort async cleanup; ignore result.
            drt.runtime().secondary().spawn(async move {
                drt.unregister_local_engine(&key).await;
            });
        }
    }
}
🤖 Prompt for AI Agents
lib/runtime/src/component/endpoint.rs around lines 312-346: the local engine is
being registered in create(), which leaks if EndpointInstance is dropped before
start(); remove the registration from create() and instead perform it in start()
immediately before spawning the endpoint and set local_engine_key there (ensure
any errors are propagated the same way), or alternatively implement a Drop for
EndpointInstance that takes local_engine_key if Some, clones endpoint.drt(), and
best-effort spawns an async task on drt.runtime().secondary() to call
unregister_local_engine(&key).await so the registration is cleaned up when the
instance is dropped without start().

Comment on lines 321 to 337
let namespace_desc = NamespaceDescriptor::new(
&namespace_segments
.iter()
.map(|s| s.as_str())
.collect::<Vec<_>>(),
)
.map_err(|e| anyhow::anyhow!("Invalid namespace: {}", e))?;
let component_desc = namespace_desc
.component(&endpoint.component.name)
.map_err(|e| anyhow::anyhow!("Invalid component: {}", e))?;
let endpoint_desc = component_desc
.endpoint(&endpoint.name)
.map_err(|e| anyhow::anyhow!("Invalid endpoint: {}", e))?;

// Register using the path string as key
let key = endpoint_desc.to_string();
tracing::debug!("Registering local engine for endpoint: {}", key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Internal namespaces not handled (descriptor creation will fail for “_internal”).

NamespaceDescriptor::new(..) rejects segments starting with “_”. Use new_internal(..) when the first segment is internal.

Apply:

-            let namespace_desc = NamespaceDescriptor::new(
-                &namespace_segments
-                    .iter()
-                    .map(|s| s.as_str())
-                    .collect::<Vec<_>>(),
-            )
+            let ns_slice: Vec<&str> = namespace_segments.iter().map(|s| s.as_str()).collect();
+            let namespace_desc = if namespace_segments.first().is_some_and(|s| s.starts_with('_')) {
+                NamespaceDescriptor::new_internal(&ns_slice)
+            } else {
+                NamespaceDescriptor::new(&ns_slice)
+            }
             .map_err(|e| anyhow::anyhow!("Invalid namespace: {}", e))?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let namespace_desc = NamespaceDescriptor::new(
&namespace_segments
.iter()
.map(|s| s.as_str())
.collect::<Vec<_>>(),
)
.map_err(|e| anyhow::anyhow!("Invalid namespace: {}", e))?;
let component_desc = namespace_desc
.component(&endpoint.component.name)
.map_err(|e| anyhow::anyhow!("Invalid component: {}", e))?;
let endpoint_desc = component_desc
.endpoint(&endpoint.name)
.map_err(|e| anyhow::anyhow!("Invalid endpoint: {}", e))?;
// Register using the path string as key
let key = endpoint_desc.to_string();
tracing::debug!("Registering local engine for endpoint: {}", key);
let ns_slice: Vec<&str> = namespace_segments.iter().map(|s| s.as_str()).collect();
let namespace_desc = if namespace_segments.first().is_some_and(|s| s.starts_with('_')) {
NamespaceDescriptor::new_internal(&ns_slice)
} else {
NamespaceDescriptor::new(&ns_slice)
}
.map_err(|e| anyhow::anyhow!("Invalid namespace: {}", e))?;
let component_desc = namespace_desc
.component(&endpoint.component.name)
.map_err(|e| anyhow::anyhow!("Invalid component: {}", e))?;
let endpoint_desc = component_desc
.endpoint(&endpoint.name)
.map_err(|e| anyhow::anyhow!("Invalid endpoint: {}", e))?;
// Register using the path string as key
let key = endpoint_desc.to_string();
tracing::debug!("Registering local engine for endpoint: {}", key);
🤖 Prompt for AI Agents
In lib/runtime/src/component/endpoint.rs around lines 321 to 337,
NamespaceDescriptor::new(...) will fail for internal namespaces whose first
segment starts with '_' — detect whether namespace_segments.first() starts with
'_' and call NamespaceDescriptor::new_internal(...) in that case, otherwise call
NamespaceDescriptor::new(...); pass the same Vec<&str> constructed from
namespace_segments, preserve the existing map_err(...) wrappers for both
branches, and continue to derive component_desc and endpoint_desc as before.

Merge main branch to bring in health check functionality while
preserving local client registry features.

Key changes resolved:
- Combined local registry fields (engine, any_engine) with health check
  field (endpoint_health_check_notifier) in Ingress struct
- Merged PushWorkHandler trait methods: both as_any_engine() for local
  registry and set_endpoint_health_check_notifier() for health checks
- Integrated health check registration logic into endpoint create/start
  pattern while maintaining the refactored separation
- Added health_check_payload field to EndpointInstance and EndpointConfig

Both feature sets now work together:
- Local client can access engines directly without network overhead
- Health check system monitors endpoint availability and resets timers
- Endpoints support both local registry and health monitoring

All library unit tests pass. Integration tests require external services.

Signed-off-by: Ryan Olson <[email protected]>
@grahamking grahamking changed the title feat: local client (dyn-986) feat: local client Sep 18, 2025
@grahamking
Copy link
Contributor

Is this what dynamo-run in=http out=mistralrs|llamacpp does, which it calls StaticFull and StaticCore? Or does this replace NATS in a distributed setup?

- Move local engine registration from create() to start() to prevent leaks
- Add conditional internal namespace handling for names starting with '_'
- Implement start_background() with CriticalTaskExecutionHandle
- Add background endpoint registry for lifecycle management
- Ensure proper cleanup when endpoints complete or fail

Key changes:
* EndpointInstance.local_engine_info now stores prepared registration data
* Registration happens in start() method, avoiding leaks if dropped
* Internal namespaces use NamespaceDescriptor::new_internal()
* Background endpoints tracked in DistributedRuntime registry
* CriticalTaskExecutionHandle manages endpoint lifecycle automatically

Signed-off-by: Ryan Olson <[email protected]>
@github-actions
Copy link

This PR is stale because it has been open 30 days with no activity. Remove stale label or comment or this will be closed in 5 days.

@github-actions github-actions bot added the Stale label Oct 26, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants