摘要:本文假定您已具备人工智能(AI)的基本概念知识,拥有API设计的实践经验,并且熟悉构建生产级系统的经典挑战。同时,您需要能够熟练阅读Python代码,并理解分布式系统原理。
本文约7000字,建议阅读15分钟
本文用 Python 举例说明了给商业系统接入外部 AI 服务的教程。
图:Unsplash(作者:Jonas Gerlach)
阅读前提
本文假定您已具备人工智能(AI)的基本概念知识,拥有API设计的实践经验,并且熟悉构建生产级系统的经典挑战。同时,您需要能够熟练阅读Python代码,并理解分布式系统原理。
免责声明
我要先说明的是:本文探讨的是解决这一特定问题的其中一种方法,绝非唯一方案 , 更谈不上是 “绝对正确” 的方案。本文的初衷是提供一种思路,通过一组特定的技术选择来展示 “方案的可行路径”。这样一来,你就可以结合自己的想法、研究成果和实践经验,最终找到一个符合自身具体需求的解决方案。
诉求
你的电商平台需要智能商品推荐功能,金融科技应用需要欺诈检测能力,客服平台则需要智能路由机制。这些需求的共同之处是什么?答案是:它们都需要通过稳定可靠、可扩展的 API来提供 AI 能力 —— 而这些 API 每天要处理数百万次请求。
作为技术架构师,我们越来越常面临这样的需求:将 AI 服务集成到现有系统中,同时不能影响系统的可靠性、性能与可维护性。这里的挑战不只是 “让 AI 能用”,而是要让 AI 在生产环境中规模化运行,并达到与其他关键系统组件同等的可靠性要求。
现实情况是,尽管通过 API 和各类服务获取 AI 能力已越来越容易,但要将其成功集成到商业系统中,需要做出合理的架构决策 —— 而很多团队在这一步容易出错。接下来,我们就来探讨如何构建能 “稳定交付业务价值” 的 AI 驱动系统。
宏观视角:AI集成架构
在深入探讨实现细节之前,我们先可视化呈现我们将要构建的完整架构。下图展示了 AI 服务将如何集成到商业系统中,其中包含了我们后续会落地的所有可靠性设计模式:
我们的目标架构
再次说明,本内容仅为示例,旨在阐释问题本身、设计思路,以及其中一种可行的解决方案。
商业背景: AI 作为服务层
假设某现代电商平台希望集成 AI 驱动的商品推荐功能,其所需的 AI 能力可能来自多种渠道:既可以是 AWS Personalize、Google Recommendations AI 这类云服务商提供的能力,也可以是通过 API 封装的自定义模型。但无论来源如何,在架构设计中,你必须将这些 AI 服务视为潜在不可靠、高延迟的组件—— 它们可能出现故障、模型漂移,甚至完全不可用。
我们将通过迭代方式构建这一系统:首先从一个简单的商品推荐 API 入手,逐步将其演进为一个稳定可靠、可投入生产的系统。
从简单开始:基础AI集成
我们从一个简单直接的集成入手。我们将首先搭建一个简单的推荐服务:调用外部AI API,然后逐步为其添加生产系统所需的鲁棒性。
# Version 1: Basic AI API integration
import Requests
from typing importList, Dict, Optional
import logging
class ProductRecommendationService:
def __init__(self, ai_service_url: str, api_key: str):
self.ai_service_url = ai_service_url
self.api_key = api_key
self.logger = logging.getLogger(__name__)
def get_recommendations(self, user_id: str, product_context: Dict) -> List[str]:
"""Get product recommendations for a user"""
payload = {
'user_id': user_id,
'context': product_context
}
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
try:
response = requests.post(
f"{self.ai_service_url}/recommendations",
json=payload,
headers=headers,
timeout=5.0
)
response.raise_for_status
data = response.json
return data.get('product_ids', )
except Exception as e:
self.logger.error(f"AI service call failed: {e}")
return # Return empty list on failure
功能说明: 这个基础实现通过发起一个简单的HTTP请求来调用AI服务并返回推荐结果。它仅通过返回空列表来处理基本错误,但缺乏可投入生产环境的功能,例如重试(retry)机制、缓存(caching)策略或合理的降级(fallback)方案。该实现适用于初期开发和测试阶段。
虽然这个基础实现可以满足初始开发需求,但它并不具备生产环境所要求的稳定性和可靠性。接下来,我们将在这一基础上进行迭代优化。
增强韧性:断路器(Circuit Breakers)与降级策略(Fallbacks)
生产级系统需要能优雅应对 AI 服务故障。在接下来的迭代中,我们将加入断路器(circuit breaker)模式和降级(fallback)策略。
# Version 2: Adding resilience patterns
import time
from enum import Enum
from dataclasses import dataclass
from typing importList, Dict, Optional, Callable
class CircuitState(Enum):
CLOSED = "closed"# Normal operation
OPEN = "open"# Failing, requests blocked
HALF_OPEN = "half_open"# Testing if service recovered
@dataclass
class CircuitBreakerConfig:
failure_threshold: int = 5
recovery_timeout: int = 60
success_threshold: int = 2
class CircuitBreaker:
def __init__(self, config: CircuitBreakerConfig):
self.config = config
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = 0
def call(self, func: Callable, *args, **kwargs):
"""Execute function with circuit breaker protection"""
if self.state == CircuitState.OPEN:
if time.time - self.last_failure_time > self.config.recovery_timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is open")
try:
result = func(*args, **kwargs)
self._on_success
return result
except Exception as e:
self._on_failure
raise e
def _on_success(self):
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.config.success_threshold:
else:
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time
if self.failure_count >= self.config.failure_threshold:
self.state = CircuitState.OPEN
class EnhancedRecommendationService(ProductRecommendationService):
def __init__(self, ai_service_url: str, api_key: str):
super.__init__(ai_service_url, api_key)
self.circuit_breaker = CircuitBreaker(CircuitBreakerConfig)
self.fallback_recommendations = {} # Cache popular products
def get_recommendations(self, user_id: str, product_context: Dict) -> List[str]:
"""Get recommendations with circuit breaker and fallback"""
try:
return self.circuit_breaker.call(
self._call_ai_service, user_id, product_context
)
self.logger.warning(f"AI service unavailable: {e}")
return self._get_fallback_recommendations(user_id, product_context)
def _call_ai_service(self, user_id: str, product_context: Dict) -> List[str]:
"""Make the actual AI service call"""
# Same logic as before, but extracted for circuit breaker
payload = {
'user_id': user_id,
'context': product_context
}
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
response = requests.post(
f"{self.ai_service_url}/recommendations",
json=payload,
headers=headers,
timeout=5.0
)
response.raise_for_status
data = response.json
return data.get('product_ids', )
def _get_fallback_recommendations(self, user_id: str,
product_context: Dict) -> List[str]:
"""Provide fallback recommendations when AI service is unavailable"""
category = product_context.get('category', 'general')
# Return cached popular products for the category
if category in self.fallback_recommendations:
return self.fallback_recommendations[category][:10]
# Default fallback - in practice, this would be loaded from database
return ['product_1', 'product_2', 'product_3', 'product_4', 'product_5']
def update_fallback_cache(self, category_recommendations: Dict[str, List[str]]):
"""Update fallback recommendations cache"""
self.fallback_recommendations.update(category_recommendations)
功能说明: 这一增强版新增了断路器模式(circuit breaker pattern),当 AI 服务出现故障时,该模式会自动停止对故障 AI 服务的调用,以防止级联故障(cascade failures)。当 AI 服务不可用时,系统会从缓存(Cached)的热门商品中获取数据,提供降级推荐结果(fallback recommendations)。此外,断路器包含三种状态:closed(正常运行)、open(拦截请求)和half-open(测试服务恢复情况)。通过这一设计,可避免外部依赖服务故障导致你的系统不堪重负。
性能优化:缓存机制(Caching)与异步处理(Async Processing)
AI 服务的延迟通常高于传统 API。在接下来的迭代中,我们将加入智能缓存与异步处理机制(asynchronous processing),以提升性能。
# Version 3: Adding performance optimisations
import asyncio
import aiohttp
import hashlib
import json
from typing importList, Dict, Optional, Union
import time
class CacheEntry:
def __init__(self, data: List[str], ttl_seconds: int = 3600):
self.data = data
self.created_at = time.time
self.ttl_seconds = ttl_seconds
def is_expired(self) -> bool:
return time.time - self.created_at > self.ttl_seconds
class OptimisedRecommendationService(EnhancedRecommendationService):
def __init__(self, ai_service_url: str, api_key: str,
cache_ttl: int = 3600, max_cache_size: int = 10000):
super.__init__(ai_service_url, api_key)
self.cache = {}
self.cache_ttl = cache_ttl
self.max_cache_size = max_cache_size
self.pending_requests = {} # Prevent duplicate concurrent requests
def _create_cache_key(self, user_id: str, product_context: Dict) -> str:
"""Create deterministic cache key"""
cache_data = {
'user_id': user_id,
'context': sorted(product_context.items)
}
return hashlib.md5(
json.dumps(cache_data, sort_keys=True).encode
).hexdigest
asyncdef get_recommendations_async(self, user_id: str,
product_context: Dict) -> List[str]:
"""Async version with caching and request deduplication"""
cache_key = self._create_cache_key(user_id, product_context)
# Check cache first
if cache_key in self.cache:
entry = self.cache[cache_key]
ifnot entry.is_expired:
self.logger.debug("Cache hit for recommendations")
return entry.data
else:
del self.cache[cache_key]
# Check if request is already pending
if cache_key in self.pending_requests:
self.logger.debug("Request already pending, waiting for result")
returnawait self.pending_requests[cache_key]
# Create new request future
request_future = asyncio.create_task(
self._fetch_recommendations_async(user_id, product_context, cache_key)
)
self.pending_requests[cache_key] = request_future
try:
result = await request_future
return result
finally:
# Clean up pending request
del self.pending_requests[cache_key]
asyncdef _fetch_recommendations_async(self, user_id: str,
product_context: Dict,
cache_key: str) -> List[str]:
"""Fetch recommendations from AI service asynchronously"""
try:
recommendations = await self.circuit_breaker.call(
self._call_ai_service_async, user_id, product_context
)
# Cache the result
self._cache_result(cache_key, recommendations)
return recommendations
except Exception as e:
self.logger.warning(f"AI service call failed: {e}")
return self._get_fallback_recommendations(user_id, product_context)
asyncdef _call_ai_service_async(self, user_id: str,
"""Async AI service call"""
payload = {
'user_id': user_id,
'context': product_context
}
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
asyncwith aiohttp.ClientSession as session:
asyncwith session.post(
f"{self.ai_service_url}/recommendations",
json=payload,
headers=headers,
timeout=aiohttp.ClientTimeout(total=5.0)
) as response:
response.raise_for_status
data = await response.json
return data.get('product_ids', )
def _cache_result(self, cache_key: str, recommendations: List[str]):
"""Cache recommendations with TTL and size management"""
# Implement LRU eviction if cache is too large
iflen(self.cache) >= self.max_cache_size:
# Remove oldest entries (simplified LRU)
oldest_keys = sorted(
self.cache.keys,
key=lambda k: self.cache[k].created_at
)[:len(self.cache) // 4] # Remove 25% of cache
for key in oldest_keys:
del self.cache[key]
self.cache[cache_key] = CacheEntry(recommendations, self.cache_ttl)
# Synchronous wrapper for backwards compatibility
def get_recommendations(self, user_id: str, product_context: Dict) -> List[str]:
"""Synchronous wrapper around async implementation"""
try:
loop = asyncio.get_event_loop
except RuntimeError:
loop = asyncio.new_event_loop
asyncio.set_event_loop(loop)
return loop.run_until_complete(
self.get_recommendations_async(user_id, product_context)
)
功能说明: 这一版本通过智能缓存与异步处理实现了性能优化。它能避免对同一 AI 服务的重复并发请求,采用基于 TTL(Time To Live)的缓存策略并结合 LRU(Least Recently Used)淘汰机制,同时提供异步(async)和同步(sync)两种接口。
缓存层大幅减少了 AI 接口的调用量(和成本),同时缩短了响应时间。异步实现则支持并发处理多个请求,且不会造成阻塞。
监控与可观测性:超越响应时间
AI 驱动的API接口需要一种超越传统指标的监控方案。在最终迭代中,我们针对 AI 特有的问题,新增了一套全面的可观测性指标。
# Version 4: Complete production-ready implementation
import time
from dataclasses import dataclass, field
from typing importList, Dict, Optional, Any
from collections import defaultdict, deque
import statistics
@dataclass
class AIMetrics:
"""Track AI service performance metrics"""
total_requests: int = 0
successful_requests: int = 0
cached_requests: int = 0
fallback_requests: int = 0
avg_response_time_ms: float = 0.0
ai_service_errors: int = 0
circuit_breaker_opens: int = 0
def success_rate(self) -> float:
return self.successful_requests / max(self.total_requests, 1)
def cache_hit_rate(self) -> float:
return self.cached_requests / max(self.total_requests, 1)
def fallback_rate(self) -> float:
return self.fallback_requests / max(self.total_requests, 1)
class ProductionRecommendationService(OptimisedRecommendationService):
def __init__(self, ai_service_url: str, api_key: str,
cache_ttl: int = 3600, max_cache_size: int = 10000):
super.__init__(ai_service_url, api_key, cache_ttl, max_cache_size)
self.metrics = AIMetrics
self.response_times = deque(maxlen=1000) # Keep last 1000 response times
self.error_history = deque(maxlen=100) # Keep last 100 errors
asyncdef get_recommendations_async(self, user_id: str,
product_context: Dict) -> List[str]:
"""Production-ready recommendations with full monitoring"""
start_time = time.time
cache_key = self._create_cache_key(user_id, product_context)
try:
self.metrics.total_requests += 1
# Check cache first
if cache_key in self.cache:
entry = self.cache[cache_key]
ifnot entry.is_expired:
self.metrics.cached_requests += 1
self.metrics.successful_requests += 1
self._record_response_time(start_time)
self.logger.debug("Cache hit for recommendations")
return entry.data
else:
del self.cache[cache_key]
# Handle concurrent requests
if cache_key in self.pending_requests:
result = await self.pending_requests[cache_key]
return result
# Create and execute new request
request_future = asyncio.create_task(
self._fetch_with_monitoring(user_id, product_context, cache_key, start_time)
)
self.pending_requests[cache_key] = request_future
try:
returnawait request_future
finally:
del self.pending_requests[cache_key]
except Exception as e:
self._record_error(e, user_id, product_context)
raise
asyncdef _fetch_with_monitoring(self, user_id: str, product_context: Dict,
cache_key: str, start_time: float) -> List[str]:
"""Fetch recommendations with comprehensive monitoring"""
try:
recommendations = await self.circuit_breaker.call(
self._call_ai_service_async, user_id, product_context
)
# Success path
self._cache_result(cache_key, recommendations)
# Log successful AI interaction for quality monitoring
self._log_ai_interaction(user_id, product_context, recommendations, True)
return recommendations
# Fallback path
self.metrics.fallback_requests += 1
self.metrics.ai_service_errors += 1
if self.circuit_breaker.state == CircuitState.OPEN:
self.metrics.circuit_breaker_opens += 1
fallback_recommendations = self._get_fallback_recommendations(
user_id, product_context
)
# Log fallback usage
self._log_ai_interaction(user_id, product_context, fallback_recommendations, False)
return fallback_recommendations
def _record_response_time(self, start_time: float):
"""Record response time for performance monitoring"""
response_time_ms = (time.time - start_time) * 1000
self.response_times.append(response_time_ms)
# Update rolling average
if self.response_times:
self.metrics.avg_response_time_ms = statistics.mean(self.response_times)
def _record_error(self, error: Exception, user_id: str, context: Dict):
"""Record error details for debugging and alerting"""
error_record = {
'timestamp': time.time,
'error_type': type(error).__name__,
'error_message': str(error),
'user_id': user_id,
'context': context,
'circuit_breaker_state': self.circuit_breaker.state.value
}
self.error_history.append(error_record)
self.logger.error(f"AI service error: {error_record}")
def _log_ai_interaction(self, user_id: str, context: Dict,
recommendations: List[str], from_ai: bool):
"""Log AI interactions for quality monitoring and debugging"""
interaction = {
'user_id': user_id,
'context': context,
'recommendations': recommendations,
'from_ai_service': from_ai,
'recommendation_count': len(recommendations)
}
# In production, this might go to a dedicated logging system
# for AI quality monitoring and A/B testing analysis
self.logger.info(f"AI interaction: {interaction}")
def get_health_status(self) -> Dict[str, Any]:
"""Provide comprehensive health status for monitoring systems"""
return {
'service_status': 'healthy'if self.metrics.success_rate > 0.95else'degraded',
'metrics': {
'total_requests': self.metrics.total_requests,
'success_rate': round(self.metrics.success_rate, 3),
'cache_hit_rate': round(self.metrics.cache_hit_rate, 3),
'fallback_rate': round(self.metrics.fallback_rate, 3),
'avg_response_time_ms': round(self.metrics.avg_response_time_ms, 2),
'ai_service_errors': self.metrics.ai_service_errors,
'circuit_breaker_state': self.circuit_breaker.state.value,
'cache_size': len(self.cache)
},
'recent_errors': list(self.error_history)[-5:], # Last 5 errors
'performance_percentiles': self._calculate_percentiles
}
def _calculate_percentiles(self) -> Dict[str, float]:
"""Calculate response time percentiles for SLA monitoring"""
ifnot self.response_times:
return {}
sorted_times = sorted(self.response_times)
n = len(sorted_times)
return {
'p50': sorted_times[int(n * 0.5)],
'p95': sorted_times[int(n * 0.95)],
'p99': sorted_times[int(n * 0.99)]
}
def reset_metrics(self):
"""Reset metrics - useful for testing or periodic resets"""
self.response_times.clear
self.error_history.clear
功能说明: 这一生产级版本新增了全面的监控与可观测性功能。它会追踪缓存命中率(cache hit rates)、降级策略使用率(fallback usage)、AI 服务可用性等 AI 特有关键指标;同时,该服务会记录所有 AI 交互过程以用于质量监控,并提供详细的健康状态端点(health status endpoints)。
性能分位数可辅助服务等级协议(SLA/Service Level Agreement)的监控,而错误追踪则有助于问题调试。对于大规模运营 AI 服务、理解其业务影响而言,这样的可观测性水平至关重要。
AI 驱动的 API 的部署模式
与传统服务不同,基于AI的API能从渐进式发布和A/B测试中获益。建议通过功能开关(Feature Flags)将不同用户群体路由至不同的AI服务提供商或模型版本。
AI 驱动的API 架构
API 设计考量
通过 API 开放 AI 能力时,可考虑以下模式:
针对高负载任务的异步处理
部分 AI 操作本身是耗时的。在设计 API 时,应同时支持同步与异步两种处理模式:
# API endpoint structure for async AI processing
from fastapi import FastAPI, BackgroundTasks
import uuid
app = FastAPI
recommendation_service = ProductionRecommendationService(
ai_service_url="https://api.ai-provider.com",
api_key="your-api-key"
)
@app.get("/recommendations/{user_id}")
asyncdef get_recommendations_sync(user_id: str, category: str = "general"):
"""Synchronous recommendations endpoint"""
context = {"category": category}
recommendations = await recommendation_service.get_recommendations_async(
user_id, context
)
return {"recommendations": recommendations}
@app.post("/recommendations/async")
asyncdef request_recommendations_async(user_id: str, context: dict,
background_tasks: BackgroundTasks):
"""Asynchronous recommendations endpoint for complex processing"""
request_id = str(uuid.uuid4)
# Store request
# await store_request(request_id, user_id, context)
# Process in background
background_tasks.add_task(
process_async_recommendations, request_id, user_id, context
)
return {"request_id": request_id, "status": "processing"}
@app.get("/recommendations/status/{request_id}")
asyncdef get_async_status(request_id: str):
"""Check status of async recommendation request"""
# In practice, check status in database/cache
return {"status": "completed", "recommendations": ["product_1", "product_2"]}
功能说明: 这些 API 端点展示了 AI 集成的同步与异步模式。同步端点会返回即时响应,适用于实时用户交互场景;而异步端点则用于处理可能耗时较长的复杂任务。异步模式借助后台任务与状态查询机制实现,非常适合处理高负载 AI 任务 —— 这类任务若采用同步请求,极易出现超时问题。
这种设计方案为不同的使用场景与用户体验需求提供了灵活性。
生产环境下的 AI API 监控
传统 API 监控无法满足 AI 驱动型服务的需求。需部署监控系统,追踪以下维度的指标:
·AI 服务可用性与响应时间
·缓存命中率及有效性
·降级策略使用模式
·模型性能指标
·外部 AI 服务的成本指标
·AI 决策对业务的影响
常见问题及规避方法
过度依赖外部服务: AI 服务提供商可能会调整定价、停用旧模型,或遭遇服务中断。对于核心系统,务必部署降级策略,并考虑采用多服务商接入方案。
缓存策略不完善:AI API 调用往往在时间和成本上都代价较高。需部署智能缓存机制,同时综合考虑用户场景、数据时效性及业务规则。
忽视成本影响:AI 服务通常按请求次数或计算单元计费。需密切监控成本消耗,并针对不同用户层级设置请求配额。
错误处理机制薄弱:AI 服务可能出现特殊故障 —— 如请求超时、配额耗尽、模型临时不可用等。设计错误处理逻辑时,需在提供明确反馈与保障系统稳定性之间找到平衡。
技术架构师的核心问题
将 AI 能力集成到系统时,需考虑以下方面:
集成策略问题
·你将采用外部 AI 服务、部署自研模型,还是混合模式?
·如何应对供应商锁定(Vendor Lock-in)问题,以及服务迁移场景?
·针对不同用户层级,你将采用何种策略管理 API 成本?
性能与可扩展性问题
·你的系统延迟需求是什么?AI 服务调用会对这些需求产生怎样的影响?
·对于可能耗尽 AI 服务配额的流量峰值,你将如何应对?
·结合你的业务使用模式,哪些缓存策略更为适用?
可靠性问题
·当 AI 服务不可用时,你将部署何种降级策略?
·如何检测AI 服务性能下降,并制定相应的应对方案?
·针对不同类型的 AI 服务故障,你将采用何种处理思路?
业务逻辑问题
·AI 决策将如何与现有业务规则融合?
·针对当前业务场景,AI 服务的准确率需达到何种水平才符合要求?
·如何衡量 AI 集成对业务的实际影响?
这些只是我临时想到的内容
总结
要将 AI 成功集成到商业系统中,需将 AI 能力视为可能发生故障、产生变更或陷入不可用状态的外部依赖。最成功的集成方案,必然是将稳健的工程实践与贴合业务的逻辑设计相结合的产物。
技术架构师需牢记的核心要点:
从设计初期就考虑故障应对。AI 服务本身的可靠性低于传统 API,因此需部署中断器、降级策略及优雅降级模式(graceful degradation patterns)。实施智能缓存。AI API 调用通常成本高且耗时久,需设计缓存策略,在数据新鲜度(freshness)、系统性能与成本控制之间找到平衡。开展全面监控。传统 API 监控无法满足需求 —— 需额外追踪 AI 特有指标,如缓存命中率、降级策略使用及单次请求成本。为演进做好规划。AI 服务发展迅速。设计集成方案时,需考虑应对服务提供商变更、模型更新以及功能弃用等情况,确保服务不中断。我们的目标并非构建最复杂精密的AI集成方案——而是打造能够持续交付商业价值,同时兼具可维护性、成本效益与扩展性的系统。
通过践行这些架构原则,您可以顺利将 AI 能力集成到商业系统中。
原文标题:
Architecture of Al-Driven Systems: WhatEvery TechnicalArchitect Should Know
原文链接:
来源:数据派THU一点号