-
Notifications
You must be signed in to change notification settings - Fork 664
feat: local client #3052
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: local client #3052
Conversation
- Remove unnecessary clippy allow annotation - Use is_some_and which is available in Rust 1.70+ (we're on 1.89) - Clean up code to follow clippy recommendations
- Add v2::entity module for type-safe entity descriptors - Implement LocalClient for bypassing network overhead - Add local engine registry to DistributedRuntime - Auto-register engines during endpoint creation - Add comprehensive tests and examples This implementation allows direct local engine invocation without the overhead of etcd watching, instance discovery, or network layers. Engines are automatically registered when endpoints are created and can be accessed via LocalClient using the v2::entity descriptor system for consistent addressing.
- Fix namespace hierarchy extraction to walk parent chain properly - Make registry methods internal (pub(crate)) for better encapsulation - Add ServiceBuilder support with with_local_registry/without_local_registry - Check enable_local_registry flag during endpoint creation (enabled by default) - Add endpoint.local_client() convenience method for consistency - Update tests to use new API without direct registry access - Fix duplicate namespace hierarchy helper functions This improves the LocalClient API to be more consistent with existing patterns while properly handling hierarchical namespaces and providing better encapsulation of internal implementation details.
…lient encapsulation - Split EndpointConfigBuilder into create() and start() phases - create() handles setup including local engine registration - start() handles network serving - Maintain backwards compatibility with existing start() -> create().start() - Remove public register_local_engine function, now internal - Fix namespace hierarchy extraction to properly walk parent chains - Update tests to use new create/start pattern (tests need further API alignment) The core implementation compiles successfully, though tests need additional work to align with current pipeline API requirements. Signed-off-by: Ryan Olson <[email protected]>
This change addresses trait bound conflicts in the local client system by introducing a new StreamData trait that only requires Send (not Sync) for response types. Key changes: - Add StreamData trait for response types that don't need Sync - Create StreamAnyEngineWrapper for type erasure with StreamData responses - Update LocalClient to use StreamData constraint for response types - Add downcast_stream() method for StreamData-compatible downcasting - Fix clippy warnings about complex types and unused variables - Rewrite local_client_demo.rs to use new endpoint builder pattern This allows ManyOut<T> types (Pin<Box<dyn AsyncEngineStream<T>>>) to work properly with the local engine registry, since streams are consumed sequentially and don't need to be shared between threads. Signed-off-by: Ryan Olson <[email protected]>
|
/ok to test f3d24a9 |
|
@coderabbitai full review |
✅ Actions performedFull review triggered. |
WalkthroughAdds a per-endpoint LocalClient for in-process engine invocation, a local engine registry in DistributedRuntime, and endpoint lifecycle refactors to register/unregister local engines. Extends engine type-erasure to support streaming responses. Introduces a v2 descriptor system (namespace/component/endpoint/instance). Adds service-level toggles, example, and tests. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Dev as Developer
participant RT as Runtime/DRT
participant EP as EndpointConfig/Instance
participant REG as Local Engine Registry
participant ING as Ingress (PushWorkHandler)
Note over Dev,EP: Endpoint creation with local engine
Dev->>EP: build EndpointConfig::from_endpoint(...)
Dev->>ING: Ingress::for_engine(engine.clone())
EP->>REG: register_local_engine(key, engine as AnyAsyncEngine)
EP->>RT: create service endpoint (lease, etcd registration)
Note over EP,REG: On shutdown: unregister_local_engine(key)
sequenceDiagram
autonumber
participant Client as LocalClient<Req,Resp,E>
participant EP as Endpoint
participant REG as Local Engine Registry
participant ENG as AsyncEngine<Req,Resp,E>
Note over Client,EP: Local client acquisition and call
Client->>EP: Endpoint::local_client()
EP->>REG: get_local_engine(key)
REG-->>EP: Arc<dyn AnyAsyncEngine>
EP-->>Client: LocalClient (downcast_stream to concrete types)
Client->>ENG: generate(request)
ENG-->>Client: Resp (e.g., ResponseStream)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Poem
Pre-merge checks❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).Please share your feedback with us on this Discord post. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (19)
lib/runtime/src/traits.rs (2)
19-21: Remove unused imports (will warn/fail underdeny(unused_imports)).
AnyAsyncEngineandArcaren’t referenced in this file.Apply:
-use crate::engine::AnyAsyncEngine; -use std::sync::Arc;
22-22: Fix doc typo.“proivde” → “provide”.
-/// A trait for objects that proivde access to the [Runtime] +/// A trait for objects that provide access to the [Runtime]lib/runtime/src/lib.rs (1)
55-55: Consider feature‑gatingv2while it stabilizes.If this surface is still in flux, hide behind
#[cfg(feature = "v2")]to control exposure.- pub mod v2; + #[cfg(feature = "v2")] + pub mod v2;lib/runtime/src/pipeline/network/ingress/push_handler.rs (1)
141-159: LGTM, with a small caching improvement.After successful conversion, cache the type‑erased engine to avoid repeated
as_stream_any()work.- if let Some(service_engine) = Ingress::engine(self) { + if let Some(service_engine) = Ingress::engine(self) { // Try to convert using StreamData-compatible wrapper - if let Some(stream_engine) = service_engine.as_stream_any() { - return Some(stream_engine); + if let Some(stream_engine) = service_engine.as_stream_any() { + // Cache for future calls if API available + if let Some(setter) = Some(()) { /* placeholder for presence check */ } + { + // If Ingress exposes set_any_engine, use it: + // Ingress::set_any_engine(self, stream_engine.clone()); + } + return Some(stream_engine); } }lib/runtime/src/distributed.rs (2)
324-333: Guard against duplicate registrations or return the previous entry.Today this silently overwrites. Either reject duplicates or return the replaced engine for callers to dispose.
- pub(crate) async fn register_local_engine( - &self, - key: String, - engine: Arc<dyn crate::engine::AnyAsyncEngine>, - ) -> Result<()> { - let mut engines = self.local_engines.lock().await; - engines.insert(key, engine); - Ok(()) - } + pub(crate) async fn register_local_engine( + &self, + key: String, + engine: Arc<dyn crate::engine::AnyAsyncEngine>, + ) -> Result<Option<Arc<dyn crate::engine::AnyAsyncEngine>>> { + let mut engines = self.local_engines.lock().await; + Ok(engines.insert(key, engine)) + }If you prefer rejection:
- let mut engines = self.local_engines.lock().await; - engines.insert(key, engine); - Ok(()) + let mut engines = self.local_engines.lock().await; + if engines.contains_key(&key) { + anyhow::bail!("local engine already registered for key: {key}"); + } + engines.insert(key, engine); + Ok(())
344-351: Consider lightweight logging for observability.Add trace logs on register/get/unregister with key to aid debugging.
- let engines = self.local_engines.lock().await; + let engines = self.local_engines.lock().await; + tracing::trace!("get_local_engine: key={}", key);(similar for register/unregister)
lib/runtime/src/v2/mod.rs (1)
18-21: Re‑exportInstanceTypeandToPathfor ergonomics.They’re exposed under
entity::, but commonly needed alongside the other descriptors.pub use entity::{ - ComponentDescriptor, DescriptorBuilder, DescriptorError, EndpointDescriptor, - InstanceDescriptor, NamespaceDescriptor, PathDescriptor, + ComponentDescriptor, DescriptorBuilder, DescriptorError, EndpointDescriptor, + InstanceDescriptor, InstanceType, NamespaceDescriptor, PathDescriptor, ToPath, };lib/runtime/examples/local_client_demo.rs (2)
6-13: Drop unused import.
AsyncEngineContextProviderisn’t used.-use dynamo_runtime::engine::{AsyncEngine, AsyncEngineContextProvider, async_trait}; +use dynamo_runtime::engine::{AsyncEngine, async_trait};
71-76: Minor: simplify type annotation.Let type inference handle the trait object to reduce verbosity.
- let engine: Arc<dyn AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, anyhow::Error>> = - Arc::new(SimpleEchoEngine); + let engine = Arc::new(SimpleEchoEngine) + as Arc<dyn AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, anyhow::Error>>;lib/runtime/tests/local_client_simple.rs (1)
6-13: Drop unused import.
AsyncEngineContextProviderisn’t used.-use dynamo_runtime::engine::{AsyncEngine, AsyncEngineContextProvider, async_trait}; +use dynamo_runtime::engine::{AsyncEngine, async_trait};lib/runtime/src/pipeline/network.rs (1)
287-290: Consider documenting the purpose of these fields.While the inline comment mentions "local registry", it would be helpful to add doc comments explaining when and how these fields are populated and used.
- // Store the original engine for local registry - engine: OnceLock<ServiceEngine<Req, Resp>>, - // Store type-erased engine for local registry - any_engine: OnceLock<Arc<dyn crate::engine::AnyAsyncEngine>>, + /// The original typed engine, stored for local registry access. + /// Set when the Ingress is created via `for_engine`. + engine: OnceLock<ServiceEngine<Req, Resp>>, + /// Type-erased version of the engine for dynamic dispatch. + /// Can be set via `set_any_engine` when proper trait bounds are available. + any_engine: OnceLock<Arc<dyn crate::engine::AnyAsyncEngine>>,lib/runtime/src/component/local_client.rs (1)
81-95: Remove duplicateget_namespace_hierarchyin local_client.rslocal_client.rs contains two identical implementations — an impl method at lines 82–95 (used via Self::get_namespace_hierarchy) and an unused module-level copy at lines 114–127; remove the module-level copy (or consolidate with the helper in lib/runtime/src/component/endpoint.rs:23).
lib/runtime/src/engine.rs (4)
210-224: Docs out of sync with trait bounds (Resp no longer requires Data).The comment still says “Resp: Data + AsyncEngineContextProvider,” but the bound is only AsyncEngineContextProvider. Also a small grammar nit in “is does not need.”
Apply:
-/// - `Resp`: The response type that implements both `Data` and `AsyncEngineContextProvider` +/// - `Resp`: The response type that implements `AsyncEngineContextProvider`. +/// Use `AsAnyAsyncEngine` for `Resp: Data` and `AsStreamAnyAsyncEngine` for non-`Sync` responses. -/// The synchronous Engine version is does not need to be awaited. +/// The synchronous Engine version does not need to be awaited.
425-456:as_stream_anyneed not return Option.It always returns
Some(...). Consider simplifying the API to returnArc<dyn AnyAsyncEngine>.-pub trait AsStreamAnyAsyncEngine { +pub trait AsStreamAnyAsyncEngine { /// Converts a typed `AsyncEngine` with StreamData response into a type-erased `AnyAsyncEngine`. fn into_stream_any_engine(self) -> Arc<dyn AnyAsyncEngine>; /// Converts a typed `AsyncEngine` with StreamData response into a type-erased `AnyAsyncEngine`. /// This is the non-consuming version of into_stream_any_engine. - fn as_stream_any(&self) -> Option<Arc<dyn AnyAsyncEngine>>; + fn as_stream_any(&self) -> Arc<dyn AnyAsyncEngine>; } @@ - fn as_stream_any(&self) -> Option<Arc<dyn AnyAsyncEngine>> { - Some(Arc::new(StreamAnyEngineWrapper { + fn as_stream_any(&self) -> Arc<dyn AnyAsyncEngine> { + Arc::new(StreamAnyEngineWrapper { engine: self.clone(), _phantom: PhantomData, - })) + }) }
487-496: Add tests for the newdowncast_streampath.Great addition. Please add success/failure tests analogous to the existing
downcasttest.Minimal sketch:
#[tokio::test] async fn test_stream_downcast() { // Define a Resp that is Send + 'static but not Sync (simulate), implement AsyncEngineContextProvider // Implement a mock AsyncEngine<Req, Resp, E>, erase with into_stream_any_engine(), then assert: // - any.downcast_stream::<Req, Resp, E>().is_some() // - any.downcast::<Req, Resp, E>().is_none() }Also applies to: 517-534
498-516: Deduplicate identical downcast bodies.Both impls share identical TypeId checks and cloning. Consider factoring into a small internal helper to reduce duplication.
Also applies to: 517-534
lib/runtime/src/component/endpoint.rs (1)
88-95: Avoid panicking on poisoned stats handler mutex.
lock().unwrap()can panic. Consider handling the error or using a non-poisoning mutex (e.g., parking_lot).- handler_map - .lock() - .unwrap() - .insert(self.endpoint.subject_to(self.lease_id), stats_handler); + if let Ok(mut map) = handler_map.lock() { + map.insert(self.endpoint.subject_to(self.lease_id), stats_handler); + } else { + tracing::warn!("Stats handler registry mutex poisoned; skipping insert"); + }lib/runtime/src/v2/entity/validation.rs (1)
24-34: Validation errors lack offending value context.Attaching the invalid input to
ValidationErrorparams improves downstream error mapping.- if !IDENTIFIER_REGEX.is_match(name) { - return Err(ValidationError::new("invalid_identifier_chars")); + if !IDENTIFIER_REGEX.is_match(name) { + let mut err = ValidationError::new("invalid_identifier_chars"); + err.add_param(std::borrow::Cow::Borrowed("value"), &name.to_string()); + return Err(err); }Apply similarly for other validators so higher layers can create precise errors.
lib/runtime/src/v2/entity/descriptor.rs (1)
73-88: Map validator codes to typed DescriptorError variants.You already define rich variants (InvalidComponentName, etc.), but the
From<ValidationError>currently flattens to a genericValidationErrormessage. Prefer emitting the typed variants to enable precise matching upstream.impl From<validator::ValidationError> for DescriptorError { fn from(err: validator::ValidationError) -> Self { - let message = match err.code.as_ref() { - "empty_identifier" => "Identifier cannot be empty".to_string(), - "invalid_identifier_chars" => "Identifier contains invalid characters. Only lowercase letters, numbers, hyphens, and underscores are allowed".to_string(), - "empty_path_segment" => "Path segment cannot be empty".to_string(), - "invalid_path_segment_chars" => "Path segment contains invalid characters. Only lowercase letters, numbers, hyphens, underscores, and dots are allowed".to_string(), - "reserved_internal_prefix" => "Names starting with '_' are reserved for internal use".to_string(), - "empty_collection" => "Collection cannot be empty".to_string(), - "component_name_too_long" => "Component name too long (max 63 characters)".to_string(), - "endpoint_name_too_long" => "Endpoint name too long (max 63 characters)".to_string(), - _ => format!("Validation error: {}", err.code), - }; - DescriptorError::ValidationError { message } + match err.code.as_ref() { + "invalid_namespace_segment" | "reserved_internal_prefix" | "empty_identifier" | "invalid_identifier_chars" => { + DescriptorError::InvalidNamespaceSegment { segment: "<unknown>".into() } + } + "invalid_component_name" | "component_name_too_long" => { + DescriptorError::InvalidComponentName { name: "<unknown>".into() } + } + "invalid_endpoint_name" | "endpoint_name_too_long" => { + DescriptorError::InvalidEndpointName { name: "<unknown>".into() } + } + "invalid_path_segment" | "invalid_path_segment_chars" | "empty_path_segment" => { + DescriptorError::InvalidPathSegment { segment: "<unknown>".into() } + } + _ => DescriptorError::ValidationError { message: err.code.to_string() }, + } } }Note: If you adopt the suggestion in validation.rs to attach the offending value in
err.params["value"], you can populate the real strings here instead of"<unknown>".
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (17)
lib/runtime/examples/local_client_demo.rs(1 hunks)lib/runtime/src/component.rs(4 hunks)lib/runtime/src/component/endpoint.rs(6 hunks)lib/runtime/src/component/local_client.rs(1 hunks)lib/runtime/src/component/service.rs(2 hunks)lib/runtime/src/distributed.rs(2 hunks)lib/runtime/src/engine.rs(6 hunks)lib/runtime/src/lib.rs(3 hunks)lib/runtime/src/pipeline/network.rs(4 hunks)lib/runtime/src/pipeline/network/ingress/push_handler.rs(1 hunks)lib/runtime/src/traits.rs(1 hunks)lib/runtime/src/v2/entity/descriptor.rs(1 hunks)lib/runtime/src/v2/entity/mod.rs(1 hunks)lib/runtime/src/v2/entity/validation.rs(1 hunks)lib/runtime/src/v2/mod.rs(1 hunks)lib/runtime/tests/local_client.rs(1 hunks)lib/runtime/tests/local_client_simple.rs(1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-07-14T21:25:56.930Z
Learnt from: ryanolson
PR: ai-dynamo/dynamo#1919
File: lib/runtime/src/engine.rs:168-168
Timestamp: 2025-07-14T21:25:56.930Z
Learning: The AsyncEngineContextProvider trait in lib/runtime/src/engine.rs was intentionally changed from `Send + Sync + Debug` to `Send + Debug` because the Sync bound was overly constraining. The trait should only require Send + Debug as designed.
Applied to files:
lib/runtime/src/traits.rslib/runtime/src/lib.rslib/runtime/src/engine.rs
📚 Learning: 2025-09-11T03:24:47.820Z
Learnt from: kthui
PR: ai-dynamo/dynamo#3004
File: lib/runtime/src/pipeline/network/ingress/push_handler.rs:271-277
Timestamp: 2025-09-11T03:24:47.820Z
Learning: In lib/runtime/src/pipeline/network/ingress/push_handler.rs, the maintainer prefers to keep the existing error comparison logic using format!("{:?}", err) == STREAM_ERR_MSG unchanged until proper error types are implemented, even though it has technical debt. Avoid suggesting changes to working legacy code that will be refactored later.
Applied to files:
lib/runtime/src/pipeline/network/ingress/push_handler.rs
🔇 Additional comments (33)
lib/runtime/src/v2/entity/mod.rs (1)
1-16: Solid public re‑exports.Surface looks coherent and self‑contained. No issues.
lib/runtime/tests/local_client_simple.rs (1)
121-129: Nice negative test.Good assertion for type mismatch behavior via runtime downcast.
lib/runtime/src/distributed.rs (1)
98-99: Approve — local engine registry verified; unregister called on endpoint teardown.
Arc<Mutex<...>> ownership and pub(crate) visibility are appropriate. register_local_engine / get_local_engine / unregister_local_engine are defined in lib/runtime/src/distributed.rs (lines 325, 336, 345). Endpoint registers at lib/runtime/src/component/endpoint.rs:340 and unregisters on teardown at lib/runtime/src/component/endpoint.rs:194; local_client retrieves via get_local_engine at lib/runtime/src/component/local_client.rs:45.lib/runtime/src/lib.rs (1)
280-283: No change required — AnyAsyncEngine is explicitly Send + Sync.
AnyAsyncEngine is declaredpub trait AnyAsyncEngine: Send + Sync(lib/runtime/src/engine.rs:303); usage withArc<dyn AnyAsyncEngine>in lib/runtime/src/lib.rs:282 and the pipeline accessors (lib/runtime/src/pipeline/network.rs:360–366) is consistent — no refactor needed.lib/runtime/src/component.rs (5)
67-67: LGTM!The addition of the
local_clientmodule follows the existing module organization pattern.
73-73: LGTM!The public re-export of
LocalClientprovides appropriate API access for users.
92-94: LGTM!The service-level registry control map
service_enable_local_registryfollows the existing pattern for service configuration tracking.
440-450: LGTM!The
local_clientmethod provides a clean API for creating per-endpoint local clients with appropriate trait bounds for request/response types and error handling.
148-150: Incorrect file reference — validate or document enable_local_registry precedenceenable_local_registry is defined in lib/runtime/src/component/service.rs (builder default = true; with_local_registry/without_local_registry) and read per-service in lib/runtime/src/component/endpoint.rs (reads registry.service_enable_local_registry and unwrap_or(true)). The original comment referenced lib/runtime/src/component.rs — update that reference and either add validation at service creation to detect/resolve conflicting settings or explicitly document that the service-level setting controls endpoint behavior. Files: lib/runtime/src/component/service.rs (builder/create) and lib/runtime/src/component/endpoint.rs (per-service read).
Likely an incorrect or invalid review comment.
lib/runtime/src/component/service.rs (4)
41-44: LGTM!The service-level
enable_local_registryflag with a sensible default oftrueprovides fine-grained control over local engine registry behavior.
47-57: LGTM!The builder methods
with_local_registryandwithout_local_registryprovide a clear and intuitive API for controlling the local registry behavior.
61-61: LGTM!Properly extracting
enable_local_registryfrom the dissolved configuration.
113-118: LGTM!The
service_name.clone()usage correctly avoids move issues, and the registry insertion properly stores the flag for later reference.lib/runtime/src/pipeline/network.rs (4)
298-299: LGTM!Properly initializing the new
OnceLockfields in the constructor.
336-344: LGTM!Correctly cloning the engine before moving it into the backend, then storing the original for local registry access.
354-367: LGTM!The accessor methods for the engine fields are well-structured and follow Rust conventions for optional values.
381-384: LGTM!The trait method with a sensible default implementation allows optional exposure of type-erased engines without breaking existing implementations.
lib/runtime/tests/local_client.rs (4)
16-36: LGTM!The
EchoEngineimplementation is clean and correctly implements theAsyncEnginetrait for testing purposes.
38-87: Test properly validates LocalClient registration and retrieval.The test comprehensively validates:
- Engine registration through the Ingress wrapper
- LocalClient creation via the endpoint's convenience method
- Correct data flow through the local client
Good coverage of the happy path.
89-138: LGTM!This test validates the LocalClient functionality when the engine is wrapped in an Ingress, ensuring compatibility with the pipeline architecture.
140-180: Excellent type-safety validation.This test properly validates that LocalClient enforces type safety by:
- Registering an engine with one type signature
- Attempting to create a client with a mismatched type signature
- Correctly expecting and handling the type mismatch error
This is crucial for preventing runtime type errors in production.
lib/runtime/src/component/local_client.rs (3)
35-55: LGTM!The
from_descriptormethod properly handles engine retrieval and type downcasting with clear error messages.
57-79: LGTM!The
from_endpointmethod correctly builds the descriptor hierarchy and delegates tofrom_descriptor.
97-110: LGTM!The
generatemethod and accessor methods provide a clean interface for direct engine invocation.lib/runtime/src/engine.rs (3)
180-182: AsyncEngineContextProvider correctly omits Sync (per prior decision).Matches the "Send + Debug only" requirement from your previous change. No action needed.
359-393: Stream erasure wrapper looks sound.Type IDs and Any exposure mirror the non‑stream path; bounds correctly use
StreamData + AsyncEngineContextProvider. LGTM.
585-587: Test update to call the extension trait is correct.Using
AsAnyAsyncEngine::into_any_engine(typed_engine)aligns with the new ergonomic path.lib/runtime/src/component/endpoint.rs (1)
22-36: Namespace traversal helper looks good.Reverse order to root‑to‑leaf is correct, no needless allocations. LGTM.
lib/runtime/src/v2/entity/validation.rs (2)
14-21: Regexes and allowed charsets look appropriate.Anchors and char classes are correct for the documented constraints.
106-186: Tests cover main cases; length limits checked.Good breadth across validators and reserved-prefix behavior.
lib/runtime/src/v2/entity/descriptor.rs (3)
212-239: DescriptorSpec schema validation reads well.Namespace handling with
internaltoggle is clear, and per-field validation is delegated correctly.
416-448: Path formatting includesinstance-{:x}; double-check consumers.Hex encoding is fine; just ensure watchers/parsers expect the
instance-prefix and hex format.
943-969: Wrapper transition tests are solid.They exercise ToPath and etcd key generation paths. Good coverage.
| // Register in etcd (only if not static) | ||
| if let Some(etcd_client) = &self.etcd_client | ||
| && let Err(e) = etcd_client | ||
| .kv_create(&etcd_path, info, Some(lease_id)) | ||
| .kv_create(&etcd_path, info, Some(self.lease_id)) | ||
| .await | ||
| { | ||
| tracing::error!("Failed to register discoverable service: {:?}", e); | ||
| cancel_token.cancel(); | ||
| return Err(error!("Failed to register discoverable service")); | ||
| } | ||
| task.await??; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: passing lease_id=0 to etcd when no lease.
If self.lease is None, we still pass Some(self.lease_id) (which is 0). That likely produces an invalid etcd lease association and can fail registration.
Apply:
- if let Some(etcd_client) = &self.etcd_client
- && let Err(e) = etcd_client
- .kv_create(&etcd_path, info, Some(self.lease_id))
- .await
- {
+ let lease_opt = if self.lease.is_some() { Some(self.lease_id) } else { None };
+ if let Some(etcd_client) = &self.etcd_client {
+ if let Err(e) = etcd_client.kv_create(&etcd_path, info, lease_opt).await {
tracing::error!("Failed to register discoverable service: {:?}", e);
cancel_token.cancel();
return Err(error!("Failed to register discoverable service"));
- }
+ }
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Register in etcd (only if not static) | |
| if let Some(etcd_client) = &self.etcd_client | |
| && let Err(e) = etcd_client | |
| .kv_create(&etcd_path, info, Some(lease_id)) | |
| .kv_create(&etcd_path, info, Some(self.lease_id)) | |
| .await | |
| { | |
| tracing::error!("Failed to register discoverable service: {:?}", e); | |
| cancel_token.cancel(); | |
| return Err(error!("Failed to register discoverable service")); | |
| } | |
| task.await??; | |
| // Register in etcd (only if not static) | |
| let lease_opt = if self.lease.is_some() { Some(self.lease_id) } else { None }; | |
| if let Some(etcd_client) = &self.etcd_client { | |
| if let Err(e) = etcd_client | |
| .kv_create(&etcd_path, info, lease_opt) | |
| .await | |
| { | |
| tracing::error!("Failed to register discoverable service: {:?}", e); | |
| cancel_token.cancel(); | |
| return Err(error!("Failed to register discoverable service")); | |
| } | |
| } |
🤖 Prompt for AI Agents
In lib/runtime/src/component/endpoint.rs around lines 213 to 223, the code
always passes Some(self.lease_id) (which is 0 when no lease) to etcd and can
create an invalid lease association; change the call to only pass a
Some(lease_id) when a real lease exists (e.g., derive the lease argument from
self.lease or self.lease_id wrapped in an Option) and pass None when there is no
lease so etcd receives no lease parameter; keep the error handling (logging,
cancel_token.cancel(), return Err) unchanged.
| let local_engine_key = if enable_local_registry | ||
| && let Some(any_engine) = handler.as_any_engine() | ||
| { | ||
| use crate::v2::entity::{ComponentDescriptor, EndpointDescriptor, NamespaceDescriptor}; | ||
|
|
||
| // Extract the full namespace hierarchy | ||
| let namespace_segments = get_namespace_hierarchy(&endpoint.component.namespace); | ||
|
|
||
| // Create the descriptor for this endpoint | ||
| let namespace_desc = NamespaceDescriptor::new( | ||
| &namespace_segments | ||
| .iter() | ||
| .map(|s| s.as_str()) | ||
| .collect::<Vec<_>>(), | ||
| ) | ||
| .map_err(|e| anyhow::anyhow!("Invalid namespace: {}", e))?; | ||
| let component_desc = namespace_desc | ||
| .component(&endpoint.component.name) | ||
| .map_err(|e| anyhow::anyhow!("Invalid component: {}", e))?; | ||
| let endpoint_desc = component_desc | ||
| .endpoint(&endpoint.name) | ||
| .map_err(|e| anyhow::anyhow!("Invalid endpoint: {}", e))?; | ||
|
|
||
| // Register using the path string as key | ||
| let key = endpoint_desc.to_string(); | ||
| tracing::debug!("Registering local engine for endpoint: {}", key); | ||
| endpoint | ||
| .drt() | ||
| .register_local_engine(key.clone(), any_engine) | ||
| .await?; | ||
| Some(key) | ||
| } else { | ||
| None | ||
| }; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Local-engine registration runs in create(); leaks if start() is never called.
If users call create() but drop the EndpointInstance without start(), the engine stays registered. Either move registration into start() or add a Drop hook to unregister best‑effort when not started.
Option A (preferred): move the registration block into start() right before spawning the endpoint and set local_engine_key there.
Option B: add a Drop guard for fallback cleanup:
impl Drop for EndpointInstance {
fn drop(&mut self) {
if let Some(key) = self.local_engine_key.take() {
let drt = self.endpoint.drt().clone();
// Best-effort async cleanup; ignore result.
drt.runtime().secondary().spawn(async move {
drt.unregister_local_engine(&key).await;
});
}
}
}🤖 Prompt for AI Agents
lib/runtime/src/component/endpoint.rs around lines 312-346: the local engine is
being registered in create(), which leaks if EndpointInstance is dropped before
start(); remove the registration from create() and instead perform it in start()
immediately before spawning the endpoint and set local_engine_key there (ensure
any errors are propagated the same way), or alternatively implement a Drop for
EndpointInstance that takes local_engine_key if Some, clones endpoint.drt(), and
best-effort spawns an async task on drt.runtime().secondary() to call
unregister_local_engine(&key).await so the registration is cleaned up when the
instance is dropped without start().
| let namespace_desc = NamespaceDescriptor::new( | ||
| &namespace_segments | ||
| .iter() | ||
| .map(|s| s.as_str()) | ||
| .collect::<Vec<_>>(), | ||
| ) | ||
| .map_err(|e| anyhow::anyhow!("Invalid namespace: {}", e))?; | ||
| let component_desc = namespace_desc | ||
| .component(&endpoint.component.name) | ||
| .map_err(|e| anyhow::anyhow!("Invalid component: {}", e))?; | ||
| let endpoint_desc = component_desc | ||
| .endpoint(&endpoint.name) | ||
| .map_err(|e| anyhow::anyhow!("Invalid endpoint: {}", e))?; | ||
|
|
||
| // Register using the path string as key | ||
| let key = endpoint_desc.to_string(); | ||
| tracing::debug!("Registering local engine for endpoint: {}", key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Internal namespaces not handled (descriptor creation will fail for “_internal”).
NamespaceDescriptor::new(..) rejects segments starting with “_”. Use new_internal(..) when the first segment is internal.
Apply:
- let namespace_desc = NamespaceDescriptor::new(
- &namespace_segments
- .iter()
- .map(|s| s.as_str())
- .collect::<Vec<_>>(),
- )
+ let ns_slice: Vec<&str> = namespace_segments.iter().map(|s| s.as_str()).collect();
+ let namespace_desc = if namespace_segments.first().is_some_and(|s| s.starts_with('_')) {
+ NamespaceDescriptor::new_internal(&ns_slice)
+ } else {
+ NamespaceDescriptor::new(&ns_slice)
+ }
.map_err(|e| anyhow::anyhow!("Invalid namespace: {}", e))?;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let namespace_desc = NamespaceDescriptor::new( | |
| &namespace_segments | |
| .iter() | |
| .map(|s| s.as_str()) | |
| .collect::<Vec<_>>(), | |
| ) | |
| .map_err(|e| anyhow::anyhow!("Invalid namespace: {}", e))?; | |
| let component_desc = namespace_desc | |
| .component(&endpoint.component.name) | |
| .map_err(|e| anyhow::anyhow!("Invalid component: {}", e))?; | |
| let endpoint_desc = component_desc | |
| .endpoint(&endpoint.name) | |
| .map_err(|e| anyhow::anyhow!("Invalid endpoint: {}", e))?; | |
| // Register using the path string as key | |
| let key = endpoint_desc.to_string(); | |
| tracing::debug!("Registering local engine for endpoint: {}", key); | |
| let ns_slice: Vec<&str> = namespace_segments.iter().map(|s| s.as_str()).collect(); | |
| let namespace_desc = if namespace_segments.first().is_some_and(|s| s.starts_with('_')) { | |
| NamespaceDescriptor::new_internal(&ns_slice) | |
| } else { | |
| NamespaceDescriptor::new(&ns_slice) | |
| } | |
| .map_err(|e| anyhow::anyhow!("Invalid namespace: {}", e))?; | |
| let component_desc = namespace_desc | |
| .component(&endpoint.component.name) | |
| .map_err(|e| anyhow::anyhow!("Invalid component: {}", e))?; | |
| let endpoint_desc = component_desc | |
| .endpoint(&endpoint.name) | |
| .map_err(|e| anyhow::anyhow!("Invalid endpoint: {}", e))?; | |
| // Register using the path string as key | |
| let key = endpoint_desc.to_string(); | |
| tracing::debug!("Registering local engine for endpoint: {}", key); |
🤖 Prompt for AI Agents
In lib/runtime/src/component/endpoint.rs around lines 321 to 337,
NamespaceDescriptor::new(...) will fail for internal namespaces whose first
segment starts with '_' — detect whether namespace_segments.first() starts with
'_' and call NamespaceDescriptor::new_internal(...) in that case, otherwise call
NamespaceDescriptor::new(...); pass the same Vec<&str> constructed from
namespace_segments, preserve the existing map_err(...) wrappers for both
branches, and continue to derive component_desc and endpoint_desc as before.
Merge main branch to bring in health check functionality while preserving local client registry features. Key changes resolved: - Combined local registry fields (engine, any_engine) with health check field (endpoint_health_check_notifier) in Ingress struct - Merged PushWorkHandler trait methods: both as_any_engine() for local registry and set_endpoint_health_check_notifier() for health checks - Integrated health check registration logic into endpoint create/start pattern while maintaining the refactored separation - Added health_check_payload field to EndpointInstance and EndpointConfig Both feature sets now work together: - Local client can access engines directly without network overhead - Health check system monitors endpoint availability and resets timers - Endpoints support both local registry and health monitoring All library unit tests pass. Integration tests require external services. Signed-off-by: Ryan Olson <[email protected]>
|
Is this what |
- Move local engine registration from create() to start() to prevent leaks - Add conditional internal namespace handling for names starting with '_' - Implement start_background() with CriticalTaskExecutionHandle - Add background endpoint registry for lifecycle management - Ensure proper cleanup when endpoints complete or fail Key changes: * EndpointInstance.local_engine_info now stores prepared registration data * Registration happens in start() method, avoiding leaks if dropped * Internal namespaces use NamespaceDescriptor::new_internal() * Background endpoints tracked in DistributedRuntime registry * CriticalTaskExecutionHandle manages endpoint lifecycle automatically Signed-off-by: Ryan Olson <[email protected]>
|
This PR is stale because it has been open 30 days with no activity. Remove stale label or comment or this will be closed in 5 days. |
Summary
Implements LocalClient for direct, in-process engine access without network overhead. Enables type-safe engine invocation for testing and local development scenarios where network latency is not desired.
Key Technical Changes
1. StreamData Trait for Non-Sync Response Types
ManyOut<T>streams don't satisfySyncrequirement for local registry storageStreamDatatrait requiring onlySend + 'static(notSync)StreamAnyEngineWrapperprovides type erasure for heterogeneous storage2. LocalClient Implementation
3. Endpoint Lifecycle Management
create()tostart()to prevent leaks when EndpointInstance is dropped without startingstart_background()methods usingCriticalTaskExecutionHandlefor proper cleanup and failure propagation4. Internal Namespace Handling
NamespaceDescriptor::new_internal()for namespaces starting with "_"5. Health Check Integration
Architecture
Files Changed
Core Implementation
lib/runtime/src/engine.rs- StreamData trait and StreamAnyEngineWrapperlib/runtime/src/component/local_client.rs- LocalClient with type-safe downcastinglib/runtime/src/component/endpoint.rs- Lifecycle management and resource leak fixeslib/runtime/src/distributed.rs- Local registry and background endpoint trackingIntegration Points
lib/runtime/src/pipeline/network/ingress/push_handler.rs- Engine registration and health check notificationslib/runtime/src/pipeline/network.rs- PushWorkHandler trait extensionslib/runtime/src/v2/entity.rs- Runtime v2 entity descriptors with validationExamples and Tests
examples/local_client_demo.rs- Comprehensive demo with proper Annotated struct usagelib/runtime/src/component/local_client/tests.rs- Type-safety and error handling testsTesting Strategy
Automated Tests
ManyOut<Annotated<T>>with proper field usage (data, id, event, comment)Manual Verification
Migration Notes
For Existing Code
endpoint.local_client().await?Performance Considerations
Addresses Review Feedback
Risk Assessment