彻底说清 Human-in-the-Loop【下】:分布式环境下的挑战与应对

B站影视 内地电影 2025-08-31 05:45 1

摘要:上篇(彻底说清 Human-in-the-Loop:企业级 Agent 系统的关键挑战与LangGraph解法【上】)介绍了单机模式下的Human-in-the-Loop(HITL)Agent原理与实现。本文继续探讨如何将HITL Agent扩展到分布式环境,

上篇(彻底说清 Human-in-the-Loop:企业级 Agent 系统的关键挑战与LangGraph解法【上】)介绍了单机模式下的Human-in-the-Loop(HITL)Agent原理与实现。本文继续探讨如何将HITL Agent扩展到分布式环境,通过API支持远程调用和并发访问(代码地址见文末):

# 单机模式:直接调用agent = create_react_agent(model, tools, checkpointer)result = agent.invoke({"messages": messages})# 中断处理是同步的if "__interrupt__" in result: user_response = input("请选择:yes/no/edit") # 直接恢复执行 result = agent.invoke(Command(resume=response_data))

很显然,这种单个用户且无法扩展的实现,一般只能用来做测试与演示。如果我们需要将LangGraph开发的Agent部署到生产环境,需要考虑将Agent部署成独立的服务。即使不考虑MCP/A2A这样的新型集成架构与协议,采用普通的HTTP API模式也会面临诸多挑战。

首先是两个最基本的工作模式转变:

从单一上下文到多个API接口

中断机制要求一次HITL Agent的交互过程必然要拆分到多个HTTP请求完成。

HITL Agent的中断工作模式要求系统保持会话与状态,因此需要隔离的会话管理机制来应对潜在的多用户访问。

可能的实现架构如下:

首先,我们简化一些环节以更多关注在HITL本身:

不考虑流式(Streaming)过程输出不考虑Agent的异步执行,采用同步API接口一个用户当前只能有一个活动的Agent会话

基于这个前提,主要的设计要点包括:

我们以上篇中的工具调用HITL Agent为例,将原来的应用拆分成服务端与客户端分别实现。

【服务端API】

基于FastAPI构建,提供如下接口服务,其中核心的是invoke与resume接口,对应Agent启动与恢复:

【关键类型与Schema】

Sessions

保存服务端会话。采用简单的Dict类型,定义如下:

sessions: Dict[str, Dict[str, Union[CompiledGraph, str]]] = { # "user_id": { # "agent": CompiledGraph, # create_react_agent 返回的智能体实例 # "session_id": str # UUID 字符串 # }}

一个用户(user_id)同时只有一个活动的会话(session_id)和智能体(agent)。

AgentRequest

代表客户端发起的Agent调用请求:

# 客户端发起的智能体请求class AgentRequest(BaseModel): user_id: str query: str system_message: Optional[str] = "你会使用工具来帮助用户。如果工具使用被拒绝,请提示用户。"

其中设置了user_id代表发起请求的客户端,因为现在是一个多用户环境。

注意,这里仅是为了简单模拟。在真实应用中通常不会直接传输user_id,一般会基于携带的安全token在服务端获取(首先做用户认证)。

AgentResponse

代表服务端Agent给客户端的响应:

# 智能体给予的响应class AgentResponse(BaseModel): # 一次会话的唯一标识符 session_id: str # 三个状态:interrupted, completed, error status: str #error时的提示消息 message: Optional[str] = None #completed时的结果消息 result: Optional[Dict[str, Any]] = None #interrupted时的中断消息 interrupt_data: Optional[Dict[str, Any]] = None

为了方便处理,这里把Agent返回客户端的响应用统一的AgentResponse表示,并在服务端根据实际Agent调用的情况构造状态(status)和返回内容:

如果Agent正常返回,且结果中包含了__interrupt__,则取出结果中的interrupt_data返回,状态为interrupted如果Agent正常返回,且结果中未包含__interrupt__,则将结果直接返回,设置状态为completed其他异常情况,则将错误信息放入message返回,并设置状态为error

InterruptResponse

这代表客户端对中断进行处理后给予的反馈结果:

# 客户端给予的反馈响应class InterruptResponse(BaseModel): user_id: str session_id: str # 响应类型:accept, reject, edit response_type: str # 如果是edit类型,可能需要额外的参数 args: Optional[Dict[str, Any]] = None

对于工具调用的反馈,通常有三种类型:accpet(允许调用)、reject(不允许调用)、edit(调整参数,此时args中携带修改后的调用参数)。

【两个关键接口】

工具与Agent本身与本地模式并无不同,这里直接看两个关键接口:

invoke_agent

不同的地方主要在于增加session与agent实例管理的动作:

针对新的请求创建session和agent实例,并在后续交互过程中重用它们。session_id同时可以用作agent运行的thread_id,用来实现agent内部检查点。

大致实现如下(部分):

@app.post("/agent/invoke", response_model=AgentResponse)def invoke_agent(request: AgentRequest): """启动智能体处理用户请求 - 同步版本,等待执行完成或中断""" user_id = request.user_id if user_id notin sessions: # 只在创建新会话时生成新的会话ID session_id = str(uuid.uuid4) agent = create_tavily_search_agent(user_id) sessions[user_id] = { "agent": agent, "session_id": session_id } else: # 使用现有会话的ID和agent agent = sessions[user_id]["agent"] session_id = sessions[user_id]["session_id"] # 初始化智能体输入 messages = ... config = {"configurable": {"thread_id": session_id}} try: result = agent.invoke(messages,config) return process_agent_result(session_id, result) ...

resume_agent

借助user_id取出服务端保存的agent实例,并恢复运行即可:

@app.post("/agent/resume", response_model=AgentResponse)def resume_agent(response: InterruptResponse): """恢复被中断的智能体执行 - 同步版本,等待执行完成或再次中断""" user_id = response.user_id client_session_id = response.session_id # 检查用户会话是否存在 if user_id notin sessions: raise HTTPException(status_code=404, detail=f"用户会话 {user_id} 不存在") # 检查会话ID是否匹配,由于未实现过期机制,也可以省略 server_session_id = sessions[user_id]["session_id"] if server_session_id != client_session_id: raise HTTPException(status_code=400, detail="会话ID不匹配,可能是过期的请求") # 获取智能体和配置 agent = sessions[user_id]["agent"] # 构造响应数据 command_data = { "type": response.response_type } # 如果提供了参数,添加到响应数据中 if response.args: command_data["args"] = response.args try: # 先恢复智能体执行 result = agent.invoke(Command(resume=command_data), config={"configurable": {"thread_id": server_session_id}}) # 再处理结果 return process_agent_result(server_session_id, result)......

这里的process_agent_result是一个辅助函数,用来对agent返回结果做统一处理,其逻辑参考AgentReponse的返回规则。

最后再实现一个删除会话接口,让客户端可以根据需要来管理会话生命周期:

@app.delete("/agent/session/{user_id}")def delete_agent_session(user_id: str): ... del sessions[user_id] ...

【客户端实现】

完成了服务端,客户端的实现在流程上与本地模式并无太多不同。只是原来的调用转化成对invoke_agent与resume_agent两个API的调用。

核心处理如下所示:

def main:...获取用户ID,模拟不同的客户端 user_id = ... while True: try: # 获取用户查询问题 query = ... # 调用智能体 response = invoke_agent(user_id, query) # 处理智能体响应 process_agent_response(response, user_id) ...

process_agent_response函数中,根据invoke接口调用的响应状态进行分类处理,特别是interrupted状态(其他状态就直接结束)的处理:

进入人类交互,请求确认或者拒绝等。交互结束后调用resume_agent接口恢复运行。用resum的返回结果递归调用process_agent_response,以支持可能再次发生的中断,直到返回completed或者error状态。

这里不再展示详细实现。直接看客户端运行效果:

我们在agent流程结束后调用了会话清理(delete)接口以开启新的session,实际应用中可根据自身需要灵活控制。

我们成功地将HITL Agent从单机模式扩展到了远程API模式,实现了多用户访问。但是在实际使用中也引入了更多的故障点,特别是会话的脆弱性(比如Web客户端不小心刷新了页面)

【主要挑战】

这里主要的问题是:

一旦客户端意外退出,正在进行的会话就会断开,状态处于“未知”。

但是服务端的agent不是有checkpointer吗?

尽管checkpointer持久了Agent运行的步骤与State,但其更多用于agent恢复运行时的“现场重建”,但在上面的情况下,你甚至无法知道要不要“恢复运行”。比如,agent在运行到某个较慢的节点时,客户端忽然断开;此时再次连接后,由于服务端很可能还在运行中(running),并不能简单的调用resume来恢复。

【优化方案】

考虑两种优化方案:

同步+增强的会话状态管理

延续当前的实现模式。但是需要在此基础上增强服务端的会话状态管理,可以用来后续查询与恢复。

异步+任务管理

改造现有的同步Agent调用为异步模式。即:不管是invoke_agent还是resum_agent的运行都启动异步任务来运行,接口本身则立即返回task_id。后续客户端随时可以通过task_id来查询服务端任务的状态与响应,因此也就无惧会话断开。

这种模式适用于大规模并发模式,或者agent运行任务时间过长(比如做深度研究)。

【方案实现】

这里我们延续之前的demo,介绍第一种方式。

首先在之前的基础上增强会话管理:增加status以及最后的响应内容等信息:

# 增强的会话存储结构sessions = { "user_id": { "agent": agent_instance, "session_id": "uuid", "status": "idle|running|interrupted|completed|error", # 新增状态跟踪 "last_response": AgentResponse, # 保存最后响应 "last_query": str, # 保存最后查询 "last_updated": timestamp # 时间戳 }}

接下来只需要在每次Agent会话状态变化时,对其进行保存即可。

比如,在开始运行agent之前:

...sessions[user_id]["status"] = "running"sessions[user_id]["last_query"] = request.querysessions[user_id]["last_response"] = Nonesessions[user_id]["last_updated"] = time.time

而在返回客户端之前,将返回信息保存起来(注意此时客户端可能已经断开):

......sessions[user_id]["status"] = response.statussessions[user_id]["last_response"] = responsesessions[user_id]["last_updated"] = time.time

此外,在agent恢复、错误发生等环节,都注意更新状态。

在服务端实现更完整的状态保存,并提供查询接口后,客户端也需要做相应的处理:在每次启动后根据user_id查询服务端状态,一旦发现有未完成的agent会话在进行,就可以根据其status与last_response做相应处理。比如:

一个处于running状态的agent,客户端可以继续等待其状态变化;而一个处于interrupted状态的agent,则可以进入人类交互过程。

这里不再详细介绍客户端实现。

【效果演示】

首先启动优化后的服务端。然后运行客户端查询,开始执行后立刻强行退出:

然后再次启动客户端,输入相同的user_id,可以看到服务端agent状态已经运行到“interrupted”状态:

此时客户端会自动显示interrupt信息,并请求用户进行交互与反馈,流程得以继续:

接着,我们在agent恢复运行后,再次强行中断:

再次启动客户端用同一用户进入。可以看到服务端agent当前处于“running”状态:

此时,客户端会进入等待状态,直到服务端agent状态变成completed:

最后显示completed状态的最终响应结果:

整个过程可以看到,无论是在interrupt中断发生,还是在正常运行过程中,即使发生会话异常,最后都可以借助状态查询与恢复机制,让流程得以继续,使用的可靠性与体验得到了极大的提升!

当然在实际应用中,还有一种更复杂的情况是服务端的故障恢复。特别是在这样“有状态”的服务端下,在关键应用中会面临几个普遍的问题:

单点故障:服务重启后所有会话丢失无法简单的水平扩展:负载均衡会导致会话找不到资源浪费:每个服务实例都要维护完整会话与状态

虽然LangGraph的checkpointer将agent运行轨迹保存在PostgreSQL中,但我们的会话元数据(session映射、用户状态、中断信息)仍然存储在内存中,这使得服务端重启后用户完全无法恢复之前的会话。

一个可能的解决方案是:

对服务端内存会话数据做持久化,比如借助Redis/rdbms;并在每次服务启动时,根据Redis中信息进行会话恢复。当然,实际应用中可以采用“懒”加载机制延后恢复。而服务端agent实例则借助checkpointer做状态恢复和运行。

这种方式不仅解决了服务端故障恢复的问题,还可以为系统的水平扩展和高可用部署奠定了基础。当然也需要更加精细化的控制与处理。

以上我们介绍了分布式环境下HITL Agent的关键设计,特别是网络环境下的故障恢复。毕竟在真实的生产环境中,服务重启、网络抖动、硬件故障都有可能发生,一个能够优雅地从各种故障中恢复的系统,才是真正可用的企业级系统。尽管增加了架构复杂度,但这种复杂度换来的是系统可靠性的飞跃。

本文演示代码参考:

来源:有趣的科技君

相关推荐