Practical Examples and Code Implementation
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
class FileSystemMCPServer {
constructor() {
this.server = new Server(
{ name: 'filesystem-server', version: '1.0.0' },
{ capabilities: { tools: {} } }
);
this.setupTools();
}
setupTools() {
// Register file reading tool
this.server.setRequestHandler('tools/list', async () => ({
tools: [
{
name: 'read_file',
description: 'Read contents of a file',
inputSchema: {
type: 'object',
properties: {
path: { type: 'string', description: 'File path to read' }
},
required: ['path']
}
},
{
name: 'write_file',
description: 'Write content to a file',
inputSchema: {
type: 'object',
properties: {
path: { type: 'string', description: 'File path to write' },
content: { type: 'string', description: 'Content to write' }
},
required: ['path', 'content']
}
}
]
}));
// Handle tool execution
this.server.setRequestHandler('tools/call', async (request) => {
const { name, arguments: args } = request.params;
switch (name) {
case 'read_file':
return await this.readFile(args.path);
case 'write_file':
return await this.writeFile(args.path, args.content);
default:
throw new Error(`Unknown tool: ${name}`);
}
});
}
async readFile(path) {
try {
const fs = await import('fs/promises');
const content = await fs.readFile(path, 'utf-8');
return {
content: [{
type: 'text',
text: `File content from ${path}:\n${content}`
}]
};
} catch (error) {
return {
content: [{
type: 'text',
text: `Error reading file: ${error.message}`
}],
isError: true
};
}
}
async writeFile(path, content) {
try {
const fs = await import('fs/promises');
await fs.writeFile(path, content, 'utf-8');
return {
content: [{
type: 'text',
text: `Successfully wrote content to ${path}`
}]
};
} catch (error) {
return {
content: [{
type: 'text',
text: `Error writing file: ${error.message}`
}],
isError: true
};
}
}
async start() {
const transport = new StdioServerTransport();
await this.server.connect(transport);
}
}
// Start the server
const server = new FileSystemMCPServer();
server.start().catch(console.error);
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';
import { spawn } from 'child_process';
class MCPFileSystemClient {
constructor() {
this.client = new Client(
{ name: 'filesystem-client', version: '1.0.0' },
{ capabilities: {} }
);
}
async connect() {
// Start the MCP server process
const serverProcess = spawn('node', ['filesystem-server.js'], {
stdio: ['pipe', 'pipe', 'inherit']
});
const transport = new StdioClientTransport({
stdin: serverProcess.stdin,
stdout: serverProcess.stdout
});
await this.client.connect(transport);
}
async listAvailableTools() {
const response = await this.client.request(
{ method: 'tools/list' },
{}
);
return response.tools;
}
async readFile(path) {
const response = await this.client.request(
{ method: 'tools/call' },
{
name: 'read_file',
arguments: { path }
}
);
return response;
}
async writeFile(path, content) {
const response = await this.client.request(
{ method: 'tools/call' },
{
name: 'write_file',
arguments: { path, content }
}
);
return response;
}
}
// Usage example
async function demonstrateFileSystemMCP() {
const client = new MCPFileSystemClient();
await client.connect();
const tools = await client.listAvailableTools();
console.log('Available tools:', tools);
await client.writeFile('./test.txt', 'Hello from MCP!');
const content = await client.readFile('./test.txt');
console.log('File content:', content);
}
demonstrateFileSystemMCP().catch(console.error);
from langchain.tools import BaseTool
from langchain.agents import initialize_agent, AgentType
from langchain.llms import OpenAI
from typing import Optional, Type
from pydantic import BaseModel, Field
import asyncio
class MCPToolInput(BaseModel):
"""Input schema for MCP tool"""
tool_name: str = Field(description="Name of the MCP tool to call")
arguments: dict = Field(description="Arguments to pass to the MCP tool")
class MCPTool(BaseTool):
"""LangChain tool that interfaces with MCP servers"""
name = "mcp_tool"
description = "Execute tools through Model Context Protocol servers"
args_schema: Type[BaseModel] = MCPToolInput
def __init__(self, mcp_client):
super().__init__()
self.mcp_client = mcp_client
def _run(self, tool_name: str, arguments: dict) -> str:
return asyncio.run(self._arun(tool_name, arguments))
async def _arun(self, tool_name: str, arguments: dict) -> str:
try:
result = await self.mcp_client.session.call_tool(
name=tool_name,
arguments=arguments
)
if result.content:
return "\n".join([content.text for content in result.content if hasattr(content, 'text')])
else:
return "Tool executed successfully but returned no content"
except Exception as e:
return f"Error executing MCP tool: {str(e)}"
class MCPLangChainIntegration:
"""Integration class for using MCP with LangChain"""
def __init__(self, llm, mcp_client):
self.llm = llm
self.mcp_client = mcp_client
self.tools = []
async def setup_tools(self):
mcp_tools = await self.mcp_client.list_tools()
for mcp_tool in mcp_tools:
langchain_tool = self.create_langchain_tool(mcp_tool)
self.tools.append(langchain_tool)
def create_langchain_tool(self, mcp_tool):
class DynamicMCPTool(BaseTool):
name = mcp_tool.name
description = mcp_tool.description
def __init__(self, mcp_client, tool_name):
super().__init__()
self.mcp_client = mcp_client
self.tool_name = tool_name
def _run(self, **kwargs) -> str:
return asyncio.run(self._arun(**kwargs))
async def _arun(self, **kwargs) -> str:
try:
result = await self.mcp_client.session.call_tool(
name=self.tool_name,
arguments=kwargs
)
if result.content:
return "\n".join([content.text for content in result.content if hasattr(content, 'text')])
else:
return "Tool executed successfully"
except Exception as e:
return f"Error: {str(e)}"
return DynamicMCPTool(self.mcp_client, mcp_tool.name)
def create_agent(self):
return initialize_agent(
tools=self.tools,
llm=self.llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=True
)
async def demonstrate_langchain_mcp_integration():
llm = OpenAI(temperature=0)
mcp_client = MCPDatabaseClient() # From previous example
await mcp_client.connect()
integration = MCPLangChainIntegration(llm, mcp_client)
await integration.setup_tools()
agent = integration.create_agent()
result = agent.run("Get the database schema and then execute a query to find all active users")
print("Agent result:", result)
if __name__ == "__main__":
asyncio.run(demonstrate_langchain_mcp_integration())
class ResilientMCPClient {
constructor(maxRetries = 3, retryDelay = 1000) {
this.maxRetries = maxRetries;
this.retryDelay = retryDelay;
this.client = new Client(
{ name: 'resilient-client', version: '1.0.0' },
{ capabilities: {} }
);
}
async callToolWithRetry(toolName, arguments) {
let lastError;
for (let attempt = 1; attempt <= this.maxRetries; attempt++) {
try {
const response = await this.client.request(
{ method: 'tools/call' },
{ name: toolName, arguments }
);
this.retryDelay = 1000;
return response;
} catch (error) {
lastError = error;
if (attempt < this.maxRetries) {
console.log(`Attempt ${attempt} failed, retrying in ${this.retryDelay}ms...`);
await this.sleep(this.retryDelay);
this.retryDelay *= 2;
}
}
}
throw new Error(`Tool call failed after ${this.maxRetries} attempts: ${lastError.message}`);
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
from dataclasses import dataclass
from typing import Dict, List, Optional
import json
import os
@dataclass
class MCPServerConfig:
name: str
command: List[str]
args: List[str] = None
env: Dict[str, str] = None
working_directory: Optional[str] = None
@dataclass
class MCPConfig:
servers: Dict[str, MCPServerConfig]
global_timeout: int = 30
max_retries: int = 3
@classmethod
def from_file(cls, config_path: str) -> 'MCPConfig':
with open(config_path, 'r') as f:
config_data = json.load(f)
servers = {}
for name, server_config in config_data.get('servers', {}).items():
servers[name] = MCPServerConfig(
name=name,
command=server_config['command'],
args=server_config.get('args', []),
env=server_config.get('env', {}),
working_directory=server_config.get('working_directory')
)
return cls(
servers=servers,
global_timeout=config_data.get('global_timeout', 30),
max_retries=config_data.get('max_retries', 3)
)
# Example configuration file (config.json)
example_config = {
"servers": {
"filesystem": {
"command": ["python", "filesystem_server.py"],
"args": ["--root", "/allowed/path"],
"env": {"LOG_LEVEL": "INFO"}
},
"database": {
"command": ["node", "database_server.js"],
"args": ["--db", "production.db"],
"working_directory": "/app/servers"
}
},
"global_timeout": 45,
"max_retries": 5
}
- Authentication and Authorization
- Implement proper authentication mechanisms for MCP servers
- Use token-based authentication for remote connections
- Implement role-based access control (RBAC) for tool access
- Input Validation
- Validate all inputs according to defined schemas
- Sanitize file paths and database queries
- Implement rate limiting to prevent abuse
- Error Handling
- Never expose sensitive information in error messages
- Log security events for monitoring
- Implement circuit breakers for failing services
- Network Security
- Use TLS for all remote connections
- Implement proper firewall rules
- Consider VPN or private network access for sensitive integrations
- Connection Pooling
- Reuse connections when possible
- Implement connection timeouts
- Monitor connection health
- Caching Strategies
- Cache frequently accessed data
- Implement cache invalidation policies
- Use appropriate cache TTL values
- Monitoring and Observability
- Implement comprehensive logging
- Monitor performance metrics
- Set up alerts for failures and performance degradation