摘要:com.alibaba.cspsentine
限流(Rate Limiting)是通过控制单位时间内系统能够处理的请求数量,来保护系统免受过载流量冲击的技术手段。
// 限流的现实比喻public class RateLimitingAnalogy {/*** 交通系统 vs 限流系统*/public class TrafficSystemComparison {// 交通信号灯 → 限流控制器// 高速公路收费站 → 令牌桶算法// 车流量统计 → 请求计数器// 交通管制 → 限流策略}/*** 没有限流的风险*/public class WithoutRateLimiting {// 1. 资源耗尽:突发流量耗尽CPU、内存、数据库连接// 2. 服务雪崩:一个服务崩溃引发连锁反应// 3. 安全风险:DDoS攻击导致服务不可用// 4. 用户体验:系统响应缓慢或完全不可用}}
算法原理优点缺点适用场景固定窗口固定时间窗口计数实现简单临界问题、不够平滑简单限流滑动窗口多个子窗口统计相对精确实现复杂精确控制漏桶算法恒定速率处理流量整形无法应对突发平滑流量令牌桶算法按速率生成令牌允许突发实现复杂大部分场景自适应限流动态调整阈值智能调节算法复杂动态环境
com.alibaba.cspsentinel-core1.8.6com.alibaba.cloudspring-cloud-starter-alibaba-sentinel2022.0.0.0com.alibaba.cspsentinel-datasource-nacos1.8.6org.springframework.bootspring-boot-starter-weborg.springframework.bootspring-boot-starter-actuatororg.springframework.bootspring-boot-starter-aop# application.yml Sentinel配置spring:application:name: order-servicecloud:sentinel:enabled: trueeager: truetransport:dashboard: 192.168.1.100:8080 # Sentinel控制台port: 8719# 限流规则数据源datasource:flow-rule:nacos:server-addr: 192.168.1.100:8848dataId: ${spring.application.name}-flow-rulesgroupId: SENTINEL_GROUPrule-type: flownamespace: sentinel-config# 日志配置logging:level:com.alibaba.csp.sentinel: INFOorg.springframework.cloud.alibaba.sentinel: DEBUG# 监控端点management:endpoints:web:exposure:include: sentinel,health,metricsendpoint:sentinel:enabled: true// 订单服务限流控制器@RestController@RequestMapping("/api/orders")@Slf4jpublic class OrderRateLimitController {@Autowiredprivate OrderService orderService;/*** 创建订单 - QPS限流*/@PostMapping@SentinelResource(value = "createOrder",blockHandler = "createOrderBlockHandler",fallback = "createOrderFallback",blockHandlerClass = OrderRateLimitHandlers.class)public ResponseEntity createOrder(@requestBody @Valid OrderCreateRequest request) {log.info("创建订单请求: 用户{}, 商品{}, 数量{}", request.getUserId, request.getProductId, request.getQuantity);OrderDTO order = orderService.createOrder(request);return ResponseEntity.ok(order);}/*** 查询订单 - 线程数限流*/@GetMapping("/{orderId}")@SentinelResource(value = "getOrderDetail",blockHandler = "getOrderBlockHandler",fallback = "getOrderFallback")public ResponseEntity getOrder(@PathVariable String orderId) {log.info("查询订单详情: {}", orderId);OrderDTO order = orderService.getOrderDetail(orderId);return ResponseEntity.ok(order);}/*** 订单列表 - 关联限流*/@GetMapping@SentinelResource(value = "listOrders",blockHandler = "listOrdersBlockHandler")public ResponseEntity> listOrders(@RequestParam(defaultValue = "1") int page,@RequestParam(defaultValue = "20") int size) {log.info("查询订单列表: 第{}页, 每页{}条", page, size);PageResult orders = orderService.listOrders(page, size);return ResponseEntity.ok(orders);}// 流控异常处理public ResponseEntity getOrderBlockHandler(String orderId, BlockException ex) {log.warn("查询订单触发流控, 订单ID: {}, 规则: {}", orderId, ex.getRule);RateLimitResponse response = RateLimitResponse.builder.code(429).message("系统繁忙,请稍后重试").retryAfter(5) // 5秒后重试.build;return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).body(null);}public ResponseEntity getOrderFallback(String orderId, Throwable ex) {log.error("查询订单服务降级, 订单ID: {}", orderId, ex);return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body(null);}public ResponseEntity> listOrdersBlockHandler(Integer page, Integer size, BlockException ex) {log.warn("查询订单列表触发流控, 页码: {}, 规则: {}", page, ex.getRule);// 返回空结果而不是错误,提升用户体验PageResult emptyResult = PageResult.empty;return ResponseEntity.ok(emptyResult);}}// 统一的限流异常处理类@Slf4jpublic class OrderRateLimitHandlers {/*** 创建订单流控处理*/public static ResponseEntity createOrderBlockHandler(OrderCreateRequest request, BlockException ex) {log.warn("创建订单触发流控, 用户: {}, 规则: {}", request.getUserId, ex.getRule);RateLimitResponse response = RateLimitResponse.builder.code(429).message("当前下单人数过多,请稍后重试").retryAfter(10).suggestTime(LocalDateTime.now.plusSeconds(30)).build;return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).body(null);}/*** 创建订单降级处理*/public static ResponseEntity createOrderFallback(OrderCreateRequest request, Throwable ex) {log.error("创建订单服务降级, 请求: {}", request, ex);// 根据异常类型提供不同的降级策略if (ex instanceof TimeoutException) {return ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT).body(null);} else if (ex instanceof ServiceUnavailableException) {return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body(null);} else {return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(null);}}}// 限流响应封装@Data@Builder@AllArgsConstructor@NoArgsConstructorpublic class RateLimitResponse {private int code;private String message;private Integer retryAfter; // 重试等待时间(秒)private LocalDateTime suggestTime; // 建议重试时间private String requestId;@Builder.Defaultprivate long timestamp = System.currentTimeMillis;}// Sentinel限流规则配置@Component@Slf4jpublic class SentinelRateLimitConfig {/*** 初始化限流规则*/@PostConstructpublic void initFlowRules {List rules = new ArrayList;// 1. 创建订单 - QPS限流,预热模式FlowRule createOrderRule = new FlowRule;createOrderRule.setResource("createOrder");createOrderRule.setGrade(RuleConstant.FLOW_GRADE_QPS);createOrderRule.setCount(100); // 阈值100 QPScreateOrderRule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_WARM_UP);createOrderRule.setWarmUpPeriodSec(10); // 预热10秒createOrderRule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);rules.add(createOrderRule);// 2. 查询订单 - QPS限流,排队等待FlowRule getOrderRule = new FlowRule;getOrderRule.setResource("getOrderDetail");getOrderRule.setGrade(RuleConstant.FLOW_GRADE_QPS);getOrderRule.setCount(500); // 阈值500 QPSgetOrderRule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER);getOrderRule.setMaxQueueingTimeMs(1000); // 最大排队1秒rules.add(getOrderRule);// 3. 订单列表 - 线程数限流FlowRule listOrdersRule = new FlowRule;listOrdersRule.setResource("listOrders");listOrdersRule.setGrade(RuleConstant.FLOW_GRADE_THREAD);listOrdersRule.setCount(50); // 最大50个并发线程rules.add(listOrdersRule);// 4. 关联限流 - 创建订单和查询订单关联FlowRule relationRule = new FlowRule;relationRule.setResource("createOrder");relationRule.setGrade(RuleConstant.FLOW_GRADE_QPS);relationRule.setCount(50);relationRule.setRefResource("getOrderDetail");relationRule.setStrategy(RuleConstant.STRATEGY_RELATE);rules.add(relationRule);FlowRuleManager.loadRules(rules);log.info("初始化Sentinel限流规则完成,规则数量: {}", rules.size);}/*** 热点参数限流规则*/@PostConstructpublic void initParamFlowRules {List paramRules = new ArrayList;// 商品详情热点参数限流ParamFlowRule productRule = new ParamFlowRule("getProductDetail").setParamIdx(0) // 第一个参数(商品ID).setCount(10) // 单商品阈值10 QPS.setGrade(RuleConstant.FLOW_GRADE_QPS);// 特殊商品独立限流ParamFlowItem hotProduct1 = new ParamFlowItem.setObject("HOT_PRODUCT_001").setClassType(String.class.getName).setCount(1); // 爆款商品限流1 QPSParamFlowItem hotProduct2 = new ParamFlowItem.setObject("HOT_PRODUCT_002").setClassType(String.class.getName).setCount(1);productRule.setParamFlowItemList(Arrays.asList(hotProduct1, hotProduct2));paramRules.add(productRule);ParamFlowRuleManager.loadRules(paramRules);log.info("初始化热点参数限流规则完成");}/*** 集群流控规则*/@PostConstructpublic void initClusterFlowRules {// 集群流控配置ClusterFlowConfig clusterConfig = new ClusterFlowConfig;clusterConfig.setFlowId(1L);clusterConfig.setThresholdType(1);clusterConfig.setFallbackToLocalWhenFail(true);// 全局限流规则FlowRule clusterRule = new FlowRule;clusterRule.setResource("globalCreateOrder");clusterRule.setGrade(RuleConstant.FLOW_GRADE_QPS);clusterRule.setCount(1000); // 集群总阈值1000 QPSclusterRule.setClusterMode(true);clusterRule.setClusterConfig(clusterConfig);FlowRuleManager.loadRules(Collections.singletonList(clusterRule));log.info("初始化集群流控规则完成");}}// 动态规则管理器@Service@Slf4jpublic class DynamicRateLimitManager {@Autowiredprivate NacosConfigManager nacosConfigManager;/*** 动态更新限流规则*/public void updateFlowRule(String resource, double qpsThreshold) {List rules = FlowRuleManager.getRules;// 查找现有规则Optional existingRule = rules.stream.filter(rule -> rule.getResource.equals(resource)).findFirst;if (existingRule.isPresent) {// 更新现有规则FlowRule rule = existingRule.get;rule.setCount(qpsThreshold);log.info("更新限流规则: {} -> {} QPS", resource, qpsThreshold);} else {// 创建新规则FlowRule newRule = new FlowRule;newRule.setResource(resource);newRule.setGrade(RuleConstant.FLOW_GRADE_QPS);newRule.setCount(qpsThreshold);rules.add(newRule);log.info("创建限流规则: {} -> {} QPS", resource, qpsThreshold);}FlowRuleManager.loadRules(rules);}/*** 根据系统负载动态调整限流阈值*/@Scheduled(fixedRate = 60000) // 每分钟调整一次public void adaptiveRateLimit {double systemLoad = getSystemLoadAverage;double cpuUsage = getCpuUsage;// 根据系统负载调整限流阈值if (systemLoad > 4.0 || cpuUsage > 0.8) {// 高负载时降低限流阈值updateFlowRule("createOrder", 50);updateFlowRule("getOrderDetail", 200);log.warn("系统高负载,降低限流阈值");} else if (systemLoad // 令牌桶限流器实现@Component@Slf4jpublic class TokenBucketRateLimiter {private final Map buckets = new ConcurrentHashMap;private final ScheduledExecutorService refillScheduler = Executors.newScheduledThreadPool(1);/*** 令牌桶配置*/@Data@AllArgsConstructorpublic static class TokenBucketConfig {private int capacity; // 桶容量private int refillTokens; // 每次补充的令牌数private long refillIntervalMs; // 补充间隔(毫秒)}/*** 令牌桶实例*/private static class TokenBucket {private final String key;private final AtomicInteger tokens;private final int capacity;private final int refillTokens;private final long refillIntervalMs;private volatile long lastRefillTime;public TokenBucket(String key, TokenBucketConfig config) {this.key = key;this.capacity = config.getCapacity;this.refillTokens = config.getRefillTokens;this.refillIntervalMs = config.getRefillIntervalMs;this.tokens = new AtomicInteger(capacity);this.lastRefillTime = System.currentTimeMillis;}public boolean tryAcquire(int tokensRequired) {refill;while (true) {int currentTokens = tokens.get;if (currentTokens
refillIntervalMs) {int refillCount = (int) (timeSinceLastRefill / refillIntervalMs) * refillTokens;tokens.updateAndGet(current -> Math.min(capacity, current + refillCount));lastRefillTime = currentTime;log.debug("补充令牌: {} -> {}", key, tokens.get);}}public int getAvailableTokens {refill;return tokens.get;}}public TokenBucketRateLimiter {// 启动定时清理任务refillScheduler.scheduleAtFixedRate(this::cleanupIdleBuckets, 1, 1, TimeUnit.HOURS);}/*** 尝试获取令牌*/public boolean tryAcquire(String key) {return tryAcquire(key, 1);}/*** 尝试获取指定数量的令牌*/public boolean tryAcquire(String key, int tokensRequired) {TokenBucket bucket = buckets.computeIfAbsent(key, k -> new TokenBucket(k, new TokenBucketConfig(100, 10, 1000))); // 默认配置boolean acquired = bucket.tryAcquire(tokensRequired);if (!acquired) {log.debug("令牌桶限流: {}, 可用令牌: {}", key, bucket.getAvailableTokens);}return acquired;}/*** 创建自定义令牌桶*/public void createBucket(String key, TokenBucketConfig config) {TokenBucket bucket = new TokenBucket(key, config);buckets.put(key, bucket);log.info("创建令牌桶: {}, 容量: {}", key, config.getCapacity);}/*** 获取可用令牌数量*/public int getAvailableTokens(String key) {TokenBucket bucket = buckets.get(key);return bucket != null ? bucket.getAvailableTokens : 0;}/*** 清理空闲的令牌桶*/private void cleanupIdleBuckets {long now = System.currentTimeMillis;Iterator> iterator = buckets.entrySet.iterator;while (iterator.hasNext) {Map.Entry entry = iterator.next;// 可以基于最后使用时间等策略清理// 这里简化实现}log.info("令牌桶清理完成,当前桶数量: {}", buckets.size);}@PreDestroypublic void destroy {refillScheduler.shutdown;}}// 令牌桶限流注解@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)public @interface TokenBucketRateLimit {String key; // 限流键int capacity default 100; // 桶容量int refillTokens default 10; // 每次补充令牌数long refillIntervalMs default 1000; // 补充间隔毫秒int tokensRequired default 1; // 每次请求需要的令牌数String message default "系统繁忙,请稍后重试";}// 令牌桶限流切面@Aspect@Component@Slf4jpublic class TokenBucketRateLimitAspect {@Autowiredprivate TokenBucketRateLimiter rateLimiter;/*** 令牌桶限流切面*/@Around("@annotation(rateLimit)")public Object rateLimit(ProceedingJoinPoint joinPoint, TokenBucketRateLimit rateLimit) throws Throwable {String key = buildRateLimitKey(joinPoint, rateLimit.key);// 初始化令牌桶TokenBucketRateLimiter.TokenBucketConfig config = new TokenBucketRateLimiter.TokenBucketConfig(rateLimit.capacity,rateLimit.refillTokens,rateLimit.refillIntervalMs);rateLimiter.createBucket(key, config);// 尝试获取令牌if (!rateLimiter.tryAcquire(key, rateLimit.tokensRequired)) {log.warn("令牌桶限流触发: {}", key);throw new RateLimitException(rateLimit.message);}return joinPoint.proceed;}private String buildRateLimitKey(ProceedingJoinPoint joinPoint, String keyTemplate) {// 解析SpEL表达式构建限流键MethodSignature signature = (MethodSignature) joinPoint.getSignature;Method method = signature.getMethod;Object args = joinPoint.getArgs;ExpressionParser parser = new SpelExpressionParser;Expression expression = parser.parseExpression(keyTemplate);EvaluationContext context = new StandardEvaluationContext;context.setVariable("args", args);context.setVariable("method", method);// 添加参数到上下文Parameter parameters = method.getParameters;for (int i = 0; i // 滑动窗口限流器@Component@Slf4jpublic class SlidingWindowRateLimiter {private final Map windows = new ConcurrentHashMap;private final ScheduledExecutorService windowRotator = Executors.newScheduledThreadPool(1);/*** 滑动窗口配置*/@Data@AllArgsConstructorpublic static class SlidingWindowConfig {private int windowSizeMs; // 窗口大小(毫秒)private int numberOfWindows; // 子窗口数量private int threshold; // 阈值}/*** 滑动窗口实例*/private static class SlidingWindow {private final String key;private final long windowSizeMs;private final int numberOfWindows;private final int threshold;private final long subWindowSizeMs;private final AtomicReferenceArray buckets;private volatile long currentWindowStart;public SlidingWindow(String key, SlidingWindowConfig config) {this.key = key;this.windowSizeMs = config.getWindowSizeMs;this.numberOfWindows = config.getNumberOfWindows;this.threshold = config.getThreshold;this.subWindowSizeMs = windowSizeMs / numberOfWindows;this.buckets = new AtomicReferenceArray(numberOfWindows);this.currentWindowStart = System.currentTimeMillis / subWindowSizeMs * subWindowSizeMs;// 初始化桶for (int i = 0; i = threshold) {log.debug("滑动窗口限流: {}, 当前请求数: {}/{}", key, totalRequests, threshold);return false;}// 记录当前请求int subWindowIndex = (int) ((currentSubWindowStart - currentWindowStart) / subWindowSizeMs);if (subWindowIndex >= 0 && subWindowIndex = numberOfWindows) {// 重置所有桶for (int i = 0; i
0) {// 滑动窗口:清除过期的桶,创建新的桶for (int i = 0; i
new SlidingWindow(k, new SlidingWindowConfig(60000, 6, 100))); // 1分钟6个窗口,阈值100return window.tryAcquire;}/*** 创建自定义滑动窗口*/public void createWindow(String key, SlidingWindowConfig config) {SlidingWindow window = new SlidingWindow(key, config);windows.put(key, window);log.info("创建滑动窗口: {}, 窗口大小: {}ms, 阈值: {}", key, config.getWindowSizeMs, config.getThreshold);}/*** 获取当前窗口计数*/public int getCurrentCount(String key) {SlidingWindow window = windows.get(key);return window != null ? window.getCurrentCount : 0;}/*** 维护窗口,清理长时间不使用的窗口*/private void maintainWindows {long now = System.currentTimeMillis;Iterator> iterator = windows.entrySet.iterator;int removedCount = 0;while (iterator.hasNext) {Map.Entry entry = iterator.next;// 可以根据最后使用时间等策略清理// 这里简化实现,不实际清理}if (removedCount > 0) {log.info("滑动窗口维护完成,清理 {} 个窗口", removedCount);}}@PreDestroypublic void destroy {windowRotator.shutdown;}}// Redis分布式限流器@Component@Slf4jpublic class RedisDistributedRateLimiter {@Autowiredprivate RedisTemplate redisTemplate;private final String RATE_LIMIT_KEY_PREFIX = "rate_limit:";/*** 固定窗口限流 - 基于Redis*/public boolean fixedWindowAcquire(String key, int threshold, long windowSizeMs) {String redisKey = RATE_LIMIT_KEY_PREFIX + key;long currentTime = System.currentTimeMillis;long windowStart = currentTime / windowSizeMs * windowSizeMs;String windowKey = redisKey + ":" + windowStart;// 使用Lua脚本保证原子性String luaScript = """local key = KEYS[1]local threshold = tonumber(ARGV[1])local windowSize = tonumber(ARGV[2])local currentTime = tonumber(ARGV[3])local count = redis.call('GET', key)if count == false thenredis.call('SET', key, 1, 'PX', windowSize)return 1elseif tonumber(count)
script = RedisScript.of(luaScript, Long.class);Long result = redisTemplate.execute(script, Collections.singletonList(windowKey), threshold, windowSizeMs, currentTime);return result != null && result == 1;}/*** 滑动窗口限流 - 基于Redis*/public boolean slidingWindowAcquire(String key, int threshold, long windowSizeMs, int numberOfWindows) {String redisKey = RATE_LIMIT_KEY_PREFIX + key;long currentTime = System.currentTimeMillis;long subWindowSizeMs = windowSizeMs / numberOfWindows;long currentSubWindowStart = currentTime / subWindowSizeMs * subWindowSizeMs;// Lua脚本实现滑动窗口String luaScript = """local key = KEYS[1]local threshold = tonumber(ARGV[1])local windowSize = tonumber(ARGV[2])local subWindowSize = tonumber(ARGV[3])local currentTime = tonumber(ARGV[4])local numWindows = tonumber(ARGV[5])-- 删除过期的子窗口local expiredBefore = currentTime - windowSizelocal subWindowStart = expiredBefore - (expiredBefore % subWindowSize)while subWindowStart = threshold thenreturn 0end-- 记录当前请求local currentSubKey = key .. ':' .. (currentTime - (currentTime % subWindowSize))redis.call('INCR', currentSubKey)redis.call('PEXPIRE', currentSubKey, windowSize + subWindowSize)return 1""";RedisScript script = RedisScript.of(luaScript, Long.class);Long result = redisTemplate.execute(script, Collections.singletonList(redisKey), threshold, windowSizeMs, subWindowSizeMs, currentTime, numberOfWindows);return result != null && result == 1;}/*** 令牌桶限流 - 基于Redis*/public boolean tokenBucketAcquire(String key, int capacity, int refillTokens, long refillIntervalMs) {String redisKey = RATE_LIMIT_KEY_PREFIX + "token_bucket:" + key;long currentTime = System.currentTimeMillis;String luaScript = """local key = KEYS[1]local capacity = tonumber(ARGV[1])local refillTokens = tonumber(ARGV[2])local refillInterval = tonumber(ARGV[3])local currentTime = tonumber(ARGV[4])local tokensRequired = tonumber(ARGV[5])local bucket = redis.call('HMGET', key, 'tokens', 'lastRefillTime')local tokens = tonumber(bucket[1] or capacity)local lastRefillTime = tonumber(bucket[2] or currentTime)-- 补充令牌local timeSinceLastRefill = currentTime - lastRefillTimeif timeSinceLastRefill > refillInterval thenlocal refillCount = math.floor(timeSinceLastRefill / refillInterval) * refillTokenstokens = math.min(capacity, tokens + refillCount)lastRefillTime = currentTimeend-- 检查是否有足够令牌if tokens
script = RedisScript.of(luaScript, Long.class);Long result = redisTemplate.execute(script, Collections.singletonList(redisKey), capacity, refillTokens, refillIntervalMs, currentTime, 1);boolean acquired = result != null && result == 1;if (!acquired) {log.debug("Redis令牌桶限流: {}", key);}return acquired;}/*** 获取限流统计信息*/public RateLimitStats getRateLimitStats(String key) {String redisKey = RATE_LIMIT_KEY_PREFIX + key;// 实现获取统计信息的逻辑return new RateLimitStats;}@Datapublic static class RateLimitStats {private String key;private long currentCount;private long threshold;private long resetTime;private double utilization; // 使用率}}// 分布式限流配置@Configurationpublic class RedisRateLimitConfig {@Beanpublic RedisTemplate redisTemplate(RedisConnectionFactory factory) {RedisTemplate template = new RedisTemplate;template.setConnectionFactory(factory);// 使用String序列化template.setKeySerializer(new StringRedisSerializer);template.setValueSerializer(new GenericJackson2JsonRedisSerializer);template.setHashKeySerializer(new StringRedisSerializer);template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer);template.afterPropertiesSet;return template;}@Beanpublic RedisDistributedRateLimiter redisDistributedRateLimiter {return new RedisDistributedRateLimiter;}}// 限流监控服务@Component@Slf4jpublic class RateLimitMonitor {@Autowiredprivate MeterRegistry meterRegistry;private final Map metricsMap = new ConcurrentHashMap;private final ScheduledExecutorService monitorScheduler = Executors.newScheduledThreadPool(1);/*** 限流指标数据*/@Datapublic static class RateLimitMetrics {private String resource;private long totalRequests;private long limitedRequests;private long lastLimitedTime;private double limitRate; // 限流比例private long windowStartTime;private int currentQps;public void recordRequest(boolean limited) {totalRequests++;if (limited) {limitedRequests++;lastLimitedTime = System.currentTimeMillis;}limitRate = totalRequests > 0 ? (double) limitedRequests / totalRequests : 0.0;}public void resetWindow {windowStartTime = System.currentTimeMillis;currentQps = 0;}}public RateLimitMonitor {// 启动监控任务monitorScheduler.scheduleAtFixedRate(this::collectMetrics, 30, 30, TimeUnit.SECONDS);}/*** 记录限流事件*/public void recordRateLimitEvent(String resource, boolean limited) {RateLimitMetrics metrics = metricsMap.computeIfAbsent(resource, k -> new RateLimitMetrics);metrics.setResource(resource);metrics.recordRequest(limited);// 记录到监控系统Counter.builder("ratelimit.requests").tag("resource", resource).tag("limited", String.valueOf(limited)).register(meterRegistry).increment;}/*** 收集监控指标*/private void collectMetrics {for (Map.Entry entry : metricsMap.entrySet) {String resource = entry.getKey;RateLimitMetrics metrics = entry.getValue;// 记录限流比例Gauge.builder("ratelimit.rate").tag("resource", resource).register(meterRegistry, metrics, m -> m.getLimitRate);// 检查异常情况checkAnomalies(resource, metrics);}}/*** 检查限流异常*/private void checkAnomalies(String resource, RateLimitMetrics metrics) {// 高限流比例告警if (metrics.getLimitRate > 0.1) { // 10%限流比例log.warn("资源 {} 限流比例过高: {:.2f}%", resource, metrics.getLimitRate * 100);sendAlert("限流比例告警", String.format("资源 %s 限流比例过高: %.2f%%", resource, metrics.getLimitRate * 100));}// 持续限警告警if (metrics.getLimitedRequests > 100 && System.currentTimeMillis - metrics.getLastLimitedTime (metricsMap));// 计算总体统计long totalRequests = metricsMap.values.stream.mapToLong(RateLimitMetrics::getTotalRequests).sum;long totalLimited = metricsMap.values.stream.mapToLong(RateLimitMetrics::getLimitedRequests).sum;report.setTotalRequests(totalRequests);report.setTotalLimitedRequests(totalLimited);report.setOverallLimitRate(totalRequests > 0 ? (double) totalLimited / totalRequests : 0.0);// 找出限流最严重的资源Optional> worstResource = metricsMap.entrySet.stream.max(Comparator.comparingDouble(entry -> entry.getValue.getLimitRate));worstResource.ifPresent(entry -> {report.setWorstResource(entry.getKey);report.setWorstLimitRate(entry.getValue.getLimitRate);});return report;}/*** 获取资源详情*/public RateLimitMetrics getResourceMetrics(String resource) {return metricsMap.get(resource);}private void sendAlert(String title, String message) {// 发送告警通知log.warn("发送限流告警: {} - {}", title, message);// 集成告警系统:邮件、钉钉、企业微信等// alertService.sendAlert(title, message);}@PreDestroypublic void destroy {monitorScheduler.shutdown;}@Datapublic static class RateLimitReport {private Instant timestamp;private Map metrics;private long totalRequests;private long totalLimitedRequests;private double overallLimitRate;private String worstResource;private double worstLimitRate;private List recommendations;}}// 限流监控控制器@RestController@RequestMapping("/monitor/ratelimit")@Slf4jpublic class RateLimitMonitorController {@Autowiredprivate RateLimitMonitor rateLimitMonitor;/*** 获取限流概览*/@GetMapping("/overview")public ResponseEntity getOverview {RateLimitMonitor.RateLimitReport report = rateLimitMonitor.generateReport;return ResponseEntity.ok(report);}/*** 获取资源详情*/@GetMapping("/resources/{resource}")public ResponseEntity getResourceMetrics(@PathVariable String resource) {RateLimitMonitor.RateLimitMetrics metrics = rateLimitMonitor.getResourceMetrics(resource);if (metrics == null) {return ResponseEntity.notFound.build;}return ResponseEntity.ok(metrics);}/*** 手动触发限流规则调整*/@PostMapping("/resources/{resource}/adjust")public ResponseEntity> adjustRateLimit(@PathVariable String resource,@RequestParam int newThreshold) {log.info("手动调整限流阈值: {} -> {}", resource, newThreshold);// 调用动态限流管理器调整阈值// dynamicRateLimitManager.updateThreshold(resource, newThreshold);Map result = new HashMap;result.put("resource", resource);result.put("newThreshold", newThreshold);result.put("timestamp", Instant.now);result.put("status", "success");return ResponseEntity.ok(result);}}// 限流健康检查@Componentpublic class RateLimitHealthIndicator implements HealthIndicator {@Autowiredprivate RateLimitMonitor rateLimitMonitor;@Overridepublic Health health {try {RateLimitMonitor.RateLimitReport report = rateLimitMonitor.generateReport;// 检查限流健康状态if (report.getOverallLimitRate > 0.3) {return Health.down.withDetail("message", "限流比例过高").withDetail("limitRate", report.getOverallLimitRate).withDetail("worstResource", report.getWorstResource).build;} else if (report.getOverallLimitRate > 0.1) {return Health.up.withDetail("message", "限流比例正常").withDetail("limitRate", report.getOverallLimitRate).withDetail("totalRequests", report.getTotalRequests).build;} else {return Health.up.withDetail("message", "系统运行良好").withDetail("limitRate", report.getOverallLimitRate).build;}} catch (Exception e) {return Health.down(e).withDetail("message", "限流监控异常").build;}}}总结限流是微服务架构中保护系统稳定性的关键技术。通过本文的实战指南,我们掌握了:
核心限流算法:
固定窗口计数器:简单高效,但有临界问题滑动窗口计数器:更精确的控制,解决临界问题漏桶算法:恒定速率处理,适合流量整形令牌桶算法:允许突发流量,适用大部分场景应用层限流:Sentinel注解、自定义限流器中间件限流:Redis分布式限流网关层限流:统一入口控制建立多级限流防御体系实施动态限流阈值调整建立完善的监控告警机制设计优雅的限流响应策略限流不是简单的技术开关,而是需要根据业务特点、系统容量和用户体验进行精细化设计的系统工程。正确的限流实践能够为微服务架构提供坚实的稳定性保障。
来源:一枚后端工程师
免责声明:本站系转载,并不代表本网赞同其观点和对其真实性负责。如涉及作品内容、版权和其它问题,请在30日内与本站联系,我们将在第一时间删除内容!