Skip to content

Revert "Add message queue for SSE messages POST endpoint (#459)" #649

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 0 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -449,30 +449,6 @@ if __name__ == "__main__":

For more information on mounting applications in Starlette, see the [Starlette documentation](https://www.starlette.io/routing/#submounting-routes).

#### Message Dispatch Options

By default, the SSE server uses an in-memory message dispatch system for incoming POST messages. For production deployments or distributed scenarios, you can use Redis or implement your own message dispatch system that conforms to the `MessageDispatch` protocol:

```python
# Using the built-in Redis message dispatch
from mcp.server.fastmcp import FastMCP
from mcp.server.message_queue import RedisMessageDispatch

# Create a Redis message dispatch
redis_dispatch = RedisMessageDispatch(
redis_url="redis://localhost:6379/0", prefix="mcp:pubsub:"
)

# Pass the message dispatch instance to the server
mcp = FastMCP("My App", message_queue=redis_dispatch)
```

To use Redis, add the Redis dependency:

```bash
uv add "mcp[redis]"
```

## Examples

### Echo Server
Expand Down
5 changes: 1 addition & 4 deletions examples/servers/simple-prompt/mcp_simple_prompt/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,12 @@ async def get_prompt(
)

if transport == "sse":
from mcp.server.message_queue.redis import RedisMessageDispatch
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.responses import Response
from starlette.routing import Mount, Route

message_dispatch = RedisMessageDispatch("redis://localhost:6379/0")

sse = SseServerTransport("/messages/", message_dispatch=message_dispatch)
sse = SseServerTransport("/messages/")

async def handle_sse(request):
async with sse.connect_sse(
Expand Down
2 changes: 0 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ dependencies = [
rich = ["rich>=13.9.4"]
cli = ["typer>=0.12.4", "python-dotenv>=1.0.0"]
ws = ["websockets>=15.0.1"]
redis = ["redis>=5.2.1", "types-redis>=4.6.0.20241004"]

[project.scripts]
mcp = "mcp.cli:app [cli]"
Expand All @@ -56,7 +55,6 @@ dev = [
"pytest-xdist>=3.6.1",
"pytest-examples>=0.0.14",
"pytest-pretty>=1.2.0",
"fakeredis==2.28.1",
]
docs = [
"mkdocs>=1.6.1",
Expand Down
6 changes: 1 addition & 5 deletions src/mcp/client/sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,7 @@ async def sse_reader(
await read_stream_writer.send(exc)
continue

session_message = SessionMessage(
message=message
)
session_message = SessionMessage(message)
await read_stream_writer.send(session_message)
case _:
logger.warning(
Expand Down Expand Up @@ -150,5 +148,3 @@ async def post_writer(endpoint_url: str):
finally:
await read_stream_writer.aclose()
await write_stream.aclose()
await read_stream.aclose()
await write_stream_reader.aclose()
2 changes: 1 addition & 1 deletion src/mcp/client/stdio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ async def stdout_reader():
await read_stream_writer.send(exc)
continue

session_message = SessionMessage(message=message)
session_message = SessionMessage(message)
await read_stream_writer.send(session_message)
except anyio.ClosedResourceError:
await anyio.lowlevel.checkpoint()
Expand Down
6 changes: 3 additions & 3 deletions src/mcp/client/streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ async def _handle_sse_event(
):
message.root.id = original_request_id

session_message = SessionMessage(message=message)
session_message = SessionMessage(message)
await read_stream_writer.send(session_message)

# Call resumption token callback if we have an ID
Expand Down Expand Up @@ -286,7 +286,7 @@ async def _handle_json_response(
try:
content = await response.aread()
message = JSONRPCMessage.model_validate_json(content)
session_message = SessionMessage(message=message)
session_message = SessionMessage(message)
await read_stream_writer.send(session_message)
except Exception as exc:
logger.error(f"Error parsing JSON response: {exc}")
Expand Down Expand Up @@ -333,7 +333,7 @@ async def _send_session_terminated_error(
id=request_id,
error=ErrorData(code=32600, message="Session terminated"),
)
session_message = SessionMessage(message=JSONRPCMessage(jsonrpc_error))
session_message = SessionMessage(JSONRPCMessage(jsonrpc_error))
await read_stream_writer.send(session_message)

async def post_writer(
Expand Down
2 changes: 1 addition & 1 deletion src/mcp/client/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async def ws_reader():
async for raw_text in ws:
try:
message = types.JSONRPCMessage.model_validate_json(raw_text)
session_message = SessionMessage(message=message)
session_message = SessionMessage(message)
await read_stream_writer.send(session_message)
except ValidationError as exc:
# If JSON parse or model validation fails, send the exception
Expand Down
32 changes: 4 additions & 28 deletions src/mcp/server/fastmcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
from mcp.server.lowlevel.server import LifespanResultT
from mcp.server.lowlevel.server import Server as MCPServer
from mcp.server.lowlevel.server import lifespan as default_lifespan
from mcp.server.message_queue import MessageDispatch
from mcp.server.session import ServerSession, ServerSessionT
from mcp.server.sse import SseServerTransport
from mcp.server.stdio import stdio_server
Expand Down Expand Up @@ -92,11 +91,6 @@ class Settings(BaseSettings, Generic[LifespanResultT]):
sse_path: str = "/sse"
message_path: str = "/messages/"

# SSE message queue settings
message_dispatch: MessageDispatch | None = Field(
None, description="Custom message dispatch instance"
)

# resource settings
warn_on_duplicate_resources: bool = True

Expand Down Expand Up @@ -607,13 +601,6 @@ def _normalize_path(self, mount_path: str, endpoint: str) -> str:

def sse_app(self, mount_path: str | None = None) -> Starlette:
"""Return an instance of the SSE server app."""
message_dispatch = self.settings.message_dispatch
if message_dispatch is None:
from mcp.server.message_queue import InMemoryMessageDispatch

message_dispatch = InMemoryMessageDispatch()
logger.info("Using default in-memory message dispatch")

from starlette.middleware import Middleware
from starlette.routing import Mount, Route

Expand All @@ -625,12 +612,11 @@ def sse_app(self, mount_path: str | None = None) -> Starlette:
normalized_message_endpoint = self._normalize_path(
self.settings.mount_path, self.settings.message_path
)

# Set up auth context and dependencies

sse = SseServerTransport(
normalized_message_endpoint,
message_dispatch=message_dispatch
normalized_message_endpoint,
)

async def handle_sse(scope: Scope, receive: Receive, send: Send):
Expand All @@ -646,14 +632,7 @@ async def handle_sse(scope: Scope, receive: Receive, send: Send):
streams[1],
self._mcp_server.create_initialization_options(),
)
return Response()

@asynccontextmanager
async def lifespan(app: Starlette):
try:
yield
finally:
await message_dispatch.close()
return Response()

# Create routes
routes: list[Route | Mount] = []
Expand Down Expand Up @@ -730,10 +709,7 @@ async def sse_endpoint(request: Request) -> None:

# Create Starlette app with routes and middleware
return Starlette(
debug=self.settings.debug,
routes=routes,
middleware=middleware,
lifespan=lifespan,
debug=self.settings.debug, routes=routes, middleware=middleware
)

async def list_prompts(self) -> list[MCPPrompt]:
Expand Down
16 changes: 0 additions & 16 deletions src/mcp/server/message_queue/__init__.py

This file was deleted.

116 changes: 0 additions & 116 deletions src/mcp/server/message_queue/base.py

This file was deleted.

Loading
Loading