From d69d94e4ad60f3b7e5bfba18b2d3ea4b75496fdc Mon Sep 17 00:00:00 2001 From: Thomas Sheffler Date: Mon, 10 Feb 2025 15:37:39 -0800 Subject: [PATCH 1/4] Test and example MCP server to illustrate Issue #201. File mcp_stdio_client.py is adapted from the example in simple-chatbot. --- tests/example_mcp_server.py | 33 +++++++++ tests/mcp_stdio_client.py | 136 ++++++++++++++++++++++++++++++++++++ tests/test_mcp_tool.py | 83 ++++++++++++++++++++++ 3 files changed, 252 insertions(+) create mode 100644 tests/example_mcp_server.py create mode 100644 tests/mcp_stdio_client.py create mode 100644 tests/test_mcp_tool.py diff --git a/tests/example_mcp_server.py b/tests/example_mcp_server.py new file mode 100644 index 00000000..17353b9b --- /dev/null +++ b/tests/example_mcp_server.py @@ -0,0 +1,33 @@ +# +# Small Demo server using FastMCP and illustrating debugging and notification streams +# + +import logging +from mcp.server.fastmcp import FastMCP, Context +import time +import asyncio + +mcp = FastMCP("MCP EXAMPLE SERVER", debug=True, log_level="DEBUG") + +logger = logging.getLogger(__name__) + +logger.debug(f"MCP STARTING EXAMPLE SERVER") + +@mcp.resource("config://app") +def get_config() -> str: + """Static configuration data""" + return "Test Server 2024-02-25" + +@mcp.tool() +async def simple_tool(x:float, y:float, ctx:Context) -> str: + logger.debug("IN SIMPLE_TOOL") + await ctx.report_progress(1, 2) + return x*y + +@mcp.tool() +async def simple_tool_with_logging(x:float, y:float, ctx:Context) -> str: + await ctx.info(f"Processing Simple Tool") + logger.debug("IN SIMPLE_TOOL") + await ctx.report_progress(1, 2) + return x*y + diff --git a/tests/mcp_stdio_client.py b/tests/mcp_stdio_client.py new file mode 100644 index 00000000..190cb8fc --- /dev/null +++ b/tests/mcp_stdio_client.py @@ -0,0 +1,136 @@ +from mcp import ClientSession, ListToolsResult, StdioServerParameters +from mcp.client.stdio import stdio_client +from mcp.types import CallToolResult +from mcp import Tool as MCPTool + +from contextlib import AsyncExitStack +from typing import Any +import asyncio + + +import logging +logger = logging.getLogger(__name__) + + + +class NotificationLoggingClientSession(ClientSession): + + def __init__(self, read_stream, write_stream): + print(f"NOTIFICATION LOGGING CLIENT SESSION") + super().__init__(read_stream, write_stream) + + # override base session to log incoming notifications + async def _received_notification(self, notification): + print(f"NOTIFICATION:{notification}") + print(f"NOTIFICATION-END") + + async def send_progress_notification(self, progress_token, progress, total): + print(f"PROGRESS:{progress_token}") + print(f"PROGRESS-END") + + +# adapted from mcp-python-sdk/examples/clients/simple-chatbot/mcp_simple_chatbot/main.py +class MCPClient: + """Manages MCP server connections and tool execution.""" + + def __init__(self, name, server_params: StdioServerParameters, errlog=None): + self.name = name + self.server_params = server_params + self.errlog = errlog + self.stdio_context: Any | None = None + self.session: ClientSession | None = None + self._cleanup_lock: asyncio.Lock = asyncio.Lock() + self.exit_stack: AsyncExitStack = AsyncExitStack() + + async def initialize(self) -> None: + """Initialize the server connection.""" + + try: + stdio_transport = await self.exit_stack.enter_async_context( + stdio_client(self.server_params) + ) + read, write = stdio_transport + session = await self.exit_stack.enter_async_context( + # ClientSession(read, write) + NotificationLoggingClientSession(read, write) + ) + await session.initialize() + self.session = session + except Exception as e: + logging.error(f"Error initializing server: {e}") + await self.cleanup() + raise + + async def get_available_tools(self) -> list[MCPTool]: + """List available tools from the server. + + Returns: + A list of available tools. + + Raises: + RuntimeError: If the server is not initialized. + """ + if not self.session: + raise RuntimeError(f"Server {self.name} not initialized") + + tools_response = await self.session.list_tools() + + # Let's just ignore pagination for now + return tools_response.tools + + async def call_tool( + self, + tool_name: str, + arguments: dict[str, Any], + retries: int = 2, + delay: float = 1.0, + ) -> Any: + """Execute a tool with retry mechanism. + + Args: + tool_name: Name of the tool to execute. + arguments: Tool arguments. + retries: Number of retry attempts. + delay: Delay between retries in seconds. + + Returns: + Tool execution result. + + Raises: + RuntimeError: If server is not initialized. + Exception: If tool execution fails after all retries. + """ + if not self.session: + raise RuntimeError(f"Server {self.name} not initialized") + + attempt = 0 + while attempt < retries: + try: + logging.info(f"Executing {tool_name}...") + result = await self.session.call_tool(tool_name, arguments) + + return result + + except Exception as e: + attempt += 1 + logging.warning( + f"Error executing tool: {e}. Attempt {attempt} of {retries}." + ) + if attempt < retries: + logging.info(f"Retrying in {delay} seconds...") + await asyncio.sleep(delay) + else: + logging.error("Max retries reached. Failing.") + raise + + async def cleanup(self) -> None: + """Clean up server resources.""" + async with self._cleanup_lock: + try: + await self.exit_stack.aclose() + self.session = None + self.stdio_context = None + except Exception as e: + logging.error(f"Error during cleanup of server {self.name}: {e}") + + diff --git a/tests/test_mcp_tool.py b/tests/test_mcp_tool.py new file mode 100644 index 00000000..a8b4a0b2 --- /dev/null +++ b/tests/test_mcp_tool.py @@ -0,0 +1,83 @@ +import pytest + +import os +import sys +from .mcp_stdio_client import MCPClient + +from mcp import StdioServerParameters + +# locate the exmaple MCP server co-located in this directory + +mcp_server_dir = os.path.dirname(os.path.abspath(__file__)) +mcp_server_file = os.path.join(mcp_server_dir, "example_mcp_server.py") + +# mcpServers config in same syntax used by reference MCP + +servers_config = { + "mcpServers": { + + "testMcpServer": { + "command": "mcp", # be sure to . .venv/bin/activate so that mcp command is found + "args": [ + "run", + mcp_server_file + ] + } + + } +} + + +# @pytest.mark.asyncio +@pytest.mark.anyio +async def test_mcp(): + + servers = servers_config.get("mcpServers") + + server0 = "testMcpServer" + config0 = servers[server0] + + client = MCPClient( + server0, + StdioServerParameters.model_validate(config0) + ) + await client.initialize() + tools = await client.get_available_tools() + + print(f"TOOLS:{tools}") + mcp_tool = tools[0] + + res = await client.call_tool("simple_tool", {"x":5, "y":7}) + + print(f"RES:{res}") + + # clients must be destroyed in reverse order + await client.cleanup() + + +# @pytest.mark.asyncio +@pytest.mark.anyio +async def test_mcp_with_logging(): + + servers = servers_config.get("mcpServers") + + server0 = "testMcpServer" + config0 = servers[server0] + + client = MCPClient( + server0, + StdioServerParameters.model_validate(config0) + ) + await client.initialize() + tools = await client.get_available_tools() + + print(f"TOOLS:{tools}") + mcp_tool = tools[0] + + res = await client.call_tool("simple_tool_with_logging", {"x":5, "y":7}) + + print(f"RES:{res}") + + # clients must be destroyed in reverse order + await client.cleanup() + From d14a2d729ea154a12d727687be5642dd5f7b5fc6 Mon Sep 17 00:00:00 2001 From: Thomas Sheffler Date: Mon, 10 Feb 2025 15:39:41 -0800 Subject: [PATCH 2/4] proposed fix - remove lines that cause a notification to be sent BACK after receiving it. This seems to create deadlock. --- src/mcp/shared/session.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/mcp/shared/session.py b/src/mcp/shared/session.py index 3d3988ce..699fb9a1 100644 --- a/src/mcp/shared/session.py +++ b/src/mcp/shared/session.py @@ -324,9 +324,9 @@ async def _receive_loop(self) -> None: await self._in_flight[cancelled_id].cancel() else: await self._received_notification(notification) - await self._incoming_message_stream_writer.send( - notification - ) +# await self._incoming_message_stream_writer.send( +# notification +# ) except Exception as e: # For other validation errors, log and continue logging.warning( From 8247e1066cd05713c166943e3cf87df861bb3784 Mon Sep 17 00:00:00 2001 From: Thomas Sheffler Date: Tue, 11 Feb 2025 06:01:13 -0800 Subject: [PATCH 3/4] remove stray import --- tests/example_mcp_server.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/example_mcp_server.py b/tests/example_mcp_server.py index 17353b9b..a8713f10 100644 --- a/tests/example_mcp_server.py +++ b/tests/example_mcp_server.py @@ -5,7 +5,6 @@ import logging from mcp.server.fastmcp import FastMCP, Context import time -import asyncio mcp = FastMCP("MCP EXAMPLE SERVER", debug=True, log_level="DEBUG") From fd98dc1f68ea3efc87701bc9b3e307ad9b3d81f9 Mon Sep 17 00:00:00 2001 From: Thomas Sheffler Date: Tue, 11 Feb 2025 06:01:32 -0800 Subject: [PATCH 4/4] remove stray lines --- tests/test_mcp_tool.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/test_mcp_tool.py b/tests/test_mcp_tool.py index a8b4a0b2..6153dcb6 100644 --- a/tests/test_mcp_tool.py +++ b/tests/test_mcp_tool.py @@ -28,7 +28,6 @@ } -# @pytest.mark.asyncio @pytest.mark.anyio async def test_mcp(): @@ -55,7 +54,6 @@ async def test_mcp(): await client.cleanup() -# @pytest.mark.asyncio @pytest.mark.anyio async def test_mcp_with_logging():