摘要:上篇(彻底说清 Human-in-the-Loop:企业级 Agent 系统的关键挑战与LangGraph解法【上】)介绍了单机模式下的Human-in-the-Loop(HITL)Agent原理与实现。本文继续探讨如何将HITL Agent扩展到分布式环境,
上篇(彻底说清 Human-in-the-Loop:企业级 Agent 系统的关键挑战与LangGraph解法【上】)介绍了单机模式下的Human-in-the-Loop(HITL)Agent原理与实现。本文继续探讨如何将HITL Agent扩展到分布式环境,通过API支持远程调用和并发访问(代码地址见文末):
# 单机模式:直接调用agent = create_react_agent(model, tools, checkpointer)result = agent.invoke({"messages": messages})# 中断处理是同步的if "__interrupt__" in result: user_response = input("请选择:yes/no/edit") # 直接恢复执行 result = agent.invoke(Command(resume=response_data))很显然,这种单个用户且无法扩展的实现,一般只能用来做测试与演示。如果我们需要将LangGraph开发的Agent部署到生产环境,需要考虑将Agent部署成独立的服务。即使不考虑MCP/A2A这样的新型集成架构与协议,采用普通的HTTP API模式也会面临诸多挑战。
首先是两个最基本的工作模式转变:
从单一上下文到多个API接口中断机制要求一次HITL Agent的交互过程必然要拆分到多个HTTP请求完成。
HITL Agent的中断工作模式要求系统保持会话与状态,因此需要隔离的会话管理机制来应对潜在的多用户访问。
可能的实现架构如下:
首先,我们简化一些环节以更多关注在HITL本身:
不考虑流式(Streaming)过程输出不考虑Agent的异步执行,采用同步API接口一个用户当前只能有一个活动的Agent会话基于这个前提,主要的设计要点包括:
我们以上篇中的工具调用HITL Agent为例,将原来的应用拆分成服务端与客户端分别实现。
【服务端API】
基于FastAPI构建,提供如下接口服务,其中核心的是invoke与resume接口,对应Agent启动与恢复:
【关键类型与Schema】
Sessions
保存服务端会话。采用简单的Dict类型,定义如下:
sessions: Dict[str, Dict[str, Union[CompiledGraph, str]]] = { # "user_id": { # "agent": CompiledGraph, # create_react_agent 返回的智能体实例 # "session_id": str # UUID 字符串 # }}一个用户(user_id)同时只有一个活动的会话(session_id)和智能体(agent)。
AgentRequest
代表客户端发起的Agent调用请求:
# 客户端发起的智能体请求class AgentRequest(BaseModel): user_id: str query: str system_message: Optional[str] = "你会使用工具来帮助用户。如果工具使用被拒绝,请提示用户。"其中设置了user_id代表发起请求的客户端,因为现在是一个多用户环境。
注意,这里仅是为了简单模拟。在真实应用中通常不会直接传输user_id,一般会基于携带的安全token在服务端获取(首先做用户认证)。
AgentResponse
代表服务端Agent给客户端的响应:
# 智能体给予的响应class AgentResponse(BaseModel): # 一次会话的唯一标识符 session_id: str # 三个状态:interrupted, completed, error status: str #error时的提示消息 message: Optional[str] = None #completed时的结果消息 result: Optional[Dict[str, Any]] = None #interrupted时的中断消息 interrupt_data: Optional[Dict[str, Any]] = None为了方便处理,这里把Agent返回客户端的响应用统一的AgentResponse表示,并在服务端根据实际Agent调用的情况构造状态(status)和返回内容:
如果Agent正常返回,且结果中包含了__interrupt__,则取出结果中的interrupt_data返回,状态为interrupted如果Agent正常返回,且结果中未包含__interrupt__,则将结果直接返回,设置状态为completed其他异常情况,则将错误信息放入message返回,并设置状态为errorInterruptResponse
这代表客户端对中断进行处理后给予的反馈结果:
# 客户端给予的反馈响应class InterruptResponse(BaseModel): user_id: str session_id: str # 响应类型:accept, reject, edit response_type: str # 如果是edit类型,可能需要额外的参数 args: Optional[Dict[str, Any]] = None对于工具调用的反馈,通常有三种类型:accpet(允许调用)、reject(不允许调用)、edit(调整参数,此时args中携带修改后的调用参数)。
【两个关键接口】
工具与Agent本身与本地模式并无不同,这里直接看两个关键接口:
invoke_agent
不同的地方主要在于增加session与agent实例管理的动作:
针对新的请求创建session和agent实例,并在后续交互过程中重用它们。session_id同时可以用作agent运行的thread_id,用来实现agent内部检查点。大致实现如下(部分):
@app.post("/agent/invoke", response_model=AgentResponse)def invoke_agent(request: AgentRequest): """启动智能体处理用户请求 - 同步版本,等待执行完成或中断""" user_id = request.user_id if user_id notin sessions: # 只在创建新会话时生成新的会话ID session_id = str(uuid.uuid4) agent = create_tavily_search_agent(user_id) sessions[user_id] = { "agent": agent, "session_id": session_id } else: # 使用现有会话的ID和agent agent = sessions[user_id]["agent"] session_id = sessions[user_id]["session_id"] # 初始化智能体输入 messages = ... config = {"configurable": {"thread_id": session_id}} try: result = agent.invoke(messages,config) return process_agent_result(session_id, result) ...resume_agent
借助user_id取出服务端保存的agent实例,并恢复运行即可:
@app.post("/agent/resume", response_model=AgentResponse)def resume_agent(response: InterruptResponse): """恢复被中断的智能体执行 - 同步版本,等待执行完成或再次中断""" user_id = response.user_id client_session_id = response.session_id # 检查用户会话是否存在 if user_id notin sessions: raise HTTPException(status_code=404, detail=f"用户会话 {user_id} 不存在") # 检查会话ID是否匹配,由于未实现过期机制,也可以省略 server_session_id = sessions[user_id]["session_id"] if server_session_id != client_session_id: raise HTTPException(status_code=400, detail="会话ID不匹配,可能是过期的请求") # 获取智能体和配置 agent = sessions[user_id]["agent"] # 构造响应数据 command_data = { "type": response.response_type } # 如果提供了参数,添加到响应数据中 if response.args: command_data["args"] = response.args try: # 先恢复智能体执行 result = agent.invoke(Command(resume=command_data), config={"configurable": {"thread_id": server_session_id}}) # 再处理结果 return process_agent_result(server_session_id, result)......这里的process_agent_result是一个辅助函数,用来对agent返回结果做统一处理,其逻辑参考AgentReponse的返回规则。
最后再实现一个删除会话接口,让客户端可以根据需要来管理会话生命周期:
@app.delete("/agent/session/{user_id}")def delete_agent_session(user_id: str): ... del sessions[user_id] ...【客户端实现】
完成了服务端,客户端的实现在流程上与本地模式并无太多不同。只是原来的调用转化成对invoke_agent与resume_agent两个API的调用。
核心处理如下所示:
def main:...获取用户ID,模拟不同的客户端 user_id = ... while True: try: # 获取用户查询问题 query = ... # 调用智能体 response = invoke_agent(user_id, query) # 处理智能体响应 process_agent_response(response, user_id) ...在process_agent_response函数中,根据invoke接口调用的响应状态进行分类处理,特别是interrupted状态(其他状态就直接结束)的处理:
进入人类交互,请求确认或者拒绝等。交互结束后调用resume_agent接口恢复运行。用resum的返回结果递归调用process_agent_response,以支持可能再次发生的中断,直到返回completed或者error状态。这里不再展示详细实现。直接看客户端运行效果:
我们在agent流程结束后调用了会话清理(delete)接口以开启新的session,实际应用中可根据自身需要灵活控制。
我们成功地将HITL Agent从单机模式扩展到了远程API模式,实现了多用户访问。但是在实际使用中也引入了更多的故障点,特别是会话的脆弱性(比如Web客户端不小心刷新了页面)。
【主要挑战】
这里主要的问题是:
一旦客户端意外退出,正在进行的会话就会断开,状态处于“未知”。
但是服务端的agent不是有checkpointer吗?
尽管checkpointer持久了Agent运行的步骤与State,但其更多用于agent恢复运行时的“现场重建”,但在上面的情况下,你甚至无法知道要不要“恢复运行”。比如,agent在运行到某个较慢的节点时,客户端忽然断开;此时再次连接后,由于服务端很可能还在运行中(running),并不能简单的调用resume来恢复。
【优化方案】
考虑两种优化方案:
同步+增强的会话状态管理延续当前的实现模式。但是需要在此基础上增强服务端的会话状态管理,可以用来后续查询与恢复。
异步+任务管理改造现有的同步Agent调用为异步模式。即:不管是invoke_agent还是resum_agent的运行都启动异步任务来运行,接口本身则立即返回task_id。后续客户端随时可以通过task_id来查询服务端任务的状态与响应,因此也就无惧会话断开。
这种模式适用于大规模并发模式,或者agent运行任务时间过长(比如做深度研究)。
【方案实现】
这里我们延续之前的demo,介绍第一种方式。
首先在之前的基础上增强会话管理:增加status以及最后的响应内容等信息:
# 增强的会话存储结构sessions = { "user_id": { "agent": agent_instance, "session_id": "uuid", "status": "idle|running|interrupted|completed|error", # 新增状态跟踪 "last_response": AgentResponse, # 保存最后响应 "last_query": str, # 保存最后查询 "last_updated": timestamp # 时间戳 }}接下来只需要在每次Agent会话状态变化时,对其进行保存即可。
比如,在开始运行agent之前:
...sessions[user_id]["status"] = "running"sessions[user_id]["last_query"] = request.querysessions[user_id]["last_response"] = Nonesessions[user_id]["last_updated"] = time.time而在返回客户端之前,将返回信息保存起来(注意此时客户端可能已经断开):
......sessions[user_id]["status"] = response.statussessions[user_id]["last_response"] = responsesessions[user_id]["last_updated"] = time.time此外,在agent恢复、错误发生等环节,都注意更新状态。
在服务端实现更完整的状态保存,并提供查询接口后,客户端也需要做相应的处理:在每次启动后根据user_id查询服务端状态,一旦发现有未完成的agent会话在进行,就可以根据其status与last_response做相应处理。比如:
一个处于running状态的agent,客户端可以继续等待其状态变化;而一个处于interrupted状态的agent,则可以进入人类交互过程。
这里不再详细介绍客户端实现。
【效果演示】
首先启动优化后的服务端。然后运行客户端查询,开始执行后立刻强行退出:
然后再次启动客户端,输入相同的user_id,可以看到服务端agent状态已经运行到“interrupted”状态:
此时客户端会自动显示interrupt信息,并请求用户进行交互与反馈,流程得以继续:
接着,我们在agent恢复运行后,再次强行中断:
再次启动客户端用同一用户进入。可以看到服务端agent当前处于“running”状态:
此时,客户端会进入等待状态,直到服务端agent状态变成completed:
最后显示completed状态的最终响应结果:
整个过程可以看到,无论是在interrupt中断发生,还是在正常运行过程中,即使发生会话异常,最后都可以借助状态查询与恢复机制,让流程得以继续,使用的可靠性与体验得到了极大的提升!
当然在实际应用中,还有一种更复杂的情况是服务端的故障恢复。特别是在这样“有状态”的服务端下,在关键应用中会面临几个普遍的问题:
单点故障:服务重启后所有会话丢失无法简单的水平扩展:负载均衡会导致会话找不到资源浪费:每个服务实例都要维护完整会话与状态虽然LangGraph的checkpointer将agent运行轨迹保存在PostgreSQL中,但我们的会话元数据(session映射、用户状态、中断信息)仍然存储在内存中,这使得服务端重启后用户完全无法恢复之前的会话。
一个可能的解决方案是:
对服务端内存会话数据做持久化,比如借助Redis/rdbms;并在每次服务启动时,根据Redis中信息进行会话恢复。当然,实际应用中可以采用“懒”加载机制延后恢复。而服务端agent实例则借助checkpointer做状态恢复和运行。
这种方式不仅解决了服务端故障恢复的问题,还可以为系统的水平扩展和高可用部署奠定了基础。当然也需要更加精细化的控制与处理。
以上我们介绍了分布式环境下HITL Agent的关键设计,特别是网络环境下的故障恢复。毕竟在真实的生产环境中,服务重启、网络抖动、硬件故障都有可能发生,一个能够优雅地从各种故障中恢复的系统,才是真正可用的企业级系统。尽管增加了架构复杂度,但这种复杂度换来的是系统可靠性的飞跃。
本文演示代码参考:
来源:有趣的科技君