-
Notifications
You must be signed in to change notification settings - Fork 0
feat: configure MCP servers in settings page #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,11 +2,18 @@ | |
|
||
import { generateText, Message } from 'ai'; | ||
import { cookies } from 'next/headers'; | ||
import { revalidatePath } from 'next/cache'; | ||
import { auth } from '@/app/(auth)/auth'; | ||
|
||
import { | ||
deleteMessagesByChatIdAfterTimestamp, | ||
getMessageById, | ||
updateChatVisiblityById, | ||
addMcpServer, | ||
getMcpServersByUserId, | ||
getMcpServerByIdAndUserId, | ||
updateMcpServerStatus, | ||
deleteMcpServer, | ||
} from '@/lib/db/queries'; | ||
import { VisibilityType } from '@/components/visibility-selector'; | ||
import { myProvider } from '@/lib/ai/providers'; | ||
|
@@ -52,3 +59,113 @@ export async function updateChatVisibility({ | |
}) { | ||
await updateChatVisiblityById({ chatId, visibility }); | ||
} | ||
|
||
// --- MCP Server Actions --- | ||
|
||
export async function fetchMcpServers() { | ||
const session = await auth(); | ||
if (!session?.user?.id) { | ||
throw new Error('Unauthorized: User not logged in.'); | ||
} | ||
try { | ||
const servers = await getMcpServersByUserId({ userId: session.user.id }); | ||
return servers; | ||
} catch (error) { | ||
console.error('Error fetching MCP servers:', error); | ||
throw new Error('Failed to fetch MCP servers.'); | ||
} | ||
} | ||
|
||
export async function addMcpServerAction({ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Error Handling Inconsistency: Some functions throw errors directly while others return error objects with a |
||
name, | ||
config, | ||
}: { | ||
name: string; | ||
config: Record<string, any>; | ||
}) { | ||
const session = await auth(); | ||
if (!session?.user?.id) { | ||
throw new Error('Unauthorized: User not logged in.'); | ||
} | ||
|
||
if (!name || !config) { | ||
throw new Error('Missing required fields: name and config.'); | ||
} | ||
|
||
try { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Error Handling: The |
||
const newServer = await addMcpServer({ | ||
userId: session.user.id, | ||
name, | ||
config, | ||
}); | ||
revalidatePath('/settings'); // Revalidate the settings page | ||
return { success: true, server: newServer }; | ||
} catch (error) { | ||
console.error('Error adding MCP server:', error); | ||
return { success: false, error: 'Failed to add MCP server.' }; | ||
} | ||
} | ||
|
||
export async function toggleMcpServerAction({ | ||
id, | ||
isEnabled, | ||
}: { | ||
id: string; | ||
isEnabled: boolean; | ||
}) { | ||
const session = await auth(); | ||
if (!session?.user?.id) { | ||
throw new Error('Unauthorized: User not logged in.'); | ||
} | ||
|
||
try { | ||
// Verify ownership before updating | ||
const existingServer = await getMcpServerByIdAndUserId({ | ||
id, | ||
userId: session.user.id, | ||
}); | ||
if (!existingServer) { | ||
throw new Error('Unauthorized: Server not found or not owned by user.'); | ||
} | ||
|
||
const updatedServer = await updateMcpServerStatus({ id, isEnabled }); | ||
revalidatePath('/settings'); // Revalidate the settings page | ||
return { success: true, server: updatedServer }; | ||
} catch (error) { | ||
console.error('Error toggling MCP server status:', error); | ||
// Distinguish between auth errors and DB errors if needed | ||
const errorMessage = error instanceof Error && error.message.startsWith('Unauthorized') | ||
? error.message | ||
: 'Failed to update MCP server status.'; | ||
return { success: false, error: errorMessage }; | ||
|
||
} | ||
} | ||
|
||
export async function deleteMcpServerAction({ id }: { id: string }) { | ||
const session = await auth(); | ||
if (!session?.user?.id) { | ||
throw new Error('Unauthorized: User not logged in.'); | ||
} | ||
|
||
try { | ||
// Verify ownership before deleting | ||
const existingServer = await getMcpServerByIdAndUserId({ | ||
id, | ||
userId: session.user.id, | ||
}); | ||
if (!existingServer) { | ||
throw new Error('Unauthorized: Server not found or not owned by user.'); | ||
} | ||
|
||
await deleteMcpServer({ id }); | ||
revalidatePath('/settings'); // Revalidate the settings page | ||
return { success: true }; | ||
} catch (error) { | ||
console.error('Error deleting MCP server:', error); | ||
const errorMessage = error instanceof Error && error.message.startsWith('Unauthorized') | ||
? error.message | ||
: 'Failed to delete MCP server.'; | ||
return { success: false, error: errorMessage }; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,14 +4,17 @@ import { | |
createDataStreamResponse, | ||
smoothStream, | ||
streamText, | ||
experimental_createMCPClient, | ||
} from 'ai'; | ||
import { Experimental_StdioMCPTransport } from 'ai/mcp-stdio'; | ||
import { auth } from '@/app/(auth)/auth'; | ||
import { systemPrompt } from '@/lib/ai/prompts'; | ||
import { | ||
deleteChatById, | ||
getChatById, | ||
saveChat, | ||
saveMessages, | ||
getEnabledMcpServersByUserId, | ||
} from '@/lib/db/queries'; | ||
import { | ||
generateUUID, | ||
|
@@ -29,6 +32,8 @@ import { myProvider } from '@/lib/ai/providers'; | |
export const maxDuration = 60; | ||
|
||
export async function POST(request: Request) { | ||
let mcpClientsToClose: Awaited<ReturnType<typeof experimental_createMCPClient>>[] = []; | ||
|
||
try { | ||
const { | ||
id, | ||
|
@@ -45,6 +50,7 @@ export async function POST(request: Request) { | |
if (!session || !session.user || !session.user.id) { | ||
return new Response('Unauthorized', { status: 401 }); | ||
} | ||
const userId = session.user.id; | ||
|
||
const userMessage = getMostRecentUserMessage(messages); | ||
|
||
|
@@ -59,9 +65,9 @@ export async function POST(request: Request) { | |
message: userMessage, | ||
}); | ||
|
||
await saveChat({ id, userId: session.user.id, title }); | ||
await saveChat({ id, userId: userId, title }); | ||
} else { | ||
if (chat.userId !== session.user.id) { | ||
if (chat.userId !== userId) { | ||
return new Response('Unauthorized', { status: 401 }); | ||
} | ||
} | ||
|
@@ -80,87 +86,146 @@ export async function POST(request: Request) { | |
}); | ||
|
||
return createDataStreamResponse({ | ||
execute: (dataStream) => { | ||
const result = streamText({ | ||
model: myProvider.languageModel(selectedChatModel), | ||
system: systemPrompt({ selectedChatModel }), | ||
messages, | ||
maxSteps: 5, | ||
experimental_activeTools: | ||
selectedChatModel === 'chat-model-reasoning' | ||
? [] | ||
: [ | ||
'getWeather', | ||
'createDocument', | ||
'updateDocument', | ||
'requestSuggestions', | ||
], | ||
experimental_transform: smoothStream({ chunking: 'word' }), | ||
experimental_generateMessageId: generateUUID, | ||
tools: { | ||
execute: async (dataStream) => { | ||
try { | ||
const staticTools = { | ||
getWeather, | ||
createDocument: createDocument({ session, dataStream }), | ||
updateDocument: updateDocument({ session, dataStream }), | ||
requestSuggestions: requestSuggestions({ | ||
session, | ||
dataStream, | ||
}), | ||
}, | ||
onFinish: async ({ response }) => { | ||
if (session.user?.id) { | ||
}; | ||
let combinedTools: Record<string, any> = { ...staticTools }; | ||
|
||
try { | ||
const enabledServers = await getEnabledMcpServersByUserId({ userId }); | ||
|
||
for (const server of enabledServers) { | ||
try { | ||
const assistantId = getTrailingMessageId({ | ||
messages: response.messages.filter( | ||
(message) => message.role === 'assistant', | ||
), | ||
}); | ||
|
||
if (!assistantId) { | ||
throw new Error('No assistant message found!'); | ||
let transport; | ||
const config = server.config as any; | ||
|
||
if (config.transportType === 'sse') { | ||
transport = { | ||
type: 'sse' as const, | ||
url: config.url, | ||
}; | ||
} else if (config.transportType === 'stdio') { | ||
if (isProductionEnvironment) { | ||
console.warn(`SECURITY WARNING: Initializing MCP client with stdio transport in production for server: ${server.name} (ID: ${server.id})`); | ||
} | ||
transport = new Experimental_StdioMCPTransport({ | ||
command: config.command, | ||
args: config.args || [], | ||
}); | ||
} else { | ||
console.warn(`Unsupported MCP transport type '${config.transportType}' for server ${server.name}`); | ||
continue; | ||
} | ||
|
||
const [, assistantMessage] = appendResponseMessages({ | ||
messages: [userMessage], | ||
responseMessages: response.messages, | ||
}); | ||
|
||
await saveMessages({ | ||
messages: [ | ||
{ | ||
id: assistantId, | ||
chatId: id, | ||
role: assistantMessage.role, | ||
parts: assistantMessage.parts, | ||
attachments: | ||
assistantMessage.experimental_attachments ?? [], | ||
createdAt: new Date(), | ||
}, | ||
], | ||
}); | ||
} catch (_) { | ||
console.error('Failed to save chat'); | ||
const mcpClient = await experimental_createMCPClient({ transport }); | ||
mcpClientsToClose.push(mcpClient); | ||
|
||
const mcpTools = await mcpClient.tools(); | ||
combinedTools = { ...combinedTools, ...mcpTools }; | ||
console.log(`Loaded ${Object.keys(mcpTools).length} tools from MCP server: ${server.name}`); | ||
|
||
} catch (mcpError) { | ||
console.error(`Failed to initialize or get tools from MCP server ${server.name} (ID: ${server.id}):`, mcpError); | ||
} | ||
} | ||
}, | ||
experimental_telemetry: { | ||
isEnabled: isProductionEnvironment, | ||
functionId: 'stream-text', | ||
}, | ||
}); | ||
|
||
result.consumeStream(); | ||
|
||
result.mergeIntoDataStream(dataStream, { | ||
sendReasoning: true, | ||
}); | ||
} catch (dbError) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Error Handling: The |
||
console.error('Failed to fetch enabled MCP servers:', dbError); | ||
} | ||
|
||
const activeToolsList = selectedChatModel === 'chat-model-reasoning' | ||
? [] | ||
: Object.keys(combinedTools); | ||
|
||
const result = streamText({ | ||
model: myProvider.languageModel(selectedChatModel), | ||
system: systemPrompt({ selectedChatModel }), | ||
messages, | ||
maxSteps: 5, | ||
tools: combinedTools, | ||
experimental_activeTools: activeToolsList, | ||
experimental_transform: smoothStream({ chunking: 'word' }), | ||
experimental_generateMessageId: generateUUID, | ||
onFinish: async ({ response }) => { | ||
if (session.user?.id) { | ||
try { | ||
const assistantId = getTrailingMessageId({ | ||
messages: response.messages.filter( | ||
(message) => message.role === 'assistant', | ||
), | ||
}); | ||
|
||
if (!assistantId) { | ||
throw new Error('No assistant message found!'); | ||
} | ||
|
||
const [, assistantMessage] = appendResponseMessages({ | ||
messages: [userMessage], | ||
responseMessages: response.messages, | ||
}); | ||
|
||
await saveMessages({ | ||
messages: [ | ||
{ | ||
id: assistantId, | ||
chatId: id, | ||
role: assistantMessage.role, | ||
parts: assistantMessage.parts, | ||
attachments: | ||
assistantMessage.experimental_attachments ?? [], | ||
createdAt: new Date(), | ||
}, | ||
], | ||
}); | ||
} catch (_) { | ||
console.error('Failed to save chat messages after stream completion'); | ||
} | ||
} | ||
console.log(`Closing ${mcpClientsToClose.length} MCP clients in onFinish...`); | ||
for (const client of mcpClientsToClose) { | ||
try { | ||
await client.close(); | ||
} catch (closeError: unknown) { | ||
console.error('Error closing MCP client in onFinish:', closeError); | ||
} | ||
} | ||
mcpClientsToClose = []; | ||
}, | ||
experimental_telemetry: { | ||
isEnabled: isProductionEnvironment, | ||
functionId: 'stream-text', | ||
}, | ||
}); | ||
|
||
result.consumeStream(); | ||
result.mergeIntoDataStream(dataStream, { sendReasoning: true }); | ||
|
||
} catch(streamError) { | ||
console.error('Error during streamText execution or MCP setup:', streamError); | ||
throw streamError; | ||
} finally { | ||
console.log('Stream execute try/catch finished.'); | ||
} | ||
}, | ||
onError: () => { | ||
return 'Oops, an error occured!'; | ||
onError: (error) => { | ||
console.error('Data stream error:', error); | ||
return 'Oops, an error occured!'; | ||
}, | ||
}); | ||
} catch (error) { | ||
console.error('Error in POST /api/chat route (initial setup):', error); | ||
for (const client of mcpClientsToClose) { | ||
client.close().catch((closeError: unknown) => console.error('Error closing MCP client during outer catch:', closeError)); | ||
} | ||
return new Response('An error occurred while processing your request!', { | ||
status: 404, | ||
status: 500, | ||
}); | ||
} | ||
} | ||
|
@@ -190,6 +255,7 @@ export async function DELETE(request: Request) { | |
|
||
return new Response('Chat deleted', { status: 200 }); | ||
} catch (error) { | ||
console.error('Error deleting chat:', error); | ||
return new Response('An error occurred while processing your request!', { | ||
status: 500, | ||
}); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Duplication: There's repeated error handling logic for unauthorized users across multiple functions. Consider extracting this into a reusable middleware or helper function to reduce duplication.