RabbitMQ最全教程-Part2(高阶使用)
我们先来看一下这个网约车的场景
我们需要实现派单服务,用户发送打车订单后需要进行进行派单,如果在指定时间内没有找到司机就会收到派单超时的通知,并且能够实时查看当前排队的抢单人数
在这幅图中,我们今天来解决一下其中的三个关键问题:打车超时,打车排队,消息推送
一、延时任务
首先什么时候需要做延时任务?
- 生成订单30分钟未支付,则自动取消
- 生成订单60秒后,给用户发短信
- 滴滴打车订单完成后,如果用户一直不评价,48小时后会将自动评价为5星。
就像刚才的打车超时问题,如果再最大限度时间内还没有打到车,那么我们需要做一些操作,这个整个流程我们就可以称为延时任务
那如何解决延时任务呢?
最原始的方案可以对数据库进行轮询操作,当查询到某些任务的延时时间到了就触发执行
除此之外,我们可以用延时队列实现
1、延时队列
JDK的延迟队列
该方案是利用JDK自带的DelayQueue来实现,这是一个无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素,放入DelayQueue中的对象,是必须实现Delayed接口的。
由于JDK自带的延时队列是基于内存的,所以其执行效率非常高,延迟小
缺点:
服务器重启后,数据全部消失,怕宕机
集群扩展相当麻烦
因为内存条件限制的原因,比如下单未付款的订单数太多,那么很容易就出现OOM异常
代码复杂度较高
时间轮算法
这样可以看出定时轮由个3个重要的属性参数,ticksPerWheel(一轮的tick数),tickDuration(一个tick的持续时间)以及 timeUnit(时间单位),例如当ticksPerWheel=60,tickDuration=1,timeUnit=秒,这就和现实中的始终的秒针走动完全类似了。
如果当前指针指在1上面,我有一个任务需要4秒以后执行,那么这个执行的线程回调或者消息将会被放在5上。那如果需要在20秒之后执行怎么办,由于这个环形结构槽数只到8,如果要20秒,指针需要多转2圈,位置是在2圈之后的5上面(20 % 8 + 1)
缺点参考JDK延迟队列
使用RabbitMQ实现延时任务
RabbitMQ可以针对Queue和Message设置 x-message-ttl,来控制消息的生存时间,如果超时,则消息变为dead letter
RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,用来控制队列内出现了dead letter,则按照这两个参数重新路由。
这个方案称为死信队列
2、死信队列
什么是死信队列?
先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解
一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信,自然就有了死信队列
所以死信队列就是存放死信消息的队列
死信交换机
死信如何被投放到死信队列,那肯定是要先交到一个死信交换机的手中。
那死信交换机只接收哪类消息呢?
- 消费者对消息使用了
basicReject
或者basicNack
回复,并且requeue
参数设置为false
,即不再将该消息重新在消费者间进行投递 - 消息在队列中超时,RabbitMQ可以在单个消息或者队列中设置
TTL
属性 - 队列中的消息已经超过其设置的最大消息个数
死信队列的实现
在 Spring Boot 配置类中定义死信交换器、普通交换器、普通队列和死信队列,并进行绑定。
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 死信交换器名称public static final String DEAD_LETTER_EXCHANGE = "dlx.exchange";// 死信队列名称public static final String DEAD_LETTER_QUEUE = "dlx.queue";// 死信路由键public static final String DEAD_LETTER_ROUTING_KEY = "dlx.routing.key";// 普通交换器名称public static final String NORMAL_EXCHANGE = "normal.exchange";// 普通队列名称public static final String NORMAL_QUEUE = "normal.queue";// 普通路由键public static final String NORMAL_ROUTING_KEY = "normal.routing.key";@Beanpublic DirectExchange deadLetterExchange() {return new DirectExchange(DEAD_LETTER_EXCHANGE);}@Beanpublic Queue deadLetterQueue() {return new Queue(DEAD_LETTER_QUEUE, true); // durable: true}@Beanpublic Binding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(DEAD_LETTER_ROUTING_KEY);}@Beanpublic DirectExchange normalExchange() {return new DirectExchange(NORMAL_EXCHANGE);}@Beanpublic Queue normalQueue() {Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);args.put("x-message-ttl", 10000); // 消息存活时间,单位毫秒return new Queue(NORMAL_QUEUE, true, false, false, args); // durable: true, exclusive: false, autoDelete: false, arguments: args}@Beanpublic Binding normalBinding() {return BindingBuilder.bind(normalQueue()).to(normalExchange()).with(NORMAL_ROUTING_KEY);}
}
创建一个生产者类,用于发送消息到普通队列。
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage(String message) {rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE, RabbitMQConfig.NORMAL_ROUTING_KEY, message);System.out.println("Sent message: " + message);}
}
创建一个消费者类,用于消费普通队列中的消息。如果消息处理失败或达到最大重试次数,消息将被路由到死信队列。
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class MessageConsumer {@RabbitListener(queues = RabbitMQConfig.NORMAL_QUEUE)public void receiveMessage(String message) {try {// 模拟消息处理失败if (message.equals("fail")) {throw new RuntimeException("Message processing failed");}System.out.println("Received message: " + message);} catch (Exception e) {System.out.println("Failed to process message: " + message);throw e; // 重新抛出异常,使消息进入死信队列}}
}
基于RabbitMQ的死信队列实现延时任务
其关键点就是对于这个正常队列不要添加消费者,TTL一到,自动投递到死信队列,接下来再对死信队列中消息进行消费,实现一个延时任务的执行
二、消息可靠性保证
面试题:MQ如何保证消息不丢失?
这是一个很宽泛的问题,而不仅仅回答一个同步?异步?ACK?
我们要从生产端怎么保证消息可靠性的、broker怎么保证的、消费端怎么保证的?
生产者保证消息可靠性
1、失败通知
当生产者发送消息时,消息先到交换机。这时,交换机有两种情况,一是能正常到达queue,我们称为可路由,反过来,主要无法传到某个queue,那么就代表不可路由,这时候表示投递失败,会有一个失败通知反馈给生产者
在代码层面,需要在生产者添加监听器
yml配置
spring:rabbitmq:# 消息在未被队列收到的情况下返回publisher-returns: true
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.api.ReturnedMessage;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// 设置 Return 回调rabbitTemplate.setReturnCallback((returnedMessage) -> {System.out.println("Message returned: " + new String(returnedMessage.getBody()) +", replyCode: " + returnedMessage.getReplyCode() +", replyText: " + returnedMessage.getReplyText() +", exchange: " + returnedMessage.getExchange() +", routingKey: " + returnedMessage.getRoutingKey());});return rabbitTemplate;}}
2、发送方确认
在上面失败通知时,是发送在无法找到queue,这时除了失败通知,其实还会返回Nack;
当可路由时,那就开始投放到真正的queue,但是这个投放一定会放入成功吗?肯定会有失败(比如:队列已满,不可再投放)
怎么样在代码层面开启发送方确认ack呢?
spring:rabbitmq: # 开启消息确认机制publisher-confirm-type: correlated
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.ReturnedMessage;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// 设置 发送方确认 回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("Message sent successfully: " + correlationData.getId());} else {System.out.println("Message sending failed: " + correlationData.getId() + ", cause: " + cause);}});// 设置 失败通知 回调rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {System.out.println("Message returned: " + message + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);});return rabbitTemplate;}
}
Broker保证消息可靠性
假设有现在一种情况,生产者已经成功将消息发送到了交换机,并且交换机也成功的将消息路由到了队列中,但是在消费者还未进行消费时,mq挂掉了,那么重启mq之后消息还会存在吗?如果消息不存在,那就造成了消息的丢失,也就不能保证消息的可靠性传输了。
解决办法就是开启RabbitMQ的持久化机制
在 Spring Boot 配置类中定义交换器、队列和绑定关系,并设置队列的持久化属性。
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.Binding;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {public static final String EXCHANGE_NAME = "my.exchange";public static final String QUEUE_NAME = "my.queue";public static final String ROUTING_KEY = "my.routing.key";@Beanpublic DirectExchange directExchange() {return new DirectExchange(EXCHANGE_NAME, true, false); // durable: true, autoDelete: false}@Beanpublic Queue queue() {return new Queue(QUEUE_NAME, true); // durable: true}@Beanpublic Binding binding() {return BindingBuilder.bind(queue()).to(directExchange()).with(ROUTING_KEY);}
}
生产者:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage(TaxiBO taxiBO) {rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, taxiBO, m -> {m.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 设置消息持久化return m;}, new CorrelationData(UUID.randomUUID().toString()));System.out.println("Sent message: " + taxiBO.getAccountId());}
}
消费者保证消息可靠性
消费者接收到消息,但是还未处理或者还未处理完,此时消费者进程挂掉了,比如重启或者异常断电等,此时MQ认为消费者已经完成消息消费,就会从队列中删除消息,从而导致消息丢失。
该如何避免这种情况呢?这就要用到RabbitMQ提供的ack机制,RabbitMQ默认是自动ack的,此时需要将其修改为手动ack,也即自己的程序确定消息已经处理完成后,手动提交ack,此时如果再遇到消息未处理进程就挂掉的情况,由于没有提交ack,RabbitMQ就不会删除这条消息,而是会把这条消息发送给其他消费者处理,但是消息是不会丢的。
spring:rabbitmq:listener:simple:acknowledge-mode: manual # 手动ack
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;import java.io.IOException;@Component
public class MessageConsumer {@Autowiredprivate RabbitTemplate rabbitTemplate;@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME, acknowledgeMode = "manual")public void receiveMessage(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {try {TaxiBO taxiBO = rabbitTemplate.getMessageConverter().fromMessage(message, TaxiBO.class);if (taxiBO != null) {System.out.println("Received message: " + taxiBO.getAccountId());// 处理消息的业务逻辑processMessage(taxiBO);// 手动确认消息channel.basicAck(deliveryTag, false);}} catch (Exception e) {e.printStackTrace();// 处理失败,可以选择重新入队或丢弃消息channel.basicNack(deliveryTag, false, true);}}private void processMessage(TaxiBO taxiBO) {// 模拟业务逻辑处理try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("Processed message: " + taxiBO.getAccountId());}
}