RabbitMQ高级篇
目录
确保发送者的可靠
为什么需要确保发送者的可靠性
RabbitMQ 的发送者重连机制配置
springAMQP实现发送者确认
MQ的可靠性
为什么需要实现MQ的可靠性?
数据持久化
Lazy Queue
核心思想
总结RabbitMQ 如何保证消息的可靠性
持久化
Lazy Queue
消息确认机制
消费者的可靠性
消费者确认机制的核心原理
消费者确认机制编辑
none 模式(无确认)
manual 模式(手动确认)
auto 模式(自动确认)
失败重试机制
失败消息处理策略
测试不出来?如何解决?
业务幂处理
为什么会产生重复消费?
使用唯一标识符(Message ID)
业务判断
延迟消息
使用插件完成
确保发送者的可靠
为什么需要确保发送者的可靠性
避免消息丢失
- 在分布式系统中,如果发送者未确认消息是否被 RabbitMQ 接收,可能会因为网络波动、RabbitMQ 服务异常等原因导致消息丢失。
- 消息丢失会导致数据不一致或业务逻辑中断,尤其是在支付系统、订单处理等关键场景下。
保证系统的可用性和稳定性
- 可靠性确保了生产者和消息队列之间的通信稳定性,即使在系统压力较大时,消息仍然能够被妥善处理。
RabbitMQ 的发送者重连机制配置
实际应用场景
临时网络波动
- 当网络抖动时,短时间内可能会导致连接失败,通过重试机制可以自动恢复连接。
RabbitMQ 短暂不可用
- RabbitMQ 重启或服务瞬间不可用时,通过重试可以避免消息丢失。
spring:rabbitmq:connection-timeout: 1s # 设置连接超时时间template:retry:enabled: true # 启用发送者重试机制initial-interval: 1000ms # 初始重试间隔为 1000ms (1 秒)multiplier: 2 # 每次重试间隔时间加倍max-attempts: 3 # 最大重试 3 次
Spring 配置说明
-
spring.rabbitmq.connection-timeout
配置与 RabbitMQ 建立连接时的超时时间,单位为毫秒或秒。在网络波动的情况下,如果连接超时,会触发重试机制。 -
spring.rabbitmq.template.retry.enabled
开启 RabbitMQ 发送消息的重试功能。当发送消息失败时,系统会按照配置的重试规则重新发送。
重试参数()
当multiplier为2的时候,相当于是计网当中CMSA/CD的截断二进制退避算法
-
initial-interval
第一次重试的间隔时间,单位为毫秒(ms)。比如配置为1000ms
,表示第一次失败后等待 1 秒开始重试。 -
multiplier
每次重试间隔时间的倍增系数。例如配置为1
,说明每次重试的间隔时间是固定的;如果配置为2
,那么每次间隔时间会加倍。计算公式:
下一次重试间隔 = initial-interval × multiplier
springAMQP实现发送者确认
MQ的可靠性
为什么需要实现MQ的可靠性?
内存中消息会丢失:
- RabbitMQ 默认将消息存储在内存中(非持久化),以降低延迟。
- 如果 MQ 宕机,内存中的消息会被清空,从而丢失。
内存空间有限,可能导致消息积压:
- 如果消费者处理过慢或不可用,内存中的消息无法被及时消费,消息会积压,占用 RabbitMQ 的内存。
- 当内存使用达到限制时,RabbitMQ 可能会触发流控(Flow Control),甚至出现系统阻塞或崩溃。RabbitMQ 会将这些消息被动地写入磁盘,以缓解内存压力。当队列中消息积压严重时,RabbitMQ 被迫将大量消息写入磁盘(被动持久化),磁盘的读写速度可能无法跟上队列的处理速度,从而引发 队列阻塞。
- 为了解决这个问题才有了同步持久化策略,不会导致消息队列阻塞
同步持久化策略:
- 消息到达时立即写入磁盘,消除了被动写入导致的延迟和瓶颈。
- 优点:可靠性高,不会因磁盘写入延迟导致队列阻塞。
- 缺点:性能略低,需要优化磁盘性能来减小延迟。
数据持久化
在 Spring AMQP 中,默认的可靠性措施已经覆盖了 队列持久化 和 自动重连,你只需要在以下两种情况下手动配置:
确保消息可靠传递和存储:
- 需要显式声明消息为持久化(
PERSISTENT
)。 - 需要注意的是,在java代码中我们需要自定义构建消息来发送临时消息。
public void sendPersistentMessage(String exchange, String routingKey, String messageContent) {// 创建持久化消息Message message = MessageBuilder.withBody(messageContent.getBytes()).setDeliveryMode(MessageProperties.DeliveryMode.PERSISTENT) // 设置为持久化.build();// 发送消息rabbitTemplate.send(exchange, routingKey, message);System.out.println("Sent persistent message: " + messageContent);}
Lazy Queue
Lazy Queue 是 RabbitMQ 的一种队列模式,主要用于优化 消息持久化策略 的缺点和高消息积压场景下的性能问题。它将消息尽可能存储在磁盘上,而不是内存中,从而减少内存占用和避免内存不足导致的队列阻塞。
Lazy Queue 设计的目标
Lazy Queue 是为了解决上述问题的一种优化方案,其目标是最大限度地减少内存占用,优先使用磁盘存储消息,从而解决持久化策略的缺点。
核心思想
消息直接写入磁盘:
- Lazy Queue 会在消息到达时直接将其存储在磁盘上,而不是先存储在内存中。
- 只有在需要消费时,才会将消息从磁盘加载到内存中。
延迟加载消息:
- 消息只有在消费者需要处理时才会被加载到内存中,而不是全部预加载。
- 这减少了内存占用,同时避免了磁盘与内存之间频繁切换的问题。
总结RabbitMQ 如何保证消息的可靠性
持久化
- 确保消息(需要手动配置)、队列和交换机在 RabbitMQ 重启后依然存在。
- 优点:保障消息不会因服务中断而丢失。
- 缺点:磁盘 IO 开销增加,可能影响性能。
Lazy Queue
- 解决持久化策略中内存占用高的问题,消息优先存储在磁盘中。
- 优点:减少内存消耗,适合消息积压场景。
- 缺点:磁盘 IO 性能略低于内存操作,不适合高吞吐实时场景。
消息确认机制
- 保证消息从生产者到 RabbitMQ 的可靠传递。
- 优点:生产者可以通过 ACK 确认消息是否成功到达 RabbitMQ。一般是可以成功到达除是程序员配置错误。因此我们可以根据实际情况选择。
- 缺点:增加网络和确认延迟。
消费者的可靠性
消费者确认机制的核心原理
消费者确认机制的目的是确保消息被成功消费后,RabbitMQ 可以安全地将消息从队列中移除。如果消息处理失败,则可以选择重新投递或丢弃,以实现可靠的消息消费。
确认机制的三种状态
ACK(Acknowledgement)
- 描述:消息已成功处理,通知 RabbitMQ 可以安全删除消息。
- 结果:RabbitMQ 将消息从队列中移除。
NACK(Negative Acknowledgement)
- 描述:消息处理失败,消费者通知 RabbitMQ 需要重新投递消息。
- 结果:消息重新进入队列,供其他消费者或当前消费者再次处理。
REJECT
- 描述:消息处理失败,但消费者明确表示拒绝消息。例如:格式问题
- 结果:消息从队列中删除,不会重新投递。
消费者确认机制
在 RabbitMQ 的消费者确认机制中,消费者需要通知 RabbitMQ 消息是否被成功处理。Spring AMQP 通过 acknowledge-mode
提供了三种消息确认模式:
none 模式(无确认)
特点:
- 消息推送到消费者后,RabbitMQ 会立即将其标记为已确认,无论消息是否被成功处理。
- 极不安全,因为如果消费者出现异常或宕机,消息可能会丢失。
适用场景:不推荐使用,除非对消息丢失完全没有影响。
manual 模式(手动确认)
特点:
- 消费者需要手动调用 RabbitMQ 提供的 API(如
basicAck
、basicNack
、basicReject
)来确认消息是否被成功处理。 - 优点:可以精确控制消息的确认时机,适合对可靠性要求高的场景。
实现方式:
- 在
@RabbitListener
方法中捕获异常,根据业务逻辑决定发送ACK
、NACK
或REJECT
。
auto 模式(自动确认)
特点:
- Spring AMQP 默认使用 AOP 对消息处理方法进行代理,消息处理成功后会自动发送
ACK
。 - 如果方法抛出异常,Spring AMQP 会自动发送
NACK
或REJECT
。 -
适用场景:适合消息可靠性要求不高、处理逻辑较简单的场景。
实现方式:无需手动确认,Spring 会根据方法执行结果自动处理。
spring:rabbitmq:host: # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码listener:simple:prefetch: 1 # 控制消费者预取的消息数量,处理完一条再处理acknowledge-mode: auto
消费者处理失败时,消息被 NACK 并重新入队,RabbitMQ 会不断尝试将消息重新投递给消费者,导致消息反复处理失败,最终对 RabbitMQ 和服务产生巨大的负担。因此我们可以通过设置失败重试机制,限制消息重试次数
失败重试机制
Spring AMQP 提供了 重试机制,可以限制消费者的最大重试次数。如果消息在多次重试后仍然失败,可以转移到死信队列或记录到日志中。
如果重试了最大次数了之后还是不能成功会直接把消息丢弃了,因此我们还需要配置重试失败处理策略。
失败消息处理策略
Spring AMQP 提供了三种常见的失败消息处理策略:
RejectAndDontRequeueRecoverer
(默认策略):丢弃消息。ImmediateRequeueMessageRecoverer
:消息重新入队。RepublishMessageRecoverer
:将消息转移到指定的交换机。原来的队列中不会再有这个消息。
选择建议:
- 非关键场景:使用默认的
RejectAndDontRequeueRecoverer
。 - 临时错误场景:使用
ImmediateRequeueMessageRecoverer
,重新入队。 - 关键场景:使用
RepublishMessageRecoverer
,将消息投递到死信队列或专用的交换机。
配置消费者重试机制
spring:rabbitmq:listener:simple:retry:enabled: truemax-attempts: 5 # 最大重试次数initial-interval: 1000ms # 初始重试间隔multiplier: 2.0 # 每次重试间隔倍增max-interval: 10000ms # 最大重试间隔
结合 MessageRecoverer
在重试失败时,指定使用的消息恢复策略:
@Configuration
public class ErrorMessageConfiguration {@Beanpublic DirectExchange errorExchange() {return new DirectExchange("error.direct"); // 声明一个 Direct 类型的交换机,名称为 "error.direct"}@Beanpublic Queue errorQueue() {return new Queue("error.queue",true); // 声明一个队列,名称为 "error.queue"}@Beanpublic Binding errorQueueBinding(Queue errorQueue, DirectExchange errorExchange) {return BindingBuilder.bind(errorQueue).to(errorExchange).with ("error"); // 将队列 "error.queue" 和交换机 "error.direct" 绑定,路由键为 "error"}@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {System.out.println("执行了消息恢复器!!!");return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");// 配置一个消息恢复器,将消费失败的消息重试多次后投递到 "error.direct" 交换机,路由键为 "error"}
}
测试不出来?如何解决?
如果你和代码写的一样或者代码没有明显的错误,但是就是测试不出来。
可以选择将simple.queue删除之后,在重新新建并且重新发送消息。
在 RabbitMQ 中,Purge Messages
只会清空处于 Ready 状态(也就是未被消费者接收或未投递中的消息)。如果有消息已经分配给消费者,处于 Unacked 状态,RabbitMQ 并不会通过 Purge
操作将它们从队列里移除。也就是说,Purge
并不能清除还在进行中的消费或处于 Unacked 状态的消息。
因此,当你点击 Purge Messages 时,如果队列里大部分消息都处于 Unacked 状态(可能由于消费者一直没 ACK),Purge 操作其实没有把这些 Unacked 消息移除。它们依然存在,消费者再次重连或者容器重启时,RabbitMQ 还会继续投递。
反之,当你删除并重新创建这个队列时,RabbitMQ 会彻底销毁所有该队列关联的消息(不管它们是 Ready 还是 Unacked),然后新建一个空队列,之前的消息自然就不复存在,手动删队列再建”可以成功清空消息,而 Purge 不行的现象。
业务幂处理
幂等性 是指在分布式系统中,某一操作无论被执行一次还是多次,其产生的效果是相同的。在使用 RabbitMQ 或其他消息队列时,消费者可能会因重试、重复投递等原因处理相同的消息多次。为了保证系统数据的一致性和可靠性,需要解决幂等性问题。
为什么会产生重复消费?
RabbitMQ 和其他消息队列可能会因为以下原因导致消息重复消费:
消息重试机制:
- 消费者处理失败时,消息会被重新投递。
- 例如:消息超时、消费者抛出异常。
网络问题:
- 如果 RabbitMQ 没有收到消费者的
ACK
,会认为消息未被成功处理,并重新投递。
消息重复发送:
- 生产者在发送消息时,如果未正确处理确认机制,可能会发送相同的消息多次。
使用唯一标识符(Message ID)
生产端设置唯一消息 ID:
- 在生产消息时,为每条消息生成一个唯一的 ID(通常是 UUID)。
- 该
Message ID
会作为消息的属性,和消息一起发送到 RabbitMQ。
消费端接收唯一消息 ID:
- 消费者在处理消息时,从消息的属性中提取
Message ID
。 - 利用这个
Message ID
在数据库或缓存(Redis)中查询是否已处理过该消息。
去重逻辑:
- 如果
Message ID
已存在,说明消息是重复的,直接忽略不处理。 - 如果
Message ID
不存在,说明消息是新的,正常处理,并将Message ID
标记为已处理。 - 如果消息只需短期去重(如几天内不重复):使用 Redis,并设置合理的过期时间。
- 如果需要长期保存处理状态:使用 MySQL 或其他关系型数据库,结合唯一约束实现幂等性。
配置消息转换器
通过 Jackson2JsonMessageConverter
设置消息的 Message ID
:
作用:
Jackson2JsonMessageConverter
会在消息体序列化为 JSON 时自动生成一个唯一的Message ID
,并附加到消息属性中。- 如果消息体已经包含一个业务 ID(如订单号),也可以使用该 ID 作为去重标识。
@Bean
public MessageConverter messageConverter() {Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();jjmc.setCreateMessageIds(true); // 自动生成消息 IDreturn jjmc;
}
消费者代码
在消费者中,通过 MessageProperties
获取消息的唯一 ID,并处理去重逻辑:
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(Message message) {// 获取消息的唯一 IDString messageId = message.getMessageProperties().getMessageId();String body = new String(message.getBody());log.info("监听simple.queue的消息. ID: [{}]", messageId);log.info("监听simple.queue的消息: [{}]", body);// 模拟处理逻辑throw new RuntimeException("我要故意出错!");// 存入数据库或者Redis当中
}
业务判断
核心思路
根据业务的唯一标识符(如 orderId
)查询业务状态:
- 消费者处理消息时,通过
orderId
查询订单信息。
判断订单状态是否已被处理:
- 如果订单状态表明该订单已经处理过(如状态为已支付),则直接忽略当前消息。
- 如果订单状态表明该订单尚未处理,则正常处理消息。
更新业务状态:
- 在消息成功处理后,将订单状态标记为已处理(如更新为“已支付”状态)。
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "trade.pay.success.queue", durable = "true"),exchange = @Exchange(name = "pay.direct"),key = "pay.success"
))
public void listenPaySuccess(Long orderId) {// 1. 查询订单Order order = orderService.getById(orderId);// 2. 判断订单状态,是否为未支付if (order == null || order.getStatus() != 1) {// 如果订单不存在,或者状态不为“待支付”(1),则直接忽略return;}// 3. 标记订单状态为已支付orderService.markOrderPaySuccess(orderId);
}
基于业务判断实现幂等性的主要局限性是强依赖于业务状态字段的设计,并且在动态数据、高并发场景下可能难以准确实现幂等性控制。例如,怎么保证库存的幂等性?这种业务判断不能保证库存这种动态数据的变化。因此,在实际项目中,业务判断通常作为一种基础手段,结合唯一消息 ID、分布式锁、缓存或事务机制共同实现幂等性,才能保证系统的可靠性和高效性。
延迟消息
为什么需要消息延迟方案?
在一般的交易服务流程中,当用户下单后,会进入支付环节。如果支付成功,理论上支付服务会通过消息队列(MQ)将消息发送给交易服务,以通知交易服务更新订单状态为“已支付”。
现在思考这样一种场景:
如果支付服务因某种未知原因无法向交易服务发送消息,可能会导致以下两个问题:
-
数据不一致
支付服务已经扣款,但交易服务无法及时感知支付结果,从而无法更新订单状态为“已支付”。此时,用户的支付记录和订单状态不匹配,导致系统数据不一致。 -
系统资源被占用
交易服务在等待支付服务的消息确认时,订单的状态可能会一直停留在“待支付”或“支付中”,导致对应的库存被锁定。其他用户在尝试购买该商品时,可能会因为库存不足而无法完成下单,从而影响系统的正常运行与用户体验。
所以交易服务需要过一段时间之后去查询支付服务的支付状态。因此需要消息延迟。
如何去查询支付状态呢?
交易服务通过 FeignClient
调用支付服务的接口,查询支付状态。
具体实现:day07-MQ高级 - 飞书云文档 (feishu.cn)
FeignClient的详细讲解:
springCloud特色知识记录(基于黑马教程2024年)-CSDN博客
消息延迟方案在分布式系统中起到以下重要作用:
延迟执行任务:
- 在一些业务场景中,需要对某些任务进行延迟处理。例如,在电商系统中,如果用户下单后未在一定时间内支付,则自动取消订单。
使用插件完成
day07-MQ高级 - 飞书云文档 (feishu.cn)
根骨以上文档的——DelayExchange插件的安装可以安装并且启动插件
官方文档:
Scheduling Messages with RabbitMQ | RabbitMQ
注意:如果你使用的MQ的版本要和插件的版本不冲突
测试生产者(发送消息)
在 RabbitMQ 的延迟队列中,延迟时间的核心是通过 消息属性 来实现的,具体来说是设置 x-delay
属性,定义消息在队列中延迟的时间。
@Testvoid testSendDelayMessageByPlugin(){rabbitTemplate.convertAndSend("delay.direct","hi","hello",message ->{message.getMessageProperties().setDelay(10000);return message;});}
rabbitTemplate.convertAndSend
:
参数:
- RabbitTemplate 是 Spring 提供的用于与 RabbitMQ 交互的工具类。
convertAndSend
方法用于向指定的交换机发送消息。delay.direct
:指定目标交换机的名称。hi
:指定路由键,用于匹配队列。hello
:发送的消息内容。message -> {...}
:消息后处理器,用于对消息属性进行修改。
设置延迟时间:
message.getMessageProperties().setDelay(10000)
:
- 通过
MessageProperties
的setDelay
方法设置延迟时间(单位为毫秒)。 - 在此示例中,延迟时间设置为 10000 毫秒(即 10 秒)。
返回修改后的消息:
- Lambda 表达式返回处理后的
Message
对象,将其发送到指定的交换机和队列。
测试消费者(接收消息)
@RabbitListener(bindings = @QueueBinding(value = @Queue(name="delay.queue",durable = "true"),exchange = @Exchange(name = "delay.direct",delayed = "true"),key = {"hi"}
))
public void listenDelayQueue(String message){log.info("listenDelayQueue接收到信息【{}】",message );
}
监听延迟队列:通过 @RabbitListener
注解,Spring Boot 应用可以监听名为 delay.queue
的队列,处理其中的消息。
绑定延迟交换机:
- 使用
@QueueBinding
将队列delay.queue
绑定到延迟交换机delay.direct
。 - 指定路由键为
"hi"
,只有匹配此路由键的消息会进入队列。 delayed="true"
:启用延迟特性,使交换机支持消息延迟。
接收并处理消息:
- 方法
listenDelayQueue
会在有消息到达队列时被触发,消费消息并打印日志。
使用场景示例
订单超时取消:
- 用户下单后,消息进入延迟队列,设置延迟时间为支付时间限制(如 30 分钟)。若支付完成,移除消息;否则到时间后取消订单。
延迟通知:
- 在一段时间后提醒用户未完成的操作(如未完成注册或填写资料)。
自动处理过期内容:
- 处理定期失效的资源(如清理过期缓存、关闭未使用的会话等)。