当前位置: 首页 > news >正文

RabbitMQ如何保证消息不丢失?

在 RabbitMQ 中,为了保证消息不丢失,通常采取以下几个关键机制:

1. 消息持久化(Message Durability)

  • 持久化队列:在声明队列时,将其设置为 durable,这样即使 RabbitMQ 服务器宕机重启,队列依然会存在。

channel.queueDeclare("queue_name", true, false, false, null);

true 表示队列持久化 

  • 持久化消息:消息在发送到队列时,可以将消息设置为持久化(persistent),即使 RabbitMQ 重启,消息也不会丢失。

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().deliveryMode(2) // 2 表示持久化消息.build();
channel.basicPublish(exchangeName, routingKey, properties, messageBody);

注意:持久化并不能完全保证消息不丢失,它只能在服务器宕机后恢复消息。但如果消息已经到达队列但尚未写入磁盘,仍可能丢失。

2. Publisher Confirm 机制

  • 生产者确认模式:当消息发送到 RabbitMQ 时,生产者可以开启 Confirm 模式,这样 RabbitMQ 会在收到消息并确认处理(包括持久化)后,发送一个 acknowledgment(确认)给生产者。生产者收到这个确认后,才认为消息成功发送。

    • 开启 Confirm 模式:

channel.confirmSelect();

        生产者可以通过监听 confirmCallback,判断消息是否成功到达 RabbitMQ。

        同步确认

channel.waitForConfirmsOrDie(); // 同步等待确认

        异步确认

channel.addConfirmListener((deliveryTag, multiple) -> {// 消息发送成功,deliveryTag 是消息的唯一标识
}, (deliveryTag, multiple) -> {// 消息发送失败
});

3. Consumer ACK 机制

  • 手动确认消费:消费者可以通过 手动 ACK 机制确认消息已经被消费。只有在消费者返回 ACK 后,RabbitMQ 才会从队列中删除该消息。

    • 默认情况下,RabbitMQ 会自动确认消息。如果消费者未能处理消息,消息可能会丢失。因此,可以设置为手动确认消息:

channel.basicConsume(queueName, false, consumerTag, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 消费消息// 手动确认消息channel.basicAck(envelope.getDeliveryTag(), false);}
});

注意:如果消费者在处理消息时异常退出,并且未发送 ACK,RabbitMQ 会将消息重新投递给其他消费者,从而保证消息不会丢失。

4. 死信队列(Dead-Letter Queue,DLQ)

  • 消息处理失败后的再处理:当消息被多次拒绝或无法被处理时,可以将其发送到一个 死信队列,以便后续手动处理或者重新投递。

    • 死信队列通常与消费者的 拒绝机制basicRejectbasicNack)配合使用。

channel.basicNack(envelope.getDeliveryTag(), false, false); // 拒绝消息,并不重新投递

             配置死信队列:

Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx_exchange"); // 设置死信交换机
channel.queueDeclare("queue_name", true, false, false, args);

5. 集群与镜像队列

  • 集群(Clustering):RabbitMQ 可以部署成一个集群,多个节点之间共享消息。如果一个节点宕机,其他节点可以接管它的任务,从而避免消息丢失。

  • 镜像队列(Mirrored Queues):在 RabbitMQ 集群中,镜像队列可以将消息复制到多个节点上,即使某个节点发生故障,消息仍然存在于其他节点。这样可以有效防止单节点故障导致的消息丢失。

    • 配置镜像队列时,可以通过 policy 设置队列镜像到多个节点:
rabbitmqctl set_policy ha-all "^queue_name$" '{"ha-mode":"all"}'

6. 合理的重试机制

  • 当消费者处理消息失败时,适当的重试机制可以确保消息不会立即丢失,而是通过延迟重试或者重新进入队列的方式再处理。

小结

  • 消息持久化 确保 RabbitMQ 宕机时消息依然存在。
  • Publisher Confirm 机制 确保消息从生产者到 RabbitMQ 的传递不会丢失。
  • Consumer ACK 机制 确保消费者正确处理消息后再确认,避免消息在消费阶段丢失。
  • 死信队列 可以捕获无法处理的消息,防止丢失。
  • 镜像队列集群 提高了高可用性,防止节点宕机导致消息丢失。

http://www.mrgr.cn/news/53668.html

相关文章:

  • 【算法系列-栈与队列】栈与队列的双向模拟
  • 如何高效解锁业务数据价值:多云时代应该怎么构建新一代数据平台架构
  • 【前端布局 如何适配不同分辨率 vue2】
  • 基于neo4j的糖尿病知识图谱数据
  • expressjs 如何记录操作日志
  • 什么是 SQL 注入攻击?如何防止 SQL 注入?
  • FFmpeg源码:av_sat_add64_c、av_sat_sub64_c函数分析
  • 外星人木乃伊---我的收藏
  • 《月光下的约定》
  • CVTE Android面试题及参考答案(100道题)
  • 使用exe4j打包jar包生成exe文件,GUI应用详细使用教程
  • 【YOLOv10改进[损失函数]】使用结合InnerIoU和Focaler的各种损失函数助力YOLOv10更优秀
  • 智慧钢厂可视化平台:钢铁生产的数字化转型
  • sentinel原理源码分析系列(六)-统计指标
  • 活着就好20241019
  • linux安装mysql数据库(最完整的yum源安装)
  • leetcode hot100 之【LeetCode 42. 接雨水】 java实现
  • day-69 使二进制数组全部等于 1 的最少操作次数 II
  • 微调小型Llama 3.2(十亿参数)模型取代GPT-4o
  • 微信定时消息发送 Python脚本神器
  • 11 django管理系统 - 管理员管理 - 分页复习(REVIEW)
  • 数字化转型中从企业架构到业务一致性:实现合规与战略目标的数字化转型路径
  • Leetcode 1135. 最低成本连通所有城市
  • [Godot4] 水底气泡的 gdshader
  • 引领企业数字化转型的核心驱动力:微服务架构与物联网
  • 【多模态】CLIP模型技术学习