当前位置: 首页 > news >正文

rabbitMQ消息重复问题怎么解决的?

RabbitMQ 消息重复的问题通常发生在消息的传递过程中,特别是在网络异常、生产者或消费者宕机等场景下,消息可能被重复消费。这种重复是由于 RabbitMQ 的 "至少一次投递" 保证机制引起的。为了解决消息重复的问题,可以采取以下几种常见的方法:

1. 使用消息唯一ID(幂等性)

确保消息的处理是幂等的,即无论同一条消息被消费多少次,结果都是相同的。可以通过给每条消息分配一个全局唯一的ID(messageId),在消费者侧进行去重处理。

  • 具体步骤

    1. 在消息生产者端生成一个唯一ID,放入消息的属性中,如UUID。
    2. 消费者接收到消息后,先检查该 messageId 是否已经处理过,通常可以存储到数据库或缓存(如 Redis)中。
    3. 如果 messageId 已经处理过,直接忽略该消息;否则,进行正常的业务处理并记录这个ID,防止重复消费。
// 消费者逻辑
String messageId = properties.getMessageId();  // 获取消息唯一IDif (redis.exists(messageId)) {// 已处理过该消息,直接忽略return;
}// 处理消息
processMessage(body);// 处理完毕后,将 messageId 存入 Redis,标记该消息已处理
redis.set(messageId, "processed");

2. 消费者端手动 ACK

RabbitMQ 支持手动ACK模式,这样可以确保消息在消费者处理成功后才发送ACK给RabbitMQ,避免消息重复处理。

  • 如果在消息处理过程中发生异常,消费者不发送ACK,而是通过 NACKReject 将消息重新投递。

  • 通过这种方式可以避免因为消息处理失败而导致消息重复消费。

channel.basicConsume(queueName, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {// 处理消息processMessage(body);// 成功处理后,发送ACKchannel.basicAck(envelope.getDeliveryTag(), false);} catch (Exception e) {// 处理失败,NACK并重新投递channel.basicNack(envelope.getDeliveryTag(), false, true);}}
});

3. 消息去重表(持久化处理)

在数据库中维护一个 消息去重表,存储每条消息的唯一ID及其处理状态。每次接收到消息时,先检查该ID是否已经存在于去重表中,决定是否处理该消息。

  • 流程
    1. 消费者收到消息后,查询数据库是否存在该 messageId
    2. 如果存在且状态为 "已处理",则直接丢弃消息。
    3. 如果不存在或状态为 "未处理",则执行处理逻辑,并在成功后更新消息状态为 "已处理"。

4. 消息投递次数限制

RabbitMQ 支持消息重新投递,但是可以设置最大重试次数,避免消息被无止境地重复投递。

  • 通过 RabbitMQ 的 死信队列(DLX) 配置,当消息达到最大重试次数后,可以将消息投递到死信队列进行后续的人工处理,防止不断重复消费。

# 配置消息的TTL,超时后投递到死信队列
x-message-ttl: 60000  # 消息存活时间,单位毫秒
x-max-length: 5  # 最大重试次数

5. 幂等操作

在业务逻辑设计中,确保操作的幂等性。例如,在执行数据库插入、更新等操作时,使用唯一索引、条件更新等手段来防止重复执行某些操作。

  • 常见的幂等性设计
    • 数据库插入:通过唯一索引避免重复插入同一条记录。
    • 数据库更新:使用条件更新(如 UPDATE WHERE)确保只在特定条件满足时更新,避免重复更新。

6. 结合事务

可以通过 RabbitMQ 的 事务模式 或者 事务消息(带确认的消息) 机制,确保消息在生产、投递、消费环节的完整性,防止因为部分失败而导致重复消息。

  • 事务模式的弊端:性能开销大,适合关键性业务场景。

总结

RabbitMQ 消息重复问题的核心是通过消息去重、消费者ACK机制、幂等性设计等手段,确保消息即使重复发送或处理也不会对系统带来不良影响。主要解决方案包括:

  1. 消息唯一ID:通过唯一ID防止重复处理。
  2. 手动ACK机制:确保消息在成功处理后才确认。
  3. 去重表:通过数据库记录消息处理状态。
  4. 限次重试:通过设置最大重试次数,防止无限重复消费。
  5. 幂等设计:确保业务操作可以多次重复执行而结果一致。

http://www.mrgr.cn/news/53817.html

相关文章:

  • windows下pycharm社区版2024下载与安装(包含新建第一个工程)
  • ant design vue TimePicker时间选择器不点击确认也可以设置值
  • webstorm 编辑器配置及配置迁移
  • process.exit 作用
  • 雷达系统分析与设计-MATLAB
  • 1. DLT645协议解析
  • 同济子豪兄--图的基本表示【斯坦福CS224W图机器学习】
  • 面试:了解 ThreadLocal 内存泄漏需要满足的 2 个条件吗?
  • 大话设计模式解读08-外观模式
  • python 函数
  • 嘉兴自闭症咨询全托机构:全面支持孩子成长的专业团队
  • 如何让审批更加的省钱?
  • 什么是DevOps,如何才能获取DevOps相关实践
  • 石墨烯磁表面等离子体
  • 对接金蝶云星空存货档案到MES系统的详细步骤及javajs动态脚本拉取的实现
  • 【C++初阶】一文讲通默认成员函数~类和对象(中)
  • Java项目-基于springboot框架的社区疫情防控平台系统项目实战(附源码+文档)
  • 【MySQL】设置二进制日志文件自动过期,从根源上解决占满磁盘的问题:通过修改 binlog_expire_logs_seconds 配置项
  • 使用C语言实现一个任务调度系统
  • 现代数字信号处理I-P4 CRLB+LMMSE 学习笔记
  • Olap数据处理
  • 智慧社区Web平台:Spring Boot技术实现
  • 高级SQL技巧:掌握数据分析与优化的艺术
  • 自由学习记录(10)
  • 【win11】终端/命令提示符/powershell美化
  • ProteinMPNN中EncLayer类介绍