Skip to content

Conversation

@t-ob
Copy link
Contributor

@t-ob t-ob commented May 30, 2025

Overview:

Adds implementation of handling embedding requests in rust lib.

Details:

  • Attempted to be consistent with existing chat_completions/completions logic
  • That said, embeddings have no autoregressive component to them, so maybe this is slightly overengineered
    • The only aggregation over a stream that occurs is essentially joining N annotated responses into a single large one
    • Given the relatively simpler implementation, there didn't seem to be a need to implement a DeltaGenerator like there is for chat/completions (in their respective delta.rs files)
    • Please lmk if I've missed the mark there!

Outside of this PR, I've also had to make the following changes to enable this:

diff --git a/launch/dynamo-run/src/input/http.rs b/launch/dynamo-run/src/input/http.rs
index c9ba1f7..b634255 100644
--- a/launch/dynamo-run/src/input/http.rs
+++ b/launch/dynamo-run/src/input/http.rs
@@ -32,6 +32,7 @@ pub async fn run(
         .port(flags.http_port)
         .enable_chat_endpoints(true)
         .enable_cmpl_endpoints(true)
+        .enable_embeddings_endpoints(true)
         .with_request_template(template)
         .build()?;
     match engine_config {
diff --git a/lib/llm/src/http/service/service_v2.rs b/lib/llm/src/http/service/service_v2.rs
index 44a5169..e381fd0 100644
--- a/lib/llm/src/http/service/service_v2.rs
+++ b/lib/llm/src/http/service/service_v2.rs
@@ -75,7 +75,7 @@ pub struct HttpServiceConfig {
     #[builder(default = "true")]
     enable_cmpl_endpoints: bool,
 
-    #[builder(default = "false")]
+    #[builder(default = "true")]
     enable_embeddings_endpoints: bool,
 
     #[builder(default = "None")]

But it does seem to me that a good default is to keep embeddings endpoints disabled, unless specifically configured. In my POC engine, I've had to hardcode these as true, but maybe the thing to do is (at least for dynamo-run) add a flag to toggle these.

Where should the reviewer start?

lib/llm/src/http/service/openai.rs

Related Issues: (use one of the action keywords Closes / Fixes / Resolves / Relates to)

  • closes GitHub issue: #xxx

Summary by CodeRabbit

  • New Features

    • Enabled embeddings API endpoints by default in the HTTP service.
    • Implemented support for embedding generation requests via the API.
    • Added streaming aggregation for embedding responses to provide consolidated results.
    • Introduced a handler for embedding requests in the Python subprocess integration.
  • Bug Fixes

    • Corrected a minor typo in documentation comments for embedding requests.
  • Chores

    • Updated default configuration to include embeddings endpoints without manual enablement.

@copy-pr-bot
Copy link

copy-pr-bot bot commented May 30, 2025

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@github-actions
Copy link

👋 Hi t-ob! Thank you for contributing to ai-dynamo/dynamo.

Just a reminder: The NVIDIA Test Github Validation CI runs an essential subset of the testing framework to quickly catch errors.Your PR reviewers may elect to test the changes comprehensively before approving your changes.

🚀

@github-actions github-actions bot added feat external-contribution Pull request is from an external contributor labels May 30, 2025
@coderabbitai
Copy link
Contributor

coderabbitai bot commented May 30, 2025

Walkthrough

Embeddings endpoint support was implemented across the codebase. The HTTP service now enables embeddings endpoints by default, and the handler for embedding requests is fully implemented. A new aggregator merges streamed embedding responses. Python subprocess logic was updated to route embedding requests to a dedicated handler, and relevant public APIs were exposed.

Changes

File(s) Change Summary
lib/llm/src/discovery/model_manager.rs Made get_embeddings_engine method public, removing #[allow(dead_code)].
lib/llm/src/http/service/openai.rs Fully implemented the embeddings async handler, added necessary imports.
lib/llm/src/protocols/openai/embeddings.rs Added public re-export of DeltaAggregator; fixed documentation typo.
lib/llm/src/protocols/openai/embeddings/aggregator.rs New file: Implements DeltaAggregator for merging streamed embedding responses, with tests and public APIs.
launch/dynamo-run/src/input/http.rs Enabled embeddings endpoints in the HTTP service builder.
launch/dynamo-run/src/subprocess/sglang_inc.py Added EmbeddingRequestHandler, updated registration and endpoint serving for embeddings.
lib/llm/src/http/service/service_v2.rs Changed default for enable_embeddings_endpoints in HttpServiceConfig to true.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant HTTP_Service
    participant ModelManager
    participant EmbeddingsEngine
    participant Aggregator

    Client->>HTTP_Service: POST /embeddings (with model name)
    HTTP_Service->>ModelManager: get_embeddings_engine(model)
    ModelManager-->>HTTP_Service: EmbeddingsEngine or error
    HTTP_Service->>EmbeddingsEngine: generate(request)
    EmbeddingsEngine-->>HTTP_Service: Stream of embedding responses
    HTTP_Service->>Aggregator: Fold stream into final response
    Aggregator-->>HTTP_Service: Aggregated embedding response
    HTTP_Service-->>Client: JSON response
Loading

Poem

In code’s deep warren, endpoints grow,
Embeddings now stream, merge, and flow.
Aggregators gather, handlers reply,
With every request, embeddings fly!
The rabbit hops—delighted, proud—
“Try the new endpoint!” it squeaks aloud.
🐇✨


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@t-ob t-ob force-pushed the tobrien/embeddings-implementation branch from a8ac751 to 4f657c3 Compare May 30, 2025 16:41
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
lib/llm/src/protocols/openai/embeddings/aggregator.rs (2)

25-27: Over-constrained DataStream alias may reject perfectly valid streams

Requiring Sync in the dynamic trait object makes the alias unusable for many Tokio/Futures streams (e.g. mpsc::Receiver, tokio_stream::wrappers::ReceiverStream) that are Send but not Sync.
Because the stream is always pinned to a single task in apply, Sync is not needed.

-type DataStream<T> = Pin<Box<dyn Stream<Item = T> + Send + Sync>>;
+type DataStream<T> = Pin<Box<dyn Stream<Item = T> + Send>>;

Removing Sync increases compatibility without sacrificing safety.


67-76: Minor readability tweak: use match on delta directly

Calling delta.ok() hides the fact that Annotated is result-like.
Matching on the original value makes control-flow clearer and avoids an extra allocation:

match delta {
    Annotated::Data(delta) => { /* happy path */ }
    Annotated::Error(error) => {
        aggregator.error = Some(error);
        return aggregator;
    }
}
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a8ac751 and 4f657c3.

📒 Files selected for processing (4)
  • lib/llm/src/discovery/model_manager.rs (1 hunks)
  • lib/llm/src/http/service/openai.rs (2 hunks)
  • lib/llm/src/protocols/openai/embeddings.rs (2 hunks)
  • lib/llm/src/protocols/openai/embeddings/aggregator.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • lib/llm/src/protocols/openai/embeddings.rs
  • lib/llm/src/discovery/model_manager.rs
  • lib/llm/src/http/service/openai.rs
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: pre-merge-rust (lib/bindings/python)
  • GitHub Check: pre-merge-rust (lib/runtime/examples)
  • GitHub Check: pre-merge-rust (.)
  • GitHub Check: Build and Test - vllm
🔇 Additional comments (1)
lib/llm/src/protocols/openai/embeddings/aggregator.rs (1)

90-93: Potential u32 overflow when aggregating token usage

prompt_tokens and total_tokens are summed without checking overflow.
Although unlikely, extremely large batch jobs (or malicious input) could wrap these counters.

Consider switching the fields to u64 (matching OpenAI’s spec) or using saturating_add:

-existing.inner.usage.prompt_tokens += response.inner.usage.prompt_tokens;
-existing.inner.usage.total_tokens += response.inner.usage.total_tokens;
+existing.inner.usage.prompt_tokens =
+    existing.inner.usage.prompt_tokens.saturating_add(response.inner.usage.prompt_tokens);
+existing.inner.usage.total_tokens =
+    existing.inner.usage.total_tokens.saturating_add(response.inner.usage.total_tokens);

@t-ob
Copy link
Contributor Author

t-ob commented Jun 2, 2025

@grahamking I've added a draft(-ish) commit 7f2839b which enables embeddings via sglang for this PR. It probably needs some slight reworking as I had to comment-out some piece of tokenizer config handling to get things to work, but the code as-is should run with:

DYN_LOG=debug dynamo run in=http out=sglang --model-path=Alibaba-NLP/gte-Qwen2-1.5B-instruct --extra-engine-args ./extra_args.json 

extra_args content:

{"is_embedding": true, "skip_tokenizer_init": false}

I've never used sglang before, so happy to be pointed at a better way of doing things, but wanted to make sure this code could be made to work with an existing engine.

One caveat, I followed the instructions at https://github.com/ai-dynamo/dynamo/blob/main/docs/guides/dynamo_run.md#sglang but found that I had to run the following to get sglang and flashinfer to work properly:

uv pip install torch==2.6.0 "sglang[all]>=0.4.6.post5" --find-links https://flashinfer.ai/whl/cu124/torch2.6/flashinfer-python/

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (2)
launch/dynamo-run/src/subprocess/sglang_inc.py (2)

129-131: Commented-out page_size could cause performance regressions

Removing arg_map["page_size"] means KV cache page size falls back to the engine default, which might increase fragmentation / latency. If the original intent was only to make it optional, add a conditional rather than fully disabling:

if config.kv_block_size:
    arg_map["page_size"] = config.kv_block_size

Currently these lines are just commented out, silently changing behaviour.


164-175: Dynamic handler selection looks good – minor DRY suggestion

Logic for choosing RequestHandler vs EmbeddingRequestHandler is duplicated in both model_type and the serve_endpoint call. A tiny refactor can avoid divergence:

is_embedding = engine_args.is_embedding
model_type = ModelType.Embedding if is_embedding else ModelType.Backend
await register_llm(model_type, endpoint, config.model_path, config.model_name)

handler_cls = EmbeddingRequestHandler if is_embedding else RequestHandler
await endpoint.serve_endpoint(handler_cls(engine_client).generate)

No functional change, but improves readability.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4f657c3 and 7f2839b.

📒 Files selected for processing (4)
  • launch/dynamo-run/src/input/http.rs (1 hunks)
  • launch/dynamo-run/src/subprocess/sglang_inc.py (3 hunks)
  • lib/llm/src/http/service/service_v2.rs (1 hunks)
  • lib/llm/src/preprocessor/prompt/template/formatters.rs (2 hunks)
✅ Files skipped from review due to trivial changes (1)
  • launch/dynamo-run/src/input/http.rs
🧰 Additional context used
🪛 Pylint (3.3.7)
launch/dynamo-run/src/subprocess/sglang_inc.py

[refactor] 80-80: Too few public methods (1/2)

(R0903)

⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: pre-merge-rust (lib/bindings/python)
  • GitHub Check: pre-merge-rust (.)
  • GitHub Check: pre-merge-rust (lib/runtime/examples)
  • GitHub Check: Build and Test - vllm
🔇 Additional comments (1)
lib/llm/src/http/service/service_v2.rs (1)

165-167: Router branch now unconditional given the default above

Because the default is now true, the if config.enable_embeddings_endpoints { … } branch will run in all deployments. Once the default is reverted (see previous comment) this branch remains correct. No change required here beyond the config flag fix.

@t-ob t-ob force-pushed the tobrien/embeddings-implementation branch from 7f2839b to cc5a709 Compare June 3, 2025 10:22
@t-ob t-ob force-pushed the tobrien/embeddings-implementation branch from cc5a709 to 85cc17f Compare June 3, 2025 10:28
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
launch/dynamo-run/src/subprocess/sglang_inc.py (1)

89-114: Consider adding error handling for the async_encode call.

The embedding generation logic is well-structured and correctly handles the non-streaming nature of async_encode. However, the implementation could benefit from error handling around the engine call.

Consider wrapping the async_encode call in a try-except block:

 async def generate(self, request):
-    gen = await self.engine_client.async_encode(prompt=request["input"])
+    try:
+        gen = await self.engine_client.async_encode(prompt=request["input"])
+    except Exception as e:
+        logging.error(f"Failed to generate embeddings: {e}")
+        raise

Additionally, you might want to validate the request format:

 async def generate(self, request):
+    if "input" not in request:
+        raise ValueError("Missing 'input' field in embedding request")
     gen = await self.engine_client.async_encode(prompt=request["input"])
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cc5a709 and 85cc17f.

📒 Files selected for processing (3)
  • launch/dynamo-run/src/input/http.rs (1 hunks)
  • launch/dynamo-run/src/subprocess/sglang_inc.py (2 hunks)
  • lib/llm/src/http/service/service_v2.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • launch/dynamo-run/src/input/http.rs
  • lib/llm/src/http/service/service_v2.rs
🧰 Additional context used
🧠 Learnings (1)
launch/dynamo-run/src/subprocess/sglang_inc.py (1)
Learnt from: t-ob
PR: ai-dynamo/dynamo#1290
File: launch/dynamo-run/src/subprocess/sglang_inc.py:80-110
Timestamp: 2025-06-03T10:17:51.672Z
Learning: The sglang `async_encode` method does not support streaming options, so collecting all embeddings before yielding is the correct approach for embedding requests.
🧬 Code Graph Analysis (1)
launch/dynamo-run/src/subprocess/sglang_inc.py (2)
lib/bindings/python/examples/hello_world/server_sglang.py (2)
  • RequestHandler (46-76)
  • generate (54-76)
lib/bindings/python/rust/lib.rs (4)
  • generate (612-623)
  • register_llm (98-137)
  • endpoint (418-424)
  • serve_endpoint (438-453)
🪛 Pylint (3.3.7)
launch/dynamo-run/src/subprocess/sglang_inc.py

[refactor] 80-80: Too few public methods (1/2)

(R0903)

⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: pre-merge-rust (lib/runtime/examples)
  • GitHub Check: pre-merge-rust (.)
  • GitHub Check: pre-merge-rust (lib/bindings/python)
  • GitHub Check: Build and Test - vllm
🔇 Additional comments (3)
launch/dynamo-run/src/subprocess/sglang_inc.py (3)

80-88: LGTM! Good inheritance pattern and proper model name handling.

The EmbeddingRequestHandler properly inherits from RequestHandler and correctly accepts the model name in the constructor, addressing the previous placeholder "TODO" concern from past reviews.

🧰 Tools
🪛 Pylint (3.3.7)

[refactor] 80-80: Too few public methods (1/2)

(R0903)


168-170: Clean conditional logic for model type selection.

The model type selection correctly uses ModelType.Embedding when the embedding flag is set, maintaining compatibility with the existing backend registration.


175-181: Well-implemented conditional handler selection.

The endpoint serving logic properly routes to the appropriate handler based on the embedding configuration. The fallback from config.model_name to config.model_path for the model name is sensible.

@t-ob t-ob force-pushed the tobrien/embeddings-implementation branch from 85cc17f to 7ee7db0 Compare June 3, 2025 10:44
@t-ob t-ob force-pushed the tobrien/embeddings-implementation branch from 7ee7db0 to d3b8652 Compare June 4, 2025 14:28
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
launch/dynamo-run/src/subprocess/sglang_inc.py (1)

80-114: Add error handling for embedding generation.

The implementation looks correct and follows OpenAI API format properly. However, consider adding error handling for potential failures in async_encode or malformed response structures.

 async def generate(self, request):
-    gen = await self.engine_client.async_encode(prompt=request["input"])
-    tokens = 0
-    embeddings = []
-    for idx, res in enumerate(gen):
-        embeddings.append(
-            {
-                "index": idx,
-                "object": "embedding",
-                "embedding": res["embedding"],
-            }
-        )
-        tokens += res["meta_info"]["prompt_tokens"]
+    try:
+        gen = await self.engine_client.async_encode(prompt=request["input"])
+        tokens = 0
+        embeddings = []
+        for idx, res in enumerate(gen):
+            embeddings.append(
+                {
+                    "index": idx,
+                    "object": "embedding",
+                    "embedding": res["embedding"],
+                }
+            )
+            tokens += res.get("meta_info", {}).get("prompt_tokens", 0)
+    except Exception as e:
+        logging.error(f"Error generating embeddings: {e}")
+        raise
🧰 Tools
🪛 Pylint (3.3.7)

[refactor] 80-80: Too few public methods (1/2)

(R0903)

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7ee7db0 and d3b8652.

📒 Files selected for processing (3)
  • launch/dynamo-run/src/input/http.rs (1 hunks)
  • launch/dynamo-run/src/subprocess/sglang_inc.py (2 hunks)
  • lib/llm/src/http/service/service_v2.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • lib/llm/src/http/service/service_v2.rs
  • launch/dynamo-run/src/input/http.rs
🧰 Additional context used
🧠 Learnings (1)
launch/dynamo-run/src/subprocess/sglang_inc.py (1)
Learnt from: t-ob
PR: ai-dynamo/dynamo#1290
File: launch/dynamo-run/src/subprocess/sglang_inc.py:80-110
Timestamp: 2025-06-03T10:17:51.672Z
Learning: The sglang `async_encode` method does not support streaming options, so collecting all embeddings before yielding is the correct approach for embedding requests.
🪛 Pylint (3.3.7)
launch/dynamo-run/src/subprocess/sglang_inc.py

[refactor] 80-80: Too few public methods (1/2)

(R0903)

⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: pre-merge-rust (lib/bindings/python)
  • GitHub Check: pre-merge-rust (lib/runtime/examples)
  • GitHub Check: pre-merge-rust (.)
  • GitHub Check: Build and Test - vllm
🔇 Additional comments (2)
launch/dynamo-run/src/subprocess/sglang_inc.py (2)

168-170: LGTM: Clean model type selection logic.

The conditional logic correctly selects the appropriate model type based on the embedding flag.


175-181: LGTM: Proper conditional endpoint serving.

The logic correctly routes embedding requests to the specialized handler while maintaining the existing behavior for regular requests. The model name fallback logic is appropriate.

@t-ob t-ob merged commit e83009a into ai-dynamo:main Jun 4, 2025
10 checks passed
@t-ob t-ob deleted the tobrien/embeddings-implementation branch June 4, 2025 15:23
@GuanLuo
Copy link
Contributor

GuanLuo commented Jun 4, 2025

Closing #1313?

@grahamking
Copy link
Contributor

Closing #1313?

I don't understand. Embeddings will benefit from "HTTP server should vary endpoints by available models", but this PR doesn't implement that.

@GuanLuo
Copy link
Contributor

GuanLuo commented Jun 5, 2025

@grahamking I guess I wasn't understanding the ask of #1313 clearly, is the intention to not even expose the endpoint if there is no model registered to it? And it is going to be on/off dynamically?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

external-contribution Pull request is from an external contributor feat size/L

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants