-
Notifications
You must be signed in to change notification settings - Fork 296
RPC transport #565
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
RPC transport #565
Conversation
|
commit: |
e7a2ae8 to
5273dcd
Compare
6ecc7db to
2fe8497
Compare
2fe8497 to
6e404aa
Compare
973aed9 to
da55f6b
Compare
72d1f7d to
f7da517
Compare
|
How far is this from being merged? |
not far, it will go though another round of reviews today |
| /** | ||
| * Validates a JSON-RPC 2.0 batch request | ||
| * @see JSON-RPC 2.0 spec section 6 | ||
| */ | ||
| function validateJSONRPCBatch(batch: unknown): batch is JSONRPCMessage[] { | ||
| if (!Array.isArray(batch)) { | ||
| throw new Error("Invalid JSON-RPC batch: must be an array"); | ||
| } | ||
|
|
||
| // Spec: "an Array with at least one value" | ||
| if (batch.length === 0) { | ||
| throw new Error("Invalid JSON-RPC batch: array must not be empty"); | ||
| } | ||
|
|
||
| // Validate each message in the batch | ||
| for (let i = 0; i < batch.length; i++) { | ||
| try { | ||
| validateJSONRPCMessage(batch[i]); | ||
| } catch (error) { | ||
| throw new Error( | ||
| `Invalid JSON-RPC batch: message at index ${i} is invalid: ${error instanceof Error ? error.message : String(error)}` | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| return true; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should probs be zod schema as MCP sdk does.
| // Set the name if the stub is a DO | ||
| if (this._doName && this._stub.setName) { | ||
| await this._stub.setName(this._doName); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont want to do this in the transport since it could be a worker to worker binding...
| if (this._props && !this._propsInitialized && this._stub.updateProps) { | ||
| await this._stub.updateProps(this._props); | ||
| this._propsInitialized = true; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here. we need a better/first class way to pass props through the transport.
| this._connectToMcpServerInternal({ | ||
| type: "http", | ||
| serverName: server.name, | ||
| url: server.server_url, | ||
| callbackUrl: server.callback_url, | ||
| options: server.server_options | ||
| ? JSON.parse(server.server_options) | ||
| : undefined, | ||
| { | ||
| reconnect: { | ||
| id: server.id, | ||
| oauthClientId: server.client_id ?? undefined | ||
| } | ||
| ) | ||
| }) | ||
| .then(() => { | ||
| // Broadcast updated MCP servers state after each server connects | ||
| this.broadcastMcpServers(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the biggest change here. I'm not sure I like it.
Why is the main agent class caring about this. it should all be added to the client manager.
| * Validates and resolves a Durable Object binding from env | ||
| * @returns The namespace and binding name for storage | ||
| */ | ||
| private _resolveRpcBinding<T extends McpAgent>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also grim to have this at the top level agent class. this is an implementation detail of the rpc transport. And actually a bad one at that.
| * await agent.addMcpServer("my-server", "https://example.com/mcp", "https://my-app.com"); | ||
| * ``` | ||
| */ | ||
| async addMcpServer< |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the lifecycle of calling this needs rethinking. I initially thought ahh we add this to onStart and then all is good. But that means you have v little control on when the mcp server is connected and when the auth happens etc. Also there is almost no error handling.
Need to think about this a bit more but maybe actually we deprecate this addMcpServer and make the lower level McpClientManager easier to use. If we do that then this change is actually not necessary as the example can directly use the client manager.
Adds custom RPC transport for MCP.
Core Implementation (packages/agents/src/mcp/rpc.ts) implements RPCClientTransport and RPCServerTransport with full JSON-RPC 2.0 validation, batch request support, and session management.
Agent Integration (packages/agents/src/index.ts) addRpcMcpServer() method to Agent class for connecting to MCP servers via RPC bindings.
McpAgent Support (packages/agents/src/mcp/index.ts) implements handleMcpMessage() with automatic transport recreation after hibernation.
Example (examples/mcp-rpc-transport/) showing Agent calling McpAgent via RPC with counter tool.
Documentation (docs/mcp-transports.md) guide covering all three transports (Streamable HTTP, SSE, RPC) with step-by-step setup.
Tests (packages/agents/src/tests/mcp/transports/rpc.test.ts)
Closes #548