Skip to content

Commit 46523af

Browse files
authored
StreamableHttp - GET request standalone SSE (#561)
1 parent 72b66a5 commit 46523af

File tree

3 files changed

+186
-17
lines changed

3 files changed

+186
-17
lines changed

examples/servers/simple-streamablehttp/mcp_simple_streamablehttp/server.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
MCP_SESSION_ID_HEADER,
1212
StreamableHTTPServerTransport,
1313
)
14+
from pydantic import AnyUrl
1415
from starlette.applications import Starlette
1516
from starlette.requests import Request
1617
from starlette.responses import Response
@@ -92,6 +93,9 @@ async def call_tool(
9293
if i < count - 1: # Don't wait after the last notification
9394
await anyio.sleep(interval)
9495

96+
# This will send a resource notificaiton though standalone SSE
97+
# established by GET request
98+
await ctx.session.send_resource_updated(uri=AnyUrl("http:///test_resource"))
9599
return [
96100
types.TextContent(
97101
type="text",

src/mcp/server/streamableHttp.py

Lines changed: 93 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@
5050
CONTENT_TYPE_JSON = "application/json"
5151
CONTENT_TYPE_SSE = "text/event-stream"
5252

53+
# Special key for the standalone GET stream
54+
GET_STREAM_KEY = "_GET_stream"
55+
5356
# Session ID validation pattern (visible ASCII characters ranging from 0x21 to 0x7E)
5457
# Pattern ensures entire string contains only valid characters by using ^ and $ anchors
5558
SESSION_ID_PATTERN = re.compile(r"^[\x21-\x7E]+$")
@@ -443,10 +446,19 @@ async def sse_writer():
443446
return
444447

445448
async def _handle_get_request(self, request: Request, send: Send) -> None:
446-
"""Handle GET requests for SSE stream establishment."""
447-
# Validate session ID if server has one
448-
if not await self._validate_session(request, send):
449-
return
449+
"""
450+
Handle GET request to establish SSE.
451+
452+
This allows the server to communicate to the client without the client
453+
first sending data via HTTP POST. The server can send JSON-RPC requests
454+
and notifications on this stream.
455+
"""
456+
writer = self._read_stream_writer
457+
if writer is None:
458+
raise ValueError(
459+
"No read stream writer available. Ensure connect() is called first."
460+
)
461+
450462
# Validate Accept header - must include text/event-stream
451463
_, has_sse = self._check_accept_headers(request)
452464

@@ -458,13 +470,80 @@ async def _handle_get_request(self, request: Request, send: Send) -> None:
458470
await response(request.scope, request.receive, send)
459471
return
460472

461-
# TODO: Implement SSE stream for GET requests
462-
# For now, return 405 Method Not Allowed
463-
response = self._create_error_response(
464-
"SSE stream from GET request not implemented yet",
465-
HTTPStatus.METHOD_NOT_ALLOWED,
473+
if not await self._validate_session(request, send):
474+
return
475+
476+
headers = {
477+
"Cache-Control": "no-cache, no-transform",
478+
"Connection": "keep-alive",
479+
"Content-Type": CONTENT_TYPE_SSE,
480+
}
481+
482+
if self.mcp_session_id:
483+
headers[MCP_SESSION_ID_HEADER] = self.mcp_session_id
484+
485+
# Check if we already have an active GET stream
486+
if GET_STREAM_KEY in self._request_streams:
487+
response = self._create_error_response(
488+
"Conflict: Only one SSE stream is allowed per session",
489+
HTTPStatus.CONFLICT,
490+
)
491+
await response(request.scope, request.receive, send)
492+
return
493+
494+
# Create SSE stream
495+
sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[
496+
dict[str, Any]
497+
](0)
498+
499+
async def standalone_sse_writer():
500+
try:
501+
# Create a standalone message stream for server-initiated messages
502+
standalone_stream_writer, standalone_stream_reader = (
503+
anyio.create_memory_object_stream[JSONRPCMessage](0)
504+
)
505+
506+
# Register this stream using the special key
507+
self._request_streams[GET_STREAM_KEY] = standalone_stream_writer
508+
509+
async with sse_stream_writer, standalone_stream_reader:
510+
# Process messages from the standalone stream
511+
async for received_message in standalone_stream_reader:
512+
# For the standalone stream, we handle:
513+
# - JSONRPCNotification (server sends notifications to client)
514+
# - JSONRPCRequest (server sends requests to client)
515+
# We should NOT receive JSONRPCResponse
516+
517+
# Send the message via SSE
518+
event_data = {
519+
"event": "message",
520+
"data": received_message.model_dump_json(
521+
by_alias=True, exclude_none=True
522+
),
523+
}
524+
525+
await sse_stream_writer.send(event_data)
526+
except Exception as e:
527+
logger.exception(f"Error in standalone SSE writer: {e}")
528+
finally:
529+
logger.debug("Closing standalone SSE writer")
530+
# Remove the stream from request_streams
531+
self._request_streams.pop(GET_STREAM_KEY, None)
532+
533+
# Create and start EventSourceResponse
534+
response = EventSourceResponse(
535+
content=sse_stream_reader,
536+
data_sender_callable=standalone_sse_writer,
537+
headers=headers,
466538
)
467-
await response(request.scope, request.receive, send)
539+
540+
try:
541+
# This will send headers immediately and establish the SSE connection
542+
await response(request.scope, request.receive, send)
543+
except Exception as e:
544+
logger.exception(f"Error in standalone SSE response: {e}")
545+
# Clean up the request stream
546+
self._request_streams.pop(GET_STREAM_KEY, None)
468547

469548
async def _handle_delete_request(self, request: Request, send: Send) -> None:
470549
"""Handle DELETE requests for explicit session termination."""
@@ -611,21 +690,18 @@ async def message_router():
611690
else:
612691
target_request_id = str(message.root.id)
613692

614-
# Send to the specific request stream if available
615-
if (
616-
target_request_id
617-
and target_request_id in self._request_streams
618-
):
693+
request_stream_id = target_request_id or GET_STREAM_KEY
694+
if request_stream_id in self._request_streams:
619695
try:
620-
await self._request_streams[target_request_id].send(
696+
await self._request_streams[request_stream_id].send(
621697
message
622698
)
623699
except (
624700
anyio.BrokenResourceError,
625701
anyio.ClosedResourceError,
626702
):
627703
# Stream might be closed, remove from registry
628-
self._request_streams.pop(target_request_id, None)
704+
self._request_streams.pop(request_stream_id, None)
629705
except Exception as e:
630706
logger.exception(f"Error in message router: {e}")
631707

tests/server/test_streamableHttp.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,3 +541,92 @@ def test_json_response(json_response_server, json_server_url):
541541
)
542542
assert response.status_code == 200
543543
assert response.headers.get("Content-Type") == "application/json"
544+
545+
546+
def test_get_sse_stream(basic_server, basic_server_url):
547+
"""Test establishing an SSE stream via GET request."""
548+
# First, we need to initialize a session
549+
mcp_url = f"{basic_server_url}/mcp"
550+
init_response = requests.post(
551+
mcp_url,
552+
headers={
553+
"Accept": "application/json, text/event-stream",
554+
"Content-Type": "application/json",
555+
},
556+
json=INIT_REQUEST,
557+
)
558+
assert init_response.status_code == 200
559+
560+
# Get the session ID
561+
session_id = init_response.headers.get(MCP_SESSION_ID_HEADER)
562+
assert session_id is not None
563+
564+
# Now attempt to establish an SSE stream via GET
565+
get_response = requests.get(
566+
mcp_url,
567+
headers={
568+
"Accept": "text/event-stream",
569+
MCP_SESSION_ID_HEADER: session_id,
570+
},
571+
stream=True,
572+
)
573+
574+
# Verify we got a successful response with the right content type
575+
assert get_response.status_code == 200
576+
assert get_response.headers.get("Content-Type") == "text/event-stream"
577+
578+
# Test that a second GET request gets rejected (only one stream allowed)
579+
second_get = requests.get(
580+
mcp_url,
581+
headers={
582+
"Accept": "text/event-stream",
583+
MCP_SESSION_ID_HEADER: session_id,
584+
},
585+
stream=True,
586+
)
587+
588+
# Should get CONFLICT (409) since there's already a stream
589+
# Note: This might fail if the first stream fully closed before this runs,
590+
# but generally it should work in the test environment where it runs quickly
591+
assert second_get.status_code == 409
592+
593+
594+
def test_get_validation(basic_server, basic_server_url):
595+
"""Test validation for GET requests."""
596+
# First, we need to initialize a session
597+
mcp_url = f"{basic_server_url}/mcp"
598+
init_response = requests.post(
599+
mcp_url,
600+
headers={
601+
"Accept": "application/json, text/event-stream",
602+
"Content-Type": "application/json",
603+
},
604+
json=INIT_REQUEST,
605+
)
606+
assert init_response.status_code == 200
607+
608+
# Get the session ID
609+
session_id = init_response.headers.get(MCP_SESSION_ID_HEADER)
610+
assert session_id is not None
611+
612+
# Test without Accept header
613+
response = requests.get(
614+
mcp_url,
615+
headers={
616+
MCP_SESSION_ID_HEADER: session_id,
617+
},
618+
stream=True,
619+
)
620+
assert response.status_code == 406
621+
assert "Not Acceptable" in response.text
622+
623+
# Test with wrong Accept header
624+
response = requests.get(
625+
mcp_url,
626+
headers={
627+
"Accept": "application/json",
628+
MCP_SESSION_ID_HEADER: session_id,
629+
},
630+
)
631+
assert response.status_code == 406
632+
assert "Not Acceptable" in response.text

0 commit comments

Comments
 (0)