-
Notifications
You must be signed in to change notification settings - Fork 664
feat: add implementation for embeddings #1290
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
👋 Hi t-ob! Thank you for contributing to ai-dynamo/dynamo. Just a reminder: The 🚀 |
WalkthroughEmbeddings endpoint support was implemented across the codebase. The HTTP service now enables embeddings endpoints by default, and the handler for embedding requests is fully implemented. A new aggregator merges streamed embedding responses. Python subprocess logic was updated to route embedding requests to a dedicated handler, and relevant public APIs were exposed. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant HTTP_Service
participant ModelManager
participant EmbeddingsEngine
participant Aggregator
Client->>HTTP_Service: POST /embeddings (with model name)
HTTP_Service->>ModelManager: get_embeddings_engine(model)
ModelManager-->>HTTP_Service: EmbeddingsEngine or error
HTTP_Service->>EmbeddingsEngine: generate(request)
EmbeddingsEngine-->>HTTP_Service: Stream of embedding responses
HTTP_Service->>Aggregator: Fold stream into final response
Aggregator-->>HTTP_Service: Aggregated embedding response
HTTP_Service-->>Client: JSON response
Poem
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
a8ac751 to
4f657c3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
lib/llm/src/protocols/openai/embeddings/aggregator.rs (2)
25-27: Over-constrainedDataStreamalias may reject perfectly valid streamsRequiring
Syncin the dynamic trait object makes the alias unusable for many Tokio/Futures streams (e.g.mpsc::Receiver,tokio_stream::wrappers::ReceiverStream) that areSendbut notSync.
Because the stream is always pinned to a single task inapply,Syncis not needed.-type DataStream<T> = Pin<Box<dyn Stream<Item = T> + Send + Sync>>; +type DataStream<T> = Pin<Box<dyn Stream<Item = T> + Send>>;Removing
Syncincreases compatibility without sacrificing safety.
67-76: Minor readability tweak: usematchondeltadirectlyCalling
delta.ok()hides the fact thatAnnotatedis result-like.
Matching on the original value makes control-flow clearer and avoids an extra allocation:match delta { Annotated::Data(delta) => { /* happy path */ } Annotated::Error(error) => { aggregator.error = Some(error); return aggregator; } }
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
lib/llm/src/discovery/model_manager.rs(1 hunks)lib/llm/src/http/service/openai.rs(2 hunks)lib/llm/src/protocols/openai/embeddings.rs(2 hunks)lib/llm/src/protocols/openai/embeddings/aggregator.rs(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- lib/llm/src/protocols/openai/embeddings.rs
- lib/llm/src/discovery/model_manager.rs
- lib/llm/src/http/service/openai.rs
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: pre-merge-rust (lib/bindings/python)
- GitHub Check: pre-merge-rust (lib/runtime/examples)
- GitHub Check: pre-merge-rust (.)
- GitHub Check: Build and Test - vllm
🔇 Additional comments (1)
lib/llm/src/protocols/openai/embeddings/aggregator.rs (1)
90-93: Potentialu32overflow when aggregating token usage
prompt_tokensandtotal_tokensare summed without checking overflow.
Although unlikely, extremely large batch jobs (or malicious input) could wrap these counters.Consider switching the fields to
u64(matching OpenAI’s spec) or usingsaturating_add:-existing.inner.usage.prompt_tokens += response.inner.usage.prompt_tokens; -existing.inner.usage.total_tokens += response.inner.usage.total_tokens; +existing.inner.usage.prompt_tokens = + existing.inner.usage.prompt_tokens.saturating_add(response.inner.usage.prompt_tokens); +existing.inner.usage.total_tokens = + existing.inner.usage.total_tokens.saturating_add(response.inner.usage.total_tokens);
|
@grahamking I've added a draft(-ish) commit 7f2839b which enables embeddings via DYN_LOG=debug dynamo run in=http out=sglang --model-path=Alibaba-NLP/gte-Qwen2-1.5B-instruct --extra-engine-args ./extra_args.json extra_args content: {"is_embedding": true, "skip_tokenizer_init": false}I've never used sglang before, so happy to be pointed at a better way of doing things, but wanted to make sure this code could be made to work with an existing engine. One caveat, I followed the instructions at https://github.com/ai-dynamo/dynamo/blob/main/docs/guides/dynamo_run.md#sglang but found that I had to run the following to get sglang and flashinfer to work properly: uv pip install torch==2.6.0 "sglang[all]>=0.4.6.post5" --find-links https://flashinfer.ai/whl/cu124/torch2.6/flashinfer-python/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (2)
launch/dynamo-run/src/subprocess/sglang_inc.py (2)
129-131: Commented-outpage_sizecould cause performance regressionsRemoving
arg_map["page_size"]means KV cache page size falls back to the engine default, which might increase fragmentation / latency. If the original intent was only to make it optional, add a conditional rather than fully disabling:if config.kv_block_size: arg_map["page_size"] = config.kv_block_sizeCurrently these lines are just commented out, silently changing behaviour.
164-175: Dynamic handler selection looks good – minor DRY suggestionLogic for choosing
RequestHandlervsEmbeddingRequestHandleris duplicated in bothmodel_typeand theserve_endpointcall. A tiny refactor can avoid divergence:is_embedding = engine_args.is_embedding model_type = ModelType.Embedding if is_embedding else ModelType.Backend await register_llm(model_type, endpoint, config.model_path, config.model_name) handler_cls = EmbeddingRequestHandler if is_embedding else RequestHandler await endpoint.serve_endpoint(handler_cls(engine_client).generate)No functional change, but improves readability.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
launch/dynamo-run/src/input/http.rs(1 hunks)launch/dynamo-run/src/subprocess/sglang_inc.py(3 hunks)lib/llm/src/http/service/service_v2.rs(1 hunks)lib/llm/src/preprocessor/prompt/template/formatters.rs(2 hunks)
✅ Files skipped from review due to trivial changes (1)
- launch/dynamo-run/src/input/http.rs
🧰 Additional context used
🪛 Pylint (3.3.7)
launch/dynamo-run/src/subprocess/sglang_inc.py
[refactor] 80-80: Too few public methods (1/2)
(R0903)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: pre-merge-rust (lib/bindings/python)
- GitHub Check: pre-merge-rust (.)
- GitHub Check: pre-merge-rust (lib/runtime/examples)
- GitHub Check: Build and Test - vllm
🔇 Additional comments (1)
lib/llm/src/http/service/service_v2.rs (1)
165-167: Router branch now unconditional given the default aboveBecause the default is now
true, theif config.enable_embeddings_endpoints { … }branch will run in all deployments. Once the default is reverted (see previous comment) this branch remains correct. No change required here beyond the config flag fix.
7f2839b to
cc5a709
Compare
cc5a709 to
85cc17f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
launch/dynamo-run/src/subprocess/sglang_inc.py (1)
89-114: Consider adding error handling for the async_encode call.The embedding generation logic is well-structured and correctly handles the non-streaming nature of
async_encode. However, the implementation could benefit from error handling around the engine call.Consider wrapping the
async_encodecall in a try-except block:async def generate(self, request): - gen = await self.engine_client.async_encode(prompt=request["input"]) + try: + gen = await self.engine_client.async_encode(prompt=request["input"]) + except Exception as e: + logging.error(f"Failed to generate embeddings: {e}") + raiseAdditionally, you might want to validate the request format:
async def generate(self, request): + if "input" not in request: + raise ValueError("Missing 'input' field in embedding request") gen = await self.engine_client.async_encode(prompt=request["input"])
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
launch/dynamo-run/src/input/http.rs(1 hunks)launch/dynamo-run/src/subprocess/sglang_inc.py(2 hunks)lib/llm/src/http/service/service_v2.rs(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- launch/dynamo-run/src/input/http.rs
- lib/llm/src/http/service/service_v2.rs
🧰 Additional context used
🧠 Learnings (1)
launch/dynamo-run/src/subprocess/sglang_inc.py (1)
Learnt from: t-ob
PR: ai-dynamo/dynamo#1290
File: launch/dynamo-run/src/subprocess/sglang_inc.py:80-110
Timestamp: 2025-06-03T10:17:51.672Z
Learning: The sglang `async_encode` method does not support streaming options, so collecting all embeddings before yielding is the correct approach for embedding requests.
🧬 Code Graph Analysis (1)
launch/dynamo-run/src/subprocess/sglang_inc.py (2)
lib/bindings/python/examples/hello_world/server_sglang.py (2)
RequestHandler(46-76)generate(54-76)lib/bindings/python/rust/lib.rs (4)
generate(612-623)register_llm(98-137)endpoint(418-424)serve_endpoint(438-453)
🪛 Pylint (3.3.7)
launch/dynamo-run/src/subprocess/sglang_inc.py
[refactor] 80-80: Too few public methods (1/2)
(R0903)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: pre-merge-rust (lib/runtime/examples)
- GitHub Check: pre-merge-rust (.)
- GitHub Check: pre-merge-rust (lib/bindings/python)
- GitHub Check: Build and Test - vllm
🔇 Additional comments (3)
launch/dynamo-run/src/subprocess/sglang_inc.py (3)
80-88: LGTM! Good inheritance pattern and proper model name handling.The
EmbeddingRequestHandlerproperly inherits fromRequestHandlerand correctly accepts the model name in the constructor, addressing the previous placeholder "TODO" concern from past reviews.🧰 Tools
🪛 Pylint (3.3.7)
[refactor] 80-80: Too few public methods (1/2)
(R0903)
168-170: Clean conditional logic for model type selection.The model type selection correctly uses
ModelType.Embeddingwhen the embedding flag is set, maintaining compatibility with the existing backend registration.
175-181: Well-implemented conditional handler selection.The endpoint serving logic properly routes to the appropriate handler based on the embedding configuration. The fallback from
config.model_nametoconfig.model_pathfor the model name is sensible.
85cc17f to
7ee7db0
Compare
7ee7db0 to
d3b8652
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
launch/dynamo-run/src/subprocess/sglang_inc.py (1)
80-114: Add error handling for embedding generation.The implementation looks correct and follows OpenAI API format properly. However, consider adding error handling for potential failures in
async_encodeor malformed response structures.async def generate(self, request): - gen = await self.engine_client.async_encode(prompt=request["input"]) - tokens = 0 - embeddings = [] - for idx, res in enumerate(gen): - embeddings.append( - { - "index": idx, - "object": "embedding", - "embedding": res["embedding"], - } - ) - tokens += res["meta_info"]["prompt_tokens"] + try: + gen = await self.engine_client.async_encode(prompt=request["input"]) + tokens = 0 + embeddings = [] + for idx, res in enumerate(gen): + embeddings.append( + { + "index": idx, + "object": "embedding", + "embedding": res["embedding"], + } + ) + tokens += res.get("meta_info", {}).get("prompt_tokens", 0) + except Exception as e: + logging.error(f"Error generating embeddings: {e}") + raise🧰 Tools
🪛 Pylint (3.3.7)
[refactor] 80-80: Too few public methods (1/2)
(R0903)
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
launch/dynamo-run/src/input/http.rs(1 hunks)launch/dynamo-run/src/subprocess/sglang_inc.py(2 hunks)lib/llm/src/http/service/service_v2.rs(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- lib/llm/src/http/service/service_v2.rs
- launch/dynamo-run/src/input/http.rs
🧰 Additional context used
🧠 Learnings (1)
launch/dynamo-run/src/subprocess/sglang_inc.py (1)
Learnt from: t-ob
PR: ai-dynamo/dynamo#1290
File: launch/dynamo-run/src/subprocess/sglang_inc.py:80-110
Timestamp: 2025-06-03T10:17:51.672Z
Learning: The sglang `async_encode` method does not support streaming options, so collecting all embeddings before yielding is the correct approach for embedding requests.
🪛 Pylint (3.3.7)
launch/dynamo-run/src/subprocess/sglang_inc.py
[refactor] 80-80: Too few public methods (1/2)
(R0903)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: pre-merge-rust (lib/bindings/python)
- GitHub Check: pre-merge-rust (lib/runtime/examples)
- GitHub Check: pre-merge-rust (.)
- GitHub Check: Build and Test - vllm
🔇 Additional comments (2)
launch/dynamo-run/src/subprocess/sglang_inc.py (2)
168-170: LGTM: Clean model type selection logic.The conditional logic correctly selects the appropriate model type based on the embedding flag.
175-181: LGTM: Proper conditional endpoint serving.The logic correctly routes embedding requests to the specialized handler while maintaining the existing behavior for regular requests. The model name fallback logic is appropriate.
|
Closing #1313? |
I don't understand. Embeddings will benefit from "HTTP server should vary endpoints by available models", but this PR doesn't implement that. |
|
@grahamking I guess I wasn't understanding the ask of #1313 clearly, is the intention to not even expose the endpoint if there is no model registered to it? And it is going to be on/off dynamically? |
Overview:
Adds implementation of handling embedding requests in rust lib.
Details:
DeltaGeneratorlike there is for chat/completions (in their respectivedelta.rsfiles)Outside of this PR, I've also had to make the following changes to enable this:
But it does seem to me that a good default is to keep embeddings endpoints disabled, unless specifically configured. In my POC engine, I've had to hardcode these as true, but maybe the thing to do is (at least for
dynamo-run) add a flag to toggle these.Where should the reviewer start?
lib/llm/src/http/service/openai.rsRelated Issues: (use one of the action keywords Closes / Fixes / Resolves / Relates to)
Summary by CodeRabbit
New Features
Bug Fixes
Chores