/Practical Examples and Code Implementation

Practical Examples and Code Implementation

import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';

class FileSystemMCPServer {
  constructor() {
    this.server = new Server(
      { name: 'filesystem-server', version: '1.0.0' },
      { capabilities: { tools: {} } }
    );
    this.setupTools();
  }

  setupTools() {
    // Register file reading tool
    this.server.setRequestHandler('tools/list', async () => ({
      tools: [
        {
          name: 'read_file',
          description: 'Read contents of a file',
          inputSchema: {
            type: 'object',
            properties: {
              path: { type: 'string', description: 'File path to read' }
            },
            required: ['path']
          }
        },
        {
          name: 'write_file',
          description: 'Write content to a file',
          inputSchema: {
            type: 'object',
            properties: {
              path: { type: 'string', description: 'File path to write' },
              content: { type: 'string', description: 'Content to write' }
            },
            required: ['path', 'content']
          }
        }
      ]
    }));

    // Handle tool execution
    this.server.setRequestHandler('tools/call', async (request) => {
      const { name, arguments: args } = request.params;
      switch (name) {
        case 'read_file':
          return await this.readFile(args.path);
        case 'write_file':
          return await this.writeFile(args.path, args.content);
        default:
          throw new Error(`Unknown tool: ${name}`);
      }
    });
  }
  async readFile(path) {
    try {
      const fs = await import('fs/promises');
      const content = await fs.readFile(path, 'utf-8');
      return {
        content: [{
          type: 'text',
          text: `File content from ${path}:\n${content}`
        }]
      };
    } catch (error) {
      return {
        content: [{
          type: 'text',
          text: `Error reading file: ${error.message}`
        }],
        isError: true
      };
    }
  }
  async writeFile(path, content) {
    try {
      const fs = await import('fs/promises');
      await fs.writeFile(path, content, 'utf-8');
      return {
        content: [{
          type: 'text',
          text: `Successfully wrote content to ${path}`
        }]
      };
    } catch (error) {
      return {
        content: [{
          type: 'text',
          text: `Error writing file: ${error.message}`
        }],
        isError: true
      };
    }
  }
  async start() {
    const transport = new StdioServerTransport();
    await this.server.connect(transport);
  }
}

// Start the server
const server = new FileSystemMCPServer();
server.start().catch(console.error);
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';
import { spawn } from 'child_process';

class MCPFileSystemClient {
  constructor() {
    this.client = new Client(
      { name: 'filesystem-client', version: '1.0.0' },
      { capabilities: {} }
    );
  }

  async connect() {
    // Start the MCP server process
    const serverProcess = spawn('node', ['filesystem-server.js'], {
      stdio: ['pipe', 'pipe', 'inherit']
    });

    const transport = new StdioClientTransport({
      stdin: serverProcess.stdin,
      stdout: serverProcess.stdout
    });

    await this.client.connect(transport);
  }

  async listAvailableTools() {
    const response = await this.client.request(
      { method: 'tools/list' },
      {}
    );
    return response.tools;
  }

  async readFile(path) {
    const response = await this.client.request(
      { method: 'tools/call' },
      {
        name: 'read_file',
        arguments: { path }
      }
    );
    return response;
  }

  async writeFile(path, content) {
    const response = await this.client.request(
      { method: 'tools/call' },
      {
        name: 'write_file',
        arguments: { path, content }
      }
    );
    return response;
  }
}

// Usage example
async function demonstrateFileSystemMCP() {
  const client = new MCPFileSystemClient();
  await client.connect();
  const tools = await client.listAvailableTools();
  console.log('Available tools:', tools);
  await client.writeFile('./test.txt', 'Hello from MCP!');
  const content = await client.readFile('./test.txt');
  console.log('File content:', content);
}

demonstrateFileSystemMCP().catch(console.error);
from langchain.tools import BaseTool
from langchain.agents import initialize_agent, AgentType
from langchain.llms import OpenAI
from typing import Optional, Type
from pydantic import BaseModel, Field
import asyncio

class MCPToolInput(BaseModel):
    """Input schema for MCP tool"""
    tool_name: str = Field(description="Name of the MCP tool to call")
    arguments: dict = Field(description="Arguments to pass to the MCP tool")

class MCPTool(BaseTool):
    """LangChain tool that interfaces with MCP servers"""
    name = "mcp_tool"
    description = "Execute tools through Model Context Protocol servers"
    args_schema: Type[BaseModel] = MCPToolInput

    def __init__(self, mcp_client):
        super().__init__()
        self.mcp_client = mcp_client

    def _run(self, tool_name: str, arguments: dict) -> str:
        return asyncio.run(self._arun(tool_name, arguments))

    async def _arun(self, tool_name: str, arguments: dict) -> str:
        try:
            result = await self.mcp_client.session.call_tool(
                name=tool_name,
                arguments=arguments
            )
            if result.content:
                return "\n".join([content.text for content in result.content if hasattr(content, 'text')])
            else:
                return "Tool executed successfully but returned no content"
        except Exception as e:
            return f"Error executing MCP tool: {str(e)}"

class MCPLangChainIntegration:
    """Integration class for using MCP with LangChain"""
    def __init__(self, llm, mcp_client):
        self.llm = llm
        self.mcp_client = mcp_client
        self.tools = []

    async def setup_tools(self):
        mcp_tools = await self.mcp_client.list_tools()
        for mcp_tool in mcp_tools:
            langchain_tool = self.create_langchain_tool(mcp_tool)
            self.tools.append(langchain_tool)

    def create_langchain_tool(self, mcp_tool):
        class DynamicMCPTool(BaseTool):
            name = mcp_tool.name
            description = mcp_tool.description
            def __init__(self, mcp_client, tool_name):
                super().__init__()
                self.mcp_client = mcp_client
                self.tool_name = tool_name
            def _run(self, **kwargs) -> str:
                return asyncio.run(self._arun(**kwargs))
            async def _arun(self, **kwargs) -> str:
                try:
                    result = await self.mcp_client.session.call_tool(
                        name=self.tool_name,
                        arguments=kwargs
                    )
                    if result.content:
                        return "\n".join([content.text for content in result.content if hasattr(content, 'text')])
                    else:
                        return "Tool executed successfully"
                except Exception as e:
                    return f"Error: {str(e)}"
        return DynamicMCPTool(self.mcp_client, mcp_tool.name)

    def create_agent(self):
        return initialize_agent(
            tools=self.tools,
            llm=self.llm,
            agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
            verbose=True
        )

async def demonstrate_langchain_mcp_integration():
    llm = OpenAI(temperature=0)
    mcp_client = MCPDatabaseClient()  # From previous example
    await mcp_client.connect()
    integration = MCPLangChainIntegration(llm, mcp_client)
    await integration.setup_tools()
    agent = integration.create_agent()
    result = agent.run("Get the database schema and then execute a query to find all active users")
    print("Agent result:", result)

if __name__ == "__main__":
    asyncio.run(demonstrate_langchain_mcp_integration())
class ResilientMCPClient {
  constructor(maxRetries = 3, retryDelay = 1000) {
    this.maxRetries = maxRetries;
    this.retryDelay = retryDelay;
    this.client = new Client(
      { name: 'resilient-client', version: '1.0.0' },
      { capabilities: {} }
    );
  }

  async callToolWithRetry(toolName, arguments) {
    let lastError;
    for (let attempt = 1; attempt <= this.maxRetries; attempt++) {
      try {
        const response = await this.client.request(
          { method: 'tools/call' },
          { name: toolName, arguments }
        );
        this.retryDelay = 1000;
        return response;
      } catch (error) {
        lastError = error;
        if (attempt < this.maxRetries) {
          console.log(`Attempt ${attempt} failed, retrying in ${this.retryDelay}ms...`);
          await this.sleep(this.retryDelay);
          this.retryDelay *= 2;
        }
      }
    }
    throw new Error(`Tool call failed after ${this.maxRetries} attempts: ${lastError.message}`);
  }

  sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}
from dataclasses import dataclass
from typing import Dict, List, Optional
import json
import os

@dataclass
class MCPServerConfig:
    name: str
    command: List[str]
    args: List[str] = None
    env: Dict[str, str] = None
    working_directory: Optional[str] = None

@dataclass
class MCPConfig:
    servers: Dict[str, MCPServerConfig]
    global_timeout: int = 30
    max_retries: int = 3
    @classmethod
    def from_file(cls, config_path: str) -> 'MCPConfig':
        with open(config_path, 'r') as f:
            config_data = json.load(f)
        servers = {}
        for name, server_config in config_data.get('servers', {}).items():
            servers[name] = MCPServerConfig(
                name=name,
                command=server_config['command'],
                args=server_config.get('args', []),
                env=server_config.get('env', {}),
                working_directory=server_config.get('working_directory')
            )
        return cls(
            servers=servers,
            global_timeout=config_data.get('global_timeout', 30),
            max_retries=config_data.get('max_retries', 3)
        )

# Example configuration file (config.json)
example_config = {
    "servers": {
        "filesystem": {
            "command": ["python", "filesystem_server.py"],
            "args": ["--root", "/allowed/path"],
            "env": {"LOG_LEVEL": "INFO"}
        },
        "database": {
            "command": ["node", "database_server.js"],
            "args": ["--db", "production.db"],
            "working_directory": "/app/servers"
        }
    },
    "global_timeout": 45,
    "max_retries": 5
}
  1. Authentication and Authorization
    • Implement proper authentication mechanisms for MCP servers
    • Use token-based authentication for remote connections
    • Implement role-based access control (RBAC) for tool access
  2. Input Validation
    • Validate all inputs according to defined schemas
    • Sanitize file paths and database queries
    • Implement rate limiting to prevent abuse
  3. Error Handling
    • Never expose sensitive information in error messages
    • Log security events for monitoring
    • Implement circuit breakers for failing services
  4. Network Security
    • Use TLS for all remote connections
    • Implement proper firewall rules
    • Consider VPN or private network access for sensitive integrations
  1. Connection Pooling
    • Reuse connections when possible
    • Implement connection timeouts
    • Monitor connection health
  2. Caching Strategies
    • Cache frequently accessed data
    • Implement cache invalidation policies
    • Use appropriate cache TTL values
  3. Monitoring and Observability
    • Implement comprehensive logging
    • Monitor performance metrics
    • Set up alerts for failures and performance degradation