文章目录[x]
- 1:简介
- 2:创建Agent
- 3:创建一个MCP Server
- 4:创建一个MCP Server管理类
- 5:启动MCP Client
简介
该项目实现了一个入门的MCP Client和MCP Server交互的功能。 MCP Client中使用了React Agent模式来实现工具的调用。 MCP Server实现了两种启动方式即(stdio和sse),该例子中使用了sse的方式。
创建Agent
使用ReAct模式创建一个Agent,也可以使用langchain等框架实现,我这里没有使用AI框架。 在项目目录下创建‘react_agent.py’文件
import asyncio
import json
import os
import datetime
from openai import AsyncOpenAI
from mcp_client import MCPClient
from pprint import pprint
class ReActAgent:
def __init__(self, mcp_client: MCPClient):
# 从环境变量获取API密钥,如果不存在则使用默认值
# 修改base_url来使用不同的大模型供应商
# React模式一次使用一个工具
api_key = os.environ.get("API_KEY", "your llm api key")
self.llm = AsyncOpenAI(
api_key=api_key,
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
self.mcp_client = mcp_client
self.messages = []
self.system_prompt = """
你是一个专业会议室预定AI助手,必须严格遵循以下规则运行:
【核心机制】
采用Thought→Action→Observation循环工作流:
1. 解析用户需求时,必须明确识别以下要素:
- 精确到分钟的时间段(自动补全日期格式)
- 预定目标(查询/预定)
2. 工具调用规范:
只能使用下列工具且每次只能调用1个:
- list_idle_meeting_rooms(必填参数:start_time, end_time)
- book_meeting_room(必填参数:room_id, start_time, end_time)
参数要求:
• 时间参数必须转为"YYYY-MM-DD HH:mm:ss"格式
• room_id必须从list_idle_meeting_rooms返回结果中选取
3. 执行流程强制要求:
(1) 预定操作前必须调用list_idle_meeting_rooms验证时段可用性
【输出规则】
1. 未完成时输出json格式:
{
"thought": "推理过程(必须包含时间转换逻辑)",
"action": {"name": "工具名", "args": {"参数1":"值1"}},
"observation": "工具返回原始结果"
}
2. 未完成标准:
- 未调用工具
- 未返回最终答案
- 明确提示预定失败
3. 完成标准:当且仅当满足以下条件时输出最终答案:
- book_meeting_room返回预定成功提示
- 包含有效的会议室ID
4. 最终答案json格式:
{
"final_answer": "预定成功:{room_id} ({start_time}至{end_time})"
}
【校验机制】
1. 时间参数三重验证:
(1) 格式正则校验:^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$
(2) 时间逻辑:start_time < end_time
(3) 时段冲突检测(通过list_idle_meeting_rooms实现)
2. 错误处理:
- 参数缺失时精确指出缺失项,示例:
{"action":{"name":"request_params","args":{"missing_params":["start_time"]}}}
- 工具调用失败时返回原始错误信息
【终止条件】
1. 成功终止:输出含预定成功信息的final_answer
2. 失败终止:连续3次参数请求未获有效输入时返回:
{"final_answer": "ERR_MAX_RETRY: 超过最大重试次数"}
【错误处理与结果验证】
1. 必须严格检查工具返回结果中是否包含"失败"、"不存在"、"错误"等关键词
2. 当observation中包含任何失败信息时:
- 禁止输出final_answer
- 必须重新进行工具调用解决问题
3. 验证规则:
- 必须逐字分析observation返回内容
- 当且仅当observation明确包含"{room_id}预定成功"字样时才可输出最终答案
- 任何失败情况必须重试或告知用户失败原因
【列表解析规则】
1. list_idle_meeting_rooms返回结果必须按原格式提取,禁止自行编造会议室ID
2. 必须严格匹配返回的会议室ID格式,不得修改或推测
3. 如返回格式为["会议室1", "会议室2", "会议室3"],则room_id必须是完全一致的"会议室1"、"会议室2"或"会议室3"
【强制验证机制】
1. 每次工具调用后必须执行:
if "失败" in observation or "不存在" in observation or "错误" in observation:
# 必须处理错误,不允许输出final_answer
# 必须尝试替代方案或告知用户
2. 禁止性规则:
- 严禁在任何失败场景下输出含"预定成功"的消息
- 严禁忽略工具返回的任何错误或警告信息
【强制纠错与重试机制】
1. 当工具返回错误时,必须执行以下步骤:
- 分析错误原因(例如:"会议室不存在")
- 重新查询可用会议室列表
- 从返回的实际可用会议室中选择
- 使用正确ID重试预定
2. 只有在确认预定成功后才能生成final_answer
【严格的上下文连贯性要求】
1. 每一步动作必须直接基于上一步observation结果,禁止忽略或歪曲
2. 必须逐字分析observation内容,并根据实际返回结果(而非臆想)执行下一步
3. 会议室ID必须严格从list_idle_meeting_rooms的结果中原样提取,禁止臆造或修改格式
4. 当observation包含"失败"、"不存在"等关键词时,下一步必须是纠正行动,而非宣告成功
【强制确认机制】
1. 在每次工具调用后,必须首先重复observation结果以证明理解
2. 思考过程必须包含对observation的明确分析,例如:"根据观察到的'预定失败,R101不存在',我需要重新选择正确的会议室ID"
3. 禁止在最终答案中使用与observation矛盾的信息
【结果验证关键规则】
1. 预定成功的唯一有效判断标准:book_meeting_room返回必须包含明确的"预定成功"字样
2. 会议室ID的有效来源:必须直接使用list_idle_meeting_rooms返回的会议室ID列表中的选项,不得添加任何前缀或后缀
3. 严禁从失败的预定结果中提取会议室ID作为最终答案
4. 当工具返回包含"失败"时,你必须尝试使用不同的会议室ID重新预定
"""
async def run(self, query):
try:
self.messages.append({"role": "system", "content": self.system_prompt})
self.messages.append({"role": "system", "content": f"当前时间:{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}"})
self.messages.append({"role": "user", "content": query})
await self.mcp_client.connect_to_sse_server("http://127.0.0.1:8001/sse")
observation_history = []
for i in range(10): # 最大循环次数限制
print(f"第{i+1}次循环")
try:
response = await self.llm.chat.completions.create(
model="qwen-plus",
messages=self.messages,
response_format={"type": "json_object"},
)
message = response.choices[0].message
# 检查是否是最终答案
if message.content and "final_answer" in message.content:
try:
final_answer = json.loads(message.content)["final_answer"]
# 检查上一步observation是否包含失败信息,处理模型判断失误的情况
last_observation = observation_history[-1]["result"]
if (
"预定失败" in last_observation
or "不存在" in last_observation
or "错误" in last_observation
):
# 发现错误 - 强制继续对话
error_msg = {
"role": "user",
"content": f"警告!你错误的宣告了成功,工具返回的信息为:「{last_observation}」。请正确处理这个错误并重试,不要声称成功。",
}
self.messages.append(error_msg)
continue
return final_answer
except (json.JSONDecodeError, KeyError) as e:
print(f"解析final_answer时出错: {e}")
# 继续执行,尝试下一轮对话
# 解析并执行工具调用
# print("message.content", message.content)
action = json.loads(message.content)["action"]
tool_name = action["name"]
tool_args = action["args"]
# 调用工具
observation = await self.mcp_client.session.call_tool(
tool_name, tool_args
)
observation_text = str(observation.content[0].text)
print(
f"\n调用工具{tool_name},参数{tool_args},返回结果{observation_text}"
)
thought = json.loads(message.content)["thought"]
action["observation"] = observation_text
print(f"\n当前步骤{i+1}的reAct详情如下:")
print(f"thought: {thought}")
print(f"action: {tool_name},参数{tool_args}")
print(f"observation: {observation_text}")
# 将思考步骤添加到对话上下文
self.messages.append(
{
"role": "assistant",
"content": json.dumps(
{
"thought": thought,
"action": action,
"observation": observation_text,
}
),
}
)
# 在工具调用后
observation_history.append(
{
"tool": tool_name,
"args": tool_args,
"result": observation_text,
}
)
except Exception as e:
print(f"API调用异常: {str(e)}")
continue
return "超过最大尝试次数,任务终止"
except Exception as e:
print(f"执行过程中发生异常: {str(e)}")
return f"执行失败: {str(e)}"
if __name__ == "__main__":
meetting_booking_mcp_client = MCPClient()
agent = ReActAgent(meetting_booking_mcp_client)
loop = asyncio.get_event_loop()
result = loop.run_until_complete(
agent.run(
"我想预定一个会议室,时间是明天上午9点到10点半,如果有多个空闲会议室,请随便选一个"
)
)
print(f"最终结果: {result}")
创建一个MCP Server
在项目目录下创建一个新的文件‘meeting_room.py’,粘贴下面的内容。 MCP Server使用sse方式的情况,需要在MCP Client启动前启动,所以可以使用‘python meeting_room.py sse 8001’启动服务。
import random
from mcp.server.sse import SseServerTransport
from mcp import types
from mcp.server.lowlevel import Server
import logging
from starlette.applications import Starlette
from starlette.routing import Mount, Route
import uvicorn
app = Server("meeting_room")
async def list_idle_meeting_rooms(start_time: str, end_time: str) -> list[types.TextContent]:
return ["会议室1", "会议室2", "会议室3"]
async def book_meeting_room(room_id: str, start_time: str, end_time: str) -> list[types.TextContent]:
# 随机返回预定成功或失败
print(f"预定会议室{room_id},开始时间{start_time},结束时间{end_time}")
if room_id in ["会议室1", "会议室2", "会议室3"]:
return f"{room_id}预定成功" if random.random() < 0.5 else f"{room_id}预定失败,请重新选择会议室"
else:
return f"预定失败,{room_id}不存在,请重新选择会议室"
@app.list_tools()
async def list_tools() -> list[types.Tool]:
return [
types.Tool(
name="list_idle_meeting_rooms",
description="list idle meeting rooms",
inputSchema={
"type": "object",
"properties": {
"start_time": {
"type": "string",
"description": "开始时间"
},
"end_time": {
"type": "string",
"description": "结束时间"
}
},
"required": ["start_time", "end_time"]
}
),
types.Tool(
name="book_meeting_room",
description="book meeting room",
inputSchema={
"type": "object",
"properties": {
"room_id": {
"type": "string",
"description": "会议室ID"
},
"start_time": {
"type": "string",
"description": "开始时间"
},
"end_time": {
"type": "string",
"description": "结束时间"
}
},
"required": ["room_id", "start_time", "end_time"]
}
)
]
@app.call_tool()
async def call_tool(
name: str,
arguments: dict
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
if name == "list_idle_meeting_rooms":
start_time = arguments["start_time"]
end_time = arguments["end_time"]
result = await list_idle_meeting_rooms(start_time, end_time)
return [types.TextContent(type="text", text=str(result))]
elif name == "book_meeting_room":
print(arguments)
room_id = arguments["room_id"]
start_time = arguments["start_time"]
end_time = arguments["end_time"]
result = await book_meeting_room(room_id, start_time, end_time)
return [types.TextContent(type="text", text=str(result))]
raise ValueError(f"Tool not found: {name}")
if __name__ == "__main__":
# Run the FastAPI app with uvicorn
import sys
# 默认使用标准输入输出传输
transport = "stdio"
port = 8000
# 检查命令行参数
if len(sys.argv) > 1:
if sys.argv[1] == "sse":
transport = "sse"
if len(sys.argv) > 2:
try:
port = int(sys.argv[2])
except ValueError:
pass
if transport == "sse":
# 配置日志
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("weather-mcp")
# 配置 SSE 传输,增加超时和重试参数
sse = SseServerTransport(
"/message/",
)
async def handle_sse(request):
try:
logger.info(f"建立新的SSE连接: {request.client}")
async with sse.connect_sse(request.scope, request.receive, request._send) as streams:
try:
await app.run(streams[0], streams[1], app.create_initialization_options())
except Exception as e:
logger.error(f"处理SSE连接时出错: {str(e)}")
import traceback
logger.error(traceback.format_exc())
except Exception as e:
logger.error(f"建立SSE连接时出错: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return
starlette_app = Starlette(
debug=True,
routes=[
Route("/sse", endpoint=handle_sse),
Mount("/message/", app=sse.handle_post_message),
]
)
print(f"在端口{port}上启动MCP服务器,使用SSE传输")
print(f"连接URL: http://127.0.0.1:{port}/sse")
uvicorn.run(starlette_app, host="0.0.0.0", port=port, log_level="info")
else:
from mcp.server.stdio import stdio_server
import anyio
async def run_stdio():
async with stdio_server() as streams:
await app.run(streams[0], streams[1], app.create_initialization_options())
print("使用标准输入输出传输启动MCP服务器")
anyio.run(run_stdio)
创建一个MCP Server管理类
在项目目录创建‘mcp_client.py’文件
from typing import Optional from contextlib import AsyncExitStack from mcp.client.sse import sse_client from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client class MCPClient: def __init__(self): self.session: Optional[ClientSession] = None self.exit_stack = AsyncExitStack() async def connect_to_server( self, server_script_path: str, server_interpreter_path: str | None ): """Connect to an MCP server Args: server_script_path: Path to the server script (.py or .js) """ is_python = server_script_path.endswith(".py") is_js = server_script_path.endswith(".js") if not (is_python or is_js): raise ValueError("Server script must be a .py or .js file") command = "python" if is_python else "node" if server_interpreter_path: command = server_interpreter_path server_params = StdioServerParameters( command=command, args=[server_script_path], env=None ) stdio_transport = await self.exit_stack.enter_async_context( stdio_client(server_params) ) self.stdio, self.write = stdio_transport self.session = await self.exit_stack.enter_async_context( ClientSession(self.stdio, self.write) ) await self.session.initialize() # List available tools response = await self.session.list_tools() tools = response.tools print("\nConnected to server with tools:", [tool.name for tool in tools]) async def connect_to_sse_server(self, server_url: str): """Connect to an MCP server Args: server_script_path: Path to the server script (.py or .js) """ self._streams_context = sse_client(url=server_url) streams = await self._streams_context.__aenter__() self._session_context = ClientSession(*streams) self.session = await self._session_context.__aenter__() await self.session.initialize() # List available tools response = await self.session.list_tools() tools = response.tools print("\nConnected to server with tools:", [tool.name for tool in tools])
启动MCP Client
使用命令启动agent,并从控制台观察输出。
python react_agent.py
控制台输出
第1次循环 调用工具list_idle_meeting_rooms,参数{'start_time': '2025-04-10 09:00:00', 'end_time': '2025-04-10 10:30:00'},返回结果['会议室1', '会议室2', '会议室3'] 当前步骤1的reAct详情如下: thought: 用户想预定明天上午9点到10点半的会议室。首先需要将时间转换为正确的格式:start_time='2025-04-10 09:00:00',end_time='2025-04-10 10:30:00'。然后调用list_idle_meeting_rooms 来查询这个时间段内空闲的会议室。 action: list_idle_meeting_rooms,参数{'start_time': '2025-04-10 09:00:00', 'end_time': '2025-04-10 10:30:00'} observation: ['会议室1', '会议室2', '会议室3'] 第2次循环 调用工具book_meeting_room,参数{'room_id': '会议室1', 'start_time': '2025-04-10 09:00:00', 'end_time': '2025-04-10 10:30:00'},返回结果会议室1预定成功 当前步骤2的reAct详情如下: thought: 查询结果显示有三个会议室可用:['会议室1', '会议室2', '会议室3']。用户要求随意选择一个,我将选择'会议室1'进行预定。现在需要调用book_meeting_room工具来完成预定操作。 action: book_meeting_room,参数{'room_id': '会议室1', 'start_time': '2025-04-10 09:00:00', 'end_time': '2025-04-10 10:30:00'} observation: 会议室1预定成功 第3次循环 最终结果: 预定成功:会议室1 (2025-04-10 09:00:00至2025-04-10 10:30:00)
重复启动几次react_agent.py可以发现当预定不成功的时候,agent会尝试换个房间进行预定。