/Practical Examples and Code Implementation

Practical Examples and Code Implementation

import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';

class FileSystemMCPServer {
  constructor() {
    this.server = new Server(
      { name: 'filesystem-server', version: '1.0.0' },
      { capabilities: { tools: {} } }
    );
    this.setupTools();
  }

  setupTools() {
    // Register file reading tool
    this.server.setRequestHandler('tools/list', async () => ({
      tools: [
        {
          name: 'read_file',
          description: 'Read contents of a file',
          inputSchema: {
            type: 'object',
            properties: {
              path: { type: 'string', description: 'File path to read' }
            },
            required: ['path']
          }
        },
        {
          name: 'write_file',
          description: 'Write content to a file',
          inputSchema: {
            type: 'object',
            properties: {
              path: { type: 'string', description: 'File path to write' },
              content: { type: 'string', description: 'Content to write' }
            },
            required: ['path', 'content']
          }
        }
      ]
    }));

    // Handle tool execution
    this.server.setRequestHandler('tools/call', async (request) => {
      const { name, arguments: args } = request.params;
      switch (name) {
        case 'read_file':
          return await this.readFile(args.path);
        case 'write_file':
          return await this.writeFile(args.path, args.content);
        default:
          throw new Error(`Unknown tool: ${name}`);
      }
    });
  }
  async readFile(path) {
    try {
      const fs = await import('fs/promises');
      const content = await fs.readFile(path, 'utf-8');
      return {
        content: [{
          type: 'text',
          text: `File content from ${path}:\n${content}`
        }]
      };
    } catch (error) {
      return {
        content: [{
          type: 'text',
          text: `Error reading file: ${error.message}`
        }],
        isError: true
      };
    }
  }
  async writeFile(path, content) {
    try {
      const fs = await import('fs/promises');
      await fs.writeFile(path, content, 'utf-8');
      return {
        content: [{
          type: 'text',
          text: `Successfully wrote content to ${path}`
        }]
      };
    } catch (error) {
      return {
        content: [{
          type: 'text',
          text: `Error writing file: ${error.message}`
        }],
        isError: true
      };
    }
  }
  async start() {
    const transport = new StdioServerTransport();
    await this.server.connect(transport);
  }
}

// Start the server
const server = new FileSystemMCPServer();
server.start().catch(console.error);
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';
import { spawn } from 'child_process';

class MCPFileSystemClient {
  constructor() {
    this.client = new Client(
      { name: 'filesystem-client', version: '1.0.0' },
      { capabilities: {} }
    );
  }

  async connect() {
    // Start the MCP server process
    const serverProcess = spawn('node', ['filesystem-server.js'], {
      stdio: ['pipe', 'pipe', 'inherit']
    });

    const transport = new StdioClientTransport({
      stdin: serverProcess.stdin,
      stdout: serverProcess.stdout
    });

    await this.client.connect(transport);
  }

  async listAvailableTools() {
    const response = await this.client.request(
      { method: 'tools/list' },
      {}
    );
    return response.tools;
  }

  async readFile(path) {
    const response = await this.client.request(
      { method: 'tools/call' },
      {
        name: 'read_file',
        arguments: { path }
      }
    );
    return response;
  }

  async writeFile(path, content) {
    const response = await this.client.request(
      { method: 'tools/call' },
      {
        name: 'write_file',
        arguments: { path, content }
      }
    );
    return response;
  }
}

// Usage example
async function demonstrateFileSystemMCP() {
  const client = new MCPFileSystemClient();
  await client.connect();
  const tools = await client.listAvailableTools();
  console.log('Available tools:', tools);
  await client.writeFile('./test.txt', 'Hello from MCP!');
  const content = await client.readFile('./test.txt');
  console.log('File content:', content);
}

demonstrateFileSystemMCP().catch(console.error);
from langchain.tools import BaseTool
from langchain.agents import initialize_agent, AgentType
from langchain.llms import OpenAI
from typing import Optional, Type
from pydantic import BaseModel, Field
import asyncio

class MCPToolInput(BaseModel):
    """Input schema for MCP tool"""
    tool_name: str = Field(description="Name of the MCP tool to call")
    arguments: dict = Field(description="Arguments to pass to the MCP tool")

class MCPTool(BaseTool):
    """LangChain tool that interfaces with MCP servers"""
    name = "mcp_tool"
    description = "Execute tools through Model Context Protocol servers"
    args_schema: Type[BaseModel] = MCPToolInput

    def __init__(self, mcp_client):
        super().__init__()
        self.mcp_client = mcp_client

    def _run(self, tool_name: str, arguments: dict) -> str:
        return asyncio.run(self._arun(tool_name, arguments))

    async def _arun(self, tool_name: str, arguments: dict) -> str:
        try:
            result = await self.mcp_client.session.call_tool(
                name=tool_name,
                arguments=arguments
            )
            if result.content:
                return "\n".join([content.text for content in result.content if hasattr(content, 'text')])
            else:
                return "Tool executed successfully but returned no content"
        except Exception as e:
            return f"Error executing MCP tool: {str(e)}"

class MCPLangChainIntegration:
    """Integration class for using MCP with LangChain"""
    def __init__(self, llm, mcp_client):
        self.llm = llm
        self.mcp_client = mcp_client
        self.tools = []

    async def setup_tools(self):
        mcp_tools = await self.mcp_client.list_tools()
        for mcp_tool in mcp_tools:
            langchain_tool = self.create_langchain_tool(mcp_tool)
            self.tools.append(langchain_tool)

    def create_langchain_tool(self, mcp_tool):
        class DynamicMCPTool(BaseTool):
            name = mcp_tool.name
            description = mcp_tool.description
            def __init__(self, mcp_client, tool_name):
                super().__init__()
                self.mcp_client = mcp_client
                self.tool_name = tool_name
            def _run(self, **kwargs) -> str:
                return asyncio.run(self._arun(**kwargs))
            async def _arun(self, **kwargs) -> str:
                try:
                    result = await self.mcp_client.session.call_tool(
                        name=self.tool_name,
                        arguments=kwargs
                    )
                    if result.content:
                        return "\n".join([content.text for content in result.content if hasattr(content, 'text')])
                    else:
                        return "Tool executed successfully"
                except Exception as e:
                    return f"Error: {str(e)}"
        return DynamicMCPTool(self.mcp_client, mcp_tool.name)

    def create_agent(self):
        return initialize_agent(
            tools=self.tools,
            llm=self.llm,
            agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
            verbose=True
        )

async def demonstrate_langchain_mcp_integration():
    llm = OpenAI(temperature=0)
    mcp_client = MCPDatabaseClient()  # From previous example
    await mcp_client.connect()
    integration = MCPLangChainIntegration(llm, mcp_client)
    await integration.setup_tools()
    agent = integration.create_agent()
    result = agent.run("Get the database schema and then execute a query to find all active users")
    print("Agent result:", result)

if __name__ == "__main__":
    asyncio.run(demonstrate_langchain_mcp_integration())
class ResilientMCPClient {
  constructor(maxRetries = 3, retryDelay = 1000) {
    this.maxRetries = maxRetries;
    this.retryDelay = retryDelay;
    this.client = new Client(
      { name: 'resilient-client', version: '1.0.0' },
      { capabilities: {} }
    );
  }

  async callToolWithRetry(toolName, arguments) {
    let lastError;
    for (let attempt = 1; attempt <= this.maxRetries; attempt++) {
      try {
        const response = await this.client.request(
          { method: 'tools/call' },
          { name: toolName, arguments }
        );
        this.retryDelay = 1000;
        return response;
      } catch (error) {
        lastError = error;
        if (attempt < this.maxRetries) {
          console.log(`Attempt ${attempt} failed, retrying in ${this.retryDelay}ms...`);
          await this.sleep(this.retryDelay);
          this.retryDelay *= 2;
        }
      }
    }
    throw new Error(`Tool call failed after ${this.maxRetries} attempts: ${lastError.message}`);
  }

  sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}