Skip to content

Commit 59ba162

Browse files
authored
feat(MCP Server Trigger Node): Handle multiple tool calls in mcp server trigger (#15064)
1 parent b373871 commit 59ba162

File tree

4 files changed

+153
-63
lines changed

4 files changed

+153
-63
lines changed

packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/McpServer.ts

Lines changed: 91 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
import type { Tool } from '@langchain/core/tools';
22
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
33
import type { RequestHandlerExtra } from '@modelcontextprotocol/sdk/shared/protocol.js';
4-
import type { JSONRPCMessage } from '@modelcontextprotocol/sdk/types.js';
4+
import type {
5+
JSONRPCMessage,
6+
ServerRequest,
7+
ServerNotification,
8+
} from '@modelcontextprotocol/sdk/types.js';
59
import {
610
JSONRPCMessageSchema,
711
ListToolsRequestSchema,
@@ -33,6 +37,20 @@ function wasToolCall(body: string) {
3337
}
3438
}
3539

40+
/**
41+
* Extracts the request ID from a JSONRPC message
42+
* Returns undefined if the message doesn't have an ID or can't be parsed
43+
*/
44+
function getRequestId(body: string): string | undefined {
45+
try {
46+
const message: unknown = JSON.parse(body);
47+
const parsedMessage: JSONRPCMessage = JSONRPCMessageSchema.parse(message);
48+
return 'id' in parsedMessage ? String(parsedMessage.id) : undefined;
49+
} catch {
50+
return undefined;
51+
}
52+
}
53+
3654
export class McpServer {
3755
servers: { [sessionId: string]: Server } = {};
3856

@@ -42,7 +60,7 @@ export class McpServer {
4260

4361
private tools: { [sessionId: string]: Tool[] } = {};
4462

45-
private resolveFunctions: { [sessionId: string]: CallableFunction } = {};
63+
private resolveFunctions: { [callId: string]: CallableFunction } = {};
4664

4765
constructor(logger: Logger) {
4866
this.logger = logger;
@@ -59,7 +77,6 @@ export class McpServer {
5977
resp.on('close', async () => {
6078
this.logger.debug(`Deleting transport for ${sessionId}`);
6179
delete this.tools[sessionId];
62-
delete this.resolveFunctions[sessionId];
6380
delete this.transports[sessionId];
6481
delete this.servers[sessionId];
6582
});
@@ -75,16 +92,25 @@ export class McpServer {
7592
async handlePostMessage(req: express.Request, resp: CompressionResponse, connectedTools: Tool[]) {
7693
const sessionId = req.query.sessionId as string;
7794
const transport = this.transports[sessionId];
78-
this.tools[sessionId] = connectedTools;
7995
if (transport) {
8096
// We need to add a promise here because the `handlePostMessage` will send something to the
8197
// MCP Server, that will run in a different context. This means that the return will happen
8298
// almost immediately, and will lead to marking the sub-node as "running" in the final execution
83-
await new Promise(async (resolve) => {
84-
this.resolveFunctions[sessionId] = resolve;
85-
await transport.handlePostMessage(req, resp, req.rawBody.toString());
86-
});
87-
delete this.resolveFunctions[sessionId];
99+
const bodyString = req.rawBody.toString();
100+
const messageId = getRequestId(bodyString);
101+
102+
// Use session & message ID if available, otherwise fall back to sessionId
103+
const callId = messageId ? `${sessionId}_${messageId}` : sessionId;
104+
this.tools[sessionId] = connectedTools;
105+
106+
try {
107+
await new Promise(async (resolve) => {
108+
this.resolveFunctions[callId] = resolve;
109+
await transport.handlePostMessage(req, resp, bodyString);
110+
});
111+
} finally {
112+
delete this.resolveFunctions[callId];
113+
}
88114
} else {
89115
this.logger.warn(`No transport found for session ${sessionId}`);
90116
resp.status(401).send('No transport found for sessionId');
@@ -94,8 +120,6 @@ export class McpServer {
94120
resp.flush();
95121
}
96122

97-
delete this.tools[sessionId]; // Clean up to avoid keeping all tools in memory
98-
99123
return wasToolCall(req.rawBody.toString());
100124
}
101125

@@ -110,57 +134,68 @@ export class McpServer {
110134
},
111135
);
112136

113-
server.setRequestHandler(ListToolsRequestSchema, async (_, extra: RequestHandlerExtra) => {
114-
if (!extra.sessionId) {
115-
throw new OperationalError('Require a sessionId for the listing of tools');
116-
}
117-
118-
return {
119-
tools: this.tools[extra.sessionId].map((tool) => {
120-
return {
121-
name: tool.name,
122-
description: tool.description,
123-
// Allow additional properties on tool call input
124-
inputSchema: zodToJsonSchema(tool.schema, { removeAdditionalStrategy: 'strict' }),
125-
};
126-
}),
127-
};
128-
});
129-
130-
server.setRequestHandler(CallToolRequestSchema, async (request, extra: RequestHandlerExtra) => {
131-
if (!request.params?.name || !request.params?.arguments) {
132-
throw new OperationalError('Require a name and arguments for the tool call');
133-
}
134-
if (!extra.sessionId) {
135-
throw new OperationalError('Require a sessionId for the tool call');
136-
}
137-
138-
const requestedTool: Tool | undefined = this.tools[extra.sessionId].find(
139-
(tool) => tool.name === request.params.name,
140-
);
141-
if (!requestedTool) {
142-
throw new OperationalError('Tool not found');
143-
}
137+
server.setRequestHandler(
138+
ListToolsRequestSchema,
139+
async (_, extra: RequestHandlerExtra<ServerRequest, ServerNotification>) => {
140+
if (!extra.sessionId) {
141+
throw new OperationalError('Require a sessionId for the listing of tools');
142+
}
144143

145-
try {
146-
const result = await requestedTool.invoke(request.params.arguments);
144+
return {
145+
tools: this.tools[extra.sessionId].map((tool) => {
146+
return {
147+
name: tool.name,
148+
description: tool.description,
149+
// Allow additional properties on tool call input
150+
inputSchema: zodToJsonSchema(tool.schema, { removeAdditionalStrategy: 'strict' }),
151+
};
152+
}),
153+
};
154+
},
155+
);
147156

148-
this.resolveFunctions[extra.sessionId]();
157+
server.setRequestHandler(
158+
CallToolRequestSchema,
159+
async (request, extra: RequestHandlerExtra<ServerRequest, ServerNotification>) => {
160+
if (!request.params?.name || !request.params?.arguments) {
161+
throw new OperationalError('Require a name and arguments for the tool call');
162+
}
163+
if (!extra.sessionId) {
164+
throw new OperationalError('Require a sessionId for the tool call');
165+
}
149166

150-
this.logger.debug(`Got request for ${requestedTool.name}, and executed it.`);
167+
const callId = extra.requestId ? `${extra.sessionId}_${extra.requestId}` : extra.sessionId;
151168

152-
if (typeof result === 'object') {
153-
return { content: [{ type: 'text', text: JSON.stringify(result) }] };
169+
const requestedTool: Tool | undefined = this.tools[extra.sessionId].find(
170+
(tool) => tool.name === request.params.name,
171+
);
172+
if (!requestedTool) {
173+
throw new OperationalError('Tool not found');
154174
}
155-
if (typeof result === 'string') {
156-
return { content: [{ type: 'text', text: result }] };
175+
176+
try {
177+
const result = await requestedTool.invoke(request.params.arguments);
178+
if (this.resolveFunctions[callId]) {
179+
this.resolveFunctions[callId]();
180+
} else {
181+
this.logger.warn(`No resolve function found for ${callId}`);
182+
}
183+
184+
this.logger.debug(`Got request for ${requestedTool.name}, and executed it.`);
185+
186+
if (typeof result === 'object') {
187+
return { content: [{ type: 'text', text: JSON.stringify(result) }] };
188+
}
189+
if (typeof result === 'string') {
190+
return { content: [{ type: 'text', text: result }] };
191+
}
192+
return { content: [{ type: 'text', text: String(result) }] };
193+
} catch (error) {
194+
this.logger.error(`Error while executing Tool ${requestedTool.name}: ${error}`);
195+
return { isError: true, content: [{ type: 'text', text: `Error: ${error.message}` }] };
157196
}
158-
return { content: [{ type: 'text', text: String(result) }] };
159-
} catch (error) {
160-
this.logger.error(`Error while executing Tool ${requestedTool.name}: ${error}`);
161-
return { isError: true, content: [{ type: 'text', text: `Error: ${error.message}` }] };
162-
}
163-
});
197+
},
198+
);
164199

165200
server.onclose = () => {
166201
this.logger.debug('Closing MCP Server');

packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/__test__/McpServer.test.ts

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ describe('McpServer', () => {
7878
it('should call transport.handlePostMessage when transport exists', async () => {
7979
mockTransport.handlePostMessage.mockImplementation(async () => {
8080
// @ts-expect-error private property `resolveFunctions`
81-
mcpServer.resolveFunctions[sessionId]();
81+
mcpServer.resolveFunctions[`${sessionId}_123`]();
8282
});
8383

8484
// Add the transport directly
@@ -110,6 +110,61 @@ describe('McpServer', () => {
110110
expect(mockResponse.flush).toHaveBeenCalled();
111111
});
112112

113+
it('should handle multiple tool calls with different ids', async () => {
114+
const firstId = 123;
115+
const secondId = 456;
116+
117+
mockTransport.handlePostMessage.mockImplementation(async () => {
118+
const requestKey = mockRequest.rawBody?.toString().includes(`"id":${firstId}`)
119+
? `${sessionId}_${firstId}`
120+
: `${sessionId}_${secondId}`;
121+
// @ts-expect-error private property `resolveFunctions`
122+
mcpServer.resolveFunctions[requestKey]();
123+
});
124+
125+
// Add the transport directly
126+
mcpServer.transports[sessionId] = mockTransport;
127+
128+
// First tool call
129+
mockRequest.rawBody = Buffer.from(
130+
JSON.stringify({
131+
jsonrpc: '2.0',
132+
method: 'tools/call',
133+
id: firstId,
134+
params: { name: 'mockTool', arguments: { param: 'first call' } },
135+
}),
136+
);
137+
138+
// Handle first tool call
139+
const firstResult = await mcpServer.handlePostMessage(mockRequest, mockResponse, [mockTool]);
140+
expect(firstResult).toBe(true);
141+
expect(mockTransport.handlePostMessage).toHaveBeenCalledWith(
142+
mockRequest,
143+
mockResponse,
144+
expect.any(String),
145+
);
146+
147+
// Second tool call with different id
148+
mockRequest.rawBody = Buffer.from(
149+
JSON.stringify({
150+
jsonrpc: '2.0',
151+
method: 'tools/call',
152+
id: secondId,
153+
params: { name: 'mockTool', arguments: { param: 'second call' } },
154+
}),
155+
);
156+
157+
// Handle second tool call
158+
const secondResult = await mcpServer.handlePostMessage(mockRequest, mockResponse, [mockTool]);
159+
expect(secondResult).toBe(true);
160+
161+
// Verify transport's handlePostMessage was called twice
162+
expect(mockTransport.handlePostMessage).toHaveBeenCalledTimes(2);
163+
164+
// Verify flush was called for both requests
165+
expect(mockResponse.flush).toHaveBeenCalledTimes(2);
166+
});
167+
113168
it('should return 401 when transport does not exist', async () => {
114169
// Call without setting up transport
115170
await mcpServer.handlePostMessage(mockRequest, mockResponse, [mockTool]);

packages/@n8n/nodes-langchain/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@
173173
"@langchain/qdrant": "0.1.1",
174174
"@langchain/redis": "0.1.0",
175175
"@langchain/textsplitters": "0.1.0",
176-
"@modelcontextprotocol/sdk": "1.9.0",
176+
"@modelcontextprotocol/sdk": "1.11.0",
177177
"@mozilla/readability": "0.6.0",
178178
"@n8n/client-oauth2": "workspace:*",
179179
"@n8n/json-schema-to-zod": "workspace:*",

pnpm-lock.yaml

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)