From f0fbfc7fcbc1489177f5c81cad7f49ba88b8244e Mon Sep 17 00:00:00 2001 From: qikaigao <33371207+qikaigao@users.noreply.github.com> Date: Mon, 14 Apr 2025 00:46:13 -0700 Subject: [PATCH 1/9] StreamableHTTPServerTransport should only check init status when there is an sessionId When using stateless mode, the backend will create a new transport for each request. For new transport, the initialized filed is always false. So we should only check the initialized filed if there is session existing --- src/server/streamableHttp.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index ec8d2aa7..fc13e762 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -371,7 +371,7 @@ export class StreamableHTTPServerTransport implements Transport { * Returns true if the session is valid, false otherwise */ private validateSession(req: IncomingMessage, res: ServerResponse): boolean { - if (!this._initialized) { + if (this.sessionId && !this._initialized) { // If the server has not been initialized yet, reject all requests res.writeHead(400).end(JSON.stringify({ jsonrpc: "2.0", From 0058c30032991cbc8d9e26ac7430d5a9dc442015 Mon Sep 17 00:00:00 2001 From: qikaigao <33371207+qikaigao@users.noreply.github.com> Date: Thu, 17 Apr 2025 11:08:34 -0700 Subject: [PATCH 2/9] Update src/server/streamableHttp.ts Co-authored-by: Cliff Hall --- src/server/streamableHttp.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index 78f3dcc5..a388b4ee 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -463,7 +463,12 @@ export class StreamableHTTPServerTransport implements Transport { * Returns true if the session is valid, false otherwise */ private validateSession(req: IncomingMessage, res: ServerResponse): boolean { - if (this.sessionId && !this._initialized) { + if (this.sessionId === undefined) { + // If the session ID is not set, the session management is disabled + // and we don't need to validate the session ID + return true; + } + if (!this._initialized) { // If the server has not been initialized yet, reject all requests res.writeHead(400).end(JSON.stringify({ jsonrpc: "2.0", From 1df348710b78746126391c02ff27d93ceb24d688 Mon Sep 17 00:00:00 2001 From: qikaigao <33371207+qikaigao@users.noreply.github.com> Date: Thu, 17 Apr 2025 15:38:12 -0700 Subject: [PATCH 3/9] Update streamableHttp.ts --- src/server/streamableHttp.ts | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index a388b4ee..dfaeed7b 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -480,11 +480,7 @@ export class StreamableHTTPServerTransport implements Transport { })); return false; } - if (this.sessionId === undefined) { - // If the session ID is not set, the session management is disabled - // and we don't need to validate the session ID - return true; - } + const sessionId = req.headers["mcp-session-id"]; if (!sessionId) { From 70608b58762fc92c5b615f15685f702eaeefd536 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Mon, 21 Apr 2025 11:54:10 +0100 Subject: [PATCH 4/9] Add more test clients --- .../client/multipleClientsParallel.ts | 159 +++++++++++ .../client/parallelToolCallsClient.ts | 197 ++++++++++++++ .../streamableHttpBatchMessages.test.ts | 252 ++++++++++++++++++ 3 files changed, 608 insertions(+) create mode 100644 src/examples/client/multipleClientsParallel.ts create mode 100644 src/examples/client/parallelToolCallsClient.ts create mode 100644 src/integration-tests/streamableHttpBatchMessages.test.ts diff --git a/src/examples/client/multipleClientsParallel.ts b/src/examples/client/multipleClientsParallel.ts new file mode 100644 index 00000000..90b9d4ea --- /dev/null +++ b/src/examples/client/multipleClientsParallel.ts @@ -0,0 +1,159 @@ +import { Client } from '../../client/index.js'; +import { StreamableHTTPClientTransport } from '../../client/streamableHttp.js'; +import { + CallToolRequest, + CallToolResultSchema, + LoggingMessageNotificationSchema, +} from '../../types.js'; + +/** + * Multiple Clients MCP Example + * + * This client demonstrates how to: + * 1. Create multiple MCP clients in parallel + * 2. Each client calls a single tool + * 3. Track notifications from each client independently + */ + +// Command line args processing +const args = process.argv.slice(2); +const serverUrl = args[0] || 'http://localhost:3000/mcp'; + +interface ClientConfig { + id: string; + name: string; + toolName: string; + toolArguments: Record; +} + +async function createAndRunClient(config: ClientConfig): Promise<{ id: string; result: any }> { + console.log(`[${config.id}] Creating client: ${config.name}`); + + const client = new Client({ + name: config.name, + version: '1.0.0' + }); + + const transport = new StreamableHTTPClientTransport(new URL(serverUrl)); + + // Set up client-specific error handler + client.onerror = (error) => { + console.error(`[${config.id}] Client error:`, error); + }; + + // Set up client-specific notification handler + client.setNotificationHandler(LoggingMessageNotificationSchema, (notification) => { + console.log(`[${config.id}] Notification: ${notification.params.data}`); + }); + + try { + // Connect to the server + await client.connect(transport); + console.log(`[${config.id}] Connected to MCP server`); + + // Call the specified tool + console.log(`[${config.id}] Calling tool: ${config.toolName}`); + const toolRequest: CallToolRequest = { + method: 'tools/call', + params: { + name: config.toolName, + arguments: { + ...config.toolArguments, + // Add client ID to arguments for identification in notifications + caller: config.id + } + } + }; + + const result = await client.request(toolRequest, CallToolResultSchema); + console.log(`[${config.id}] Tool call completed`); + + // Keep the connection open for a bit to receive notifications + await new Promise(resolve => setTimeout(resolve, 5000)); + + // Disconnect + await transport.close(); + console.log(`[${config.id}] Disconnected from MCP server`); + + return { id: config.id, result }; + } catch (error) { + console.error(`[${config.id}] Error:`, error); + throw error; + } +} + +async function main(): Promise { + console.log('MCP Multiple Clients Example'); + console.log('============================'); + console.log(`Server URL: ${serverUrl}`); + console.log(''); + + try { + // Define client configurations + const clientConfigs: ClientConfig[] = [ + { + id: 'client1', + name: 'basic-client-1', + toolName: 'start-notification-stream', + toolArguments: { + interval: 3, // 1 second between notifications + count: 5 // Send 5 notifications + } + }, + { + id: 'client2', + name: 'basic-client-2', + toolName: 'start-notification-stream', + toolArguments: { + interval: 2, // 2 seconds between notifications + count: 3 // Send 3 notifications + } + }, + { + id: 'client3', + name: 'basic-client-3', + toolName: 'start-notification-stream', + toolArguments: { + interval: 1, // 0.5 second between notifications + count: 8 // Send 8 notifications + } + } + ]; + + // Start all clients in parallel + console.log(`Starting ${clientConfigs.length} clients in parallel...`); + console.log(''); + + const clientPromises = clientConfigs.map(config => createAndRunClient(config)); + const results = await Promise.all(clientPromises); + + // Display results from all clients + console.log('\n=== Final Results ==='); + results.forEach(({ id, result }) => { + console.log(`\n[${id}] Tool result:`); + if (Array.isArray(result.content)) { + result.content.forEach((item: { type: string; text?: string }) => { + if (item.type === 'text' && item.text) { + console.log(` ${item.text}`); + } else { + console.log(` ${item.type} content:`, item); + } + }); + } else { + console.log(` Unexpected result format:`, result); + } + }); + + console.log('\n=== All clients completed successfully ==='); + + } catch (error) { + console.error('Error running multiple clients:', error); + process.exit(1); + } +} + +// Start the example +main().catch((error: unknown) => { + console.error('Error running MCP multiple clients example:', error); + process.exit(1); +}); \ No newline at end of file diff --git a/src/examples/client/parallelToolCallsClient.ts b/src/examples/client/parallelToolCallsClient.ts new file mode 100644 index 00000000..8a53ade9 --- /dev/null +++ b/src/examples/client/parallelToolCallsClient.ts @@ -0,0 +1,197 @@ +import { Client } from '../../client/index.js'; +import { StreamableHTTPClientTransport } from '../../client/streamableHttp.js'; +import { + ListToolsRequest, + ListToolsResultSchema, + CallToolRequest, + CallToolResultSchema, + LoggingMessageNotificationSchema, +} from '../../types.js'; + +/** + * Parallel Tool Calls MCP Client + * + * This client demonstrates how to: + * 1. Start multiple tool calls in parallel + * 2. Track notifications from each tool call using a caller parameter + */ + +// Command line args processing +const args = process.argv.slice(2); +const serverUrl = args[0] || 'http://localhost:3000/mcp'; + +async function main(): Promise { + console.log('MCP Parallel Tool Calls Client'); + console.log('=============================='); + console.log(`Connecting to server at: ${serverUrl}`); + + let client: Client; + let transport: StreamableHTTPClientTransport; + + try { + // Create client with streamable HTTP transport + client = new Client({ + name: 'parallel-tool-calls-client', + version: '1.0.0' + }); + + client.onerror = (error) => { + console.error('Client error:', error); + }; + + // Connect to the server + transport = new StreamableHTTPClientTransport(new URL(serverUrl)); + await client.connect(transport); + console.log('Successfully connected to MCP server'); + + // Set up notification handler with caller identification + client.setNotificationHandler(LoggingMessageNotificationSchema, (notification) => { + console.log(`Notification: ${notification.params.data}`); + }); + + console.log("List tools") + const toolsRequest = await listTools(client); + console.log("Tools: ", toolsRequest) + + + // 2. Start multiple notification tools in parallel + console.log('\n=== Starting Multiple Notification Streams in Parallel ==='); + const toolResults = await startParallelNotificationTools(client); + + // Log the results from each tool call + for (const [caller, result] of Object.entries(toolResults)) { + console.log(`\n=== Tool result for ${caller} ===`); + result.content.forEach((item: { type: string; text: any; }) => { + if (item.type === 'text') { + console.log(` ${item.text}`); + } else { + console.log(` ${item.type} content:`, item); + } + }); + } + + // 3. Wait for all notifications (10 seconds) + console.log('\n=== Waiting for all notifications ==='); + await new Promise(resolve => setTimeout(resolve, 10000)); + + // 4. Disconnect + console.log('\n=== Disconnecting ==='); + await transport.close(); + console.log('Disconnected from MCP server'); + + } catch (error) { + console.error('Error running client:', error); + process.exit(1); + } +} + +/** + * List available tools on the server + */ +async function listTools(client: Client): Promise { + try { + const toolsRequest: ListToolsRequest = { + method: 'tools/list', + params: {} + }; + const toolsResult = await client.request(toolsRequest, ListToolsResultSchema); + + console.log('Available tools:'); + if (toolsResult.tools.length === 0) { + console.log(' No tools available'); + } else { + for (const tool of toolsResult.tools) { + console.log(` - ${tool.name}: ${tool.description}`); + } + } + } catch (error) { + console.log(`Tools not supported by this server: ${error}`); + } +} + +/** + * Start multiple notification tools in parallel with different configurations + * Each tool call includes a caller parameter to identify its notifications + */ +async function startParallelNotificationTools(client: Client): Promise> { + try { + // Define multiple tool calls with different configurations + const toolCalls = [ + { + caller: 'fast-notifier', + request: { + method: 'tools/call', + params: { + name: 'start-notification-stream', + arguments: { + interval: 2, // 0.5 second between notifications + count: 10, // Send 10 notifications + caller: 'fast-notifier' // Identify this tool call + } + } + } + }, + { + caller: 'slow-notifier', + request: { + method: 'tools/call', + params: { + name: 'start-notification-stream', + arguments: { + interval: 5, // 2 seconds between notifications + count: 5, // Send 5 notifications + caller: 'slow-notifier' // Identify this tool call + } + } + } + }, + { + caller: 'burst-notifier', + request: { + method: 'tools/call', + params: { + name: 'start-notification-stream', + arguments: { + interval: 1, // 0.1 second between notifications + count: 3, // Send just 3 notifications + caller: 'burst-notifier' // Identify this tool call + } + } + } + } + ]; + + console.log(`Starting ${toolCalls.length} notification tools in parallel...`); + + // Start all tool calls in parallel + const toolPromises = toolCalls.map(({ caller, request }) => { + console.log(`Starting tool call for ${caller}...`); + return client.request(request, CallToolResultSchema) + .then(result => ({ caller, result })) + .catch(error => { + console.error(`Error in tool call for ${caller}:`, error); + throw error; + }); + }); + + // Wait for all tool calls to complete + const results = await Promise.all(toolPromises); + + // Organize results by caller + const resultsByTool: Record = {}; + results.forEach(({ caller, result }) => { + resultsByTool[caller] = result; + }); + + return resultsByTool; + } catch (error) { + console.error(`Error starting parallel notification tools:`, error); + throw error; + } +} + +// Start the client +main().catch((error: unknown) => { + console.error('Error running MCP client:', error); + process.exit(1); +}); \ No newline at end of file diff --git a/src/integration-tests/streamableHttpBatchMessages.test.ts b/src/integration-tests/streamableHttpBatchMessages.test.ts new file mode 100644 index 00000000..dc79e3de --- /dev/null +++ b/src/integration-tests/streamableHttpBatchMessages.test.ts @@ -0,0 +1,252 @@ +/** + * Integration tests for batch messaging in StreamableHttp transport + * + * This test suite focuses specifically on the requirement from the spec: + * "The body of the POST request MUST be one of the following: + * - A single JSON-RPC request, notification, or response + * - An array batching one or more requests and/or notifications + * - An array batching one or more responses" + */ + +import { createServer, type Server } from "node:http"; +import { AddressInfo } from "node:net"; +import { randomUUID } from "node:crypto"; +import { StreamableHTTPServerTransport } from "../server/streamableHttp.js"; +import { McpServer } from "../server/mcp.js"; +import { Client } from "../client/index.js"; +import { CallToolResult, CallToolResultSchema, JSONRPCMessage } from "../types.js"; +import { z } from "zod"; +import { StreamableHTTPClientTransport } from '../client/streamableHttp.js'; +describe("StreamableHttp Batch Messaging - SSE", () => { + let server: Server; + let client: Client; + + // Just a basic server and client for testing + beforeEach(async () => { + // Create MCP server with test tools + const mcpServer = new McpServer( + { name: "batch-test-server", version: "1.0.0" }, + { capabilities: { logging: {} } } + ); + + // Simple greeting tool + mcpServer.tool( + "greet", + "A simple greeting tool", + { name: z.string().describe("Name to greet") }, + async ({ name }): Promise => { + return { content: [{ type: "text", text: `Hello, ${name}!` }] }; + } + ); + + // Server transport with session management + const serverTransport = new StreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + }); + + // Connect transport to server + await mcpServer.connect(serverTransport); + + // Create HTTP server + server = createServer(async (req, res) => { + await serverTransport.handleRequest(req, res); + }); + + // Start server on random port + const baseUrl = await new Promise((resolve) => { + server.listen(0, '127.0.0.1', () => { + const addr = server.address() as AddressInfo; + resolve(new URL(`http://127.0.0.1:${addr.port}`)); + }); + }); + + // Create client that connects to the server + const transport = new StreamableHTTPClientTransport(baseUrl); + client = new Client({ name: "test-client", version: "1.0.0" }); + await client.connect(transport); + + }); + + afterEach(async () => { + await client.close(); + await new Promise((resolve) => server.close(() => resolve())); + }); + + /** + * 1. Test sending a single JSON-RPC request + */ + it("handles a single request", async () => { + const result = await client.callTool({ + name: "greet", + arguments: { + name: "user" + } + }); + + expect(result).toHaveProperty("content"); + expect(Array.isArray(result.content)).toBe(true); + expect(result.content).toHaveLength(1); + + // Type assertion to handle the unknown type + const content = result.content as Array<{ type: string, text: string }>; + expect(content[0]).toHaveProperty("type", "text"); + expect(content[0]).toHaveProperty("text", "Hello, user!"); + + }); + + /** + * 1. Test sending a single JSON-RPC request + */ + it("handles a single request with request", async () => { + const result = await client.request( + { + method: "tools/call", + params: { + name: "greet", + arguments: { + name: "user" + } + }, + }, + CallToolResultSchema, + ); + + + expect(result).toHaveProperty("content"); + expect(Array.isArray(result.content)).toBe(true); + expect(result.content).toHaveLength(1); + + // Type assertion to handle the unknown type + const content = result.content as Array<{ type: string, text: string }>; + expect(content[0]).toHaveProperty("type", "text"); + expect(content[0]).toHaveProperty("text", "Hello, user!"); + + }); + + // it("handles an array batching one or more requests and/or notifications", async () => { + // const result = await client.request({ + + // }); + + // }); + + +}); + +describe("StreamableHttp Batch Messaging - JSON response", () => { + let server: Server; + let client: Client; + + // Just a basic server and client for testing + beforeEach(async () => { + // Create MCP server with test tools + const mcpServer = new McpServer( + { name: "batch-test-server", version: "1.0.0" }, + { capabilities: { logging: {} } } + ); + + // Simple greeting tool + mcpServer.tool( + "greet", + "A simple greeting tool", + { name: z.string().describe("Name to greet") }, + async ({ name }): Promise => { + return { content: [{ type: "text", text: `Hello, ${name}!` }] }; + } + ); + + // Server transport with session management + const serverTransport = new StreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + enableJsonResponse: true, + + }); + + // Connect transport to server + await mcpServer.connect(serverTransport); + + // Create HTTP server + server = createServer(async (req, res) => { + await serverTransport.handleRequest(req, res); + }); + + // Start server on random port + const baseUrl = await new Promise((resolve) => { + server.listen(0, '127.0.0.1', () => { + const addr = server.address() as AddressInfo; + resolve(new URL(`http://127.0.0.1:${addr.port}`)); + }); + }); + + // Create client that connects to the server + const transport = new StreamableHTTPClientTransport(baseUrl); + client = new Client({ name: "test-client", version: "1.0.0" }); + await client.connect(transport); + + }); + + afterEach(async () => { + await client.close(); + await new Promise((resolve) => server.close(() => resolve())); + }); + + /** + * 1. Test sending a single JSON-RPC request + */ + it("handles a single request", async () => { + const result = await client.callTool({ + name: "greet", + arguments: { + name: "user" + } + }); + + expect(result).toHaveProperty("content"); + expect(Array.isArray(result.content)).toBe(true); + expect(result.content).toHaveLength(1); + + // Type assertion to handle the unknown type + const content = result.content as Array<{ type: string, text: string }>; + expect(content[0]).toHaveProperty("type", "text"); + expect(content[0]).toHaveProperty("text", "Hello, user!"); + + }); + + /** + * 1. Test sending a single JSON-RPC request + */ + it("handles a single request with request", async () => { + const result = await client.request( + { + method: "tools/call", + params: { + name: "greet", + arguments: { + name: "user" + } + }, + }, + CallToolResultSchema, + ); + + + expect(result).toHaveProperty("content"); + expect(Array.isArray(result.content)).toBe(true); + expect(result.content).toHaveLength(1); + + // Type assertion to handle the unknown type + const content = result.content as Array<{ type: string, text: string }>; + expect(content[0]).toHaveProperty("type", "text"); + expect(content[0]).toHaveProperty("text", "Hello, user!"); + + }); + + // it("handles an array batching one or more requests and/or notifications", async () => { + // const result = await client.request({ + + // }); + + // }); + + +}); \ No newline at end of file From 39f7fdf050b34a04ca06c740f38b9448941f0691 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Mon, 21 Apr 2025 14:41:12 +0100 Subject: [PATCH 5/9] fix stateless example --- .../server/simpleStatelessStreamableHttp.ts | 205 +++++++++--------- 1 file changed, 99 insertions(+), 106 deletions(-) diff --git a/src/examples/server/simpleStatelessStreamableHttp.ts b/src/examples/server/simpleStatelessStreamableHttp.ts index f1f37510..1fbdbddc 100644 --- a/src/examples/server/simpleStatelessStreamableHttp.ts +++ b/src/examples/server/simpleStatelessStreamableHttp.ts @@ -4,108 +4,114 @@ import { StreamableHTTPServerTransport } from '../../server/streamableHttp.js'; import { z } from 'zod'; import { CallToolResult, GetPromptResult, ReadResourceResult } from '../../types.js'; -// Create an MCP server with implementation details -const server = new McpServer({ - name: 'stateless-streamable-http-server', - version: '1.0.0', -}, { capabilities: { logging: {} } }); - -// Register a simple prompt -server.prompt( - 'greeting-template', - 'A simple greeting prompt template', - { - name: z.string().describe('Name to include in greeting'), - }, - async ({ name }): Promise => { - return { - messages: [ - { - role: 'user', - content: { - type: 'text', - text: `Please greet ${name} in a friendly manner.`, +const getServer = () => { + // Create an MCP server with implementation details + const server = new McpServer({ + name: 'stateless-streamable-http-server', + version: '1.0.0', + }, { capabilities: { logging: {} } }); + + // Register a simple prompt + server.prompt( + 'greeting-template', + 'A simple greeting prompt template', + { + name: z.string().describe('Name to include in greeting'), + }, + async ({ name }): Promise => { + return { + messages: [ + { + role: 'user', + content: { + type: 'text', + text: `Please greet ${name} in a friendly manner.`, + }, }, - }, - ], - }; - } -); - -// Register a tool specifically for testing resumability -server.tool( - 'start-notification-stream', - 'Starts sending periodic notifications for testing resumability', - { - interval: z.number().describe('Interval in milliseconds between notifications').default(100), - count: z.number().describe('Number of notifications to send (0 for 100)').default(10), - }, - async ({ interval, count }, { sendNotification }): Promise => { - const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); - let counter = 0; - - while (count === 0 || counter < count) { - counter++; - try { - await sendNotification({ - method: "notifications/message", - params: { - level: "info", - data: `Periodic notification #${counter} at ${new Date().toISOString()}` - } - }); - } - catch (error) { - console.error("Error sending notification:", error); - } - // Wait for the specified interval - await sleep(interval); + ], + }; } + ); + + // Register a tool specifically for testing resumability + server.tool( + 'start-notification-stream', + 'Starts sending periodic notifications for testing resumability', + { + interval: z.number().describe('Interval in milliseconds between notifications').default(100), + count: z.number().describe('Number of notifications to send (0 for 100)').default(10), + }, + async ({ interval, count }, { sendNotification }): Promise => { + const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); + let counter = 0; - return { - content: [ - { - type: 'text', - text: `Started sending periodic notifications every ${interval}ms`, + while (count === 0 || counter < count) { + counter++; + try { + await sendNotification({ + method: "notifications/message", + params: { + level: "info", + data: `Periodic notification #${counter} at ${new Date().toISOString()}` + } + }); } - ], - }; - } -); - -// Create a simple resource at a fixed URI -server.resource( - 'greeting-resource', - 'https://example.com/greetings/default', - { mimeType: 'text/plain' }, - async (): Promise => { - return { - contents: [ - { - uri: 'https://example.com/greetings/default', - text: 'Hello, world!', - }, - ], - }; - } -); + catch (error) { + console.error("Error sending notification:", error); + } + // Wait for the specified interval + await sleep(interval); + } + + return { + content: [ + { + type: 'text', + text: `Started sending periodic notifications every ${interval}ms`, + } + ], + }; + } + ); + + // Create a simple resource at a fixed URI + server.resource( + 'greeting-resource', + 'https://example.com/greetings/default', + { mimeType: 'text/plain' }, + async (): Promise => { + return { + contents: [ + { + uri: 'https://example.com/greetings/default', + text: 'Hello, world!', + }, + ], + }; + } + ); + return server; +} const app = express(); app.use(express.json()); -const transport: StreamableHTTPServerTransport = new StreamableHTTPServerTransport({ - sessionIdGenerator: undefined, -}); -// Setup routes for the server -const setupServer = async () => { - await server.connect(transport); -}; + app.post('/mcp', async (req: Request, res: Response) => { - console.log('Received MCP request:', req.body); + const server = getServer(); try { - await transport.handleRequest(req, res, req.body); + const transport: StreamableHTTPServerTransport = new StreamableHTTPServerTransport({ + sessionIdGenerator: undefined, + }); + await server.connect(transport); + await transport.handleRequest(req, res, req.body); + res.on('close', () => { + console.log('Request closed'); + transport.close(); + server.close(); + }); } catch (error) { console.error('Error handling MCP request:', error); if (!res.headersSent) { @@ -145,28 +151,15 @@ app.delete('/mcp', async (req: Request, res: Response) => { })); }); + // Start the server const PORT = 3000; -setupServer().then(() => { - app.listen(PORT, () => { - console.log(`MCP Streamable HTTP Server listening on port ${PORT}`); - }); -}).catch(error => { - console.error('Failed to set up the server:', error); - process.exit(1); +app.listen(PORT, () => { + console.log(`MCP Stateless Streamable HTTP Server listening on port ${PORT}`); }); // Handle server shutdown process.on('SIGINT', async () => { console.log('Shutting down server...'); - try { - console.log(`Closing transport`); - await transport.close(); - } catch (error) { - console.error(`Error closing transport:`, error); - } - - await server.close(); - console.log('Server shutdown complete'); process.exit(0); }); \ No newline at end of file From 37fc7d2ca814bfc04ca89244978cec26b5f3f79c Mon Sep 17 00:00:00 2001 From: ihrpr Date: Mon, 21 Apr 2025 14:56:12 +0100 Subject: [PATCH 6/9] fix check to skip session validation for stateless mode --- src/server/streamableHttp.test.ts | 2 +- src/server/streamableHttp.ts | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/server/streamableHttp.test.ts b/src/server/streamableHttp.test.ts index a0f2e0bb..7f9e85d8 100644 --- a/src/server/streamableHttp.test.ts +++ b/src/server/streamableHttp.test.ts @@ -288,7 +288,7 @@ describe("StreamableHTTPServerTransport", () => { }); }); - it("should reject requests without a valid session ID", async () => { + it.only("should reject requests without a valid session ID", async () => { const response = await sendPostRequest(baseUrl, TEST_MESSAGES.toolsList); expect(response.status).toBe(400); diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index b17ef043..ed83076a 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -463,8 +463,8 @@ export class StreamableHTTPServerTransport implements Transport { * Returns true if the session is valid, false otherwise */ private validateSession(req: IncomingMessage, res: ServerResponse): boolean { - if (this.sessionId === undefined) { - // If the session ID is not set, the session management is disabled + if (this.sessionIdGenerator === undefined) { + // If the sessionIdGenerator ID is not set, the session management is disabled // and we don't need to validate the session ID return true; } From eeda5d630a57d45b04c5522cb733c00e9f2a6552 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Mon, 21 Apr 2025 15:13:25 +0100 Subject: [PATCH 7/9] readme update --- README.md | 32 +- .../server/jsonResponseStreamableHttp.ts | 138 ++++---- src/examples/server/simpleSseServer.ts | 107 +++--- .../server/simpleStatelessStreamableHttp.ts | 3 - src/examples/server/simpleStreamableHttp.ts | 311 +++++++++--------- .../sseAndStreamableHttpCompatibleServer.ts | 91 ++--- 6 files changed, 344 insertions(+), 338 deletions(-) diff --git a/README.md b/README.md index 200cfab6..605cc725 100644 --- a/README.md +++ b/README.md @@ -312,19 +312,19 @@ For simpler use cases where session management isn't needed: const app = express(); app.use(express.json()); -const transport: StreamableHTTPServerTransport = new StreamableHTTPServerTransport({ - sessionIdGenerator: undefined, // set to undefined for stateless servers -}); - -// Setup routes for the server -const setupServer = async () => { - await server.connect(transport); -}; - app.post('/mcp', async (req: Request, res: Response) => { - console.log('Received MCP request:', req.body); try { - await transport.handleRequest(req, res, req.body); + const server = getServer(); + const transport: StreamableHTTPServerTransport = new StreamableHTTPServerTransport({ + sessionIdGenerator: undefined, + }); + await server.connect(transport); + await transport.handleRequest(req, res, req.body); + res.on('close', () => { + console.log('Request closed'); + transport.close(); + server.close(); + }); } catch (error) { console.error('Error handling MCP request:', error); if (!res.headersSent) { @@ -364,15 +364,11 @@ app.delete('/mcp', async (req: Request, res: Response) => { })); }); + // Start the server const PORT = 3000; -setupServer().then(() => { - app.listen(PORT, () => { - console.log(`MCP Streamable HTTP Server listening on port ${PORT}`); - }); -}).catch(error => { - console.error('Failed to set up the server:', error); - process.exit(1); +app.listen(PORT, () => { + console.log(`MCP Stateless Streamable HTTP Server listening on port ${PORT}`); }); ``` diff --git a/src/examples/server/jsonResponseStreamableHttp.ts b/src/examples/server/jsonResponseStreamableHttp.ts index bcd6a960..02d8c2de 100644 --- a/src/examples/server/jsonResponseStreamableHttp.ts +++ b/src/examples/server/jsonResponseStreamableHttp.ts @@ -5,74 +5,78 @@ import { StreamableHTTPServerTransport } from '../../server/streamableHttp.js'; import { z } from 'zod'; import { CallToolResult, isInitializeRequest } from '../../types.js'; + // Create an MCP server with implementation details -const server = new McpServer({ - name: 'json-response-streamable-http-server', - version: '1.0.0', -}, { - capabilities: { - logging: {}, - } -}); +const getServer = () => { + const server = new McpServer({ + name: 'json-response-streamable-http-server', + version: '1.0.0', + }, { + capabilities: { + logging: {}, + } + }); + + // Register a simple tool that returns a greeting + server.tool( + 'greet', + 'A simple greeting tool', + { + name: z.string().describe('Name to greet'), + }, + async ({ name }): Promise => { + return { + content: [ + { + type: 'text', + text: `Hello, ${name}!`, + }, + ], + }; + } + ); + + // Register a tool that sends multiple greetings with notifications + server.tool( + 'multi-greet', + 'A tool that sends different greetings with delays between them', + { + name: z.string().describe('Name to greet'), + }, + async ({ name }, { sendNotification }): Promise => { + const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); + + await sendNotification({ + method: "notifications/message", + params: { level: "debug", data: `Starting multi-greet for ${name}` } + }); -// Register a simple tool that returns a greeting -server.tool( - 'greet', - 'A simple greeting tool', - { - name: z.string().describe('Name to greet'), - }, - async ({ name }): Promise => { - return { - content: [ - { - type: 'text', - text: `Hello, ${name}!`, - }, - ], - }; - } -); - -// Register a tool that sends multiple greetings with notifications -server.tool( - 'multi-greet', - 'A tool that sends different greetings with delays between them', - { - name: z.string().describe('Name to greet'), - }, - async ({ name }, { sendNotification }): Promise => { - const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); - - await sendNotification({ - method: "notifications/message", - params: { level: "debug", data: `Starting multi-greet for ${name}` } - }); - - await sleep(1000); // Wait 1 second before first greeting - - await sendNotification({ - method: "notifications/message", - params: { level: "info", data: `Sending first greeting to ${name}` } - }); - - await sleep(1000); // Wait another second before second greeting - - await sendNotification({ - method: "notifications/message", - params: { level: "info", data: `Sending second greeting to ${name}` } - }); - - return { - content: [ - { - type: 'text', - text: `Good morning, ${name}!`, - } - ], - }; - } -); + await sleep(1000); // Wait 1 second before first greeting + + await sendNotification({ + method: "notifications/message", + params: { level: "info", data: `Sending first greeting to ${name}` } + }); + + await sleep(1000); // Wait another second before second greeting + + await sendNotification({ + method: "notifications/message", + params: { level: "info", data: `Sending second greeting to ${name}` } + }); + + return { + content: [ + { + type: 'text', + text: `Good morning, ${name}!`, + } + ], + }; + } + ); + return server; +} const app = express(); app.use(express.json()); @@ -104,6 +108,7 @@ app.post('/mcp', async (req: Request, res: Response) => { }); // Connect the transport to the MCP server BEFORE handling the request + const server = getServer(); await server.connect(transport); await transport.handleRequest(req, res, req.body); return; // Already handled @@ -153,6 +158,5 @@ app.listen(PORT, () => { // Handle server shutdown process.on('SIGINT', async () => { console.log('Shutting down server...'); - await server.close(); process.exit(0); }); \ No newline at end of file diff --git a/src/examples/server/simpleSseServer.ts b/src/examples/server/simpleSseServer.ts index 74cdcaac..cae3be30 100644 --- a/src/examples/server/simpleSseServer.ts +++ b/src/examples/server/simpleSseServer.ts @@ -15,60 +15,63 @@ import { CallToolResult } from '../../types.js'; */ // Create an MCP server instance -const server = new McpServer({ - name: 'simple-sse-server', - version: '1.0.0', -}, { capabilities: { logging: {} } }); - -server.tool( - 'start-notification-stream', - 'Starts sending periodic notifications', - { - interval: z.number().describe('Interval in milliseconds between notifications').default(1000), - count: z.number().describe('Number of notifications to send').default(10), - }, - async ({ interval, count }, { sendNotification }): Promise => { - const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); - let counter = 0; - - // Send the initial notification - await sendNotification({ - method: "notifications/message", - params: { - level: "info", - data: `Starting notification stream with ${count} messages every ${interval}ms` +const getServer = () => { + const server = new McpServer({ + name: 'simple-sse-server', + version: '1.0.0', + }, { capabilities: { logging: {} } }); + + server.tool( + 'start-notification-stream', + 'Starts sending periodic notifications', + { + interval: z.number().describe('Interval in milliseconds between notifications').default(1000), + count: z.number().describe('Number of notifications to send').default(10), + }, + async ({ interval, count }, { sendNotification }): Promise => { + const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); + let counter = 0; + + // Send the initial notification + await sendNotification({ + method: "notifications/message", + params: { + level: "info", + data: `Starting notification stream with ${count} messages every ${interval}ms` + } + }); + + // Send periodic notifications + while (counter < count) { + counter++; + await sleep(interval); + + try { + await sendNotification({ + method: "notifications/message", + params: { + level: "info", + data: `Notification #${counter} at ${new Date().toISOString()}` + } + }); + } + catch (error) { + console.error("Error sending notification:", error); + } } - }); - - // Send periodic notifications - while (counter < count) { - counter++; - await sleep(interval); - - try { - await sendNotification({ - method: "notifications/message", - params: { - level: "info", - data: `Notification #${counter} at ${new Date().toISOString()}` + + return { + content: [ + { + type: 'text', + text: `Completed sending ${count} notifications every ${interval}ms`, } - }); - } - catch (error) { - console.error("Error sending notification:", error); - } + ], + }; } - - return { - content: [ - { - type: 'text', - text: `Completed sending ${count} notifications every ${interval}ms`, - } - ], - }; - } -); + ); + return server; +}; const app = express(); app.use(express.json()); @@ -96,6 +99,7 @@ app.get('/mcp', async (req: Request, res: Response) => { }; // Connect the transport to the MCP server + const server = getServer(); await server.connect(transport); // Start the SSE transport to begin streaming @@ -163,7 +167,6 @@ process.on('SIGINT', async () => { console.error(`Error closing transport for session ${sessionId}:`, error); } } - await server.close(); console.log('Server shutdown complete'); process.exit(0); }); \ No newline at end of file diff --git a/src/examples/server/simpleStatelessStreamableHttp.ts b/src/examples/server/simpleStatelessStreamableHttp.ts index 1fbdbddc..6fb2ae83 100644 --- a/src/examples/server/simpleStatelessStreamableHttp.ts +++ b/src/examples/server/simpleStatelessStreamableHttp.ts @@ -96,9 +96,6 @@ const getServer = () => { const app = express(); app.use(express.json()); - - - app.post('/mcp', async (req: Request, res: Response) => { const server = getServer(); try { diff --git a/src/examples/server/simpleStreamableHttp.ts b/src/examples/server/simpleStreamableHttp.ts index 2b25f4b2..d94908bd 100644 --- a/src/examples/server/simpleStreamableHttp.ts +++ b/src/examples/server/simpleStreamableHttp.ts @@ -7,164 +7,167 @@ import { CallToolResult, GetPromptResult, isInitializeRequest, ReadResourceResul import { InMemoryEventStore } from '../shared/inMemoryEventStore.js'; // Create an MCP server with implementation details -const server = new McpServer({ - name: 'simple-streamable-http-server', - version: '1.0.0', -}, { capabilities: { logging: {} } }); - -// Log the capability invocation details -server.onCapabilityChange((event) => { - switch (event.action) { - case 'invoked': - console.log(`${event.capabilityType} invocation ${event.invocationIndex}: '${event.capabilityName}' started`); - break; - case 'completed': - console.log(`${event.capabilityType} invocation ${event.invocationIndex}: '${event.capabilityName}' completed in ${event.durationMs}ms`); - break; - case 'error': - console.log(`${event.capabilityType} invocation ${event.invocationIndex}: '${event.capabilityName}' failed in ${event.durationMs}ms: ${event.error}`); - break; - } -}); - -// Register a simple tool that returns a greeting -server.tool( - 'greet', - 'A simple greeting tool', - { - name: z.string().describe('Name to greet'), - }, - async ({ name }): Promise => { - return { - content: [ - { - type: 'text', - text: `Hello, ${name}!`, - }, - ], - }; - } -); - -// Register a tool that sends multiple greetings with notifications -server.tool( - 'multi-greet', - 'A tool that sends different greetings with delays between them', - { - name: z.string().describe('Name to greet'), - }, - async ({ name }, { sendNotification }): Promise => { - const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); - - await sendNotification({ - method: "notifications/message", - params: { level: "debug", data: `Starting multi-greet for ${name}` } - }); - - await sleep(1000); // Wait 1 second before first greeting - - await sendNotification({ - method: "notifications/message", - params: { level: "info", data: `Sending first greeting to ${name}` } - }); - - await sleep(1000); // Wait another second before second greeting - - await sendNotification({ - method: "notifications/message", - params: { level: "info", data: `Sending second greeting to ${name}` } - }); - - return { - content: [ - { - type: 'text', - text: `Good morning, ${name}!`, - } - ], - }; - } -); - -// Register a simple prompt -server.prompt( - 'greeting-template', - 'A simple greeting prompt template', - { - name: z.string().describe('Name to include in greeting'), - }, - async ({ name }): Promise => { - return { - messages: [ - { - role: 'user', - content: { +const getServer = () => { + const server = new McpServer({ + name: 'simple-streamable-http-server', + version: '1.0.0', + }, { capabilities: { logging: {} } }); + + // Log the capability invocation details + server.onCapabilityChange((event) => { + switch (event.action) { + case 'invoked': + console.log(`${event.capabilityType} invocation ${event.invocationIndex}: '${event.capabilityName}' started`); + break; + case 'completed': + console.log(`${event.capabilityType} invocation ${event.invocationIndex}: '${event.capabilityName}' completed in ${event.durationMs}ms`); + break; + case 'error': + console.log(`${event.capabilityType} invocation ${event.invocationIndex}: '${event.capabilityName}' failed in ${event.durationMs}ms: ${event.error}`); + break; + } + }); + + // Register a simple tool that returns a greeting + server.tool( + 'greet', + 'A simple greeting tool', + { + name: z.string().describe('Name to greet'), + }, + async ({ name }): Promise => { + return { + content: [ + { type: 'text', - text: `Please greet ${name} in a friendly manner.`, + text: `Hello, ${name}!`, }, - }, - ], - }; - } -); - -// Register a tool specifically for testing resumability -server.tool( - 'start-notification-stream', - 'Starts sending periodic notifications for testing resumability', - { - interval: z.number().describe('Interval in milliseconds between notifications').default(100), - count: z.number().describe('Number of notifications to send (0 for 100)').default(50), - }, - async ({ interval, count }, { sendNotification }): Promise => { - const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); - let counter = 0; - - while (count === 0 || counter < count) { - counter++; - try { - await sendNotification({ - method: "notifications/message", - params: { - level: "info", - data: `Periodic notification #${counter} at ${new Date().toISOString()}` - } - }); - } - catch (error) { - console.error("Error sending notification:", error); - } - // Wait for the specified interval - await sleep(interval); + ], + }; } + ); + + // Register a tool that sends multiple greetings with notifications + server.tool( + 'multi-greet', + 'A tool that sends different greetings with delays between them', + { + name: z.string().describe('Name to greet'), + }, + async ({ name }, { sendNotification }): Promise => { + const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); + + await sendNotification({ + method: "notifications/message", + params: { level: "debug", data: `Starting multi-greet for ${name}` } + }); + + await sleep(1000); // Wait 1 second before first greeting + + await sendNotification({ + method: "notifications/message", + params: { level: "info", data: `Sending first greeting to ${name}` } + }); + + await sleep(1000); // Wait another second before second greeting + + await sendNotification({ + method: "notifications/message", + params: { level: "info", data: `Sending second greeting to ${name}` } + }); - return { - content: [ - { - type: 'text', - text: `Started sending periodic notifications every ${interval}ms`, + return { + content: [ + { + type: 'text', + text: `Good morning, ${name}!`, + } + ], + }; + } + ); + + // Register a simple prompt + server.prompt( + 'greeting-template', + 'A simple greeting prompt template', + { + name: z.string().describe('Name to include in greeting'), + }, + async ({ name }): Promise => { + return { + messages: [ + { + role: 'user', + content: { + type: 'text', + text: `Please greet ${name} in a friendly manner.`, + }, + }, + ], + }; + } + ); + + // Register a tool specifically for testing resumability + server.tool( + 'start-notification-stream', + 'Starts sending periodic notifications for testing resumability', + { + interval: z.number().describe('Interval in milliseconds between notifications').default(100), + count: z.number().describe('Number of notifications to send (0 for 100)').default(50), + }, + async ({ interval, count }, { sendNotification }): Promise => { + const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); + let counter = 0; + + while (count === 0 || counter < count) { + counter++; + try { + await sendNotification({ + method: "notifications/message", + params: { + level: "info", + data: `Periodic notification #${counter} at ${new Date().toISOString()}` + } + }); } - ], - }; - } -); - -// Create a simple resource at a fixed URI -server.resource( - 'greeting-resource', - 'https://example.com/greetings/default', - { mimeType: 'text/plain' }, - async (): Promise => { - return { - contents: [ - { - uri: 'https://example.com/greetings/default', - text: 'Hello, world!', - }, - ], - }; - } -); + catch (error) { + console.error("Error sending notification:", error); + } + // Wait for the specified interval + await sleep(interval); + } + + return { + content: [ + { + type: 'text', + text: `Started sending periodic notifications every ${interval}ms`, + } + ], + }; + } + ); + + // Create a simple resource at a fixed URI + server.resource( + 'greeting-resource', + 'https://example.com/greetings/default', + { mimeType: 'text/plain' }, + async (): Promise => { + return { + contents: [ + { + uri: 'https://example.com/greetings/default', + text: 'Hello, world!', + }, + ], + }; + } + ); + return server; +}; const app = express(); app.use(express.json()); @@ -207,6 +210,7 @@ app.post('/mcp', async (req: Request, res: Response) => { // Connect the transport to the MCP server BEFORE handling the request // so responses can flow back through the same transport + const server = getServer(); await server.connect(transport); await transport.handleRequest(req, res, req.body); @@ -303,7 +307,6 @@ process.on('SIGINT', async () => { console.error(`Error closing transport for session ${sessionId}:`, error); } } - await server.close(); console.log('Server shutdown complete'); process.exit(0); }); diff --git a/src/examples/server/sseAndStreamableHttpCompatibleServer.ts b/src/examples/server/sseAndStreamableHttpCompatibleServer.ts index d644dd07..7a0ab04e 100644 --- a/src/examples/server/sseAndStreamableHttpCompatibleServer.ts +++ b/src/examples/server/sseAndStreamableHttpCompatibleServer.ts @@ -18,52 +18,54 @@ import { InMemoryEventStore } from '../shared/inMemoryEventStore.js'; * - /messages: The deprecated POST endpoint for older clients (POST to send messages) */ +const getServer = () => { + const server = new McpServer({ + name: 'backwards-compatible-server', + version: '1.0.0', + }, { capabilities: { logging: {} } }); + + // Register a simple tool that sends notifications over time + server.tool( + 'start-notification-stream', + 'Starts sending periodic notifications for testing resumability', + { + interval: z.number().describe('Interval in milliseconds between notifications').default(100), + count: z.number().describe('Number of notifications to send (0 for 100)').default(50), + }, + async ({ interval, count }, { sendNotification }): Promise => { + const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); + let counter = 0; + + while (count === 0 || counter < count) { + counter++; + try { + await sendNotification({ + method: "notifications/message", + params: { + level: "info", + data: `Periodic notification #${counter} at ${new Date().toISOString()}` + } + }); + } + catch (error) { + console.error("Error sending notification:", error); + } + // Wait for the specified interval + await sleep(interval); + } -const server = new McpServer({ - name: 'backwards-compatible-server', - version: '1.0.0', -}, { capabilities: { logging: {} } }); - -// Register a simple tool that sends notifications over time -server.tool( - 'start-notification-stream', - 'Starts sending periodic notifications for testing resumability', - { - interval: z.number().describe('Interval in milliseconds between notifications').default(100), - count: z.number().describe('Number of notifications to send (0 for 100)').default(50), - }, - async ({ interval, count }, { sendNotification }): Promise => { - const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); - let counter = 0; - - while (count === 0 || counter < count) { - counter++; - try { - await sendNotification({ - method: "notifications/message", - params: { - level: "info", - data: `Periodic notification #${counter} at ${new Date().toISOString()}` + return { + content: [ + { + type: 'text', + text: `Started sending periodic notifications every ${interval}ms`, } - }); - } - catch (error) { - console.error("Error sending notification:", error); - } - // Wait for the specified interval - await sleep(interval); + ], + }; } - - return { - content: [ - { - type: 'text', - text: `Started sending periodic notifications every ${interval}ms`, - } - ], - }; - } -); + ); + return server; +}; // Create Express application const app = express(); @@ -125,6 +127,7 @@ app.all('/mcp', async (req: Request, res: Response) => { }; // Connect the transport to the MCP server + const server = getServer(); await server.connect(transport); } else { // Invalid request - no session ID or not initialization request @@ -167,6 +170,7 @@ app.get('/sse', async (req: Request, res: Response) => { res.on("close", () => { delete transports[transport.sessionId]; }); + const server = getServer(); await server.connect(transport); }); @@ -237,7 +241,6 @@ process.on('SIGINT', async () => { console.error(`Error closing transport for session ${sessionId}:`, error); } } - await server.close(); console.log('Server shutdown complete'); process.exit(0); }); \ No newline at end of file From 6050aa9f0537a965c19d36c443f5ab6a1fceab0c Mon Sep 17 00:00:00 2001 From: ihrpr Date: Mon, 21 Apr 2025 15:31:38 +0100 Subject: [PATCH 8/9] lint --- .../client/multipleClientsParallel.ts | 9 +- .../client/parallelToolCallsClient.ts | 14 +- .../streamableHttpBatchMessages.test.ts | 252 ------------------ src/server/streamableHttp.test.ts | 2 +- 4 files changed, 13 insertions(+), 264 deletions(-) delete mode 100644 src/integration-tests/streamableHttpBatchMessages.test.ts diff --git a/src/examples/client/multipleClientsParallel.ts b/src/examples/client/multipleClientsParallel.ts index 90b9d4ea..cc01fc06 100644 --- a/src/examples/client/multipleClientsParallel.ts +++ b/src/examples/client/multipleClientsParallel.ts @@ -4,6 +4,7 @@ import { CallToolRequest, CallToolResultSchema, LoggingMessageNotificationSchema, + CallToolResult, } from '../../types.js'; /** @@ -23,12 +24,12 @@ interface ClientConfig { id: string; name: string; toolName: string; - toolArguments: Record; + toolArguments: Record; } -async function createAndRunClient(config: ClientConfig): Promise<{ id: string; result: any }> { +async function createAndRunClient(config: ClientConfig): Promise<{ id: string; result: CallToolResult }> { console.log(`[${config.id}] Creating client: ${config.name}`); - + const client = new Client({ name: config.name, version: '1.0.0' @@ -101,7 +102,7 @@ async function main(): Promise { } }, { - id: 'client2', + id: 'client2', name: 'basic-client-2', toolName: 'start-notification-stream', toolArguments: { diff --git a/src/examples/client/parallelToolCallsClient.ts b/src/examples/client/parallelToolCallsClient.ts index 8a53ade9..3783992d 100644 --- a/src/examples/client/parallelToolCallsClient.ts +++ b/src/examples/client/parallelToolCallsClient.ts @@ -3,9 +3,9 @@ import { StreamableHTTPClientTransport } from '../../client/streamableHttp.js'; import { ListToolsRequest, ListToolsResultSchema, - CallToolRequest, CallToolResultSchema, LoggingMessageNotificationSchema, + CallToolResult, } from '../../types.js'; /** @@ -57,11 +57,11 @@ async function main(): Promise { // 2. Start multiple notification tools in parallel console.log('\n=== Starting Multiple Notification Streams in Parallel ==='); const toolResults = await startParallelNotificationTools(client); - + // Log the results from each tool call for (const [caller, result] of Object.entries(toolResults)) { console.log(`\n=== Tool result for ${caller} ===`); - result.content.forEach((item: { type: string; text: any; }) => { + result.content.forEach((item: { type: string; text?: string; }) => { if (item.type === 'text') { console.log(` ${item.text}`); } else { @@ -113,7 +113,7 @@ async function listTools(client: Client): Promise { * Start multiple notification tools in parallel with different configurations * Each tool call includes a caller parameter to identify its notifications */ -async function startParallelNotificationTools(client: Client): Promise> { +async function startParallelNotificationTools(client: Client): Promise> { try { // Define multiple tool calls with different configurations const toolCalls = [ @@ -162,7 +162,7 @@ async function startParallelNotificationTools(client: Client): Promise { console.log(`Starting tool call for ${caller}...`); @@ -176,9 +176,9 @@ async function startParallelNotificationTools(client: Client): Promise = {}; + const resultsByTool: Record = {}; results.forEach(({ caller, result }) => { resultsByTool[caller] = result; }); diff --git a/src/integration-tests/streamableHttpBatchMessages.test.ts b/src/integration-tests/streamableHttpBatchMessages.test.ts deleted file mode 100644 index dc79e3de..00000000 --- a/src/integration-tests/streamableHttpBatchMessages.test.ts +++ /dev/null @@ -1,252 +0,0 @@ -/** - * Integration tests for batch messaging in StreamableHttp transport - * - * This test suite focuses specifically on the requirement from the spec: - * "The body of the POST request MUST be one of the following: - * - A single JSON-RPC request, notification, or response - * - An array batching one or more requests and/or notifications - * - An array batching one or more responses" - */ - -import { createServer, type Server } from "node:http"; -import { AddressInfo } from "node:net"; -import { randomUUID } from "node:crypto"; -import { StreamableHTTPServerTransport } from "../server/streamableHttp.js"; -import { McpServer } from "../server/mcp.js"; -import { Client } from "../client/index.js"; -import { CallToolResult, CallToolResultSchema, JSONRPCMessage } from "../types.js"; -import { z } from "zod"; -import { StreamableHTTPClientTransport } from '../client/streamableHttp.js'; -describe("StreamableHttp Batch Messaging - SSE", () => { - let server: Server; - let client: Client; - - // Just a basic server and client for testing - beforeEach(async () => { - // Create MCP server with test tools - const mcpServer = new McpServer( - { name: "batch-test-server", version: "1.0.0" }, - { capabilities: { logging: {} } } - ); - - // Simple greeting tool - mcpServer.tool( - "greet", - "A simple greeting tool", - { name: z.string().describe("Name to greet") }, - async ({ name }): Promise => { - return { content: [{ type: "text", text: `Hello, ${name}!` }] }; - } - ); - - // Server transport with session management - const serverTransport = new StreamableHTTPServerTransport({ - sessionIdGenerator: () => randomUUID(), - }); - - // Connect transport to server - await mcpServer.connect(serverTransport); - - // Create HTTP server - server = createServer(async (req, res) => { - await serverTransport.handleRequest(req, res); - }); - - // Start server on random port - const baseUrl = await new Promise((resolve) => { - server.listen(0, '127.0.0.1', () => { - const addr = server.address() as AddressInfo; - resolve(new URL(`http://127.0.0.1:${addr.port}`)); - }); - }); - - // Create client that connects to the server - const transport = new StreamableHTTPClientTransport(baseUrl); - client = new Client({ name: "test-client", version: "1.0.0" }); - await client.connect(transport); - - }); - - afterEach(async () => { - await client.close(); - await new Promise((resolve) => server.close(() => resolve())); - }); - - /** - * 1. Test sending a single JSON-RPC request - */ - it("handles a single request", async () => { - const result = await client.callTool({ - name: "greet", - arguments: { - name: "user" - } - }); - - expect(result).toHaveProperty("content"); - expect(Array.isArray(result.content)).toBe(true); - expect(result.content).toHaveLength(1); - - // Type assertion to handle the unknown type - const content = result.content as Array<{ type: string, text: string }>; - expect(content[0]).toHaveProperty("type", "text"); - expect(content[0]).toHaveProperty("text", "Hello, user!"); - - }); - - /** - * 1. Test sending a single JSON-RPC request - */ - it("handles a single request with request", async () => { - const result = await client.request( - { - method: "tools/call", - params: { - name: "greet", - arguments: { - name: "user" - } - }, - }, - CallToolResultSchema, - ); - - - expect(result).toHaveProperty("content"); - expect(Array.isArray(result.content)).toBe(true); - expect(result.content).toHaveLength(1); - - // Type assertion to handle the unknown type - const content = result.content as Array<{ type: string, text: string }>; - expect(content[0]).toHaveProperty("type", "text"); - expect(content[0]).toHaveProperty("text", "Hello, user!"); - - }); - - // it("handles an array batching one or more requests and/or notifications", async () => { - // const result = await client.request({ - - // }); - - // }); - - -}); - -describe("StreamableHttp Batch Messaging - JSON response", () => { - let server: Server; - let client: Client; - - // Just a basic server and client for testing - beforeEach(async () => { - // Create MCP server with test tools - const mcpServer = new McpServer( - { name: "batch-test-server", version: "1.0.0" }, - { capabilities: { logging: {} } } - ); - - // Simple greeting tool - mcpServer.tool( - "greet", - "A simple greeting tool", - { name: z.string().describe("Name to greet") }, - async ({ name }): Promise => { - return { content: [{ type: "text", text: `Hello, ${name}!` }] }; - } - ); - - // Server transport with session management - const serverTransport = new StreamableHTTPServerTransport({ - sessionIdGenerator: () => randomUUID(), - enableJsonResponse: true, - - }); - - // Connect transport to server - await mcpServer.connect(serverTransport); - - // Create HTTP server - server = createServer(async (req, res) => { - await serverTransport.handleRequest(req, res); - }); - - // Start server on random port - const baseUrl = await new Promise((resolve) => { - server.listen(0, '127.0.0.1', () => { - const addr = server.address() as AddressInfo; - resolve(new URL(`http://127.0.0.1:${addr.port}`)); - }); - }); - - // Create client that connects to the server - const transport = new StreamableHTTPClientTransport(baseUrl); - client = new Client({ name: "test-client", version: "1.0.0" }); - await client.connect(transport); - - }); - - afterEach(async () => { - await client.close(); - await new Promise((resolve) => server.close(() => resolve())); - }); - - /** - * 1. Test sending a single JSON-RPC request - */ - it("handles a single request", async () => { - const result = await client.callTool({ - name: "greet", - arguments: { - name: "user" - } - }); - - expect(result).toHaveProperty("content"); - expect(Array.isArray(result.content)).toBe(true); - expect(result.content).toHaveLength(1); - - // Type assertion to handle the unknown type - const content = result.content as Array<{ type: string, text: string }>; - expect(content[0]).toHaveProperty("type", "text"); - expect(content[0]).toHaveProperty("text", "Hello, user!"); - - }); - - /** - * 1. Test sending a single JSON-RPC request - */ - it("handles a single request with request", async () => { - const result = await client.request( - { - method: "tools/call", - params: { - name: "greet", - arguments: { - name: "user" - } - }, - }, - CallToolResultSchema, - ); - - - expect(result).toHaveProperty("content"); - expect(Array.isArray(result.content)).toBe(true); - expect(result.content).toHaveLength(1); - - // Type assertion to handle the unknown type - const content = result.content as Array<{ type: string, text: string }>; - expect(content[0]).toHaveProperty("type", "text"); - expect(content[0]).toHaveProperty("text", "Hello, user!"); - - }); - - // it("handles an array batching one or more requests and/or notifications", async () => { - // const result = await client.request({ - - // }); - - // }); - - -}); \ No newline at end of file diff --git a/src/server/streamableHttp.test.ts b/src/server/streamableHttp.test.ts index 7f9e85d8..a0f2e0bb 100644 --- a/src/server/streamableHttp.test.ts +++ b/src/server/streamableHttp.test.ts @@ -288,7 +288,7 @@ describe("StreamableHTTPServerTransport", () => { }); }); - it.only("should reject requests without a valid session ID", async () => { + it("should reject requests without a valid session ID", async () => { const response = await sendPostRequest(baseUrl, TEST_MESSAGES.toolsList); expect(response.status).toBe(400); From ded0ce3a8d63be72fde26837725455a6cc49bc7a Mon Sep 17 00:00:00 2001 From: ihrpr Date: Tue, 22 Apr 2025 10:45:17 +0100 Subject: [PATCH 9/9] improve readme --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 49ca11d2..e5a283d9 100644 --- a/README.md +++ b/README.md @@ -311,6 +311,10 @@ const app = express(); app.use(express.json()); app.post('/mcp', async (req: Request, res: Response) => { + // In stateless mode, create a new instance of transport and server for each request + // to ensure complete isolation. A single instance would cause request ID collisions + // when multiple clients connect concurrently. + try { const server = getServer(); const transport: StreamableHTTPServerTransport = new StreamableHTTPServerTransport({