Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions packages/server/src/server/mcp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,9 @@ export class McpServer {
await this.validateToolOutput(tool, result, request.params.name);
return result;
} catch (error) {
if (error instanceof ProtocolError && error.code === ProtocolErrorCode.UrlElicitationRequired) {
throw error; // Return the error to the caller without wrapping in CallToolResult
if (error instanceof ProtocolError) {
// Protocol errors should be returned as JSON-RPC errors, not wrapped in CallToolResult
throw error;
}
return this.createToolError(error instanceof Error ? error.message : String(error));
}
Expand Down
69 changes: 65 additions & 4 deletions packages/server/src/server/stdio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { process } from '@modelcontextprotocol/server/_shims';
export class StdioServerTransport implements Transport {
private _readBuffer: ReadBuffer = new ReadBuffer();
private _started = false;
private _closed = false;

constructor(
private _stdin: Readable = process.stdin,
Expand All @@ -37,6 +38,17 @@ export class StdioServerTransport implements Transport {
_onerror = (error: Error) => {
this.onerror?.(error);
};
_onstdouterror = (error: Error) => {
// Handle stdout broken pipe when client disconnects.
if ((error as NodeJS.ErrnoException).code === 'EPIPE') {
this.close().catch(() => {
// Ignore errors during close
});
return;
}

this.onerror?.(error);
};

/**
* Starts listening for messages on `stdin`.
Expand All @@ -51,6 +63,7 @@ export class StdioServerTransport implements Transport {
this._started = true;
this._stdin.on('data', this._ondata);
this._stdin.on('error', this._onerror);
this._stdout.on('error', this._onstdouterror);
}

private processReadBuffer() {
Expand All @@ -69,9 +82,15 @@ export class StdioServerTransport implements Transport {
}

async close(): Promise<void> {
if (this._closed) {
return;
}
this._closed = true;

// Remove our event listeners first
this._stdin.off('data', this._ondata);
this._stdin.off('error', this._onerror);
this._stdout.off('error', this._onstdouterror);

// Check if we were the only data listener
const remainingDataListeners = this._stdin.listenerCount('data');
Expand All @@ -87,12 +106,54 @@ export class StdioServerTransport implements Transport {
}

send(message: JSONRPCMessage): Promise<void> {
return new Promise(resolve => {
return new Promise((resolve, reject) => {
const json = serializeMessage(message);
if (this._stdout.write(json)) {
let settled = false;

const cleanup = () => {
this._stdout.off('error', onError);
this._stdout.off('drain', onDrain);
};

const onDrain = () => {
if (settled) {
return;
}
settled = true;
cleanup();
resolve();
} else {
this._stdout.once('drain', resolve);
};

const onError = (error: Error) => {
if (settled) {
return;
}
settled = true;
cleanup();

if ((error as NodeJS.ErrnoException).code === 'EPIPE') {
this.close().catch(() => {
// Ignore errors during close
});
resolve();
return;
}

reject(error);
};

this._stdout.once('error', onError);

try {
if (this._stdout.write(json)) {
settled = true;
cleanup();
resolve();
} else {
this._stdout.once('drain', onDrain);
}
} catch (error) {
onError(error as Error);
}
});
}
Expand Down
51 changes: 51 additions & 0 deletions packages/server/test/server/stdio.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,54 @@ test('should read multiple messages', async () => {
await finished;
expect(readMessages).toEqual(messages);
});

test('should handle EPIPE from stdout error event and close gracefully', async () => {
const epipeOutput = new Writable({
write(_chunk, _encoding, callback) {
callback();
}
});

const server = new StdioServerTransport(input, epipeOutput);

let didClose = false;
server.onclose = () => {
didClose = true;
};

await server.start();

epipeOutput.emit('error', Object.assign(new Error('broken pipe'), { code: 'EPIPE' }));

await new Promise(resolve => setImmediate(resolve));

expect(didClose).toBeTruthy();
});

test('should resolve send and close on EPIPE write failure', async () => {
const epipeWriteOutput = new Writable({
write(_chunk, _encoding, callback) {
callback(Object.assign(new Error('broken pipe'), { code: 'EPIPE' }));
}
});

const server = new StdioServerTransport(input, epipeWriteOutput);

let didClose = false;
server.onclose = () => {
didClose = true;
};

await server.start();

await expect(
server.send({
jsonrpc: '2.0',
method: 'notifications/initialized'
})
).resolves.toBeUndefined();

await new Promise(resolve => setImmediate(resolve));

expect(didClose).toBeTruthy();
});
28 changes: 10 additions & 18 deletions test/integration/test/server/mcp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1837,25 +1837,17 @@ describe('Zod v4', () => {

await Promise.all([client.connect(clientTransport), mcpServer.server.connect(serverTransport)]);

const result = await client.request(
{
method: 'tools/call',
params: {
name: 'nonexistent-tool'
}
},
CallToolResultSchema
);

expect(result.isError).toBe(true);
expect(result.content).toEqual(
expect.arrayContaining([
await expect(
client.request(
{
type: 'text',
text: expect.stringContaining('Tool nonexistent-tool not found')
}
])
);
method: 'tools/call',
params: {
name: 'nonexistent-tool'
}
},
CallToolResultSchema
)
).rejects.toThrow(/Tool nonexistent-tool not found/);
});

/***
Expand Down
Loading