Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions docs/mcp/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,19 @@ pip/uv-add "pydantic-ai-slim[mcp]"

PydanticAI comes with two ways to connect to MCP servers:

- [`MCPServerHTTP`][pydantic_ai.mcp.MCPServerHTTP] which connects to an MCP server using the [HTTP SSE](https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse) transport
- [`MCPServerHTTP`][pydantic_ai.mcp.MCPServerHTTP] which connects to an MCP server using the [HTTP SSE](https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse) or [Streamable HTTP](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http) transport
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- [`MCPServerHTTP`][pydantic_ai.mcp.MCPServerHTTP] which connects to an MCP server using the [HTTP SSE](https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse) or [Streamable HTTP](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http) transport
- [`MCPServerHTTP`][pydantic_ai.mcp.MCPServerHTTP] which connects to an MCP server using the [Streamable HTTP](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http) or older [HTTP SSE](https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse) transport

- [`MCPServerStdio`][pydantic_ai.mcp.MCPServerStdio] which runs the server as a subprocess and connects to it using the [stdio](https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#stdio) transport

Examples of both are shown below; [mcp-run-python](run-python.md) is used as the MCP server in both examples.

### SSE Client
### SSE or Streamable HTTP Client
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
### SSE or Streamable HTTP Client
### HTTP Client


[`MCPServerHTTP`][pydantic_ai.mcp.MCPServerHTTP] connects over HTTP using the [HTTP + Server Sent Events transport](https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse) to a server.
[`MCPServerHTTP`][pydantic_ai.mcp.MCPServerHTTP] connects over HTTP using the [HTTP + Server Sent Events transport](https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse) or the [Streamable HTTP](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http) to a server.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
[`MCPServerHTTP`][pydantic_ai.mcp.MCPServerHTTP] connects over HTTP using the [HTTP + Server Sent Events transport](https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse) or the [Streamable HTTP](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http) to a server.
[`MCPServerHTTP`][pydantic_ai.mcp.MCPServerHTTP] connects to a server over HTTP using [Streamable HTTP](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http) or [HTTP + Server Sent Events](https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse).


!!! note
[`MCPServerHTTP`][pydantic_ai.mcp.MCPServerHTTP] requires an MCP server to be running and accepting HTTP connections before calling [`agent.run_mcp_servers()`][pydantic_ai.Agent.run_mcp_servers]. Running the server is not managed by PydanticAI.

The name "HTTP" is used since this implemented will be adapted in future to use the new
[Streamable HTTP](https://github.com/modelcontextprotocol/specification/pull/206) currently in development.

Before creating the SSE client, we need to run the server (docs [here](run-python.md)):
Before creating the client, we need to run the server (docs [here](run-python.md)):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Before creating the client, we need to run the server (docs [here](run-python.md)):
For backwards compatibility, the default transport is SSE. If your server instead uses Streamable HTTP, you have to specify the `transport='streamable-http'` option when creating `MCPServerHTTP`.
Before creating the client, we need to run the server (docs [here](run-python.md)):


```bash {title="terminal (run sse server)"}
deno run \
Expand All @@ -45,7 +42,7 @@ deno run \
from pydantic_ai import Agent
from pydantic_ai.mcp import MCPServerHTTP

server = MCPServerHTTP(url='http://localhost:3001/sse') # (1)!
server = MCPServerHTTP(url='http://localhost:3001/sse', transport='sse') # (1)!
agent = Agent('openai:gpt-4o', mcp_servers=[server]) # (2)!


Expand All @@ -56,7 +53,7 @@ async def main():
#> There are 9,208 days between January 1, 2000, and March 18, 2025.
```

1. Define the MCP server with the URL used to connect.
1. Define the MCP server with the URL and transport used to connect. The url will typically end in `/mcp` for HTTP servers and `/sse` for SSE. Transport can be either sse or streamable-http.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
1. Define the MCP server with the URL and transport used to connect. The url will typically end in `/mcp` for HTTP servers and `/sse` for SSE. Transport can be either sse or streamable-http.
1. Define the MCP server with the URL and transport used to connect. The URL will typically end in `/mcp` for Streamable HTTP servers and `/sse` for SSE. Transport can be either `'sse'` or `'streamable-http'`.

2. Create an agent with the MCP server attached.
3. Create a client session to connect to the server.

Expand Down Expand Up @@ -84,6 +81,10 @@ Will display as follows:

![Logfire run python code](../img/logfire-run-python-code.png)

#### Transport

`MCPServerHTTP` supports both SSE (`sse`) and Streamable HTTP (`streamable-http`) servers which can be specified via the `transport` parameter.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can drop this section as I suggested mentioning the transport arg above.

It'd be nice if we could also have a Streamable HTTP example!


### MCP "stdio" Server

The other transport offered by MCP is the [stdio transport](https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#stdio) where the server is run as a subprocess and communicates with the client over `stdin` and `stdout`. In this case, you'd use the [`MCPServerStdio`][pydantic_ai.mcp.MCPServerStdio] class.
Expand Down
61 changes: 38 additions & 23 deletions pydantic_ai_slim/pydantic_ai/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@
from collections.abc import AsyncIterator, Sequence
from contextlib import AsyncExitStack, asynccontextmanager
from dataclasses import dataclass
from datetime import timedelta
from pathlib import Path
from types import TracebackType
from typing import Any
from typing import Any, Literal

import anyio
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from mcp.types import (
BlobResourceContents,
EmbeddedResource,
ImageContent,
JSONRPCMessage,
LoggingLevel,
TextContent,
TextResourceContents,
Expand All @@ -31,6 +31,8 @@
from mcp.client.session import ClientSession
from mcp.client.sse import sse_client
from mcp.client.stdio import StdioServerParameters, stdio_client
from mcp.client.streamable_http import streamablehttp_client
from mcp.shared.message import SessionMessage
except ImportError as _import_error:
raise ImportError(
'Please install the `mcp` package to use the MCP server, '
Expand All @@ -56,8 +58,8 @@ class MCPServer(ABC):
"""

_client: ClientSession
_read_stream: MemoryObjectReceiveStream[JSONRPCMessage | Exception]
_write_stream: MemoryObjectSendStream[JSONRPCMessage]
_read_stream: MemoryObjectReceiveStream[SessionMessage | Exception]
_write_stream: MemoryObjectSendStream[SessionMessage]
_exit_stack: AsyncExitStack

@abstractmethod
Expand All @@ -66,8 +68,8 @@ async def client_streams(
self,
) -> AsyncIterator[
tuple[
MemoryObjectReceiveStream[JSONRPCMessage | Exception],
MemoryObjectSendStream[JSONRPCMessage],
MemoryObjectReceiveStream[SessionMessage | Exception],
MemoryObjectSendStream[SessionMessage],
]
]:
"""Create the streams for the MCP server."""
Expand Down Expand Up @@ -266,8 +268,8 @@ async def client_streams(
self,
) -> AsyncIterator[
tuple[
MemoryObjectReceiveStream[JSONRPCMessage | Exception],
MemoryObjectSendStream[JSONRPCMessage],
MemoryObjectReceiveStream[SessionMessage | Exception],
MemoryObjectSendStream[SessionMessage],
]
]:
server = StdioServerParameters(command=self.command, args=list(self.args), env=self.env, cwd=self.cwd)
Expand All @@ -288,11 +290,8 @@ def _get_client_initialize_timeout(self) -> float:
class MCPServerHTTP(MCPServer):
"""An MCP server that connects over streamable HTTP connections.

This class implements the SSE transport from the MCP specification.
See <https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse> for more information.

The name "HTTP" is used since this implemented will be adapted in future to use the new
[Streamable HTTP](https://github.com/modelcontextprotocol/specification/pull/206) currently in development.
This class implements both the SSE and Streamable HTTP transports from the MCP specification.
See <https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http> for more information.

!!! note
Using this class as an async context manager will create a new pool of HTTP connections to connect
Expand All @@ -303,7 +302,7 @@ class MCPServerHTTP(MCPServer):
from pydantic_ai import Agent
from pydantic_ai.mcp import MCPServerHTTP

server = MCPServerHTTP('http://localhost:3001/sse') # (1)!
server = MCPServerHTTP('http://localhost:3001/sse', transport='sse') # (1)!
agent = Agent('openai:gpt-4o', mcp_servers=[server])

async def main():
Expand Down Expand Up @@ -358,22 +357,38 @@ async def main():
For example, if `tool_prefix='foo'`, then a tool named `bar` will be registered as `foo_bar`
"""

transport: Literal['sse', 'streamable-http'] = 'sse'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These arguments match the underlying terms that FastMCP uses

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have a chance, could you see if it would be easy to implement an 'auto' option that still defaults to SSE for backward compatibility, but switches to streamable-http when the path is /mcp?

"""The transport the MCP server is using.

If empty, sse will be assumed for backwards compatibality reasons.
"""

@asynccontextmanager
async def client_streams(
self,
) -> AsyncIterator[
tuple[
MemoryObjectReceiveStream[JSONRPCMessage | Exception],
MemoryObjectSendStream[JSONRPCMessage],
MemoryObjectReceiveStream[SessionMessage | Exception],
MemoryObjectSendStream[SessionMessage],
]
]: # pragma: no cover
async with sse_client(
url=self.url,
headers=self.headers,
timeout=self.timeout,
sse_read_timeout=self.sse_read_timeout,
) as (read_stream, write_stream):
yield read_stream, write_stream
if self.transport == 'sse':
async with sse_client(
url=self.url,
headers=self.headers,
timeout=self.timeout,
sse_read_timeout=self.sse_read_timeout,
) as (read_stream, write_stream):
yield read_stream, write_stream
elif self.transport == 'streamable-http':
async with streamablehttp_client(
url=self.url,
headers=self.headers,
timeout=self.timeout if isinstance(self.timeout, timedelta) else timedelta(seconds=self.timeout),
) as (read_stream, write_stream, _):
yield read_stream, write_stream
else:
raise ValueError(f'Unsupported transport type: {self.transport}')

def _get_log_level(self) -> LoggingLevel | None:
return self.log_level
Expand Down
2 changes: 1 addition & 1 deletion pydantic_ai_slim/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ tavily = ["tavily-python>=0.5.0"]
# CLI
cli = ["rich>=13", "prompt-toolkit>=3", "argcomplete>=3.5.0"]
# MCP
mcp = ["mcp>=1.9.0; python_version >= '3.10'"]
mcp = ["mcp>=1.9.1; python_version >= '3.10'"]
# Evals
evals = ["pydantic-evals=={{ version }}"]
# A2A
Expand Down
71 changes: 70 additions & 1 deletion tests/test_mcp.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
"""Tests for the MCP (Model Context Protocol) server implementation."""

import multiprocessing
import re
import socket
import time
from pathlib import Path
from typing import Literal

import pytest
from inline_snapshot import snapshot
Expand All @@ -21,6 +25,7 @@
from pydantic_ai.usage import Usage

from .conftest import IsDatetime, try_import
from .mcp_server import mcp as test_mcp_server

with try_import() as imports_successful:
from pydantic_ai.mcp import MCPServerHTTP, MCPServerStdio
Expand All @@ -31,7 +36,6 @@
pytestmark = [
pytest.mark.skipif(not imports_successful(), reason='mcp and openai not installed'),
pytest.mark.anyio,
pytest.mark.vcr,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calling this out, I think all VCR tests are marked individually, but something about this blocked not the initial connection but the subsequent calls within the server.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is correct. You need to use --record-mode=rewrite when running the first time for the first test.

]


Expand All @@ -42,6 +46,71 @@ def agent(openai_api_key: str):
return Agent(model, mcp_servers=[server])


def run_sse_server() -> None:
"""Run the Streamable HTTP MCP server."""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Run the Streamable HTTP MCP server."""
"""Run the SSE MCP server."""

test_mcp_server.run(transport='sse')


def run_streamable_http_server() -> None:
"""Run the SSE MCP Server"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Run the SSE MCP Server"""
"""Run the Streamable HTTP MCP Server"""

test_mcp_server.run(transport='streamable-http')


@pytest.fixture
def mcp_server(request: pytest.FixtureRequest):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this feels complex, but I couldn't figure out a simpler way.

proc = multiprocessing.Process(target=request.param, daemon=True)
print('Staring streamable http server process on port port')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
print('Staring streamable http server process on port port')

proc.start()

max_attempts = 20
attempt = 0

while attempt < max_attempts:
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(('localhost', 8000))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use a randomly assigned port here, we can't be sure 8000 will be available

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like FastMCP has its own host and port settings, and the run method runs the entire server. Do we need to run it in a separate process at all? Could we use a thread? It looks like FastMCP.run calls anyio.run(self.run_sse_async). We could likely do something ourselves to run the run_sse_async async method in a thread.

print('MCP server started.')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
print('MCP server started.')

break
except ConnectionRefusedError:
time.sleep(0.1)
attempt += 1
else:
proc.kill() # Ensure process is killed if it fails to start
proc.join(timeout=1)
raise RuntimeError(f'StreamableHTTP server failed to start on port after {max_attempts} attempts')

yield
proc.kill()


@pytest.mark.parametrize(
'mcp_server, path, transport',
[
(
run_sse_server,
'sse',
'sse',
),
(
run_streamable_http_server,
'mcp',
'streamable-http',
),
],
indirect=['mcp_server'],
)
async def test_http_servers(mcp_server: None, path: str, transport: Literal['sse', 'streamable-http']):
server = MCPServerHTTP(url=f'http://localhost:8000/{path}', transport=transport)
async with server:
tools = await server.list_tools()
assert len(tools) == 10
assert tools[0].name == 'celsius_to_fahrenheit'
assert tools[0].description.startswith('Convert Celsius to Fahrenheit.')

result = await server.call_tool('celsius_to_fahrenheit', {'celsius': 0})
assert result == snapshot('32.0')


async def test_stdio_server():
server = MCPServerStdio('python', ['-m', 'tests.mcp_server'])
async with server:
Expand Down
11 changes: 6 additions & 5 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading