-
Couldn't load subscription status.
- Fork 1.3k
Add Streamable HTTP Transport to MCPServerHTTP #1905
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -18,22 +18,19 @@ pip/uv-add "pydantic-ai-slim[mcp]" | |||||||||
|
|
||||||||||
| PydanticAI comes with two ways to connect to MCP servers: | ||||||||||
|
|
||||||||||
| - [`MCPServerHTTP`][pydantic_ai.mcp.MCPServerHTTP] which connects to an MCP server using the [HTTP SSE](https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse) transport | ||||||||||
| - [`MCPServerHTTP`][pydantic_ai.mcp.MCPServerHTTP] which connects to an MCP server using the [HTTP SSE](https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse) or [Streamable HTTP](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http) transport | ||||||||||
| - [`MCPServerStdio`][pydantic_ai.mcp.MCPServerStdio] which runs the server as a subprocess and connects to it using the [stdio](https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#stdio) transport | ||||||||||
|
|
||||||||||
| Examples of both are shown below; [mcp-run-python](run-python.md) is used as the MCP server in both examples. | ||||||||||
|
|
||||||||||
| ### SSE Client | ||||||||||
| ### SSE or Streamable HTTP Client | ||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
|
|
||||||||||
| [`MCPServerHTTP`][pydantic_ai.mcp.MCPServerHTTP] connects over HTTP using the [HTTP + Server Sent Events transport](https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse) to a server. | ||||||||||
| [`MCPServerHTTP`][pydantic_ai.mcp.MCPServerHTTP] connects over HTTP using the [HTTP + Server Sent Events transport](https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse) or the [Streamable HTTP](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http) to a server. | ||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
|
|
||||||||||
| !!! note | ||||||||||
| [`MCPServerHTTP`][pydantic_ai.mcp.MCPServerHTTP] requires an MCP server to be running and accepting HTTP connections before calling [`agent.run_mcp_servers()`][pydantic_ai.Agent.run_mcp_servers]. Running the server is not managed by PydanticAI. | ||||||||||
|
|
||||||||||
| The name "HTTP" is used since this implemented will be adapted in future to use the new | ||||||||||
| [Streamable HTTP](https://github.com/modelcontextprotocol/specification/pull/206) currently in development. | ||||||||||
|
|
||||||||||
| Before creating the SSE client, we need to run the server (docs [here](run-python.md)): | ||||||||||
| Before creating the client, we need to run the server (docs [here](run-python.md)): | ||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
|
|
||||||||||
| ```bash {title="terminal (run sse server)"} | ||||||||||
| deno run \ | ||||||||||
|
|
@@ -45,7 +42,7 @@ deno run \ | |||||||||
| from pydantic_ai import Agent | ||||||||||
| from pydantic_ai.mcp import MCPServerHTTP | ||||||||||
|
|
||||||||||
| server = MCPServerHTTP(url='http://localhost:3001/sse') # (1)! | ||||||||||
| server = MCPServerHTTP(url='http://localhost:3001/sse', transport='sse') # (1)! | ||||||||||
| agent = Agent('openai:gpt-4o', mcp_servers=[server]) # (2)! | ||||||||||
|
|
||||||||||
|
|
||||||||||
|
|
@@ -56,7 +53,7 @@ async def main(): | |||||||||
| #> There are 9,208 days between January 1, 2000, and March 18, 2025. | ||||||||||
| ``` | ||||||||||
|
|
||||||||||
| 1. Define the MCP server with the URL used to connect. | ||||||||||
| 1. Define the MCP server with the URL and transport used to connect. The url will typically end in `/mcp` for HTTP servers and `/sse` for SSE. Transport can be either sse or streamable-http. | ||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
| 2. Create an agent with the MCP server attached. | ||||||||||
| 3. Create a client session to connect to the server. | ||||||||||
|
|
||||||||||
|
|
@@ -84,6 +81,10 @@ Will display as follows: | |||||||||
|
|
||||||||||
|  | ||||||||||
|
|
||||||||||
| #### Transport | ||||||||||
|
|
||||||||||
| `MCPServerHTTP` supports both SSE (`sse`) and Streamable HTTP (`streamable-http`) servers which can be specified via the `transport` parameter. | ||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can drop this section as I suggested mentioning the It'd be nice if we could also have a Streamable HTTP example! |
||||||||||
|
|
||||||||||
| ### MCP "stdio" Server | ||||||||||
|
|
||||||||||
| The other transport offered by MCP is the [stdio transport](https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#stdio) where the server is run as a subprocess and communicates with the client over `stdin` and `stdout`. In this case, you'd use the [`MCPServerStdio`][pydantic_ai.mcp.MCPServerStdio] class. | ||||||||||
|
|
||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,17 +6,17 @@ | |
| from collections.abc import AsyncIterator, Sequence | ||
| from contextlib import AsyncExitStack, asynccontextmanager | ||
| from dataclasses import dataclass | ||
| from datetime import timedelta | ||
| from pathlib import Path | ||
| from types import TracebackType | ||
| from typing import Any | ||
| from typing import Any, Literal | ||
|
|
||
| import anyio | ||
| from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream | ||
| from mcp.types import ( | ||
| BlobResourceContents, | ||
| EmbeddedResource, | ||
| ImageContent, | ||
| JSONRPCMessage, | ||
| LoggingLevel, | ||
| TextContent, | ||
| TextResourceContents, | ||
|
|
@@ -31,6 +31,8 @@ | |
| from mcp.client.session import ClientSession | ||
| from mcp.client.sse import sse_client | ||
| from mcp.client.stdio import StdioServerParameters, stdio_client | ||
| from mcp.client.streamable_http import streamablehttp_client | ||
| from mcp.shared.message import SessionMessage | ||
| except ImportError as _import_error: | ||
| raise ImportError( | ||
| 'Please install the `mcp` package to use the MCP server, ' | ||
|
|
@@ -56,8 +58,8 @@ class MCPServer(ABC): | |
| """ | ||
|
|
||
| _client: ClientSession | ||
| _read_stream: MemoryObjectReceiveStream[JSONRPCMessage | Exception] | ||
| _write_stream: MemoryObjectSendStream[JSONRPCMessage] | ||
| _read_stream: MemoryObjectReceiveStream[SessionMessage | Exception] | ||
| _write_stream: MemoryObjectSendStream[SessionMessage] | ||
| _exit_stack: AsyncExitStack | ||
|
|
||
| @abstractmethod | ||
|
|
@@ -66,8 +68,8 @@ async def client_streams( | |
| self, | ||
| ) -> AsyncIterator[ | ||
| tuple[ | ||
| MemoryObjectReceiveStream[JSONRPCMessage | Exception], | ||
| MemoryObjectSendStream[JSONRPCMessage], | ||
| MemoryObjectReceiveStream[SessionMessage | Exception], | ||
| MemoryObjectSendStream[SessionMessage], | ||
| ] | ||
| ]: | ||
| """Create the streams for the MCP server.""" | ||
|
|
@@ -266,8 +268,8 @@ async def client_streams( | |
| self, | ||
| ) -> AsyncIterator[ | ||
| tuple[ | ||
| MemoryObjectReceiveStream[JSONRPCMessage | Exception], | ||
| MemoryObjectSendStream[JSONRPCMessage], | ||
| MemoryObjectReceiveStream[SessionMessage | Exception], | ||
| MemoryObjectSendStream[SessionMessage], | ||
| ] | ||
| ]: | ||
| server = StdioServerParameters(command=self.command, args=list(self.args), env=self.env, cwd=self.cwd) | ||
|
|
@@ -288,11 +290,8 @@ def _get_client_initialize_timeout(self) -> float: | |
| class MCPServerHTTP(MCPServer): | ||
| """An MCP server that connects over streamable HTTP connections. | ||
|
|
||
| This class implements the SSE transport from the MCP specification. | ||
| See <https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse> for more information. | ||
|
|
||
| The name "HTTP" is used since this implemented will be adapted in future to use the new | ||
| [Streamable HTTP](https://github.com/modelcontextprotocol/specification/pull/206) currently in development. | ||
| This class implements both the SSE and Streamable HTTP transports from the MCP specification. | ||
| See <https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http> for more information. | ||
|
|
||
| !!! note | ||
| Using this class as an async context manager will create a new pool of HTTP connections to connect | ||
|
|
@@ -303,7 +302,7 @@ class MCPServerHTTP(MCPServer): | |
| from pydantic_ai import Agent | ||
| from pydantic_ai.mcp import MCPServerHTTP | ||
|
|
||
| server = MCPServerHTTP('http://localhost:3001/sse') # (1)! | ||
| server = MCPServerHTTP('http://localhost:3001/sse', transport='sse') # (1)! | ||
| agent = Agent('openai:gpt-4o', mcp_servers=[server]) | ||
|
|
||
| async def main(): | ||
|
|
@@ -358,22 +357,38 @@ async def main(): | |
| For example, if `tool_prefix='foo'`, then a tool named `bar` will be registered as `foo_bar` | ||
| """ | ||
|
|
||
| transport: Literal['sse', 'streamable-http'] = 'sse' | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These arguments match the underlying terms that FastMCP uses There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you have a chance, could you see if it would be easy to implement an |
||
| """The transport the MCP server is using. | ||
|
|
||
| If empty, sse will be assumed for backwards compatibality reasons. | ||
| """ | ||
|
|
||
| @asynccontextmanager | ||
| async def client_streams( | ||
| self, | ||
| ) -> AsyncIterator[ | ||
| tuple[ | ||
| MemoryObjectReceiveStream[JSONRPCMessage | Exception], | ||
| MemoryObjectSendStream[JSONRPCMessage], | ||
| MemoryObjectReceiveStream[SessionMessage | Exception], | ||
| MemoryObjectSendStream[SessionMessage], | ||
| ] | ||
| ]: # pragma: no cover | ||
| async with sse_client( | ||
| url=self.url, | ||
| headers=self.headers, | ||
| timeout=self.timeout, | ||
| sse_read_timeout=self.sse_read_timeout, | ||
| ) as (read_stream, write_stream): | ||
| yield read_stream, write_stream | ||
| if self.transport == 'sse': | ||
| async with sse_client( | ||
| url=self.url, | ||
| headers=self.headers, | ||
| timeout=self.timeout, | ||
| sse_read_timeout=self.sse_read_timeout, | ||
| ) as (read_stream, write_stream): | ||
| yield read_stream, write_stream | ||
| elif self.transport == 'streamable-http': | ||
| async with streamablehttp_client( | ||
| url=self.url, | ||
| headers=self.headers, | ||
| timeout=self.timeout if isinstance(self.timeout, timedelta) else timedelta(seconds=self.timeout), | ||
| ) as (read_stream, write_stream, _): | ||
| yield read_stream, write_stream | ||
| else: | ||
| raise ValueError(f'Unsupported transport type: {self.transport}') | ||
|
|
||
| def _get_log_level(self) -> LoggingLevel | None: | ||
| return self.log_level | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -1,7 +1,11 @@ | ||||||
| """Tests for the MCP (Model Context Protocol) server implementation.""" | ||||||
|
|
||||||
| import multiprocessing | ||||||
| import re | ||||||
| import socket | ||||||
| import time | ||||||
| from pathlib import Path | ||||||
| from typing import Literal | ||||||
|
|
||||||
| import pytest | ||||||
| from inline_snapshot import snapshot | ||||||
|
|
@@ -21,6 +25,7 @@ | |||||
| from pydantic_ai.usage import Usage | ||||||
|
|
||||||
| from .conftest import IsDatetime, try_import | ||||||
| from .mcp_server import mcp as test_mcp_server | ||||||
|
|
||||||
| with try_import() as imports_successful: | ||||||
| from pydantic_ai.mcp import MCPServerHTTP, MCPServerStdio | ||||||
|
|
@@ -31,7 +36,6 @@ | |||||
| pytestmark = [ | ||||||
| pytest.mark.skipif(not imports_successful(), reason='mcp and openai not installed'), | ||||||
| pytest.mark.anyio, | ||||||
| pytest.mark.vcr, | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. calling this out, I think all VCR tests are marked individually, but something about this blocked not the initial connection but the subsequent calls within the server. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, this is correct. You need to use |
||||||
| ] | ||||||
|
|
||||||
|
|
||||||
|
|
@@ -42,6 +46,71 @@ def agent(openai_api_key: str): | |||||
| return Agent(model, mcp_servers=[server]) | ||||||
|
|
||||||
|
|
||||||
| def run_sse_server() -> None: | ||||||
| """Run the Streamable HTTP MCP server.""" | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| test_mcp_server.run(transport='sse') | ||||||
|
|
||||||
|
|
||||||
| def run_streamable_http_server() -> None: | ||||||
| """Run the SSE MCP Server""" | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| test_mcp_server.run(transport='streamable-http') | ||||||
|
|
||||||
|
|
||||||
| @pytest.fixture | ||||||
| def mcp_server(request: pytest.FixtureRequest): | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this feels complex, but I couldn't figure out a simpler way. |
||||||
| proc = multiprocessing.Process(target=request.param, daemon=True) | ||||||
| print('Staring streamable http server process on port port') | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| proc.start() | ||||||
|
|
||||||
| max_attempts = 20 | ||||||
| attempt = 0 | ||||||
|
|
||||||
| while attempt < max_attempts: | ||||||
| try: | ||||||
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | ||||||
| s.connect(('localhost', 8000)) | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should use a randomly assigned port here, we can't be sure 8000 will be available There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like FastMCP has its own host and port settings, and the |
||||||
| print('MCP server started.') | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| break | ||||||
| except ConnectionRefusedError: | ||||||
| time.sleep(0.1) | ||||||
| attempt += 1 | ||||||
| else: | ||||||
| proc.kill() # Ensure process is killed if it fails to start | ||||||
| proc.join(timeout=1) | ||||||
| raise RuntimeError(f'StreamableHTTP server failed to start on port after {max_attempts} attempts') | ||||||
|
|
||||||
| yield | ||||||
| proc.kill() | ||||||
|
|
||||||
|
|
||||||
| @pytest.mark.parametrize( | ||||||
| 'mcp_server, path, transport', | ||||||
| [ | ||||||
| ( | ||||||
| run_sse_server, | ||||||
| 'sse', | ||||||
| 'sse', | ||||||
| ), | ||||||
| ( | ||||||
| run_streamable_http_server, | ||||||
| 'mcp', | ||||||
| 'streamable-http', | ||||||
| ), | ||||||
| ], | ||||||
| indirect=['mcp_server'], | ||||||
| ) | ||||||
| async def test_http_servers(mcp_server: None, path: str, transport: Literal['sse', 'streamable-http']): | ||||||
| server = MCPServerHTTP(url=f'http://localhost:8000/{path}', transport=transport) | ||||||
| async with server: | ||||||
| tools = await server.list_tools() | ||||||
| assert len(tools) == 10 | ||||||
| assert tools[0].name == 'celsius_to_fahrenheit' | ||||||
| assert tools[0].description.startswith('Convert Celsius to Fahrenheit.') | ||||||
|
|
||||||
| result = await server.call_tool('celsius_to_fahrenheit', {'celsius': 0}) | ||||||
| assert result == snapshot('32.0') | ||||||
|
|
||||||
|
|
||||||
| async def test_stdio_server(): | ||||||
| server = MCPServerStdio('python', ['-m', 'tests.mcp_server']) | ||||||
| async with server: | ||||||
|
|
||||||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.