MCP入门Demo

文章目录[x]
  1. 1:简介
  2. 2:创建Agent
  3. 3:创建一个MCP Server
  4. 4:创建一个MCP Server管理类
  5. 5:启动MCP Client

简介

该项目实现了一个入门的MCP Client和MCP Server交互的功能。 MCP Client中使用了React Agent模式来实现工具的调用。 MCP Server实现了两种启动方式即(stdio和sse),该例子中使用了sse的方式。

创建Agent

 

使用ReAct模式创建一个Agent,也可以使用langchain等框架实现,我这里没有使用AI框架。 在项目目录下创建‘react_agent.py’文件

import asyncio
import json
import os
import datetime
from openai import AsyncOpenAI
from mcp_client import MCPClient
from pprint import pprint


class ReActAgent:
    def __init__(self, mcp_client: MCPClient):
        # 从环境变量获取API密钥,如果不存在则使用默认值
        # 修改base_url来使用不同的大模型供应商
        # React模式一次使用一个工具
        api_key = os.environ.get("API_KEY", "your llm api key")
        self.llm = AsyncOpenAI(
            api_key=api_key,
            base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
        )
        self.mcp_client = mcp_client
        self.messages = []
        self.system_prompt = """
            你是一个专业会议室预定AI助手,必须严格遵循以下规则运行:

            【核心机制】
            采用Thought→Action→Observation循环工作流:
            1. 解析用户需求时,必须明确识别以下要素:
            - 精确到分钟的时间段(自动补全日期格式)
            - 预定目标(查询/预定)

            2. 工具调用规范:
            只能使用下列工具且每次只能调用1个:
            - list_idle_meeting_rooms(必填参数:start_time, end_time)
            - book_meeting_room(必填参数:room_id, start_time, end_time)
            
            参数要求:
            • 时间参数必须转为"YYYY-MM-DD HH:mm:ss"格式
            • room_id必须从list_idle_meeting_rooms返回结果中选取

            3. 执行流程强制要求:
            (1) 预定操作前必须调用list_idle_meeting_rooms验证时段可用性

            【输出规则】
            1. 未完成时输出json格式:
            {
            "thought": "推理过程(必须包含时间转换逻辑)",
            "action": {"name": "工具名", "args": {"参数1":"值1"}},
            "observation": "工具返回原始结果"
            }
            
            2. 未完成标准:
            - 未调用工具
            - 未返回最终答案
            - 明确提示预定失败
            
            3. 完成标准:当且仅当满足以下条件时输出最终答案:
            - book_meeting_room返回预定成功提示
            - 包含有效的会议室ID

            4. 最终答案json格式:
            {
            "final_answer": "预定成功:{room_id} ({start_time}至{end_time})"
            }

            【校验机制】
            1. 时间参数三重验证:
            (1) 格式正则校验:^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$
            (2) 时间逻辑:start_time < end_time
            (3) 时段冲突检测(通过list_idle_meeting_rooms实现)

            2. 错误处理:
            - 参数缺失时精确指出缺失项,示例:
                {"action":{"name":"request_params","args":{"missing_params":["start_time"]}}}
            - 工具调用失败时返回原始错误信息

            【终止条件】
            1. 成功终止:输出含预定成功信息的final_answer
            2. 失败终止:连续3次参数请求未获有效输入时返回:
            {"final_answer": "ERR_MAX_RETRY: 超过最大重试次数"}

            【错误处理与结果验证】
            1. 必须严格检查工具返回结果中是否包含"失败"、"不存在"、"错误"等关键词
            2. 当observation中包含任何失败信息时:
               - 禁止输出final_answer
               - 必须重新进行工具调用解决问题
            3. 验证规则:
               - 必须逐字分析observation返回内容
               - 当且仅当observation明确包含"{room_id}预定成功"字样时才可输出最终答案
               - 任何失败情况必须重试或告知用户失败原因

            【列表解析规则】
            1. list_idle_meeting_rooms返回结果必须按原格式提取,禁止自行编造会议室ID
            2. 必须严格匹配返回的会议室ID格式,不得修改或推测
            3. 如返回格式为["会议室1", "会议室2", "会议室3"],则room_id必须是完全一致的"会议室1"、"会议室2"或"会议室3"

            【强制验证机制】
            1. 每次工具调用后必须执行:
               if "失败" in observation or "不存在" in observation or "错误" in observation:
                   # 必须处理错误,不允许输出final_answer
                   # 必须尝试替代方案或告知用户

            2. 禁止性规则:
               - 严禁在任何失败场景下输出含"预定成功"的消息
               - 严禁忽略工具返回的任何错误或警告信息

            【强制纠错与重试机制】
            1. 当工具返回错误时,必须执行以下步骤:
               - 分析错误原因(例如:"会议室不存在")
               - 重新查询可用会议室列表
               - 从返回的实际可用会议室中选择
               - 使用正确ID重试预定
               
            2. 只有在确认预定成功后才能生成final_answer

            【严格的上下文连贯性要求】
            1. 每一步动作必须直接基于上一步observation结果,禁止忽略或歪曲
            2. 必须逐字分析observation内容,并根据实际返回结果(而非臆想)执行下一步
            3. 会议室ID必须严格从list_idle_meeting_rooms的结果中原样提取,禁止臆造或修改格式
            4. 当observation包含"失败"、"不存在"等关键词时,下一步必须是纠正行动,而非宣告成功

            【强制确认机制】
            1. 在每次工具调用后,必须首先重复observation结果以证明理解
            2. 思考过程必须包含对observation的明确分析,例如:"根据观察到的'预定失败,R101不存在',我需要重新选择正确的会议室ID"
            3. 禁止在最终答案中使用与observation矛盾的信息

            【结果验证关键规则】
            1. 预定成功的唯一有效判断标准:book_meeting_room返回必须包含明确的"预定成功"字样
            2. 会议室ID的有效来源:必须直接使用list_idle_meeting_rooms返回的会议室ID列表中的选项,不得添加任何前缀或后缀
            3. 严禁从失败的预定结果中提取会议室ID作为最终答案
            4. 当工具返回包含"失败"时,你必须尝试使用不同的会议室ID重新预定

            """

    async def run(self, query):
        try:
            self.messages.append({"role": "system", "content": self.system_prompt})
            self.messages.append({"role": "system", "content": f"当前时间:{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}"})
            self.messages.append({"role": "user", "content": query})
            await self.mcp_client.connect_to_sse_server("http://127.0.0.1:8001/sse")
            

            observation_history = []

            for i in range(10):  # 最大循环次数限制
                print(f"第{i+1}次循环")
                try:
                    response = await self.llm.chat.completions.create(
                        model="qwen-plus",
                        messages=self.messages,
                        response_format={"type": "json_object"},
                    )
                    message = response.choices[0].message

                    # 检查是否是最终答案
                    if message.content and "final_answer" in message.content:
                        try:
                            final_answer = json.loads(message.content)["final_answer"]
                            # 检查上一步observation是否包含失败信息,处理模型判断失误的情况
                            last_observation = observation_history[-1]["result"]
                            if (
                                "预定失败" in last_observation
                                or "不存在" in last_observation
                                or "错误" in last_observation
                            ):
                                # 发现错误 - 强制继续对话
                                error_msg = {
                                    "role": "user",
                                    "content": f"警告!你错误的宣告了成功,工具返回的信息为:「{last_observation}」。请正确处理这个错误并重试,不要声称成功。",
                                }
                                self.messages.append(error_msg)
                                continue
                            return final_answer
                        except (json.JSONDecodeError, KeyError) as e:
                            print(f"解析final_answer时出错: {e}")
                            # 继续执行,尝试下一轮对话
                    # 解析并执行工具调用
                    # print("message.content", message.content)
                    action = json.loads(message.content)["action"]
                    tool_name = action["name"]
                    tool_args = action["args"]
                    # 调用工具
                    observation = await self.mcp_client.session.call_tool(
                        tool_name, tool_args
                    )
                    observation_text = str(observation.content[0].text)
                    print(
                        f"\n调用工具{tool_name},参数{tool_args},返回结果{observation_text}"
                    )

                    thought = json.loads(message.content)["thought"]

                    action["observation"] = observation_text
                    print(f"\n当前步骤{i+1}的reAct详情如下:")
                    print(f"thought: {thought}")
                    print(f"action: {tool_name},参数{tool_args}")
                    print(f"observation: {observation_text}")

                    # 将思考步骤添加到对话上下文
                    self.messages.append(
                        {
                            "role": "assistant",
                            "content": json.dumps(
                                {
                                    "thought": thought,
                                    "action": action,
                                    "observation": observation_text,
                                }
                            ),
                        }
                    )

                    # 在工具调用后
                    observation_history.append(
                        {
                            "tool": tool_name,
                            "args": tool_args,
                            "result": observation_text,
                        }
                    )

                except Exception as e:
                    print(f"API调用异常: {str(e)}")
                    continue

            return "超过最大尝试次数,任务终止"

        except Exception as e:
            print(f"执行过程中发生异常: {str(e)}")
            return f"执行失败: {str(e)}"


if __name__ == "__main__":
    meetting_booking_mcp_client = MCPClient()
    agent = ReActAgent(meetting_booking_mcp_client)
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(
        agent.run(
            "我想预定一个会议室,时间是明天上午9点到10点半,如果有多个空闲会议室,请随便选一个"
        )
    )
    print(f"最终结果: {result}")

 

创建一个MCP Server

 

在项目目录下创建一个新的文件‘meeting_room.py’,粘贴下面的内容。 MCP Server使用sse方式的情况,需要在MCP Client启动前启动,所以可以使用‘python meeting_room.py sse 8001’启动服务。

import random
from mcp.server.sse import SseServerTransport
from mcp import types
from mcp.server.lowlevel import Server
import logging
from starlette.applications import Starlette
from starlette.routing import Mount, Route
import uvicorn

app = Server("meeting_room")

async def list_idle_meeting_rooms(start_time: str, end_time: str) -> list[types.TextContent]:
    return ["会议室1", "会议室2", "会议室3"]

async def book_meeting_room(room_id: str, start_time: str, end_time: str) -> list[types.TextContent]:
    # 随机返回预定成功或失败
    print(f"预定会议室{room_id},开始时间{start_time},结束时间{end_time}")
    if room_id in ["会议室1", "会议室2", "会议室3"]:
        return f"{room_id}预定成功" if random.random() < 0.5 else f"{room_id}预定失败,请重新选择会议室"
    else:
        return f"预定失败,{room_id}不存在,请重新选择会议室"

@app.list_tools()
async def list_tools() -> list[types.Tool]:
    return [
        types.Tool(
            name="list_idle_meeting_rooms",
            description="list idle meeting rooms",
            inputSchema={
                "type": "object",
                "properties": {
                    "start_time": {
                        "type": "string",
                        "description": "开始时间"
                    },
                    "end_time": {
                        "type": "string",
                        "description": "结束时间"
                    }
                },
                "required": ["start_time", "end_time"]
            }
            ),
        types.Tool(
            name="book_meeting_room",
            description="book meeting room",
            inputSchema={
                "type": "object",
                "properties": {
                    "room_id": {
                        "type": "string",
                        "description": "会议室ID"
                    },
                    "start_time": {
                        "type": "string",
                        "description": "开始时间"
                    },
                    "end_time": {
                        "type": "string",
                        "description": "结束时间"
                    }
                },
                "required": ["room_id", "start_time", "end_time"]
            }
        )
    ]

@app.call_tool()
async def call_tool(
    name: str,
    arguments: dict
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
    if name == "list_idle_meeting_rooms":
        start_time = arguments["start_time"]
        end_time = arguments["end_time"]
        result = await list_idle_meeting_rooms(start_time, end_time)
        return [types.TextContent(type="text", text=str(result))]
    elif name == "book_meeting_room":
        print(arguments)
        room_id = arguments["room_id"]
        start_time = arguments["start_time"]
        end_time = arguments["end_time"]
        result = await book_meeting_room(room_id, start_time, end_time)
        return [types.TextContent(type="text", text=str(result))]
    raise ValueError(f"Tool not found: {name}")

if __name__ == "__main__":
    # Run the FastAPI app with uvicorn
    import sys
    # 默认使用标准输入输出传输
    transport = "stdio"
    port = 8000
    # 检查命令行参数
    if len(sys.argv) > 1:
        if sys.argv[1] == "sse":
            transport = "sse"
        if len(sys.argv) > 2:
            try:
                port = int(sys.argv[2])
            except ValueError:
                pass
    if transport == "sse":
        # 配置日志
        logging.basicConfig(level=logging.INFO, 
                           format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        logger = logging.getLogger("weather-mcp")
        
        # 配置 SSE 传输,增加超时和重试参数
        sse = SseServerTransport(
            "/message/", 
        )
        async def handle_sse(request):
            try:
                logger.info(f"建立新的SSE连接: {request.client}")
                async with sse.connect_sse(request.scope, request.receive, request._send) as streams:
                    try:
                        await app.run(streams[0], streams[1], app.create_initialization_options())
                    except Exception as e:
                        logger.error(f"处理SSE连接时出错: {str(e)}")
                        import traceback
                        logger.error(traceback.format_exc())
            except Exception as e:
                logger.error(f"建立SSE连接时出错: {str(e)}")
                import traceback
                logger.error(traceback.format_exc())
                return
        
        starlette_app = Starlette(
            debug=True,
            routes=[
                Route("/sse", endpoint=handle_sse),
                Mount("/message/", app=sse.handle_post_message),
            ]
        )
        print(f"在端口{port}上启动MCP服务器,使用SSE传输")
        print(f"连接URL: http://127.0.0.1:{port}/sse")
        uvicorn.run(starlette_app, host="0.0.0.0", port=port, log_level="info")
    else:
        from mcp.server.stdio import stdio_server
        import anyio
        async def run_stdio():
            async with stdio_server() as streams:
                await app.run(streams[0], streams[1], app.create_initialization_options())
        print("使用标准输入输出传输启动MCP服务器")
        anyio.run(run_stdio)

 

创建一个MCP Server管理类

 

在项目目录创建‘mcp_client.py’文件

from typing import Optional
from contextlib import AsyncExitStack
from mcp.client.sse import sse_client
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client


class MCPClient:
    def __init__(self):
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()

    async def connect_to_server(
        self, server_script_path: str, server_interpreter_path: str | None
    ):
        """Connect to an MCP server

        Args:
            server_script_path: Path to the server script (.py or .js)
        """
        is_python = server_script_path.endswith(".py")
        is_js = server_script_path.endswith(".js")
        if not (is_python or is_js):
            raise ValueError("Server script must be a .py or .js file")

        command = "python" if is_python else "node"
        if server_interpreter_path:
            command = server_interpreter_path
        server_params = StdioServerParameters(
            command=command, args=[server_script_path], env=None
        )

        stdio_transport = await self.exit_stack.enter_async_context(
            stdio_client(server_params)
        )
        self.stdio, self.write = stdio_transport
        self.session = await self.exit_stack.enter_async_context(
            ClientSession(self.stdio, self.write)
        )

        await self.session.initialize()

        # List available tools
        response = await self.session.list_tools()
        tools = response.tools
        print("\nConnected to server with tools:", [tool.name for tool in tools])

    async def connect_to_sse_server(self, server_url: str):
        """Connect to an MCP server

        Args:
            server_script_path: Path to the server script (.py or .js)
        """
        self._streams_context = sse_client(url=server_url)

        streams = await self._streams_context.__aenter__()
        self._session_context = ClientSession(*streams)
        self.session = await self._session_context.__aenter__()

        await self.session.initialize()
        # List available tools
        response = await self.session.list_tools()
        tools = response.tools
        print("\nConnected to server with tools:", [tool.name for tool in tools])

 

启动MCP Client

 

使用命令启动agent,并从控制台观察输出。

python react_agent.py
控制台输出
第1次循环

调用工具list_idle_meeting_rooms,参数{'start_time': '2025-04-10 09:00:00', 'end_time': '2025-04-10 10:30:00'},返回结果['会议室1', '会议室2', '会议室3']

当前步骤1的reAct详情如下:
thought: 用户想预定明天上午9点到10点半的会议室。首先需要将时间转换为正确的格式:start_time='2025-04-10 09:00:00',end_time='2025-04-10 10:30:00'。然后调用list_idle_meeting_rooms 来查询这个时间段内空闲的会议室。
action: list_idle_meeting_rooms,参数{'start_time': '2025-04-10 09:00:00', 'end_time': '2025-04-10 10:30:00'}
observation: ['会议室1', '会议室2', '会议室3']
第2次循环

调用工具book_meeting_room,参数{'room_id': '会议室1', 'start_time': '2025-04-10 09:00:00', 'end_time': '2025-04-10 10:30:00'},返回结果会议室1预定成功

当前步骤2的reAct详情如下:
thought: 查询结果显示有三个会议室可用:['会议室1', '会议室2', '会议室3']。用户要求随意选择一个,我将选择'会议室1'进行预定。现在需要调用book_meeting_room工具来完成预定操作。
action: book_meeting_room,参数{'room_id': '会议室1', 'start_time': '2025-04-10 09:00:00', 'end_time': '2025-04-10 10:30:00'}
observation: 会议室1预定成功
第3次循环
最终结果: 预定成功:会议室1 (2025-04-10 09:00:00至2025-04-10 10:30:00)

重复启动几次react_agent.py可以发现当预定不成功的时候,agent会尝试换个房间进行预定。

点赞

发表回复

昵称和uid可以选填一个,填邮箱必填(留言回复后将会发邮件给你)
tips:输入uid可以快速获得你的昵称和头像(已失效)