MCP07-logging和progress等功能说明

B站影视 内地电影 2025-09-22 12:03 1

摘要:截至目前(2025年9月19日),除了基础的 Prompt、Resource 和 Tool 概念,FastMCP 还提供了以下功能:Sampling、Elicitation、Roots、Logging、Progress、Proxy、Middleware、Com

截至目前(2025年9月19日),除了基础的 Prompt、Resource 和 Tool 概念,FastMCP 还提供了以下功能:Sampling、Elicitation、Roots、Logging、Progress、Proxy、Middleware、Composition 和 Authentication 等功能

Sampling,采样,在server端调用client的llm,实现功能解耦Elicition,征询,实现人工介入roots,Client告知Server可访问的资源Logging,将Server日志发送给ClientProgress,Server端将进度发送给ClientProxy,代理其它MCP ServerMiddleware,拦截MCP通信中的请求和响应Composition,Server端将多个servers组合成一个Server对外提供Authentication,Client和Server之间安全认证 #技术分享

其中 Sampling 和 Elicitation 在我的实际开发中用到的比较多,所以我在前面章节中单独拎出来介绍了。FastMCP 官方文档也说了 Authentication 还在迅速迭代中,虽然已经有了相关文档,但本文暂时就不涉及了,等这个功能稳定了再具体细说。剩下的功能会在本文中一次性全部介绍完,篇幅较长,可以根据章节名跳转到自己需要关注的内容。本文大部分参考自官方文档

Roots 是客户端向服务器告知其可访问资源的一种机制。服务器可利用此信息调整行为或提供更相关的响应。

from fastmcp import Clientclient = Client( "my_mcp_server.py", roots=["/path/to/root1", "/path/to/root2"] )from fastmcp import Clientfrom fastmcp.client.roots import RequestContextasync def roots_callback(context: RequestContext) -> list[str]: print(f"Server requested roots (Request ID: {context.request_id})") return ["/path/to/root1", "/path/to/root2"]client = Client( "my_mcp_server.py", roots=roots_callback )

Logging,从服务器向 MCP 客户端发送消息。FastMCP 提供了一个 logger( fastmcp.utilities.logging.get_logger ),也可以用 python 标准库的 logging 。

服务器日志功能允许 MCP 工具向客户端发送调试(debug)、信息(info)、警告(warning)和错误(error)级别的消息。这有助于用户了解函数执行过程,在开发和运行阶段辅助调试。一般用于以下场景:

调试 :发送详细的执行信息,帮助诊断问题进度可见性 :让用户了解工具当前正在执行的操作错误报告 :向客户端传达问题及其上下文审计追踪 :为合规或分析目的生成工具执行记录

与标准 Python 日志不同,MCP 服务器 Logging 会直接将消息发送至客户端,使其在客户端界面或日志中可见。

在任意 tool 函数中使用 Context 提供的日志方法:

from fastmcp import FastMCP, Contextmcp = FastMCP("custom")@mcp.tool async def analyze_data(data: list[float], ctx: Context) -> dict: """通过全面日志记录分析数值数据。""" await ctx.debug("开始分析数值数据") await ctx.info(f"正在分析 {len(data)} 个数据点") try: if not data: await ctx.warning("提供了空数据列表") return {"error": "空数据列表"} result = sum(data) / len(data) await ctx.info(f"分析完成,平均值为:{result}") return {"average": result, "count": len(data)} except Exception as e: await ctx.error(f"分析失败:{str(e)}") raiseif __name__ == "__main__": mcp.run(transport="stdio", show_banner=False)

所有日志方法( debug 、info 、warning 、error 、log )现在均支持 extra 参数,该参数接受一个字典,用于传递任意结构化数据。这使得客户端可接收结构化日志,便于创建丰富且可查询的日志记录。

@mcp.toolasync def process_transaction(transaction_id: str, amount: float, ctx: Context): await ctx.info( f"正在处理交易 {transaction_id}", extra={ "transaction_id": transaction_id, "amount": amount, "currency": "USD" } )import asynciofrom pathlib import Pathfrom fastmcp.client import Client, StdioTransportfrom fastmcp.client.logging import LogMessageimport loggingimport syslogger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) logger.addHandler(logging.StreamHandler(sys.stdout))LOGGING_LEVEL_MAP = logging.getLevelNamesMappingclass MCPClient: def __init__(self): self.mcp_client = Client( StdioTransport( command = str(Path(__file__).parent.parent / ".venv" / "bin" / "python"), args = ["demo09-server.py"], cwd = str(Path(__file__).parent) ), log_handler=self.logging_handler, )async def logging_handler(self, message: LogMessage): """ Handles incoming logs from the MCP server and forwards them to the standard Python logging system. """ msg = message.data.get('msg') extra = message.data.get('extra')level = LOGGING_LEVEL_MAP.get(message.level.upper, logging.INFO) logger.log(level, msg, extra=extra)async def generate(self): async with self.mcp_client: await self.mcp_client.pingrst = await self.mcp_client.call_tool("analyze_data", arguments={"data": [1.0, 2.0, 3.0, 4.0, 5.0]}) print(rst)async def main: client = MCPClient await client.generateif __name__ == "__main__": asyncio.run(main)

client 运行输出

开始分析数值数据正在分析 5 个数据点分析完成,平均值为:3.0CallToolResult(content=[TextContent(type='text', text='{"average":3.0,"count":5}', annotations=None, meta=None)], structured_content={'average': 3.0, 'count': 5}, data={'average': 3.0, 'count': 5}, is_error=False)

Progress 功能允许 MCP tool 向 Client 通知长时间运行操作的当前进度。这使得 Client 能够显示进度指示器,从而在执行耗时任务时提供更佳的用户体验。Progress 在以下方面具有重要价值:

用户体验 :让用户了解长时间运行操作的当前状态进度指示器 :使客户端能够显示进度条或百分比防止超时 :表明操作正在持续进行中,避免被误判为无响应调试用途 :追踪执行进度,便于性能分析from fastmcp import FastMCP, Contextimport asynciomcp = FastMCP("custom")@mcp.tool async def process_items(items: list[str], ctx: Context) -> dict: """处理项目列表,并发送进度更新。""" total = len(items) results = for i, item in enumerate(items): await ctx.report_progress(progress=i, total=total) await asyncio.sleep(0.1) results.append(item.upper) await ctx.report_progress(progress=total, total=total) return {"processed": len(results), "results": results}if __name__ == "__main__": mcp.run(transport="stdio", show_banner=False)import asynciofrom pathlib import Pathfrom fastmcp.client import Client, StdioTransportclass MCPClient: def __init__(self): self.mcp_client = Client( StdioTransport( command = str(Path(__file__).parent.parent / ".venv" / "bin" / "python"), args = ["demo09-server.py"], cwd = str(Path(__file__).parent) ), progress_handler=self.progress_handler, )async def progress_handler(self, progress: float, total: float | None, message: str | None) -> None: if total is not None: percentage = (progress / total) * 100 print(f"Progress: {percentage:.1f}% - {message or ''}") else: print(f"Progress: {progress} - {message or ''}")async def generate(self): async with self.mcp_client: await self.mcp_client.pingrst = await self.mcp_client.call_tool("process_items", arguments={"items": ["item1", "item2", "item3", "item4", "item5", "item6", "item7", "item8", "item9", "item10", "item11", "item12", "item13", "item14", "item15"]}) print(rst)async def main: client = MCPClient await client.generateif __name__ == "__main__": asyncio.run(main)

client 运行输出

Progress: 0.0% -Progress: 13.3% -Progress: 26.7% -Progress: 40.0% -Progress: 53.3% -Progress: 66.7% -Progress: 80.0% -Progress: 93.3% -CallToolResult(content=[TextContent(type='text', text='{"processed":15,"results":["ITEM1","ITEM2","ITEM3","ITEM4","ITEM5","ITEM6","ITEM7","ITEM8","ITEM9","ITEM10","ITEM11","ITEM12","ITEM13","ITEM14","ITEM15"]}', annotations=None, meta=None)], structured_content={'processed': 15, 'results': ['ITEM1', 'ITEM2', 'ITEM3', 'ITEM4', 'ITEM5', 'ITEM6', 'ITEM7', 'ITEM8', 'ITEM9', 'ITEM10', 'ITEM11', 'ITEM12', 'ITEM13', 'ITEM14', 'ITEM15']}, data={'processed': 15, 'results': ['ITEM1', 'ITEM2', 'ITEM3', 'ITEM4', 'ITEM5', 'ITEM6', 'ITEM7', 'ITEM8', 'ITEM9', 'ITEM10', 'ITEM11', 'ITEM12', 'ITEM13', 'ITEM14', 'ITEM15']}, is_error=False)

FastMCP 的 Proxy 允许一个 FastMCP 服务器实例作为前端,代理另一个 MCP 服务器(该服务器可能是远程的、运行在不同传输协议上的,甚至是另一个 FastMCP 实例)。此功能通过 FastMCP.as_proxy 类方法实现。作为代理服务器,它本身不直接实现工具或资源。当它接收到请求(如 tools/call 或 resources/read )时,会将该请求转发至一个_后端_ MCP 服务器,接收其响应,再将响应原样返回给原始客户端。

sequenceDiagram participant ClientApp as 您的客户端(如 Claude Desktop) participant FastMCPProxy as FastMCP 代理服务器 participant BackendServer as 后端 MCP 服务器(如远程 SSE)ClientApp->>FastMCPProxy: MCP 请求(如 stdio) Note over FastMCPProxy, BackendServer: 代理转发请求 FastMCPProxy->>BackendServer: MCP 请求(如 sse) BackendServer-->>FastMCPProxy: MCP 响应(如 sse) Note over ClientApp, FastMCPProxy: 代理转发响应 FastMCPProxy-->>ClientApp: MCP 响应(如 stdio)会话隔离 :每个请求拥有独立隔离的会话,确保并发操作安全传输协议桥接 :通过一种传输协议暴露运行在另一种传输协议上的服务器高级 MCP 功能支持 :自动转发采样(sampling)、引导(elicitation)、日志和进度报告安全性 :作为后端服务器的受控网关简化架构 :即使后端位置或传输协议变更,前端仍保持单一接入点

使用代理服务器时,特别是连接到基于 HTTP 的后端服务器时,需注意延迟可能显著增加。例如, list_tools 操作可能耗时数百毫秒,而本地工具仅需 1–2 毫秒。挂载代理服务器时,此延迟会影响父服务器的所有操作,而不仅仅是与被代理工具的交互。

如果您的使用场景对低延迟有严格要求,建议使用 import_server 方法在启动时复制工具,而非在运行时进行代理。

推荐使用 ProxyClient 创建代理,它提供完整的 MCP 功能支持,并自动实现会话隔离:

from fastmcp import FastMCPfrom fastmcp.server.proxy import ProxyClientproxy = FastMCP.as_proxy( ProxyClient("backend_server.py"), name="MyProxy" )if __name__ == "__main__": proxy.run

此单一设置即可提供:

安全的并发请求处理自动转发高级 MCP 功能(采样、引导等)会话隔离,防止上下文混淆与所有 MCP 客户端完全兼容

ProxyClient 会自动在后端服务器与连接到代理的客户端之间转发高级 MCP 协议功能,确保完整的 MCP 兼容性。支持的功能:

Roots :将文件系统根目录访问请求转发给客户端Sampling :将后端发起的 LLM 补全请求转发给客户端Elicitation :将用户输入请求转发给客户端Logging :将后端日志消息转发至客户端Progress :在长时间操作中转发进度通知

也可以自定义功能支持,比如设置为 None 来选择性禁用转发

backend = ProxyClient( "backend_server.py", sampling_handler=None, log_handler=None)

你可以直接从符合 MCPConfig 模式的配置字典创建代理。这对于快速设置指向远程服务器的代理非常有用,无需手动配置每个连接细节。

from fastmcp import FastMCPconfig = { "mcpServers": { "default": { "url": "https://example.com/mcp ", "transport": "http" } } }proxy = FastMCP.as_proxy(config, name="Config-Based Proxy")if __name__ == "__main__": proxy.run多服务器的设置

你可以通过在配置中指定多个条目来创建指向多个服务器的代理。系统会自动以配置名称作为前缀挂载它们:

config = { "mcpServers": { "weather": { "url": "https://weather-api.example.com/mcp ", "transport": "http" }, "calendar": { "url": "https://calendar-api.example.com/mcp ", "transport": "http" } }}composite_proxy = FastMCP.as_proxy(config, name="Composite Proxy")显式会话管理

在内部,FastMCP.as_proxy 使用 FastMCPProxy 类。您通常无需直接与此类交互,但在高级场景下它可供使用。FastMCPProxy 要求显式会话管理——不会执行任何自动检测。您必须选择您的会话策略:

shared_client = ProxyClient("backend_server.py")def shared_session_factory: return shared_clientproxy = FastMCPProxy(client_factory=shared_session_factory)def fresh_session_factory: return ProxyClient("backend_server.py")proxy = FastMCPProxy(client_factory=fresh_session_factory)

如需自动选择会话策略,请使用便捷方法 FastMCP.as_proxy 。

def custom_client_factory: client = ProxyClient("backend_server.py") return clientproxy = FastMCPProxy(client_factory=custom_client_factory)

MCP 中间件允许您在请求和响应流经服务器时对其进行拦截和修改。可以将其视为一条管道,每个中间件均可检查当前操作、进行修改,然后将控制权传递给链中的下一个中间件。与传统的 Web 中间件不同,MCP 中间件专为 Model Context Protocol 设计,为各类 MCP 操作(如工具调用、资源读取和提示请求)提供专用钩子。

MCP 中间件是一个全新概念,未来版本中可能发生破坏性变更。

MCP 中间件的常见应用场景包括:

身份验证与授权 :在执行操作前验证客户端权限日志与监控 :追踪使用模式与性能指标速率限制 :按客户端或操作类型控制请求频率请求/响应转换 :在数据到达工具前或离开后对其进行修改缓存 :存储频繁请求的数据以提升性能错误处理 :为服务器提供一致的错误响应

FastMCP 中间件基于管道模型运行。当请求进入时,它会按添加到服务器的顺序依次流经各个中间件。每个中间件均可:

检查传入的请求及其上下文在传递给下一个中间件或处理器前修改请求通过调用 call_next 执行链中的下一个中间件/处理器在返回前检查并修改响应处理执行过程中发生的错误

关键在于,中间件形成一条链,每个环节决定是继续处理还是完全终止链的执行。

如果你熟悉 ASGI 中间件,FastMCP 中间件的基本结构会感觉似曾相识。其核心是一个可调用类,接收一个包含当前 JSON-RPC 消息信息的上下文对象,以及一个用于继续中间件链的处理器函数。

重要的是要理解,MCP 基于 JSON-RPC 规范 运行。虽然 FastMCP 以熟悉的方式呈现请求和响应,但其本质是 JSON-RPC 消息,而非 Web 应用中常见的 HTTP 请求/响应对。FastMCP 中间件适用于所有 传输类型 ,包括本地 stdio 传输和 HTTP 传输,但并非所有中间件实现都兼容所有传输类型(例如,检查 HTTP 头部的中间件无法在 stdio 传输中工作)。

实现中间件最基础的方式是重写 Middleware 基类的 call 方法:

from fastmcp.server.middleware import Middleware, MiddlewareContextclass RawMiddleware(Middleware): async def __call__(self, context: MiddlewareContext, call_next): print(f"原始中间件正在处理:{context.method}") result = await call_next(context) print(f"原始中间件处理完成:{context.method}") return result中间件钩子

为便于用户针对特定类型的消息,FastMCP 中间件提供了一系列专用钩子。您可以重写特定的钩子方法(而非实现原始的 __call__ 方法),这些方法仅在特定类型的操作时被调用,从而允许您精确地定位中间件逻辑所需的粒度。

FastMCP 提供多个按不同粒度调用的钩子。理解此层级结构对有效设计中间件至关重要。

当请求进入时,同一请求可能触发多个钩子调用 ,执行顺序由泛化到具体:

on_message - 为所有 MCP 消息(请求和通知)调用on_request 或 on_notification - 根据消息类型调用操作特定钩子 - 为特定 MCP 操作调用,如 on_call_tool

例如,当客户端调用工具时,您的中间件将收到 多次钩子调用

on_message 和 on_request 用于任何初始工具发现操作(如 list_tools)on_message (因为它是任何 MCP 消息)用于工具调用本身on_request (因为工具调用期望响应)用于工具调用本身on_call_tool (因为它是具体的工具执行)用于工具调用本身

请注意,MCP SDK 可能会执行额外操作(如为缓存目的列出工具),这将触发超出直接工具执行范围的额外中间件调用。

此层级结构允许您以适当的粒度定位中间件逻辑。对广泛关注点(如日志)使用 on_message ,对身份验证使用 on_request ,对工具特定逻辑(如性能监控)使用 on_call_tool 。

可用钩子

理解如何在中间件中访问组件信息(工具、资源、提示)对构建强大的中间件功能至关重要。访问模式在列出操作与执行操作之间存在显著差异。

FastMCP 中间件以不同方式处理两种类型的操作:

列出操作 ( on_list_tools , on_list_resources , on_list_prompts 等):

中间件接收 FastMCP 组件对象 ,包含完整元数据这些对象包含 FastMCP 特有属性(如 tags ),可直接从组件访问结果在转换为 MCP 格式前包含完整组件信息标签包含在返回给 MCP 客户端的组件 meta 字段中

执行操作 ( on_call_tool , on_read_resource , on_get_prompt ):

中间件在 组件执行前 运行中间件结果为执行结果,或组件未找到时的错误组件元数据在钩子参数中不可直接访问

如果需要在执行操作期间检查组件属性(如标签),请使用通过上下文获取的 FastMCP 服务器实例:

from fastmcp.server.middleware import Middleware, MiddlewareContextfrom fastmcp.exceptions import ToolErrorclass TagBasedMiddleware(Middleware): async def on_call_tool(self, context: MiddlewareContext, call_next): if context.fastmcp_context: try: tool = await context.fastmcp_context.fastmcp.get_tool(context.message.name) if "private" in tool.tags: raise ToolError("访问被拒绝:私有工具") if not tool.enabled: raise ToolError("工具当前已禁用") except Exception: pass return await call_next(context)

相同模式适用于资源和提示:

from fastmcp.server.middleware import Middleware, MiddlewareContextfrom fastmcp.exceptions import ResourceError, PromptErrorclass ComponentAccessMiddleware(Middleware): async def on_read_resource(self, context: MiddlewareContext, call_next): if context.fastmcp_context: try: resource = await context.fastmcp_context.fastmcp.get_resource(context.message.uri) if "restricted" in resource.tags: raise ResourceError("访问被拒绝:受限资源") except Exception: pass return await call_next(context) async def on_get_prompt(self, context: MiddlewareContext, call_next): if context.fastmcp_context: try: prompt = await context.fastmcp_context.fastmcp.get_prompt(context.message.name) if not prompt.enabled: raise PromptError("提示当前已禁用") except Exception: pass return await call_next(context)处理列出结果

对于列出操作,中间件 call_next 函数在组件转换为 MCP 格式前返回 FastMCP 组件列表。您可以过滤或修改此列表并将其返回给客户端。例如:

from fastmcp.server.middleware import Middleware, MiddlewareContextclass ListingFilterMiddleware(Middleware): async def on_list_tools(self, context: MiddlewareContext, call_next): result = await call_next(context) filtered_tools = [ tool for tool in result if "private" not in tool.tags ] return filtered_tools

此过滤在组件转换为 MCP 格式并返回给客户端前进行。标签在过滤期间可访问,并包含在最终列出响应的组件 meta 字段中。

在列出操作中过滤组件时,请确保也在相应的执行钩子( on_call_tool 、 on_read_resource 、 on_get_prompt )中阻止已过滤组件的执行,以保持一致性。

工具调用拒绝

您可以通过在中间件中抛出 ToolError 来拒绝访问特定工具。这是阻止工具执行的正确方式,因为它与 FastMCP 错误处理系统正确集成

from fastmcp.server.middleware import Middleware, MiddlewareContextfrom fastmcp.exceptions import ToolErrorclass AuthMiddleware(Middleware): async def on_call_tool(self, context: MiddlewareContext, call_next): tool_name = context.message.name if tool_name.lower in ["delete", "admin_config"]: raise ToolError("访问被拒绝:工具需要管理员权限") return await call_next(context)

拒绝工具调用时,务必抛出 ToolError ,而非返回 ToolResult 对象或其他值。 ToolError 确保错误通过中间件链正确传播,并转换为正确的 MCP 错误响应格式。

工具调用修改

对于工具调用等执行操作,您可以在执行前修改参数,或在执行后转换结果:

from fastmcp.server.middleware import Middleware, MiddlewareContextclass ToolCallMiddleware(Middleware): async def on_call_tool(self, context: MiddlewareContext, call_next): if context.message.name == "calculate": if context.message.arguments.get("value", 0)

对于更复杂的工具重写场景,请考虑使用 工具转换 模式,它为创建修改后的工具变体提供了更结构化的方法。

钩子剖析

每个中间件钩子遵循相同的模式。让我们通过 on_message 钩子来理解其结构:

async def on_message(self, context: MiddlewareContext, call_next): print(f"正在处理 {context.method}") result = await call_next(context) print(f"已完成 {context.method}") return result

每个钩子接收两个参数:

context: MiddlewareContext - 包含当前请求信息:context.method - MCP 方法名称(如 "tools/call")context.source - 请求来源("client" 或 "server")context.type - 消息类型("request" 或 "notification")context.message - MCP 消息数据context.timestamp - 请求接收时间context.fastmcp_context - FastMCP Context 对象(如可用)call_next - 用于继续中间件链的函数。除非您希望完全停止处理,否则 必须 调用此函数。

开发者对请求流拥有完全控制权:

继续处理 :调用 await call_next(context) 以继续修改请求 :在调用 call_next 前更改上下文修改响应 :在调用 call_next 后更改结果停止链 :不调用 call_next (极少需要)处理错误 :在 try/catch 块中包装 call_next

除了修改请求和响应,您还可以存储状态数据,供工具(可选)稍后访问。为此,请使用 FastMCP Context 适当调用 set_state 或 get_state 。

FastMCP 中间件通过继承 Middleware 基类并重写所需钩子来实现。

from fastmcp import FastMCPfrom fastmcp.server.middleware import Middleware, MiddlewareContextclass LoggingMiddleware(Middleware): """记录所有 MCP 操作的中间件。""" async def on_message(self, context: MiddlewareContext, call_next): """为所有 MCP 消息调用。""" print(f"正在处理来自 {context.source} 的 {context.method}") result = await call_next(context) print(f"{context.method} 处理完成") return resultmcp = FastMCP("MyServer") mcp.add_middleware(LoggingMiddleware)

中间件按添加到服务器的顺序执行。最先添加的中间件在进入时最先运行,在退出时最后运行:

mcp = FastMCP("MyServer")mcp.add_middleware(AuthenticationMiddleware("secret-token")) mcp.add_middleware(PerformanceMiddleware) mcp.add_middleware(LoggingMiddleware)

这将创建以下执行流:

AuthenticationMiddleware(预处理)PerformanceMiddleware(预处理)LoggingMiddleware(预处理)实际工具/资源处理器LoggingMiddleware(后处理)PerformanceMiddleware(后处理)AuthenticationMiddleware(后处理)

当使用 服务器组合(下面提的 Composition) (如 mount 或 import_server )时,中间件行为遵循以下规则:

父服务器中间件 为所有请求运行,包括路由到挂载服务器的请求挂载服务器中间件 仅为由该特定服务器处理的请求运行中间件顺序 在每个服务器内保持不变parent = FastMCP("Parent")parent.add_middleware(AuthenticationMiddleware("token"))child = FastMCP("Child") child.add_middleware(LoggingMiddleware)@child.tool def child_tool -> str: return "from child"parent.mount(child, prefix="child")

当客户端调用 "child_tool" 时,请求将首先流经父服务器的身份验证中间件,然后路由到子服务器,在子服务器中再经过其日志中间件。

内置中间件

FastMCP 包含多个中间件实现,展示了最佳实践并提供立即可用的功能。让我们通过构建简化版本来探索每种类型的工作原理,然后了解如何使用完整实现。

性能监控对于理解服务器行为和识别瓶颈至关重要。FastMCP 在 fastmcp.server.middleware.timing 中包含计时中间件。

以下是其工作方式的示例:

import timefrom fastmcp.server.middleware import Middleware, MiddlewareContextclass SimpleTimingMiddleware(Middleware): async def on_request(self, context: MiddlewareContext, call_next): start_time = time.perf_counter try: result = await call_next(context) duration_ms = (time.perf_counter - start_time) * 1000 print(f"请求 {context.method} 在 {duration_ms:.2f}ms 内完成") return result except Exception as e: duration_ms = (time.perf_counter - start_time) * 1000 print(f"请求 {context.method} 在 {duration_ms:.2f}ms 后失败:{e}") raise

要使用具有正确日志和配置的完整版本:

from fastmcp.server.middleware.timing import ( TimingMiddleware, DetailedTimingMiddleware)mcp.add_middleware(TimingMiddleware)mcp.add_middleware(DetailedTimingMiddleware)

内置版本包括自定义日志支持、正确格式化,且 DetailedTimingMiddleware 提供 on_call_tool 和 on_read_resource 等操作特定钩子,以实现精细计时。

请求和响应日志记录对于调试、监控和理解 MCP 服务器中的使用模式至关重要。FastMCP 在 fastmcp.server.middleware.logging 中提供全面的日志中间件。

以下是其工作方式的示例:

from fastmcp.server.middleware import Middleware, MiddlewareContextclass SimpleLoggingMiddleware(Middleware): async def on_message(self, context: MiddlewareContext, call_next): print(f"正在处理来自 {context.source} 的 {context.method}") try: result = await call_next(context) print(f"{context.method} 处理完成") return result except Exception as e: print(f"{context.method} 失败:{e}") raise

要使用具有高级功能的完整版本:

from fastmcp.server.middleware.logging import ( LoggingMiddleware, StructuredLoggingMiddleware)mcp.add_middleware(LoggingMiddleware( include_payloads=True, max_payload_length=1000 ))mcp.add_middleware(StructuredLoggingMiddleware(include_payloads=True))

内置版本包括负载日志、结构化 JSON 输出、自定义日志支持、负载大小限制以及用于精细控制的操作特定钩子。

速率限制中间件

速率限制对于保护服务器免受滥用、确保公平资源使用以及在负载下保持性能至关重要。FastMCP 在 fastmcp.server.middleware.rate_limiting 中包含复杂的速率限制中间件。

以下是其工作方式的示例:

import timefrom collections import defaultdictfrom fastmcp.server.middleware import Middleware, MiddlewareContextfrom mcp import McpErrorfrom mcp.types import ErrorDataclass SimpleRateLimitMiddleware(Middleware): def __init__(self, requests_per_minute: int = 60): self.requests_per_minute = requests_per_minute self.client_requests = defaultdict(list) async def on_request(self, context: MiddlewareContext, call_next): current_time = time.time client_id = "default" cutoff_time = current_time - 60 self.client_requests[client_id] = [ req_time for req_time in self.client_requests[client_id] if req_time > cutoff_time ] if len(self.client_requests[client_id]) >= self.requests_per_minute: raise McpError(ErrorData(code=-32000, message="超出速率限制")) self.client_requests[client_id].append(current_time) return await call_next(context)

要使用具有高级算法的完整版本:

from fastmcp.server.middleware.rate_limiting import ( RateLimitingMiddleware, SlidingWindowRateLimitingMiddleware)mcp.add_middleware(RateLimitingMiddleware( max_requests_per_second=10.0, burst_capacity=20 ))mcp.add_middleware(SlidingWindowRateLimitingMiddleware( max_requests=100, window_minutes=1 ))

内置版本包括令牌桶算法、按客户端识别、全局速率限制以及具有可配置客户端识别功能的异步安全实现。

错误处理中间件

一致的错误处理和恢复对于健壮的 MCP 服务器至关重要。FastMCP 在 fastmcp.server.middleware.error_handling 中提供全面的错误处理中间件。

以下是其工作方式的示例:

import loggingfrom fastmcp.server.middleware import Middleware, MiddlewareContextclass SimpleErrorHandlingMiddleware(Middleware): def __init__(self): self.logger = logging.getLogger("errors") self.error_counts = {} async def on_message(self, context: MiddlewareContext, call_next): try: return await call_next(context) except Exception as error: error_key = f"{type(error).__name__}:{context.method}" self.error_counts[error_key] = self.error_counts.get(error_key, 0) + 1 self.logger.error(f"{context.method} 中发生错误:{type(error).__name__}: {error}") raise

要使用具有高级功能的完整版本:

from fastmcp.server.middleware.error_handling import ( ErrorHandlingMiddleware, RetryMiddleware)mcp.add_middleware(ErrorHandlingMiddleware( include_traceback=True, transform_errors=True, error_callback=my_error_callback ))mcp.add_middleware(RetryMiddleware( max_retries=3, retry_exceptions=(ConnectionError, TimeoutError) ))

内置版本包括错误转换、自定义回调、可配置的重试逻辑以及正确的 MCP 错误格式化。

from fastmcp import FastMCPfrom fastmcp.server.middleware.timing import TimingMiddlewarefrom fastmcp.server.middleware.logging import LoggingMiddlewarefrom fastmcp.server.middleware.rate_limiting import RateLimitingMiddlewarefrom fastmcp.server.middleware.error_handling import ErrorHandlingMiddlewaremcp = FastMCP("Production Server")mcp.add_middleware(ErrorHandlingMiddleware) mcp.add_middleware(RateLimitingMiddleware(max_requests_per_second=50)) mcp.add_middleware(TimingMiddleware) mcp.add_middleware(LoggingMiddleware)@mcp.tool def my_tool(data: str) -> str: return f"已处理:{data}"

随着 MCP 应用规模扩大,你可能希望将工具、资源和提示按逻辑模块组织,或复用现有的服务器组件。FastMCP 通过两种方法支持服务器组合:

import_server :一次性复制组件并添加前缀(静态组合)。mount :创建实时链接,主服务器在运行时将请求委托给子服务器(动态组合)。导入vs挂载

选择导入还是挂载取决于您的具体用例和需求。

| 特性 | 导入 | 挂载 | | ---

| 方法 | FastMCP.import_server(server, prefix=None) | FastMCP.mount(server, prefix=None) | | 组合类型 | 一次性复制(静态) | 实时链接(动态) | | 更新同步 | 子服务器的变更不会反映到主服务器 | 子服务器的变更立即反映到主服务器 | | 性能 | 快速 — 无运行时委托开销 | 较慢 — 受最慢挂载服务器影响 | | 前缀 | 可选 — 省略则保留原名称 | 可选 — 省略则保留原名称 | | 适用场景 | 打包最终组件、性能敏感场景 | 运行时模块化组合 |

导入

import_server 方法将一个 FastMCP 实例( 子服务器 )中的所有组件(工具、资源、模板、提示)复制到另一个实例( 主服务器 )中。可选提供 prefix 以避免命名冲突。若未提供前缀,组件将按原样导入。当多个服务器使用相同前缀(或无前缀)导入时,最后导入的服务器组件将覆盖先前导入的同名组件。

from fastmcp import FastMCPimport asyncioweather_mcp = FastMCP(name="WeatherService")@weather_mcp.tool def get_forecast(city: str) -> dict: """获取天气预报。""" return {"city": city, "forecast": "Sunny"}@weather_mcp.resource("data://cities/supported") def list_supported_cities -> list[str]: """列出支持天气查询的城市。""" return ["London", "Paris", "Tokyo"]main_mcp = FastMCP(name="MainApp")async def setup: await main_mcp.import_server(weather_mcp, prefix="weather")if __name__ == "__main__": asyncio.run(setup) main_mcp.run导入的工作原理

当你调用 await main_mcp.import_server(subserver, prefix={whatever}) 时:

工具 : subserver 的所有工具被添加到 main_mcp ,名称前缀为 {prefix}_ 。subserver.tool(name="my_tool") 变为 main_mcp.tool(name="{prefix}_my_tool") 。资源 :所有资源的 URI 和名称均被添加前缀。URI: subserver.resource(uri="data://info") 变为 main_mcp.resource(uri="data://{prefix}/info") 。名称: resource.name 变为 "{prefix}_{resource.name}" 。资源模板 :模板的前缀规则与资源类似。URI: subserver.resource(uri="data://{id}") 变为 main_mcp.resource(uri="data://{prefix}/{id}") 。名称: template.name 变为 "{prefix}_{template.name}" 。提示 :所有提示的名称被添加前缀 {prefix}_ 。subserver.prompt(name="my_prompt") 变为 main_mcp.prompt(name="{prefix}_my_prompt") 。

请注意,import_server 执行的是 一次性复制 。在导入 之后 对 subserver 所做的更改 不会 反映在 main_mcp 中。subserver 的 lifespan 上下文也 不会 由主服务器执行。

prefix 参数是可选的。如果省略,组件将按原样导入,不进行修改,这样组件将保留其原始名称。当导入多个具有相同前缀或无前缀的服务器时, 最后导入 的服务器的组件将优先。

mount 方法在 main_mcp 服务器与 subserver 之间创建一个 实时链接 。它不复制组件,而是在运行时将匹配可选 prefix 的组件请求 委托 给 subserver 处理。若未提供前缀,则子服务器的组件可通过原始名称直接访问。当多个服务器使用相同前缀(或无前缀)挂载时,对于冲突的组件名称,最后挂载的服务器将优先。

import asynciofrom fastmcp import FastMCP, Clientdynamic_mcp = FastMCP(name="DynamicService")@dynamic_mcp.tool def initial_tool: """初始工具演示。""" return "Initial Tool Exists"main_mcp = FastMCP(name="MainAppLive") main_mcp.mount(dynamic_mcp, prefix="dynamic")@dynamic_mcp.tool def added_later: """挂载后添加的工具。""" return "Tool Added Dynamically!"async def test_dynamic_mount: tools = await main_mcp.get_tools print("可用工具:", list(tools.keys)) async with Client(main_mcp) as client: result = await client.call_tool("dynamic_added_later") print("结果:", result.data)if __name__ == "__main__": asyncio.run(test_dynamic_mount)

配置挂载后:

实时链接 :父服务器与挂载的服务器建立连接。动态更新 :对挂载服务器的更改在通过父服务器访问时立即生效。前缀访问 :父服务器使用前缀将请求路由到挂载的服务器。委托 :对匹配前缀的组件的请求在运行时委托给挂载的服务器处理。

命名工具、资源、模板和提示的前缀规则与 import_server 相同。这包括为资源和模板的 URI/键及名称添加前缀,以便在多服务器配置中更好地识别。

由于“实时链接”的存在,父服务器上的 list_tools 等操作会受到最慢挂载服务器速度的影响。特别是,基于 HTTP 的挂载服务器可能引入显著延迟(300-400ms,而本地工具仅需 1-2ms),并且这种减速会影响整个服务器,而不仅仅是与 HTTP 代理工具的交互。如果性能至关重要,通过 import_server 导入工具可能是更合适的解决方案,因为它在启动时一次性复制组件,而不是在运行时委托请求。

来源:墨码行者

相关推荐