Claude MCP 协议深度实战:打造能操作数据库 / 文件 / API 的智能体
从零搭建三个可直接用于生产的 MCP Server,让 Claude 真正拥有「手」, 自主完成数据查询、文件读写、第三方接口调用。
什么是 MCP,为什么它改变了 Agent 开发
MCP(Model Context Protocol) 是 Anthropic 推出的开放协议, 定义了 AI 模型与外部工具 / 数据源之间的标准通信格式。
传统做法:你需要手写 function calling schema,自己解析参数,自己处理错误。 MCP 做法:启动一个 MCP Server,Claude 自动发现它暴露的工具并调用。
┌─────────────────────────────────────────────────────┐
│ 你的 Claude Agent │
│ │
│ 用户指令 → Claude 推理 → 选择工具 → 解析结果 → 回复 │
└────────────────┬──────────────────────────────────────┘
│ MCP 协议(stdio / SSE)
┌───────────┴──────────────┐
│ │
┌────▼────┐ ┌──────────┐ ┌───▼────┐
│ 文件系统 │ │ SQLite DB │ │外部 API │
│ Server │ │ Server │ │ Server │
└─────────┘ └──────────┘ └────────┘
┌─────────────────────────────────────────────────────┐
│ 你的 Claude Agent │
│ │
│ 用户指令 → Claude 推理 → 选择工具 → 解析结果 → 回复 │
└────────────────┬──────────────────────────────────────┘
│ MCP 协议(stdio / SSE)
┌───────────┴──────────────┐
│ │
┌────▼────┐ ┌──────────┐ ┌───▼────┐
│ 文件系统 │ │ SQLite DB │ │外部 API │
│ Server │ │ Server │ │ Server │
└─────────┘ └──────────┘ └────────┘
本文你将构建:
- MCP Server 1:文件系统(读 / 写 / 列目录)
- MCP Server 2:SQLite 数据库(查询 / 插入 / 更新)
- MCP Server 3:HTTP API 网关(调用任意 REST 接口)
- 一个统一的 Claude Agent,同时连接三个 Server
环境准备
pip install anthropic mcp httpx
pip install anthropic mcp httpx
ClaudeAPI 密钥配置:
export ANTHROPIC_API_KEY="your-claudeapi-token"
export ANTHROPIC_BASE_URL="https://gw.claudeapi.com"
export ANTHROPIC_API_KEY="your-claudeapi-token"
export ANTHROPIC_BASE_URL="https://gw.claudeapi.com"
通过 ClaudeAPI 调用,国内直连,官方 8 折定价, 支持支付宝 / 微信充值。注册地址 →
第一部分:文件系统 MCP Server
代码实现
# mcp_file_server.py
import os
import json
from pathlib import Path
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
app = Server("file-system")
WORKSPACE = Path(os.environ.get("MCP_WORKSPACE", "./workspace"))
WORKSPACE.mkdir(exist_ok=True)
@app.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="read_file",
description="读取工作区内指定文件的内容",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string", "description": "相对于工作区的文件路径"}
},
"required": ["path"],
},
),
Tool(
name="write_file",
description="将内容写入工作区内的文件,文件不存在则创建",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string", "description": "相对于工作区的文件路径"},
"content": {"type": "string", "description": "写入内容"},
},
"required": ["path", "content"],
},
),
Tool(
name="list_directory",
description="列出工作区目录下的文件和子目录",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "相对路径,默认为根目录",
"default": ".",
}
},
},
),
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "read_file":
target = WORKSPACE / arguments["path"]
if not target.exists():
return [TextContent(type="text", text=f"错误:文件 {arguments['path']} 不存在")]
return [TextContent(type="text", text=target.read_text(encoding="utf-8"))]
elif name == "write_file":
target = WORKSPACE / arguments["path"]
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(arguments["content"], encoding="utf-8")
return [TextContent(type="text", text=f"✅ 已写入:{arguments['path']}")]
elif name == "list_directory":
target = WORKSPACE / arguments.get("path", ".")
if not target.is_dir():
return [TextContent(type="text", text="错误:不是有效目录")]
entries = [
f"{'📁' if p.is_dir() else '📄'} {p.name}"
for p in sorted(target.iterdir())
]
return [TextContent(type="text", text="\n".join(entries) or "(空目录)")]
return [TextContent(type="text", text=f"未知工具:{name}")]
if __name__ == "__main__":
import asyncio
asyncio.run(stdio_server(app))
# mcp_file_server.py
import os
import json
from pathlib import Path
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
app = Server("file-system")
WORKSPACE = Path(os.environ.get("MCP_WORKSPACE", "./workspace"))
WORKSPACE.mkdir(exist_ok=True)
@app.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="read_file",
description="读取工作区内指定文件的内容",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string", "description": "相对于工作区的文件路径"}
},
"required": ["path"],
},
),
Tool(
name="write_file",
description="将内容写入工作区内的文件,文件不存在则创建",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string", "description": "相对于工作区的文件路径"},
"content": {"type": "string", "description": "写入内容"},
},
"required": ["path", "content"],
},
),
Tool(
name="list_directory",
description="列出工作区目录下的文件和子目录",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "相对路径,默认为根目录",
"default": ".",
}
},
},
),
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "read_file":
target = WORKSPACE / arguments["path"]
if not target.exists():
return [TextContent(type="text", text=f"错误:文件 {arguments['path']} 不存在")]
return [TextContent(type="text", text=target.read_text(encoding="utf-8"))]
elif name == "write_file":
target = WORKSPACE / arguments["path"]
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(arguments["content"], encoding="utf-8")
return [TextContent(type="text", text=f"✅ 已写入:{arguments['path']}")]
elif name == "list_directory":
target = WORKSPACE / arguments.get("path", ".")
if not target.is_dir():
return [TextContent(type="text", text="错误:不是有效目录")]
entries = [
f"{'📁' if p.is_dir() else '📄'} {p.name}"
for p in sorted(target.iterdir())
]
return [TextContent(type="text", text="\n".join(entries) or "(空目录)")]
return [TextContent(type="text", text=f"未知工具:{name}")]
if __name__ == "__main__":
import asyncio
asyncio.run(stdio_server(app))
第二部分:SQLite 数据库 MCP Server
# mcp_db_server.py
import sqlite3
import json
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
app = Server("sqlite-db")
DB_PATH = "data.db"
def get_conn():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
@app.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="execute_query",
description="执行 SELECT 查询,返回结果集(只读)",
inputSchema={
"type": "object",
"properties": {
"sql": {"type": "string", "description": "SELECT 语句"},
"params": {
"type": "array",
"items": {},
"description": "参数列表(防 SQL 注入)",
"default": [],
},
},
"required": ["sql"],
},
),
Tool(
name="execute_write",
description="执行 INSERT / UPDATE / DELETE,返回影响行数",
inputSchema={
"type": "object",
"properties": {
"sql": {"type": "string", "description": "写操作 SQL 语句"},
"params": {"type": "array", "items": {}, "default": []},
},
"required": ["sql"],
},
),
Tool(
name="list_tables",
description="列出数据库中所有表及其结构",
inputSchema={"type": "object", "properties": {}},
),
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
params = arguments.get("params", [])
if name == "execute_query":
with get_conn() as conn:
try:
rows = conn.execute(arguments["sql"], params).fetchall()
if not rows:
return [TextContent(type="text", text="查询结果为空")]
result = [dict(row) for row in rows]
return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False, indent=2))]
except Exception as e:
return [TextContent(type="text", text=f"查询错误:{e}")]
elif name == "execute_write":
with get_conn() as conn:
try:
cursor = conn.execute(arguments["sql"], params)
conn.commit()
return [TextContent(type="text", text=f"✅ 执行成功,影响 {cursor.rowcount} 行")]
except Exception as e:
return [TextContent(type="text", text=f"写入错误:{e}")]
elif name == "list_tables":
with get_conn() as conn:
tables = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()
result = []
for t in tables:
cols = conn.execute(f"PRAGMA table_info({t['name']})").fetchall()
col_desc = ", ".join(
f"{c['name']} {c['type']}" for c in cols
)
result.append(f"📊 {t['name']}({col_desc})")
return [TextContent(type="text", text="\n".join(result) or "无表")]
return [TextContent(type="text", text=f"未知工具:{name}")]
if __name__ == "__main__":
import asyncio
asyncio.run(stdio_server(app))
# mcp_db_server.py
import sqlite3
import json
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
app = Server("sqlite-db")
DB_PATH = "data.db"
def get_conn():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
@app.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="execute_query",
description="执行 SELECT 查询,返回结果集(只读)",
inputSchema={
"type": "object",
"properties": {
"sql": {"type": "string", "description": "SELECT 语句"},
"params": {
"type": "array",
"items": {},
"description": "参数列表(防 SQL 注入)",
"default": [],
},
},
"required": ["sql"],
},
),
Tool(
name="execute_write",
description="执行 INSERT / UPDATE / DELETE,返回影响行数",
inputSchema={
"type": "object",
"properties": {
"sql": {"type": "string", "description": "写操作 SQL 语句"},
"params": {"type": "array", "items": {}, "default": []},
},
"required": ["sql"],
},
),
Tool(
name="list_tables",
description="列出数据库中所有表及其结构",
inputSchema={"type": "object", "properties": {}},
),
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
params = arguments.get("params", [])
if name == "execute_query":
with get_conn() as conn:
try:
rows = conn.execute(arguments["sql"], params).fetchall()
if not rows:
return [TextContent(type="text", text="查询结果为空")]
result = [dict(row) for row in rows]
return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False, indent=2))]
except Exception as e:
return [TextContent(type="text", text=f"查询错误:{e}")]
elif name == "execute_write":
with get_conn() as conn:
try:
cursor = conn.execute(arguments["sql"], params)
conn.commit()
return [TextContent(type="text", text=f"✅ 执行成功,影响 {cursor.rowcount} 行")]
except Exception as e:
return [TextContent(type="text", text=f"写入错误:{e}")]
elif name == "list_tables":
with get_conn() as conn:
tables = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()
result = []
for t in tables:
cols = conn.execute(f"PRAGMA table_info({t['name']})").fetchall()
col_desc = ", ".join(
f"{c['name']} {c['type']}" for c in cols
)
result.append(f"📊 {t['name']}({col_desc})")
return [TextContent(type="text", text="\n".join(result) or "无表")]
return [TextContent(type="text", text=f"未知工具:{name}")]
if __name__ == "__main__":
import asyncio
asyncio.run(stdio_server(app))
第三部分:HTTP API MCP Server
# mcp_api_server.py
import json
import httpx
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
app = Server("http-api")
@app.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="http_get",
description="发起 GET 请求并返回响应内容",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string", "description": "完整的请求 URL"},
"headers": {
"type": "object",
"description": "请求头键值对",
"default": {},
},
"params": {
"type": "object",
"description": "Query 参数键值对",
"default": {},
},
},
"required": ["url"],
},
),
Tool(
name="http_post",
description="发起 POST 请求(JSON body)并返回响应",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string", "description": "完整的请求 URL"},
"body": {"type": "object", "description": "请求体(JSON 格式)"},
"headers": {"type": "object", "default": {}},
},
"required": ["url", "body"],
},
),
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
async with httpx.AsyncClient(timeout=30) as client:
try:
if name == "http_get":
resp = await client.get(
arguments["url"],
headers=arguments.get("headers", {}),
params=arguments.get("params", {}),
)
elif name == "http_post":
resp = await client.post(
arguments["url"],
headers={"Content-Type": "application/json", **arguments.get("headers", {})},
json=arguments["body"],
)
else:
return [TextContent(type="text", text=f"未知工具:{name}")]
try:
result = json.dumps(resp.json(), ensure_ascii=False, indent=2)
except Exception:
result = resp.text
return [TextContent(
type="text",
text=f"状态码:{resp.status_code}\n\n{result}"
)]
except httpx.TimeoutException:
return [TextContent(type="text", text="错误:请求超时")]
except Exception as e:
return [TextContent(type="text", text=f"请求错误:{e}")]
if __name__ == "__main__":
import asyncio
asyncio.run(stdio_server(app))
# mcp_api_server.py
import json
import httpx
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
app = Server("http-api")
@app.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="http_get",
description="发起 GET 请求并返回响应内容",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string", "description": "完整的请求 URL"},
"headers": {
"type": "object",
"description": "请求头键值对",
"default": {},
},
"params": {
"type": "object",
"description": "Query 参数键值对",
"default": {},
},
},
"required": ["url"],
},
),
Tool(
name="http_post",
description="发起 POST 请求(JSON body)并返回响应",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string", "description": "完整的请求 URL"},
"body": {"type": "object", "description": "请求体(JSON 格式)"},
"headers": {"type": "object", "default": {}},
},
"required": ["url", "body"],
},
),
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
async with httpx.AsyncClient(timeout=30) as client:
try:
if name == "http_get":
resp = await client.get(
arguments["url"],
headers=arguments.get("headers", {}),
params=arguments.get("params", {}),
)
elif name == "http_post":
resp = await client.post(
arguments["url"],
headers={"Content-Type": "application/json", **arguments.get("headers", {})},
json=arguments["body"],
)
else:
return [TextContent(type="text", text=f"未知工具:{name}")]
try:
result = json.dumps(resp.json(), ensure_ascii=False, indent=2)
except Exception:
result = resp.text
return [TextContent(
type="text",
text=f"状态码:{resp.status_code}\n\n{result}"
)]
except httpx.TimeoutException:
return [TextContent(type="text", text="错误:请求超时")]
except Exception as e:
return [TextContent(type="text", text=f"请求错误:{e}")]
if __name__ == "__main__":
import asyncio
asyncio.run(stdio_server(app))
第四部分:Claude Agent 统一接入三个 Server
# agent.py
import asyncio
import json
import os
from contextlib import AsyncExitStack
import anthropic
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
client = anthropic.Anthropic(
api_key=os.environ["ANTHROPIC_API_KEY"],
base_url=os.environ.get("ANTHROPIC_BASE_URL", "https://gw.claudeapi.com"),
)
# 定义三个 MCP Server 启动参数
SERVERS = {
"file": StdioServerParameters(
command="python", args=["mcp_file_server.py"]
),
"db": StdioServerParameters(
command="python", args=["mcp_db_server.py"]
),
"api": StdioServerParameters(
command="python", args=["mcp_api_server.py"]
),
}
async def run_agent(user_message: str):
async with AsyncExitStack() as stack:
sessions: dict[str, ClientSession] = {}
all_tools = []
# 并行连接所有 MCP Server,收集工具列表
for server_name, params in SERVERS.items():
read, write = await stack.enter_async_context(stdio_client(params))
session = await stack.enter_async_context(ClientSession(read, write))
await session.initialize()
sessions[server_name] = session
tools_resp = await session.list_tools()
for tool in tools_resp.tools:
all_tools.append({
"name": f"{server_name}__{tool.name}",
"description": f"[{server_name}] {tool.description}",
"input_schema": tool.inputSchema,
})
print(f"\n✅ 已加载 {len(all_tools)} 个工具:{[t['name'] for t in all_tools]}\n")
# 开始 Agent 循环
messages = [{"role": "user", "content": user_message}]
system = (
"你是一个拥有文件系统、数据库、HTTP API 操作能力的智能体。"
"工具名称格式为 server__tool_name,server 前缀表示所属系统。"
"请根据用户需求自主选择并组合使用工具,任务完成后给出清晰总结。"
)
while True:
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=4096,
system=system,
tools=all_tools,
messages=messages,
)
print(f"[Claude] stop_reason: {response.stop_reason}")
# 解析响应,处理文本和工具调用
tool_uses = []
for block in response.content:
if block.type == "text":
print(f"\n💬 Claude:{block.text}")
elif block.type == "tool_use":
tool_uses.append(block)
if response.stop_reason == "end_turn" or not tool_uses:
break
# 执行所有工具调用
messages.append({"role": "assistant", "content": response.content})
tool_results = []
for tool_use in tool_uses:
server_name, tool_name = tool_use.name.split("__", 1)
print(f"\n🔧 调用工具:{tool_use.name}")
print(f" 参数:{json.dumps(tool_use.input, ensure_ascii=False)}")
session = sessions[server_name]
result = await session.call_tool(tool_name, tool_use.input)
result_text = result.content[0].text if result.content else "(无返回)"
print(f" 结果:{result_text[:200]}{'...' if len(result_text) > 200 else ''}")
tool_results.append({
"type": "tool_result",
"tool_use_id": tool_use.id,
"content": result_text,
})
messages.append({"role": "user", "content": tool_results})
# 演示任务
DEMO_TASKS = [
# 文件 + 数据库组合任务
"""
请完成以下任务:
1. 在数据库中创建 users 表(id INTEGER PRIMARY KEY, name TEXT, email TEXT, created_at TEXT)
2. 插入三条测试数据
3. 查询所有用户并将结果保存到工作区的 users_report.txt 文件
4. 读取并确认文件内容正确
""",
# HTTP API + 文件任务
"""
调用 https://api.github.com/repos/anthropics/anthropic-sdk-python/releases/latest
获取最新版本信息,提取版本号和发布时间,
写入工作区的 sdk_version.md 文件(Markdown 格式)。
""",
]
if __name__ == "__main__":
for i, task in enumerate(DEMO_TASKS, 1):
print(f"\n{'='*60}")
print(f"任务 {i}:{task.strip()[:50]}...")
print("=" * 60)
asyncio.run(run_agent(task))
# agent.py
import asyncio
import json
import os
from contextlib import AsyncExitStack
import anthropic
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
client = anthropic.Anthropic(
api_key=os.environ["ANTHROPIC_API_KEY"],
base_url=os.environ.get("ANTHROPIC_BASE_URL", "https://gw.claudeapi.com"),
)
# 定义三个 MCP Server 启动参数
SERVERS = {
"file": StdioServerParameters(
command="python", args=["mcp_file_server.py"]
),
"db": StdioServerParameters(
command="python", args=["mcp_db_server.py"]
),
"api": StdioServerParameters(
command="python", args=["mcp_api_server.py"]
),
}
async def run_agent(user_message: str):
async with AsyncExitStack() as stack:
sessions: dict[str, ClientSession] = {}
all_tools = []
# 并行连接所有 MCP Server,收集工具列表
for server_name, params in SERVERS.items():
read, write = await stack.enter_async_context(stdio_client(params))
session = await stack.enter_async_context(ClientSession(read, write))
await session.initialize()
sessions[server_name] = session
tools_resp = await session.list_tools()
for tool in tools_resp.tools:
all_tools.append({
"name": f"{server_name}__{tool.name}",
"description": f"[{server_name}] {tool.description}",
"input_schema": tool.inputSchema,
})
print(f"\n✅ 已加载 {len(all_tools)} 个工具:{[t['name'] for t in all_tools]}\n")
# 开始 Agent 循环
messages = [{"role": "user", "content": user_message}]
system = (
"你是一个拥有文件系统、数据库、HTTP API 操作能力的智能体。"
"工具名称格式为 server__tool_name,server 前缀表示所属系统。"
"请根据用户需求自主选择并组合使用工具,任务完成后给出清晰总结。"
)
while True:
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=4096,
system=system,
tools=all_tools,
messages=messages,
)
print(f"[Claude] stop_reason: {response.stop_reason}")
# 解析响应,处理文本和工具调用
tool_uses = []
for block in response.content:
if block.type == "text":
print(f"\n💬 Claude:{block.text}")
elif block.type == "tool_use":
tool_uses.append(block)
if response.stop_reason == "end_turn" or not tool_uses:
break
# 执行所有工具调用
messages.append({"role": "assistant", "content": response.content})
tool_results = []
for tool_use in tool_uses:
server_name, tool_name = tool_use.name.split("__", 1)
print(f"\n🔧 调用工具:{tool_use.name}")
print(f" 参数:{json.dumps(tool_use.input, ensure_ascii=False)}")
session = sessions[server_name]
result = await session.call_tool(tool_name, tool_use.input)
result_text = result.content[0].text if result.content else "(无返回)"
print(f" 结果:{result_text[:200]}{'...' if len(result_text) > 200 else ''}")
tool_results.append({
"type": "tool_result",
"tool_use_id": tool_use.id,
"content": result_text,
})
messages.append({"role": "user", "content": tool_results})
# 演示任务
DEMO_TASKS = [
# 文件 + 数据库组合任务
"""
请完成以下任务:
1. 在数据库中创建 users 表(id INTEGER PRIMARY KEY, name TEXT, email TEXT, created_at TEXT)
2. 插入三条测试数据
3. 查询所有用户并将结果保存到工作区的 users_report.txt 文件
4. 读取并确认文件内容正确
""",
# HTTP API + 文件任务
"""
调用 https://api.github.com/repos/anthropics/anthropic-sdk-python/releases/latest
获取最新版本信息,提取版本号和发布时间,
写入工作区的 sdk_version.md 文件(Markdown 格式)。
""",
]
if __name__ == "__main__":
for i, task in enumerate(DEMO_TASKS, 1):
print(f"\n{'='*60}")
print(f"任务 {i}:{task.strip()[:50]}...")
print("=" * 60)
asyncio.run(run_agent(task))
运行效果示例
✅ 已加载 7 个工具:[
'file__read_file', 'file__write_file', 'file__list_directory',
'db__execute_query', 'db__execute_write', 'db__list_tables',
'api__http_get', 'api__http_post'
]
[Claude] stop_reason: tool_use
🔧 调用工具:db__execute_write
参数:{"sql": "CREATE TABLE IF NOT EXISTS users ..."}
结果:✅ 执行成功,影响 0 行
🔧 调用工具:db__execute_write
参数:{"sql": "INSERT INTO users VALUES ...", "params": [...]}
结果:✅ 执行成功,影响 3 行
🔧 调用工具:db__execute_query
参数:{"sql": "SELECT * FROM users"}
结果:[{"id": 1, "name": "张三", ...}, ...]
🔧 调用工具:file__write_file
参数:{"path": "users_report.txt", "content": "..."}
结果:✅ 已写入:users_report.txt
💬 Claude:已完成所有任务。数据库中成功创建 users 表并插入 3 条记录,
查询结果已保存至 workspace/users_report.txt,文件内容经读取确认无误。
✅ 已加载 7 个工具:[
'file__read_file', 'file__write_file', 'file__list_directory',
'db__execute_query', 'db__execute_write', 'db__list_tables',
'api__http_get', 'api__http_post'
]
[Claude] stop_reason: tool_use
🔧 调用工具:db__execute_write
参数:{"sql": "CREATE TABLE IF NOT EXISTS users ..."}
结果:✅ 执行成功,影响 0 行
🔧 调用工具:db__execute_write
参数:{"sql": "INSERT INTO users VALUES ...", "params": [...]}
结果:✅ 执行成功,影响 3 行
🔧 调用工具:db__execute_query
参数:{"sql": "SELECT * FROM users"}
结果:[{"id": 1, "name": "张三", ...}, ...]
🔧 调用工具:file__write_file
参数:{"path": "users_report.txt", "content": "..."}
结果:✅ 已写入:users_report.txt
💬 Claude:已完成所有任务。数据库中成功创建 users 表并插入 3 条记录,
查询结果已保存至 workspace/users_report.txt,文件内容经读取确认无误。
生产环境最佳实践
1. 工具权限分层
# 在 Server 中加入权限控制
READONLY_TOOLS = {"read_file", "list_directory", "execute_query"}
WRITE_TOOLS = {"write_file", "execute_write"}
# 生产环境可按需暴露不同工具集
# 在 Server 中加入权限控制
READONLY_TOOLS = {"read_file", "list_directory", "execute_query"}
WRITE_TOOLS = {"write_file", "execute_write"}
# 生产环境可按需暴露不同工具集
2. 使用 xhigh 努力级别处理复杂任务
response = client.messages.create(
model="claude-opus-4-7", # 复杂多步任务用 Opus 4.7
max_tokens=8192,
thinking={"type": "enabled", "budget_tokens": 4000},
...
)
response = client.messages.create(
model="claude-opus-4-7", # 复杂多步任务用 Opus 4.7
max_tokens=8192,
thinking={"type": "enabled", "budget_tokens": 4000},
...
)
3. 错误重试机制
import tenacity
@tenacity.retry(
wait=tenacity.wait_exponential(min=1, max=10),
stop=tenacity.stop_after_attempt(3),
)
async def call_tool_with_retry(session, tool_name, inputs):
return await session.call_tool(tool_name, inputs)
import tenacity
@tenacity.retry(
wait=tenacity.wait_exponential(min=1, max=10),
stop=tenacity.stop_after_attempt(3),
)
async def call_tool_with_retry(session, tool_name, inputs):
return await session.call_tool(tool_name, inputs)
4. MCP Server 注册表(适合多人协作)
# servers_registry.py
REGISTRY = {
"file": {"command": "python", "args": ["mcp_file_server.py"]},
"db": {"command": "python", "args": ["mcp_db_server.py"]},
"api": {"command": "python", "args": ["mcp_api_server.py"]},
"postgres": {"command": "npx", "args": ["@modelcontextprotocol/server-postgres"]},
"github": {"command": "npx", "args": ["@modelcontextprotocol/server-github"]},
}
# servers_registry.py
REGISTRY = {
"file": {"command": "python", "args": ["mcp_file_server.py"]},
"db": {"command": "python", "args": ["mcp_db_server.py"]},
"api": {"command": "python", "args": ["mcp_api_server.py"]},
"postgres": {"command": "npx", "args": ["@modelcontextprotocol/server-postgres"]},
"github": {"command": "npx", "args": ["@modelcontextprotocol/server-github"]},
}
常见问题
Q:MCP 和普通 function calling 有什么区别?
function calling 需要你在每次请求时手动传入工具定义,MCP 将工具定义维护在独立 Server 中,Claude 自动发现并调用,更适合工具数量多、复用场景多的 Agent。
Q:MCP Server 崩溃了怎么办?
在 Agent 循环外包一层重连逻辑,或使用 supervisor 进程管理(如 pm2、supervisord)保持 Server 常驻。
Q:如何限制 Claude 只能访问特定目录 / 表?
在 MCP Server 内做路径白名单或表名过滤,Claude 感知不到底层限制,只会使用 Server 暴露的能力范围。
总结
| 组件 | 作用 |
|---|---|
mcp_file_server.py |
文件读写 / 目录浏览 |
mcp_db_server.py |
SQLite 查询 / 写入 |
mcp_api_server.py |
任意 HTTP REST 调用 |
agent.py |
统一接入,Claude 自主规划工具调用链 |
MCP 让 Agent 从「聊天机器人」变成「能干活的工程师」。 结合 ClaudeAPI 的稳定接入和 Opus 4.7 的自主任务能力, 你可以将这套架构直接用于生产级的自动化工作流。
开始构建
支持支付宝 / 微信充值,官方 8 折,国内直连。



