100000#告警阈值//自定义分区策略"/>

Kafka百万消息积压救火指南:半小时从瘫痪到通畅

B站影视 电影资讯 2025-07-21 05:12 1

摘要:# Prometheus + Grafana监控模板kafka_consumer_lag{group="order-group"} > 100000 # 告警阈值// 自定义分区策略:确保订单号相同消息进入同一分区public class OrderParti

凌晨三点,服务告警炸了锅。订单服务堆积了120万条Kafka消息,下游系统彻底瘫痪——这是我经历过最惊心动魄的救火战役。

核心思路:先恢复业务,再深挖根因。别一上来就查代码!

# 查看积压量 (关键指标!)bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \--group my-group --describe | grep -E 'TOPIC|LAG'# 输出示例:# TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG# orders 0 150000 300000 150000

行动指南

LAG持续增长 → 消费者处理能力不足特定分区LAG高 → 分区热点问题# 监控生产速率bin/kafka-producer-perf-test.sh --topic orders \--throughput -1 --num-records 100000 --record-size 1024 \--producer-props bootstrap.servers=localhost:9092

异常信号:生产者TPS突然飙升(如从1k/s到10k/s)

// Spring Boot中动态调整并发度(关键代码)@KafkaListener(topics = "orders", groupId = "order-group")public class OrderConsumer {// 使用线程池动态控制并发private final ExecutorService processor = Executors.newFixedThreadPool(4); @KafkaHandlerpublic void handle(OrderMessage message) {processor.submit( -> {// 业务处理逻辑 (确保线程安全!)processOrder(message);});}// 动态调整线程池大小 (通过API触发)public void scaleConsumers(int newSize) {((ThreadPoolExecutor) processor).setCorePoolSize(newSize);}}

操作步骤

# 将orders主题分区从8扩容到16(注意:仅对新数据有效!)bin/kafka-topics.sh --alter --topic orders \--partitions 16 --bootstrap-server localhost:9092

必须配合

立即重启消费者组,触发重平衡新消费者加入分担负载

血泪经验:分区扩容后,必须同步增加消费者实例数,否则可能适得其反!

# Prometheus + Grafana监控模板kafka_consumer_lag{group="order-group"} > 100000 # 告警阈值// 自定义分区策略:确保订单号相同消息进入同一分区public class OrderPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte keyBytes, Object value, byte valueBytes, Cluster cluster) {List partitions = cluster.partitionsForTopic(topic);return Math.abs(key.hashCode) % partitions.size; }}# Spring Kafka配置死信队列spring:kafka:listener:dead-letter-topic: orders.DLT # 自动创建死信主题@Beanpublic ConsumerFactory consumerFactory {Map props = new HashMap;props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); // 单次拉取条数props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 5242880); // 5MB/次return new DefaultKafkaConsumerFactory(props);}# 模拟百万消息压测(实战必备!)kafka-producer-perf-test --topic load-test \--num-records 1000000 --throughput 50000 --record-size 1024

真实案例:某电商大促期间,通过自动扩容策略,在3分钟内消化了350万条积压消息,避免千万级损失。

永远记住:Kafka积压不是技术问题,是系统预警。每一次救火后,必须做三件事:

分析火焰图定位慢处理 检查业务逻辑是否阻塞(如数据库锁) 用生产镜像做全链路压测

来源:马士兵教育

相关推荐