当前位置: 首页 > news >正文

【RabbitMQ】可靠性传输

概述

作为消息中间件来说,最重要的任务就是收发消息。因此我们在收发消息的过程中,就要考虑消息是否会丢失的问题。结果是必然的,假设我们没有采取任何措施,那么消息一定会丢失。对于一些不那么重要的业务来说,消息丢失几条是无所谓的,例如使用消息中间件来做一个注册成功的业务,那丢失几条是无所谓的;但是,如果是一些比较重要的业务来说,消息一条也不能丢失。所以,我们就要考虑消息是在哪个阶段丢失的,应如何避免消息丢失。

如上图,消息丢失大概分为三种情况: 

1. 生产者到Broker的过程产生问题:由于应用程序故障、网络抖动等原因,生产者并没有成功向Broker发送消息。

2. Broker本身的问题:生产者成功将消息发送给了Broker,但是Broker没有把消息保存好,导致消息丢失。

3. Broker到消费者的过程产生问题:Broker发送消息到消费者,消费者在消费消息时,由于没有处理好,导致Broker将消费失败的消息从队列中删除了。

RabbitMQ针对这三种消息可能丢失的情形进行考虑,做出了不同的应对:

1. 针对生产者到Broker的问题,RabbitMQ推出了发送方确认机制,或者说是发布确认模式。

2. 针对Broker自身的问题,RabbitMQ推出了持久化的机制,例如针对交换机、队列以及消息的持久化。

3. 针对Broker到消费者的问题,RabbitMQ推出了消息确认机制。

发送方确认

当消息从生产者发送出去之后,消息有没有成功的到达Broker之中,这是第一个消息可能丢失的情况。并且,由于Broker内部也分成了Exchange和Queue两部分,即使消息成功到达了交换机,但是有没有到达队列之中,这也是需要考虑的点。

对于该问题,RabbitMQ提出了两种解决方案:

  • 事务
  • 发送方确认机制

在该篇文章中,主要来介绍发送方确认机制。原因则是因为使用事务比较消耗性能,因此日常开发中使用的并不多。针对刚才提到了交换机和队列之间的问题,RabbitMQ也是全部考虑到了,所以有两个方式来控制消息的可靠性传递:

  • confirm确认模式
  • return退回模式

confirm确认模式

spring:rabbitmq:host: 43.138.108.125port: 5672username: adminpassword: adminvirtual-host: mq-springboot-testpublisher-confirm-type: correlated # 表示发送方确认模式的confirm机制,correlated表示异步确认,还有一种同步确认,不管同步确认可能造成阻塞
// 可靠性传输的发送方确认
@Configuration
public class ConfirmConfig {@Bean("confirmQueue")public Queue confirmQueue() {return QueueBuilder.durable(Constants.CONFIRM_QUEUE).build();}@Bean("confirmExchange")public Exchange confirmExchange() {return ExchangeBuilder.directExchange(Constants.CONFIRM_EXCHANGE).durable(true).build();}@Bean("confirmQueueBind")public Binding confirmQueueBind(@Qualifier("confirmExchange") Exchange exchange,@Qualifier("confirmQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("confirm").noargs();}}
@RestController
@RequestMapping("/confirm")
public class ConfirmController {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMappingpublic void confirmQueue() {// 异步调用方法this.rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("执行了消息确认机制中的confirm机制");if(b) {System.out.println("交换机接收到了消息,消息id为:" + correlationData.getId());} else {System.out.println("交换机没有接收到消息,原因为:" + s);System.out.println("处理具体业务,选择重发或者其他");}}});// 定义一个全局id,区分不同消息,防止ack时出现错误String uuid = UUID.randomUUID().toString().replace("-", "");CorrelationData correlationData = new CorrelationData(uuid);this.rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm", "hello 发送确认模式", correlationData);}}
@Configuration
public class ConfirmListener {@RabbitListener(queues = Constants.CONFIRM_QUEUE)public void confirmListener(String msg) {System.out.println("接收到消息:" + msg);}}

 启动程序之后,当生产者发送消息之后,就会出现如下内容:

但是,如果再次发送消息,就会直接报错:

 原因是因为Spring的Bean默认是单例,而RabbitTemplate对象同样支持一个回调,所以就出现了如上错误。想要解决上述办法的话,可以将Bean的作用域设置成多例模式。

@Component
public class RabbitTemplateConfig {@Bean// 这样做是有问题的,但是并不知道问题出在哪里,后续进行解决
//    @Scope(value = "prototype", proxyMode = ScopedProxyMode.TARGET_CLASS)public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {return new RabbitTemplate(connectionFactory);}}

 由于使用上述方法并没有跑通程序,因此我又使用了如下方法:

@Component
public class RabbitTemplateConfig {@Bean("rabbitTemplate")// 这样做是有问题的,但是并不知道问题出在哪里,后续进行解决
//    @Scope(value = "prototype", proxyMode = ScopedProxyMode.TARGET_CLASS)public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {return new RabbitTemplate(connectionFactory);}@Bean("rabbitTemplateConfirm")public RabbitTemplate rabbitTemplateConfirm(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// 执行回调方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("执行了消息确认机制中的confirm机制");if(b) {System.out.println("交换机接收到了消息,消息id为:" + correlationData.getId());} else {System.out.println("交换机没有接收到消息,原因为:" + s);System.out.println("处理具体业务,选择重发或者其他");}}});return rabbitTemplate;}}
@RestController
@RequestMapping("/confirm")
public class ConfirmController {@Resource(name = "rabbitTemplateConfirm")private RabbitTemplate rabbitTemplate;@RequestMappingpublic void confirmQueue() {// 定义一个全局id,区分不同消息,防止ack时出现错误String uuid = UUID.randomUUID().toString().replace("-", "");CorrelationData correlationData = new CorrelationData(uuid);this.rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm", "hello 发送确认模式", correlationData);}}

使用如上代码之后,成功跑通程序,获取到如下结果:

在confirm模式中,只要是消息没有到达交换机,那就会出现错误结果。但是,如果正确到达交换机,即使没有正确到达队列,回调函数也会成功执行到正确结果那部分。测试如下:

1. 当给出交换机的名称错误时,不会到达交换机,返回错误结果

交换机名称故意写错,原本是confirm.exchange​

 2. 交换机正确,但是给出的路由键错误,导致到不了相应的交换机

路由键故意写错,原本是confirm

在上述两个例子中证明了即使使用了confirm确认模式,消息也有可能在从交换机到队列的过程中出错。因此我们也需要在交换机和队列的传输过程中增加一个保障,那就是return退回模式。 

return退回模式 

如上描述,消息到达交换机之后,会根据路由规则进行匹配,把消息放到队列中。交换机到队列的过程,如果消息无法被任何队列消费(可能是队列不存在,也可能是路由键没有匹配的队列),可以选择把消息退回给发送方。

当我们写一个回调方法,对消息进行处理,就是return退回模式。

spring:rabbitmq:host: 43.138.108.125port: 5672username: adminpassword: adminvirtual-host: mq-springboot-testpublisher-confirm-type: correlated # 表示发送方确认模式的confirm机制,correlated表示异步确认,还有一种同步确认,不管同步确认可能造成阻塞publisher-returns: true # 表示发送方确认模式的return模式,true表示开启template:mandatory: true # true表示交换机无法进行路由消息时,会将消息返回给生产者,false表示无法进行路由时,直接丢弃
@Component
public class RabbitTemplateConfig {@Bean("rabbitTemplate")// 这样做是有问题的,但是并不知道问题出在哪里,后续进行解决
//    @Scope(value = "prototype", proxyMode = ScopedProxyMode.TARGET_CLASS)public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {return new RabbitTemplate(connectionFactory);}@Bean("rabbitTemplateConfirm")public RabbitTemplate rabbitTemplateConfirm(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// 执行confirm机制回调方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("执行了消息确认机制中的confirm机制");if(b) {System.out.println("交换机接收到了消息,消息id为:" + correlationData.getId());} else {System.out.println("交换机没有接收到消息,原因为:" + s);System.out.println("处理具体业务,选择重发或者其他");}}});// 执行了return机制回调方法rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println("执行了消息确认机制中的return模式");System.out.println("消息进行退回,退回的消息是:" + new String(returnedMessage.getMessage().getBody()));}});return rabbitTemplate;}}

将发送方确认机制的两种模式完成之后,再写一个交换机正确,路由键不正确的例子,就会出现如下结果: 

 综上所述,将生产者到Broker的消息丢失问题给解决了。总的来说,包含了两个维度,一个是从生产者到交换机,一个是从交换机到队列。

持久化

持久化是RabbitMQ的可靠性保证机制之一,它保证的是RabbitMQ内部的可靠性。

持久化分为三个部分:交换机的持久化、队列的持久化、消息的持久化。

交换机的持久化

其实,交换机的持久化早就在使用了,只不过是没有进行介绍而已。在JavaSDK中,通过声明交换机时的一个参数来实现;在SpringBoot中,也是通过声明交换机时的一个参数来实现(durable)。交换机设置成持久化之后,交换机的属性就会在服务器内部保存,当MQ的服务器发生意外宕机之后,重启服务器后不需要重新去建立交换机,持久化后的交换机会自动建立。

如果交换机不设置持久化,那么MQ服务器在重启之后,相关的交换机元数据就会消失。因此,对于一个长期使用的交换机来说,必然是要将其设置成持久化的。

队列的持久化

和交换机相同,队列的持久化也早就在使用了。同样,也是通过设置参数durable实现的。

不同的是,队列的持久化比交换机的持久化还稍微重要些。设想,队列不进行持久化,当重启队列之后,队列元数据就会被删除。既然队列都被删除了,那消息肯定也都没了。这对于生产环境的机子来说是比较难搞的事情,因此队列要进行持久化。

消息的持久化

队列持久化之后,消息不进行持久化,那重启之后,照样没数据,还不如不持久化队列呢。所以,继交换机持久化、队列持久化之后,消息也要进行持久化。

在SpringBoot中,消息的持久化就是设置MessageProperties中的deliveyMode为PERSISTENT即可,如下述代码:

@Configuration
public class DurableConfig {@Bean("durableQueue")public Queue durableQueue() {return QueueBuilder.durable(Constants.DURABLE_QUEUE).build(); // 队列持久化}@Bean("durableExchange")public Exchange durableExchange() {return ExchangeBuilder.directExchange(Constants.DURABLE_EXCHANGE).durable(true).build(); // 交换机持久化}@Bean("durableQueueBind")public Binding durableQueueBind(@Qualifier("durableExchange") Exchange exchange,@Qualifier("durableQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("durable").noargs();}}
@RestController
@RequestMapping("/durable")
public class DurableController {@Resourcepublic RabbitTemplate rabbitTemplate;@RequestMappingpublic void durableQueue() {String body = "hello 持久化";Message msg = new Message(body.getBytes(StandardCharsets.UTF_8), new MessageProperties());msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 消息持久化this.rabbitTemplate.convertAndSend(Constants.DURABLE_EXCHANGE, "durable", msg);System.out.println("持久化发送消息成功!");}}
@Configuration
public class DurableLister {@RabbitListener(queues = Constants.DURABLE_QUEUE)public void durableListener(String msg) {System.out.println("持久化消息消费成功:" + msg);}}

消息确认

消息确认机制是保证可靠性传输的最后一个机制,保障的是从Broker到消费者的整个过程。

RabbitMQ向消费者发送消息之后,就会把这条消息删除。但是,如果消费者处理消息异常,就会造成消息丢失。为了保障消息从队列顺利到达消费者,RabbitMQ提出了消息确认机制。

消费者在订阅队列是,可以指定autoAck参数,根据这个参数设置,消息确认机制可以分成以下两种:

  • 手动确认:当autoAck等于false时,RabbitMQ会等待消费者显示地调用Basic.Ack命令,恢复确认信号后才能从内存(或者磁盘中)删除消息。这种模式适合于消息可靠性要求较高的场景。
  • 自动确认:当autoAck等于true时,RabbitMQ会自动把发送出去的消息设置为确认,然后从内存(或者硬盘中)删除消息,而不管消费者是否真正消费了这条消息。这种模式适合于消息可靠性要求不高的场景。

当消费者指定需要手动确认时,队列中的消息就分成了两个部分:

  • 等待投递给消费者的消息
  • 已经投递给消费者,但是还没有收到消费者确认信号的消息

如果RabbitMQ一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则RabbitMQ会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来那个消费者。

手动确认

消费者在收到消息之后,可以选择确认,也可以选择直接拒绝或者跳过。RabbitMQ也提供了不同的确认应答方式,消费者可以调用与其对应的channel相关方法,共有三种:

肯定确认

channel,basiAck(long deliveryTag, boolean multiple),表示RabbitMQ已经知道该消息处理成功,可以将其丢弃了。

deliveryTag表示消息的唯一标识,是一个单调递增的64位的长整型值。deliveryTag是每个信道独立维护的,所以在每个信道上都是唯一的。当消费者确认一条消息时,必须使用对应的信道上进行确认。

multiple表示是否批量确认,在某些情况下,为了减少网络流量,可以对一系列的deliveryTag进行批量确认。值为true则会一次性确认所有小于或等于指定deliveryTag的消息。值为false时则只会确认当前指定的deliveryTag的消息。

deliveryTagRabbitMQ中消息确认机制的一个重要组成部分,他确保了消息传递的可靠性和顺序性。

否定确认

channle.basicReject(long deliveryTag, boolean requeue),消费者可以调用该消息告诉RabbbitMQ拒绝该消息。

requeue表示拒绝后这条消息如何处理。值为true时,会重新将该消息存入队列,以便可以发送给下一个订阅的消费者。值为false时,则会把消息从队列中删除,而不会把他发送给新的消费者。

否定确认

channel.basicNack(long deliveryTag, boolean multiple, boolean requeue),该方法和第二个方法含义相同,唯一的区别是该方法可以批量处理。

代码案例

主要展示SpringBoot模式下的代码书写。

Spring-AMQP对于消息确认机制提供了三种策略:

  1. AcknowledgeMode.NONE:表示消息一旦投递给消费者,不管消费者是否处理成功该消息,RabbitMQ都会自动确认,从队列中移除该消息。如果消费者处理消息失败,消息可能会丢失。
  2. AcknowledgeMode.AUTO(默认):表示消息投递给消费者,如果处理过程中抛出了异常,则不会确认该消息;但是如果没有发生异常,该消息就会自动确认。
  3. AcknowledgeMode.MANUAL:表示手动确认模式,消费者必须在成功处理消息之后调用basicAck来确认消息。如果消息未被确认,RabbitMQ会认为消息未处理成功,并且会在消费者可用时重新投递该消息,这种模式提高了消息处理的可靠性,因为即使消费者处理消息后失败,消息也不会丢失,而是可以被重新处理。
spring:rabbitmq:host: 43.138.108.125port: 5672username: adminpassword: adminvirtual-host: mq-springboot-testpublisher-confirm-type: correlated # 表示发送方确认模式的confirm机制,correlated表示异步确认,还有一种同步确认,不管同步确认可能造成阻塞publisher-returns: true # 表示发送方确认模式的return模式,true表示开启template:mandatory: true # true表示交换机无法进行路由消息时,会将消息返回给生产者,false表示无法进行路由时,直接丢弃listener:simple:acknowledge-mode: auto # 消息确认机制,三种策略可选
@Configuration
public class AckConfig {@Bean("ackQueue")public Queue ackQueue() {return QueueBuilder.durable(Constants.ACK_QUEUE).build();}@Bean("ackExchange")public Exchange ackExchange() {return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).durable(true).build();}@Bean("ackQueueBind")public Binding ackQueueBind(@Qualifier("ackExchange") Exchange exchange,@Qualifier("ackQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("ack").noargs();}}
@RestController
@RequestMapping("/ack")
public class AckController {@Resourcepublic RabbitTemplate rabbitTemplate;@RequestMappingpublic void ackQueue() {this.rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE, "ack", "hello ack");System.out.println("消息确认机制生产者发送成功");}}

当使用策略为NONE时,发现即使出现异常,消息也会从消费者中删除:

@Configuration
@RabbitListener(queues = Constants.ACK_QUEUE) // 当类中所有的消费者都指向一个队列时,就可以放在类上
public class AckListener {@RabbitHandler // RabbitListener注解放在类上时,就需要使用该注解放在方法上public void ackListener(String msg) {System.out.println("接收到消息:" + msg);int a = 3 / 0;System.out.println("自制异常,用来感受auto");}}

当使用策略为AUTO时,发现出现异常之后,会一直重试,并且在开源界面中也会出现未确认消息一条:

@Configuration
@RabbitListener(queues = Constants.ACK_QUEUE) // 当类中所有的消费者都指向一个队列时,就可以放在类上
public class AckListener {@RabbitHandler // RabbitListener注解放在类上时,就需要使用该注解放在方法上public void ackListener(String msg) {System.out.println("接收到消息:" + msg);int a = 3 / 0;System.out.println("自制异常,用来感受auto");}}

当使用策略为MANUAL,就需要在消费者中修改代码:

@Configuration
@RabbitListener(queues = Constants.ACK_QUEUE) // 当类中所有的消费者都指向一个队列时,就可以放在类上
public class AckListener {@RabbitHandler // RabbitListener注解放在类上时,就需要使用该注解放在方法上public void ackListener(Message msg, Channel channel) throws IOException {try {System.out.println("接收到消息为:" + msg);/*** 第一个参数是deliveryTag* 第二个参数是是否批量处理*/channel.basicAck(msg.getMessageProperties().getDeliveryTag(), true);} catch (Exception e) {/*** 第一个参数是deliveryTag* 第二个参数是requeue*/channel.basicReject(msg.getMessageProperties().getDeliveryTag(), true);/*** 第一个参数是deliveryTag* 第二个参数是是否批量处理* 第三个参数是requeue*/// channel.basicNack(msg.getMessageProperties().getDeliveryTag(), true, true);}}}

总结

在该篇文章中,主要描述了RabbitMQ的保障可靠性传输的三个策略,这三个策略保障了消息从生产者产生到消费者消费整个过程的可靠性,使得RabbitMQ的性能更好。但是,并不能完全保障消息不丢失,或者说,没有一个消息队列可以保障消息不丢失,例如持久化时,是不会直接持久化到硬盘,而是持久化到缓存中,经过几条消息的沉淀之后,再持久化到硬盘,所以在这个过程中一点宕机,那么消息也是会丢失的。总的来说,这三条策略还是尽可能的保障了消息传输的可靠性。


http://www.mrgr.cn/news/29120.html

相关文章:

  • 部分动态铜皮的孤岛无法删除。报错
  • 如何在运行时传递回调: 解锁动态事件处理
  • 电磁阀,线性电磁阀信号驱动隔离变送器
  • Java | Leetcode Java题解之第414题第三大的数
  • [强化你的LangChain工具创建技能:从基础到进阶]
  • 20240918 每日AI必读资讯
  • npm安装时候报错certificate has expired
  • 从HTML到LangChain:如何高效加载和解析HTML文件
  • 在Ubuntu 18.04上安装R的方法
  • 『功能项目』靠近Npc显示可对话标识【60】
  • 在Ubuntu上安装Rails和nginx与Passenger的方法
  • C++第七节课 运算符重载
  • Linux:用户账号管理和组账号管理
  • stm32开发之串口空闲中断和环形数组的最简单的组合使用
  • 【ShuQiHere】算法分析:揭开效率与复杂度的神秘面纱
  • goctl安装失败
  • oracle 11g SYSAUX表空间清理
  • 408算法题leetcode--第七天
  • Java中的OOM与SOF:详解内存溢出与栈溢出
  • 计算机视觉中的图像ROI区域提取与应用