摘要:async def build_async_engine_client_from_engine_args( engine_args: AsyncEngineArgs, disable_frontend_multiprocessing: bool = False
在API Server启动过程中,会调用build_async_engine_client方法创建EngineCore客户端,即AsyncLLM,用于与EngineCore通信(V1将Engine的调度器和模型执行器核心功能拆分出来,也就是EngineCore,独立执行循环),进程间通过ZeroMQ通信;
async with build_async_engine_client(args) as engine_client: ......是一个异步上下文管理器,用于根据传入的命令行参数(args)创建并管理AsyncLLM Engine Client。它确保在的使用过程中,所有资源都能被正确初始化,并在退出时清理。async def build_async_engine_client( args: Namespace) -> AsyncIterator[EngineClient]: # Context manager to handle engine_client lifecycle # Ensures everything is shutdown and cleaned up on error/exit # 解析并生成engine配置对象 engine_args = AsyncEngineArgs.from_cli_args(args) async with build_async_engine_client_from_engine_args( engine_args, args.disable_frontend_multiprocessing) as engine: yield enginebuild_async_engine_client_from_engine_args据engine_args和多进程模式选项,创建并返回一个async def build_async_engine_client_from_engine_args( engine_args: AsyncEngineArgs, disable_frontend_multiprocessing: bool = False,) -> AsyncIterator[EngineClient]: """ Create EngineClient, either: - in-process using the AsyncLLMEngine Directly - multiprocess using AsyncLLMEngine RPC Returns the Client or None if the creation failed. """ # Create the EngineConfig (determines if we can use V1). usage_context = UsageContext.OPENAI_API_SERVER vllm_config = engine_args.create_engine_config(usage_context=usage_context) # V1 AsyncLLM. # 根据v0/v1配置选择启用模式,如果是V1, 则直接使用 AsyncLLM 引擎,并在退出上下文时调用 async_llm.shutdown 释放资源 if envs.VLLM_USE_V1: if disable_frontend_multiprocessing: logger.warning( "V1 is enabled, but got --disable-frontend-multiprocessing. " "To disable frontend multiprocessing, set VLLM_USE_V1=0.") from vllm.v1.engine.async_llm import AsyncLLM async_llm: Optional[AsyncLLM] = None try: async_llm = AsyncLLM.from_vllm_config( vllm_config=vllm_config, usage_context=usage_context, disable_log_Requests=engine_args.disable_log_requests, disable_log_stats=engine_args.disable_log_stats) yield async_llm finally: if async_llm: async_llm.shutdown # V0 AsyncLLM. # 如果禁用了前端多进程模式或者启动了PP,选择该模式 elif (MQLLMEngineClient.is_unsupported_config(vllm_config) or disable_frontend_multiprocessing): engine_client: Optional[EngineClient] = None try: engine_client = AsyncLLMEngine.from_vllm_config( vllm_config=vllm_config, usage_context=usage_context, disable_log_requests=engine_args.disable_log_requests, disable_log_stats=engine_args.disable_log_stats) yield engine_client finally: if engine_client and hasattr(engine_client, "shutdown"): engine_client.shutdown # V0MQLLMEngine. # V0 多进程模式 else: if "PROMETHEUS_MULTIPROC_DIR" not in os.environ: # Make TemporaryDirectory for prometheus multiprocessing # Note: global TemporaryDirectory will be automatically # cleaned up upon exit. global prometheus_multiproc_dir prometheus_multiproc_dir = tempfile.TemporaryDirectory os.environ[ "PROMETHEUS_MULTIPROC_DIR"] = prometheus_multiproc_dir.name else: logger.warning( "Found PROMETHEUS_MULTIPROC_DIR was set by user. " "This directory must be wiped between vLLM runs or " "you will find inaccurate metrics. Unset the variable " "and vLLM will properly handle cleanup.") # Select random path for IPC. # 选择IPC路径 ipc_path = get_open_zmq_ipc_path logger.info("Multiprocessing frontend to use %s for IPC Path.", ipc_path) # Start RPCServer in separate process (holds the LLMEngine). # the current process might have CUDA context, # so we need to spawn a new process context = multiprocessing.get_context("spawn") # Ensure we can serialize transformer config before spawning maybe_register_config_serialize_by_value # The Process can raise an exception during startup, which may # not actually result in an exitcode being reported. As a result # we use a shared variable to communicate the information. engine_alive = multiprocessing.Value('b', True, lock=False) # 启动一个独立进程(run_mp_engine) engine_process = context.Process( target=run_mp_engine, args=(vllm_config, UsageContext.OPENAI_API_SERVER, ipc_path, engine_args.disable_log_stats, engine_args.disable_log_requests, engine_alive)) engine_process.start engine_pid = engine_process.pid assert engine_pid is not None, "Engine process failed to start." logger.info("Started engine process with PID %d", engine_pid) def _cleanup_ipc_path: socket_path = ipc_path.replace("ipc://", "") if os.path.exists(socket_path): os.remove(socket_path) # Ensure we clean up the local IPC socket file on exit. atexit.register(_cleanup_ipc_path) # Build RPCClient, which conforms to EngineClient Protocol. build_client = partial(MQLLMEngineClient, ipc_path, vllm_config, engine_pid) mq_engine_client = await asyncio.get_running_loop.run_in_executor( None, build_client) try: while True: try: await mq_engine_client.setup break except TimeoutError: if (not engine_process.is_alive or not engine_alive.value): raise RuntimeError( "Engine process failed to start. See stack " "trace for the root cause.") from None yield mq_engine_client # type: ignore[misc] finally: # Ensure rpc server process was terminated engine_process.terminate # Close all open connections to the backend mq_engine_client.close # Wait for engine process to join engine_process.join(4) if engine_process.exitcode is None: # Kill if taking longer than 5 seconds to stop engine_process.kill # Lazy import for prometheus multiprocessing. # We need to set PROMETHEUS_MULTIPROC_DIR environment variable # before prometheus_client is imported. # See https://prometheus.github.io/client_python/multiprocess/ from prometheus_client import multiprocess multiprocess.mark_process_dead(engine_process.pid) 在获取到AsyncLLM engine client之后,执行一系列的初始化操作,包括EngineCore等;在上节中提到,在API Server收到生成请求之后,会调用engine client 的generate生成结果;
generate是AsyncLLM类的重要实现,用于处理生成请求,它是异步推理引擎的主要入口点,负责接收用户输入(如 prompt 和采样参数),将请求添加到引擎核心(EngineCore),并通过异步生成器逐步返回推理结果。
如果输出处理器尚未启动,则启动一个后台任务(_run_output_handler)。输出处理器负责从引擎核心获取推理结果,并将其推送到对应的异步队列中if self.output_handler is None: self.output_handler = asyncio.create_task( self._run_output_handler)调用add_request方法,将请求添加到Enginecore。返回一个异步队列(asyncio.Queue),用于存储该请求的推理结果。q = await self.add_request( request_id, prompt, sampling_params, lora_request=lora_request, trace_headers=trace_headers, prompt_adapter_request=prompt_adapter_request, priority=priority,)add_request将用户输入的请求(如 prompt 和采样参数)转换为引擎核心(EngineCore)可以处理的格式,并将其注册到输出处理器和EngineCore中,以便后续推理。async def add_request( self, request_id: str, prompt: PromptType, params: Union[SamplingParams, PoolingParams], arrival_time: Optional[float] = None, lora_request: Optional[LoRARequest] = None, trace_headers: Optional[Mapping[str, str]] = None, prompt_adapter_request: Optional[PromptAdapterRequest] = None, priority: int = 0,) -> asyncio.Queue[RequestOutput]: """Add new request to the AsyncLLM.""" # Create a new output queue for the request. queue: asyncio.Queue[RequestOutput] = asyncio.Queue # Convert Input --> Request. # 调用 Processor 的 process_inputs 方法,将用户输入(如 prompt 和 params)转换为 EngineCoreRequest 类型的请求。 #EngineCoreRequest 是引擎核心(EngineCore)可以直接处理的请求格式,包含所有必要的信息。 request = self.processor.process_inputs(request_id, prompt, params, arrival_time, lora_request, trace_headers, prompt_adapter_request, priority) # 如果 params 是 SamplingParams 类型,则检查其 n 值(表示需要生成的候选数量)。 # 如果 n == 1,表示只需要一个候选结果;否则,需要生成多个候选结果。 n = params.n if isinstance(params, SamplingParams) else 1 # 如果 n == 1,直接调用 _add_request 方法,将请求添加到输出处理器和引擎核心中。 # 返回创建的异步队列(queue)。 if n == 1: await self._add_request(request, None, 0, queue) return queue # Fan out child requests (for n>1). parent_request = ParentRequest(request_id, params) for idx in range(n): request_id, params = parent_request.get_child_info(idx) child_request = request if idx == n - 1 else copy(request) child_request.request_id = request_id child_request.sampling_params = params await self._add_request(child_request, parent_request, idx, queue) return queue如果 n > 1,创建一个 ParentRequest 对象,用于管理多个子请求。
遍历每个候选请求的索引 idx:调用 parent_request.get_child_info(idx) 获取子请求的 request_id 和采样参数。如果是最后一个子请求,直接使用原始请求;否则,复制原始请求。更新子请求的 request_id 和采样参数。调用 _add_request 方法,将子请求添加到输出处理器和引擎核心中。
_add_request用于将请求注册到输出处理器和EngineCore中async def _add_request(self, request: EngineCoreRequest, parent_req: Optional[ParentRequest], index: int, queue: asyncio.Queue[RequestOutput]): # Add the request to OutputProcessor (this process). # 将请求添加到 OutputProcessor,以便处理推理结果并推送到异步队列。 self.output_processor.add_request(request, parent_req, index, queue) # Add the EngineCoreRequest to EngineCore (separate process). # 异步将请求添加到 EngineCore,以便执行实际的推理任务 await self.engine_core.add_request_async(request) if self.log_requests: logger.info("Added request %s.", request.request_id)add_request_async是AsyncMPClient类中的一个异步方法,用于将推理请求异步发送到EngineCore。它是异步推理流程的关键部分,负责将请求序列化并通过 ZeroMQ(ZMQ)套接字发送到运行在后台进程中的EngineCoreasync def add_request_async(self, request: EngineCoreRequest) -> None: # NOTE: text prompt is not needed in the core engine as it has been # tokenized. request.prompt = None await self._send_input(EngineCoreRequestType.ADD, request)_send_input用于将请求或命令发送到Engine coreasync def _send_input(self, request_type: EngineCoreRequestType, request: Any) -> None: # 将请求类型(request_type.value)和序列化后的请求数据打包为消息 msg = (request_type.value, self.encoder.encode(request)) # input_socket是ZMQ创建的socket,在MPClient客户端初始化时创建;使用 ZMQ 的 send_multipart 方法将消息发送到输入套接字(input_socket) await self.input_socket.send_multipart(msg, copy=False) # 检查输出队列(outputs_queue)是否已初始化。如果未初始化,调用 _start_output_queue_task 方法启动一个异步任务,用于处理引擎核心的响应。 # 确保在发送请求后,系统能够接收并处理引擎核心的响应 if self.outputs_queue is None: await self._start_output_queue_task outputs_queue),用于存储引擎核心的响应。同时启动一个异步任务(process_outputs_socket),从 ZMQ 输出套接字中接收响应并将其放入输出队列。在其中也会创建output_socket最后从异步队列中获取推理结果, 如果队列中有多个结果(如流式输出),将其合并。之后检查结果是否完成(out.finished),如果完成则退出循环。最后使用yield将结果逐步返回给调用者finished = Falsewhile not finished: # Note: drain queue without await if possible (avoids # task switching under load which helps performance). out = q.get_nowait if not q.empty else await q.get # Coalesce any additional queued outputs while not q.empty: next_out = q.get_nowait if sampling_params.output_kind == RequestOutputKind.DELTA: out.add(next_out) else: out = next_out # Note: both OutputProcessor and EngineCore handle their # own request cleanup based on finished. finished = out.finished yield out
在下节中,将深入探讨EngineCore的实现细节
来源:鼠meme
免责声明:本站系转载,并不代表本网赞同其观点和对其真实性负责。如涉及作品内容、版权和其它问题,请在30日内与本站联系,我们将在第一时间删除内容!