-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Add message queue for SSE messages POST endpoint #459
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
praboud-ant
merged 53 commits into
modelcontextprotocol:main
from
akash329d:akash329d/add-message-queue-sse
May 7, 2025
Merged
Changes from 10 commits
Commits
Show all changes
53 commits
Select commit
Hold shift + click to select a range
10c5af8
initial
akash329d c3d5efc
readme update
akash329d b2fce7d
ruff
akash329d 7c82f36
fix typing issues
akash329d b92f22f
update lock
akash329d 5dbca6e
retrigger tests?
akash329d badc1e2
revert
akash329d 23665db
clean up test stuff
akash329d ccd5a13
lock pydantic version
akash329d fb44020
fix lock
akash329d efe6da9
wip
akash329d d625782
fixes
akash329d 78c6aef
Add optional redis dep
akash329d fad836c
changes
akash329d fd97501
format / lint
akash329d 4bce7d8
cleanup
akash329d d6075bb
update lock
akash329d 8ee3a7e
remove redundant comment
akash329d 7cabcea
add a checkpoint
akash329d 5111c92
naming changes
akash329d 09e0cab
logging improvements
akash329d 8d280d8
better channel validation
akash329d c2bb049
merge
akash329d 87e07b8
formatting and linting
akash329d b484284
fix naming in server.py
akash329d 0bfd800
Rework to fix POST blocking issue
akash329d 1e81f36
comments fix
akash329d 215cc42
wip
akash329d 8fce8e6
back to b48428486aa90f7529c36e5a78074ac2a2d813bc
akash329d b2893e6
push message handling onto corresponding SSE session task group
akash329d e5938d4
format
akash329d a151f1c
clean up comment and session state
akash329d d22f46b
shorten comment
akash329d 8d6a20d
remove extra change
akash329d bb24881
testing
akash329d 564561f
add a cancelscope on the finally
akash329d 9419ad0
Move to session heartbeat w/ TTL
akash329d 046ed94
add test for TTL
akash329d 70547c0
merge conflict
akash329d 5638653
merge fixes
akash329d 2437e46
fakeredis dev dep
akash329d 9664c8a
fmt
akash329d 30b475b
convert to Pydantic models
akash329d 0114189
fmt
akash329d 7081a40
more type fixes
akash329d 5ae3cc6
test cleanup
akash329d 46b78f2
rename to message dispatch
akash329d e21d514
make int tests better
akash329d ee9f4de
lint
akash329d 206a98a
tests hanging
akash329d bb59e5d
do cleanup after test
akash329d ca9a54a
fmt
akash329d 9832c34
clean up int test
akash329d File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
""" | ||
Message Queue Module for MCP Server | ||
|
||
This module implements queue interfaces for handling | ||
messages between clients and servers. | ||
""" | ||
|
||
from mcp.server.message_queue.base import InMemoryMessageQueue, MessageQueue | ||
|
||
# Try to import Redis implementation if available | ||
try: | ||
from mcp.server.message_queue.redis import RedisMessageQueue | ||
except ImportError: | ||
RedisMessageQueue = None | ||
|
||
__all__ = ["MessageQueue", "InMemoryMessageQueue", "RedisMessageQueue"] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
import logging | ||
from typing import Protocol, runtime_checkable | ||
from uuid import UUID | ||
|
||
import mcp.types as types | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@runtime_checkable | ||
class MessageQueue(Protocol): | ||
akash329d marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Abstract interface for an SSE message queue. | ||
|
||
This interface allows messages to be queued and processed by any SSE server instance | ||
enabling multiple servers to handle requests for the same session. | ||
""" | ||
|
||
async def add_message( | ||
self, session_id: UUID, message: types.JSONRPCMessage | Exception | ||
) -> bool: | ||
"""Add a message to the queue for the specified session. | ||
|
||
Args: | ||
session_id: The UUID of the session this message is for | ||
message: The message to queue | ||
|
||
Returns: | ||
bool: True if message was accepted, False if session not found | ||
""" | ||
... | ||
|
||
async def get_message( | ||
self, session_id: UUID, timeout: float = 0.1 | ||
) -> types.JSONRPCMessage | Exception | None: | ||
"""Get the next message for the specified session. | ||
|
||
Args: | ||
session_id: The UUID of the session to get messages for | ||
timeout: Maximum time to wait for a message, in seconds | ||
|
||
Returns: | ||
The next message or None if no message is available | ||
""" | ||
... | ||
|
||
async def register_session(self, session_id: UUID) -> None: | ||
"""Register a new session with the queue. | ||
|
||
Args: | ||
session_id: The UUID of the new session to register | ||
""" | ||
... | ||
|
||
async def unregister_session(self, session_id: UUID) -> None: | ||
"""Unregister a session when it's closed. | ||
|
||
Args: | ||
session_id: The UUID of the session to unregister | ||
""" | ||
... | ||
|
||
async def session_exists(self, session_id: UUID) -> bool: | ||
"""Check if a session exists. | ||
|
||
Args: | ||
session_id: The UUID of the session to check | ||
|
||
Returns: | ||
bool: True if the session is active, False otherwise | ||
""" | ||
... | ||
|
||
|
||
class InMemoryMessageQueue: | ||
"""Default in-memory implementation of the MessageQueue interface. | ||
|
||
This implementation keeps messages in memory for | ||
each session until they're retrieved. | ||
""" | ||
|
||
def __init__(self) -> None: | ||
self._message_queues: dict[UUID, list[types.JSONRPCMessage | Exception]] = {} | ||
self._active_sessions: set[UUID] = set() | ||
akash329d marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
async def add_message( | ||
self, session_id: UUID, message: types.JSONRPCMessage | Exception | ||
) -> bool: | ||
"""Add a message to the queue for the specified session.""" | ||
if session_id not in self._active_sessions: | ||
logger.warning(f"Message received for unknown session {session_id}") | ||
return False | ||
|
||
if session_id not in self._message_queues: | ||
self._message_queues[session_id] = [] | ||
|
||
self._message_queues[session_id].append(message) | ||
logger.debug(f"Added message to queue for session {session_id}") | ||
return True | ||
|
||
async def get_message( | ||
self, session_id: UUID, timeout: float = 0.1 | ||
) -> types.JSONRPCMessage | Exception | None: | ||
"""Get the next message for the specified session.""" | ||
if session_id not in self._active_sessions: | ||
return None | ||
|
||
queue = self._message_queues.get(session_id, []) | ||
if not queue: | ||
return None | ||
|
||
message = queue.pop(0) | ||
if not queue: # Clean up empty queue | ||
del self._message_queues[session_id] | ||
|
||
return message | ||
|
||
async def register_session(self, session_id: UUID) -> None: | ||
"""Register a new session with the queue.""" | ||
self._active_sessions.add(session_id) | ||
logger.debug(f"Registered session {session_id}") | ||
|
||
async def unregister_session(self, session_id: UUID) -> None: | ||
"""Unregister a session when it's closed.""" | ||
self._active_sessions.discard(session_id) | ||
if session_id in self._message_queues: | ||
del self._message_queues[session_id] | ||
logger.debug(f"Unregistered session {session_id}") | ||
|
||
async def session_exists(self, session_id: UUID) -> bool: | ||
"""Check if a session exists.""" | ||
return session_id in self._active_sessions |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
import json | ||
import logging | ||
from uuid import UUID | ||
|
||
import mcp.types as types | ||
|
||
try: | ||
import redis.asyncio as redis # type: ignore[import] | ||
except ImportError: | ||
raise ImportError( | ||
"Redis support requires the 'redis' package. " | ||
"Install it with: 'uv add redis' or 'uv add \"mcp[redis]\"'" | ||
) | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class RedisMessageQueue: | ||
"""Redis implementation of the MessageQueue interface. | ||
|
||
This implementation uses Redis lists to store messages for each session. | ||
Redis provides persistence and allows multiple servers to share the same queue. | ||
""" | ||
|
||
def __init__( | ||
self, redis_url: str = "redis://localhost:6379/0", prefix: str = "mcp:queue:" | ||
) -> None: | ||
"""Initialize Redis message queue. | ||
|
||
Args: | ||
redis_url: Redis connection string | ||
prefix: Key prefix for Redis keys to avoid collisions | ||
""" | ||
self._redis = redis.Redis.from_url(redis_url, decode_responses=True) # type: ignore[attr-defined] | ||
self._prefix = prefix | ||
self._active_sessions_key = f"{prefix}active_sessions" | ||
logger.debug(f"Initialized Redis message queue with URL: {redis_url}") | ||
|
||
def _session_queue_key(self, session_id: UUID) -> str: | ||
"""Get the Redis key for a session's message queue.""" | ||
return f"{self._prefix}session:{session_id.hex}" | ||
|
||
async def add_message( | ||
self, session_id: UUID, message: types.JSONRPCMessage | Exception | ||
) -> bool: | ||
"""Add a message to the queue for the specified session.""" | ||
# Check if session exists | ||
if not await self.session_exists(session_id): | ||
logger.warning(f"Message received for unknown session {session_id}") | ||
return False | ||
|
||
# Serialize the message | ||
if isinstance(message, Exception): | ||
# For exceptions, store them as special format | ||
data = json.dumps( | ||
{ | ||
"_exception": True, | ||
"type": type(message).__name__, | ||
"message": str(message), | ||
} | ||
) | ||
else: | ||
data = message.model_dump_json(by_alias=True, exclude_none=True) | ||
|
||
# Push to the right side of the list (queue) | ||
await self._redis.rpush(self._session_queue_key(session_id), data) # type: ignore[attr-defined] | ||
logger.debug(f"Added message to Redis queue for session {session_id}") | ||
return True | ||
|
||
async def get_message( | ||
self, session_id: UUID, timeout: float = 0.1 | ||
) -> types.JSONRPCMessage | Exception | None: | ||
"""Get the next message for the specified session.""" | ||
# Check if session exists | ||
if not await self.session_exists(session_id): | ||
return None | ||
|
||
# Pop from the left side of the list (queue) | ||
# Use BLPOP with timeout to avoid busy waiting | ||
result = await self._redis.blpop([self._session_queue_key(session_id)], timeout) # type: ignore[attr-defined] | ||
akash329d marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if not result: | ||
return None | ||
|
||
# result is a tuple of (key, value) | ||
_, data = result # type: ignore[misc] | ||
|
||
# Deserialize the message | ||
json_data = json.loads(data) # type: ignore[arg-type] | ||
|
||
# Check if it's an exception | ||
if isinstance(json_data, dict): | ||
exception_dict: dict[str, object] = json_data | ||
if exception_dict.get("_exception", False): | ||
return Exception( | ||
f"{exception_dict['type']}: {exception_dict['message']}" | ||
) | ||
|
||
# Regular message | ||
try: | ||
return types.JSONRPCMessage.model_validate_json(data) # type: ignore[arg-type] | ||
except Exception as e: | ||
logger.error(f"Failed to deserialize message: {e}") | ||
return None | ||
|
||
async def register_session(self, session_id: UUID) -> None: | ||
"""Register a new session with the queue.""" | ||
# Add session ID to the set of active sessions | ||
await self._redis.sadd(self._active_sessions_key, session_id.hex) # type: ignore[attr-defined] | ||
logger.debug(f"Registered session {session_id} in Redis") | ||
|
||
async def unregister_session(self, session_id: UUID) -> None: | ||
"""Unregister a session when it's closed.""" | ||
# Remove session ID from active sessions | ||
await self._redis.srem(self._active_sessions_key, session_id.hex) # type: ignore[attr-defined] | ||
# Delete the session's message queue | ||
await self._redis.delete(self._session_queue_key(session_id)) # type: ignore[attr-defined] | ||
logger.debug(f"Unregistered session {session_id} from Redis") | ||
|
||
async def session_exists(self, session_id: UUID) -> bool: | ||
"""Check if a session exists.""" | ||
# Explicitly annotate the result as bool to help the type checker | ||
result = bool( | ||
await self._redis.sismember(self._active_sessions_key, session_id.hex) # type: ignore[attr-defined] | ||
) | ||
return result |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.