解决MQ消息丢失问题的5种方案

B站影视 港台电影 2025-10-28 21:04 4

摘要:// RabbitMQ生产者确认配置@Beanpublic RabbitTemplate rabbitTemplate {RabbitTemplate template = new RabbitTemplate(connectionFactory);templ

今天我们来聊聊一个让很多开发者头疼的话题——MQ消息丢失问题。

有些小伙伴在工作中,一提到消息队列就觉得很简单,但真正遇到线上消息丢失时,排查起来却让人抓狂。

其实,我在实际工作中,也遇到过MQ消息丢失的情况。

今天这篇文章,专门跟大家一起聊聊这个话题,希望对你会有所帮助。

在深入解决方案之前,我们先搞清楚消息在哪几个环节可能丢失:

自动确认模式下处理异常消费者宕机处理中断手动确认但忘记确认

理解了问题根源,接下来我们看5种实用的解决方案。

生产者发送消息后等待Broker确认,确保消息成功到达。

这是防止消息丢失的第一道防线。

// RabbitMQ生产者确认配置@Beanpublic RabbitTemplate rabbitTemplate {RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {// 消息成功到达BrokermessageStatusService.markConfirmed(correlationData.getId);} else {// 发送失败,触发重试retryService.scheduleRetry(correlationData.getId);}});return template;}// 可靠发送方法public void sendReliable(String exchange, String routingKey, Object message) {String messageId = generateId;// 先落库保存发送状态messageStatusService.saveSendingStatus(messageId, message);// 发送持久化消息rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {msg.getMessageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);msg.getMessageProperties.setMessageId(messageId);return msg;}, new CorrelationData(messageId));}

将消息保存到磁盘,确保Broker重启后消息不丢失。

这是防止Broker端消息丢失的关键。

// 持久化队列配置@Beanpublic Queue orderQueue {return QueueBuilder.durable("order.queue") // 队列持久化.deadLetterExchange("order.dlx") // 死信交换机.build;}// 发送持久化消息public void sendPersistentMessage(Object message) {rabbitTemplate.convertAndSend("order.exchange", "order.create", message, msg -> {msg.getMessageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 消息持久化return msg;});}// Kafka持久化配置@Beanpublic ProducerFactory producerFactory {Map props = new HashMap;props.put(ProducerConfig.ACKS_CONFIG, "all"); // 所有副本确认props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等性returnnew DefaultKafkaProducerFactory(props);}优缺点

优点:

有效防止Broker重启导致的消息丢失配置简单,效果明显

缺点:

磁盘IO影响性能需要足够的磁盘空间

消费者处理完消息后手动向Broker发送确认,Broker收到确认后才删除消息。

这是保证消息不丢失的最后一道防线。

关键实现// 手动确认消费者@RabbitListener(queues = "order.queue")public void handleMessage(Order order, Message message, Channel channel) {long deliveryTag = message.getMessageProperties.getDeliveryTag;try {// 业务处理orderService.processOrder(order);// 手动确认channel.basicAck(deliveryTag, false);log.info("消息处理完成: {}", order.getOrderId);} catch (Exception e) {log.error("消息处理失败: {}", order.getOrderId, e);// 处理失败,重新入队channel.basicNack(deliveryTag, false, true);}}// 消费者容器配置@Beanpublic SimpleRabbitListenerContainerFactory containerFactory {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory;factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动确认factory.setPrefetchCount(10); // 预取数量factory.setConcurrentConsumers(3); // 并发消费者return factory;}注意事项

通过事务保证本地业务操作和消息发送的原子性,要么都成功,要么都失败。

关键实现// 本地事务表方案@Transactionalpublic void createOrder(Order order) {// 1. 保存订单到数据库orderRepository.save(order);// 2. 保存消息到本地消息表LocalMessage localMessage = new LocalMessage;localMessage.setBusinessId(order.getOrderId);localMessage.setContent(JSON.toJSONString(order));localMessage.setStatus(MessageStatus.PENDING);localMessageRepository.save(localMessage);// 3. 事务提交,本地业务和消息存储保持一致性}// 定时任务扫描并发送消息@Scheduled(fixedDelay = 5000)public void sendPendingMessages {List pendingMessages = localMessageRepository.findByStatus(MessageStatus.PENDING);for (LocalMessage message : pendingMessages) {try {// 发送消息到MQrabbitTemplate.convertAndSend("order.exchange", "order.create", message.getContent);// 更新消息状态为已发送message.setStatus(MessageStatus.SENT);localMessageRepository.save(message);} catch (Exception e) {log.error("发送消息失败: {}", message.getId, e);}}}// RocketMQ事务消息public void sendTransactionMessage(Order order) {TransactionMQProducer producer = new TransactionMQProducer("order_producer");// 发送事务消息Message msg = new Message("order_topic", "create", JSON.toJSONBytes(order));TransactionSendResult result = producer.sendMessageInTransaction(msg, null);if (result.getLocalTransactionState == LocalTransactionState.COMMIT_MESSAGE) {log.info("事务消息提交成功");}}适用场景

通过重试机制处理临时故障,通过死信队列处理最终无法消费的消息。

关键实现// 重试队列配置@Beanpublic Queue orderQueue {return QueueBuilder.durable("order.queue").withArgument("x-dead-letter-exchange", "order.dlx") // 死信交换机.withArgument("x-dead-letter-routing-key", "order.dead").withArgument("x-message-ttl", 60000) // 60秒后进入死信.build;}// 死信队列配置@Beanpublic Queue orderDeadLetterQueue {return QueueBuilder.durable("order.dead.queue").build;}// 消费者重试逻辑@RabbitListener(queues = "order.queue")public void handleMessageWithRetry(Order order, Message message, Channel channel) {long deliveryTag = message.getMessageProperties.getDeliveryTag;try {orderService.processOrder(order);channel.basicAck(deliveryTag, false);} catch (TemporaryException e) {// 临时异常,重新入队重试channel.basicNack(deliveryTag, false, true);} catch (PermanentException e) {// 永久异常,直接确认进入死信队列channel.basicAck(deliveryTag, false);log.error("消息进入死信队列: {}", order.getOrderId, e);}}// 死信队列消费者@RabbitListener(queues = "order.dead.queue")public void handleDeadLetterMessage(Order order) {log.warn("处理死信消息: {}", order.getOrderId);// 发送告警、记录日志、人工处理等alertService.sendAlert("死信消息告警", order.toString);}方案可靠性性能影响复杂度适用场景生产者确认高中低所有需要可靠发送的场景消息持久化中中低Broker重启保护消费者确认高低中确保消息被成功处理事务消息最高高高强一致性要求的业务重试+死信高低中处理临时故障和最终死信

初创项目/简单业务:

生产者确认 + 消息持久化 + 消费者确认满足大部分场景,实现简单

电商/交易系统:

生产者确认 + 事务消息 + 重试机制保证数据一致性,处理复杂业务

大数据/日志处理:

消息持久化 + 消费者确认允许少量丢失,追求吞吐量

金融/支付系统:

消息丢失问题是消息队列使用中的常见挑战,通过今天介绍的5种方案,我们可以构建一个可靠的消息系统:

生产者确认机制 - 保证消息成功发送到Broker消息持久化机制 - 防止Broker重启导致消息丢失消费者确认机制 - 确保消息被成功处理事务消息机制 - 保证业务和消息的一致性重试与死信队列 - 处理异常情况和最终死信

有些小伙伴可能会问:"我需要全部使用这些方案吗?

"我的建议是:根据业务需求选择合适的组合

对于关键业务,建议至少使用前三种方案;对于普通业务,可以根据实际情况适当简化。

记住,没有完美的方案,只有最适合的方案。

来源:架构师之道

相关推荐