Skip to content

Add message queue for SSE messages POST endpoint #459

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
10c5af8
initial
akash329d Apr 8, 2025
c3d5efc
readme update
akash329d Apr 8, 2025
b2fce7d
ruff
akash329d Apr 8, 2025
7c82f36
fix typing issues
akash329d Apr 9, 2025
b92f22f
update lock
akash329d Apr 9, 2025
5dbca6e
retrigger tests?
akash329d Apr 9, 2025
badc1e2
revert
akash329d Apr 9, 2025
23665db
clean up test stuff
akash329d Apr 9, 2025
ccd5a13
lock pydantic version
akash329d Apr 9, 2025
fb44020
fix lock
akash329d Apr 9, 2025
efe6da9
wip
akash329d Apr 14, 2025
d625782
fixes
akash329d Apr 14, 2025
78c6aef
Add optional redis dep
akash329d Apr 14, 2025
fad836c
changes
akash329d Apr 14, 2025
fd97501
format / lint
akash329d Apr 14, 2025
4bce7d8
cleanup
akash329d Apr 14, 2025
d6075bb
update lock
akash329d Apr 14, 2025
8ee3a7e
remove redundant comment
akash329d Apr 14, 2025
7cabcea
add a checkpoint
akash329d Apr 14, 2025
5111c92
naming changes
akash329d Apr 15, 2025
09e0cab
logging improvements
akash329d Apr 15, 2025
8d280d8
better channel validation
akash329d Apr 15, 2025
c2bb049
merge
akash329d Apr 15, 2025
87e07b8
formatting and linting
akash329d Apr 15, 2025
b484284
fix naming in server.py
akash329d Apr 15, 2025
0bfd800
Rework to fix POST blocking issue
akash329d Apr 21, 2025
1e81f36
comments fix
akash329d Apr 21, 2025
215cc42
wip
akash329d Apr 22, 2025
8fce8e6
back to b48428486aa90f7529c36e5a78074ac2a2d813bc
akash329d Apr 22, 2025
b2893e6
push message handling onto corresponding SSE session task group
akash329d Apr 22, 2025
e5938d4
format
akash329d Apr 22, 2025
a151f1c
clean up comment and session state
akash329d Apr 22, 2025
d22f46b
shorten comment
akash329d Apr 22, 2025
8d6a20d
remove extra change
akash329d Apr 23, 2025
bb24881
testing
akash329d Apr 24, 2025
564561f
add a cancelscope on the finally
akash329d May 1, 2025
9419ad0
Move to session heartbeat w/ TTL
akash329d May 1, 2025
046ed94
add test for TTL
akash329d May 1, 2025
70547c0
merge conflict
akash329d May 5, 2025
5638653
merge fixes
akash329d May 5, 2025
2437e46
fakeredis dev dep
akash329d May 5, 2025
9664c8a
fmt
akash329d May 5, 2025
30b475b
convert to Pydantic models
akash329d May 5, 2025
0114189
fmt
akash329d May 5, 2025
7081a40
more type fixes
akash329d May 5, 2025
5ae3cc6
test cleanup
akash329d May 5, 2025
46b78f2
rename to message dispatch
akash329d May 5, 2025
e21d514
make int tests better
akash329d May 6, 2025
ee9f4de
lint
akash329d May 6, 2025
206a98a
tests hanging
akash329d May 6, 2025
bb59e5d
do cleanup after test
akash329d May 6, 2025
ca9a54a
fmt
akash329d May 6, 2025
9832c34
clean up int test
akash329d May 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ If you haven't created a uv-managed project yet, create one:
```bash
uv add "mcp[cli]"
```

For optional features, you can add extras:

```bash
# For Redis support in message queue
uv add "mcp[redis]"
```

Alternatively, for projects using pip for dependencies:
```bash
Expand Down Expand Up @@ -385,6 +392,29 @@ app.router.routes.append(Host('mcp.acme.corp', app=mcp.sse_app()))

For more information on mounting applications in Starlette, see the [Starlette documentation](https://www.starlette.io/routing/#submounting-routes).

#### Message Queue Options

By default, the SSE server uses an in-memory message queue for incoming POST messages. For production deployments or distributed scenarios, you can use Redis:

```python
from mcp.server.fastmcp import FastMCP

mcp = FastMCP(
"My App",
settings={
"message_queue": "redis",
"redis_url": "redis://localhost:6379/0",
"redis_prefix": "mcp:queue:",
},
)
```

To use Redis, add the Redis dependency:

```bash
uv add "mcp[redis]"
```

## Examples

### Echo Server
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ dependencies = [
"anyio>=4.5",
"httpx>=0.27",
"httpx-sse>=0.4",
"pydantic>=2.7.2,<3.0.0",
"pydantic>=2.7.2,<=2.10.1",
"starlette>=0.27",
"sse-starlette>=1.6.1",
"pydantic-settings>=2.5.2",
Expand All @@ -36,6 +36,7 @@ dependencies = [
rich = ["rich>=13.9.4"]
cli = ["typer>=0.12.4", "python-dotenv>=1.0.0"]
ws = ["websockets>=15.0.1"]
redis = ["redis>=4.5.0"]

[project.scripts]
mcp = "mcp.cli:app [cli]"
Expand Down
29 changes: 28 additions & 1 deletion src/mcp/server/fastmcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ class Settings(BaseSettings, Generic[LifespanResultT]):
sse_path: str = "/sse"
message_path: str = "/messages/"

# SSE message queue settings
message_queue: Literal["memory", "redis"] = "memory"
redis_url: str = "redis://localhost:6379/0"
redis_prefix: str = "mcp:queue:"

# resource settings
warn_on_duplicate_resources: bool = True

Expand Down Expand Up @@ -479,7 +484,29 @@ async def run_sse_async(self) -> None:

def sse_app(self) -> Starlette:
"""Return an instance of the SSE server app."""
sse = SseServerTransport(self.settings.message_path)
message_queue = None
if self.settings.message_queue == "redis":
try:
from mcp.server.message_queue import RedisMessageQueue

message_queue = RedisMessageQueue(
redis_url=self.settings.redis_url, prefix=self.settings.redis_prefix
)
logger.info(f"Using Redis message queue at {self.settings.redis_url}")
except ImportError:
logger.error(
"Redis message queue requested but 'redis' package not installed. "
)
raise
else:
from mcp.server.message_queue import InMemoryMessageQueue

message_queue = InMemoryMessageQueue()
logger.info("Using in-memory message queue")

sse = SseServerTransport(
self.settings.message_path, message_queue=message_queue
)

async def handle_sse(request: Request) -> None:
async with sse.connect_sse(
Expand Down
16 changes: 16 additions & 0 deletions src/mcp/server/message_queue/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""
Message Queue Module for MCP Server

This module implements queue interfaces for handling
messages between clients and servers.
"""

from mcp.server.message_queue.base import InMemoryMessageQueue, MessageQueue

# Try to import Redis implementation if available
try:
from mcp.server.message_queue.redis import RedisMessageQueue
except ImportError:
RedisMessageQueue = None

__all__ = ["MessageQueue", "InMemoryMessageQueue", "RedisMessageQueue"]
131 changes: 131 additions & 0 deletions src/mcp/server/message_queue/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import logging
from typing import Protocol, runtime_checkable
from uuid import UUID

import mcp.types as types

logger = logging.getLogger(__name__)


@runtime_checkable
class MessageQueue(Protocol):
"""Abstract interface for an SSE message queue.

This interface allows messages to be queued and processed by any SSE server instance
enabling multiple servers to handle requests for the same session.
"""

async def add_message(
self, session_id: UUID, message: types.JSONRPCMessage | Exception
) -> bool:
"""Add a message to the queue for the specified session.

Args:
session_id: The UUID of the session this message is for
message: The message to queue

Returns:
bool: True if message was accepted, False if session not found
"""
...

async def get_message(
self, session_id: UUID, timeout: float = 0.1
) -> types.JSONRPCMessage | Exception | None:
"""Get the next message for the specified session.

Args:
session_id: The UUID of the session to get messages for
timeout: Maximum time to wait for a message, in seconds

Returns:
The next message or None if no message is available
"""
...

async def register_session(self, session_id: UUID) -> None:
"""Register a new session with the queue.

Args:
session_id: The UUID of the new session to register
"""
...

async def unregister_session(self, session_id: UUID) -> None:
"""Unregister a session when it's closed.

Args:
session_id: The UUID of the session to unregister
"""
...

async def session_exists(self, session_id: UUID) -> bool:
"""Check if a session exists.

Args:
session_id: The UUID of the session to check

Returns:
bool: True if the session is active, False otherwise
"""
...


class InMemoryMessageQueue:
"""Default in-memory implementation of the MessageQueue interface.

This implementation keeps messages in memory for
each session until they're retrieved.
"""

def __init__(self) -> None:
self._message_queues: dict[UUID, list[types.JSONRPCMessage | Exception]] = {}
self._active_sessions: set[UUID] = set()

async def add_message(
self, session_id: UUID, message: types.JSONRPCMessage | Exception
) -> bool:
"""Add a message to the queue for the specified session."""
if session_id not in self._active_sessions:
logger.warning(f"Message received for unknown session {session_id}")
return False

if session_id not in self._message_queues:
self._message_queues[session_id] = []

self._message_queues[session_id].append(message)
logger.debug(f"Added message to queue for session {session_id}")
return True

async def get_message(
self, session_id: UUID, timeout: float = 0.1
) -> types.JSONRPCMessage | Exception | None:
"""Get the next message for the specified session."""
if session_id not in self._active_sessions:
return None

queue = self._message_queues.get(session_id, [])
if not queue:
return None

message = queue.pop(0)
if not queue: # Clean up empty queue
del self._message_queues[session_id]

return message

async def register_session(self, session_id: UUID) -> None:
"""Register a new session with the queue."""
self._active_sessions.add(session_id)
logger.debug(f"Registered session {session_id}")

async def unregister_session(self, session_id: UUID) -> None:
"""Unregister a session when it's closed."""
self._active_sessions.discard(session_id)
if session_id in self._message_queues:
del self._message_queues[session_id]
logger.debug(f"Unregistered session {session_id}")

async def session_exists(self, session_id: UUID) -> bool:
"""Check if a session exists."""
return session_id in self._active_sessions
126 changes: 126 additions & 0 deletions src/mcp/server/message_queue/redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import json
import logging
from uuid import UUID

import mcp.types as types

try:
import redis.asyncio as redis # type: ignore[import]
except ImportError:
raise ImportError(
"Redis support requires the 'redis' package. "
"Install it with: 'uv add redis' or 'uv add \"mcp[redis]\"'"
)

logger = logging.getLogger(__name__)


class RedisMessageQueue:
"""Redis implementation of the MessageQueue interface.

This implementation uses Redis lists to store messages for each session.
Redis provides persistence and allows multiple servers to share the same queue.
"""

def __init__(
self, redis_url: str = "redis://localhost:6379/0", prefix: str = "mcp:queue:"
) -> None:
"""Initialize Redis message queue.

Args:
redis_url: Redis connection string
prefix: Key prefix for Redis keys to avoid collisions
"""
self._redis = redis.Redis.from_url(redis_url, decode_responses=True) # type: ignore[attr-defined]
self._prefix = prefix
self._active_sessions_key = f"{prefix}active_sessions"
logger.debug(f"Initialized Redis message queue with URL: {redis_url}")

def _session_queue_key(self, session_id: UUID) -> str:
"""Get the Redis key for a session's message queue."""
return f"{self._prefix}session:{session_id.hex}"

async def add_message(
self, session_id: UUID, message: types.JSONRPCMessage | Exception
) -> bool:
"""Add a message to the queue for the specified session."""
# Check if session exists
if not await self.session_exists(session_id):
logger.warning(f"Message received for unknown session {session_id}")
return False

# Serialize the message
if isinstance(message, Exception):
# For exceptions, store them as special format
data = json.dumps(
{
"_exception": True,
"type": type(message).__name__,
"message": str(message),
}
)
else:
data = message.model_dump_json(by_alias=True, exclude_none=True)

# Push to the right side of the list (queue)
await self._redis.rpush(self._session_queue_key(session_id), data) # type: ignore[attr-defined]
logger.debug(f"Added message to Redis queue for session {session_id}")
return True

async def get_message(
self, session_id: UUID, timeout: float = 0.1
) -> types.JSONRPCMessage | Exception | None:
"""Get the next message for the specified session."""
# Check if session exists
if not await self.session_exists(session_id):
return None

# Pop from the left side of the list (queue)
# Use BLPOP with timeout to avoid busy waiting
result = await self._redis.blpop([self._session_queue_key(session_id)], timeout) # type: ignore[attr-defined]

if not result:
return None

# result is a tuple of (key, value)
_, data = result # type: ignore[misc]

# Deserialize the message
json_data = json.loads(data) # type: ignore[arg-type]

# Check if it's an exception
if isinstance(json_data, dict):
exception_dict: dict[str, object] = json_data
if exception_dict.get("_exception", False):
return Exception(
f"{exception_dict['type']}: {exception_dict['message']}"
)

# Regular message
try:
return types.JSONRPCMessage.model_validate_json(data) # type: ignore[arg-type]
except Exception as e:
logger.error(f"Failed to deserialize message: {e}")
return None

async def register_session(self, session_id: UUID) -> None:
"""Register a new session with the queue."""
# Add session ID to the set of active sessions
await self._redis.sadd(self._active_sessions_key, session_id.hex) # type: ignore[attr-defined]
logger.debug(f"Registered session {session_id} in Redis")

async def unregister_session(self, session_id: UUID) -> None:
"""Unregister a session when it's closed."""
# Remove session ID from active sessions
await self._redis.srem(self._active_sessions_key, session_id.hex) # type: ignore[attr-defined]
# Delete the session's message queue
await self._redis.delete(self._session_queue_key(session_id)) # type: ignore[attr-defined]
logger.debug(f"Unregistered session {session_id} from Redis")

async def session_exists(self, session_id: UUID) -> bool:
"""Check if a session exists."""
# Explicitly annotate the result as bool to help the type checker
result = bool(
await self._redis.sismember(self._active_sessions_key, session_id.hex) # type: ignore[attr-defined]
)
return result
Loading
Loading