当前位置: 首页 > news >正文

RabbitMQ最全教程-Part2(高阶使用)

我们先来看一下这个网约车的场景

 我们需要实现派单服务,用户发送打车订单后需要进行进行派单,如果在指定时间内没有找到司机就会收到派单超时的通知,并且能够实时查看当前排队的抢单人数

 在这幅图中,我们今天来解决一下其中的三个关键问题:打车超时,打车排队,消息推送

一、延时任务

首先什么时候需要做延时任务?

  • 生成订单30分钟未支付,则自动取消
  • 生成订单60秒后,给用户发短信
  • 滴滴打车订单完成后,如果用户一直不评价,48小时后会将自动评价为5星。

 就像刚才的打车超时问题,如果再最大限度时间内还没有打到车,那么我们需要做一些操作,这个整个流程我们就可以称为延时任务

那如何解决延时任务呢?

最原始的方案可以对数据库进行轮询操作,当查询到某些任务的延时时间到了就触发执行

除此之外,我们可以用延时队列实现

1、延时队列

 JDK的延迟队列

该方案是利用JDK自带的DelayQueue来实现,这是一个无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素,放入DelayQueue中的对象,是必须实现Delayed接口的。

 由于JDK自带的延时队列是基于内存的,所以其执行效率非常高,延迟小

缺点:

服务器重启后,数据全部消失,怕宕机
集群扩展相当麻烦
因为内存条件限制的原因,比如下单未付款的订单数太多,那么很容易就出现OOM异常
代码复杂度较高

时间轮算法

 这样可以看出定时轮由个3个重要的属性参数,ticksPerWheel(一轮的tick数),tickDuration(一个tick的持续时间)以及 timeUnit(时间单位),例如当ticksPerWheel=60,tickDuration=1,timeUnit=秒,这就和现实中的始终的秒针走动完全类似了。

如果当前指针指在1上面,我有一个任务需要4秒以后执行,那么这个执行的线程回调或者消息将会被放在5上。那如果需要在20秒之后执行怎么办,由于这个环形结构槽数只到8,如果要20秒,指针需要多转2圈,位置是在2圈之后的5上面(20 % 8 + 1)

缺点参考JDK延迟队列

使用RabbitMQ实现延时任务

 RabbitMQ可以针对Queue和Message设置 x-message-ttl,来控制消息的生存时间,如果超时,则消息变为dead letter
RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,用来控制队列内出现了dead letter,则按照这两个参数重新路由。

这个方案称为死信队列

2、死信队列

什么是死信队列?

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解

一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信,自然就有了死信队列

所以死信队列就是存放死信消息的队列

死信交换机

死信如何被投放到死信队列,那肯定是要先交到一个死信交换机的手中。

那死信交换机只接收哪类消息呢?

  • 消费者对消息使用了basicReject或者basicNack回复,并且requeue参数设置为false,即不再将该消息重新在消费者间进行投递
  • 消息在队列中超时,RabbitMQ可以在单个消息或者队列中设置TTL属性
  • 队列中的消息已经超过其设置的最大消息个数

死信队列的实现

在 Spring Boot 配置类中定义死信交换器、普通交换器、普通队列和死信队列,并进行绑定。

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 死信交换器名称public static final String DEAD_LETTER_EXCHANGE = "dlx.exchange";// 死信队列名称public static final String DEAD_LETTER_QUEUE = "dlx.queue";// 死信路由键public static final String DEAD_LETTER_ROUTING_KEY = "dlx.routing.key";// 普通交换器名称public static final String NORMAL_EXCHANGE = "normal.exchange";// 普通队列名称public static final String NORMAL_QUEUE = "normal.queue";// 普通路由键public static final String NORMAL_ROUTING_KEY = "normal.routing.key";@Beanpublic DirectExchange deadLetterExchange() {return new DirectExchange(DEAD_LETTER_EXCHANGE);}@Beanpublic Queue deadLetterQueue() {return new Queue(DEAD_LETTER_QUEUE, true); // durable: true}@Beanpublic Binding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(DEAD_LETTER_ROUTING_KEY);}@Beanpublic DirectExchange normalExchange() {return new DirectExchange(NORMAL_EXCHANGE);}@Beanpublic Queue normalQueue() {Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);args.put("x-message-ttl", 10000); // 消息存活时间,单位毫秒return new Queue(NORMAL_QUEUE, true, false, false, args); // durable: true, exclusive: false, autoDelete: false, arguments: args}@Beanpublic Binding normalBinding() {return BindingBuilder.bind(normalQueue()).to(normalExchange()).with(NORMAL_ROUTING_KEY);}
}

创建一个生产者类,用于发送消息到普通队列。

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage(String message) {rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE, RabbitMQConfig.NORMAL_ROUTING_KEY, message);System.out.println("Sent message: " + message);}
}

创建一个消费者类,用于消费普通队列中的消息。如果消息处理失败或达到最大重试次数,消息将被路由到死信队列。

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class MessageConsumer {@RabbitListener(queues = RabbitMQConfig.NORMAL_QUEUE)public void receiveMessage(String message) {try {// 模拟消息处理失败if (message.equals("fail")) {throw new RuntimeException("Message processing failed");}System.out.println("Received message: " + message);} catch (Exception e) {System.out.println("Failed to process message: " + message);throw e; // 重新抛出异常,使消息进入死信队列}}
}

基于RabbitMQ的死信队列实现延时任务

其关键点就是对于这个正常队列不要添加消费者,TTL一到,自动投递到死信队列,接下来再对死信队列中消息进行消费,实现一个延时任务的执行

二、消息可靠性保证

面试题:MQ如何保证消息不丢失?

这是一个很宽泛的问题,而不仅仅回答一个同步?异步?ACK?

我们要从生产端怎么保证消息可靠性的、broker怎么保证的、消费端怎么保证的?

生产者保证消息可靠性

1、失败通知

当生产者发送消息时,消息先到交换机。这时,交换机有两种情况,一是能正常到达queue,我们称为可路由,反过来,主要无法传到某个queue,那么就代表不可路由,这时候表示投递失败,会有一个失败通知反馈给生产者

在代码层面,需要在生产者添加监听器

yml配置

spring:rabbitmq:# 消息在未被队列收到的情况下返回publisher-returns: true
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.api.ReturnedMessage;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// 设置 Return 回调rabbitTemplate.setReturnCallback((returnedMessage) -> {System.out.println("Message returned: " + new String(returnedMessage.getBody()) +", replyCode: " + returnedMessage.getReplyCode() +", replyText: " + returnedMessage.getReplyText() +", exchange: " + returnedMessage.getExchange() +", routingKey: " + returnedMessage.getRoutingKey());});return rabbitTemplate;}}

2、发送方确认

在上面失败通知时,是发送在无法找到queue,这时除了失败通知,其实还会返回Nack;

 当可路由时,那就开始投放到真正的queue,但是这个投放一定会放入成功吗?肯定会有失败(比如:队列已满,不可再投放)

怎么样在代码层面开启发送方确认ack呢?

spring:rabbitmq:    # 开启消息确认机制publisher-confirm-type: correlated
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.ReturnedMessage;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// 设置 发送方确认 回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("Message sent successfully: " + correlationData.getId());} else {System.out.println("Message sending failed: " + correlationData.getId() + ", cause: " + cause);}});// 设置 失败通知 回调rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {System.out.println("Message returned: " + message + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);});return rabbitTemplate;}
}

Broker保证消息可靠性

假设有现在一种情况,生产者已经成功将消息发送到了交换机,并且交换机也成功的将消息路由到了队列中,但是在消费者还未进行消费时,mq挂掉了,那么重启mq之后消息还会存在吗?如果消息不存在,那就造成了消息的丢失,也就不能保证消息的可靠性传输了。

解决办法就是开启RabbitMQ的持久化机制

在 Spring Boot 配置类中定义交换器、队列和绑定关系,并设置队列的持久化属性。

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.Binding;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {public static final String EXCHANGE_NAME = "my.exchange";public static final String QUEUE_NAME = "my.queue";public static final String ROUTING_KEY = "my.routing.key";@Beanpublic DirectExchange directExchange() {return new DirectExchange(EXCHANGE_NAME, true, false); // durable: true, autoDelete: false}@Beanpublic Queue queue() {return new Queue(QUEUE_NAME, true); // durable: true}@Beanpublic Binding binding() {return BindingBuilder.bind(queue()).to(directExchange()).with(ROUTING_KEY);}
}

生产者:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage(TaxiBO taxiBO) {rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, taxiBO, m -> {m.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 设置消息持久化return m;}, new CorrelationData(UUID.randomUUID().toString()));System.out.println("Sent message: " + taxiBO.getAccountId());}
}

消费者保证消息可靠性

消费者接收到消息,但是还未处理或者还未处理完,此时消费者进程挂掉了,比如重启或者异常断电等,此时MQ认为消费者已经完成消息消费,就会从队列中删除消息,从而导致消息丢失。

该如何避免这种情况呢?这就要用到RabbitMQ提供的ack机制,RabbitMQ默认是自动ack的,此时需要将其修改为手动ack,也即自己的程序确定消息已经处理完成后,手动提交ack,此时如果再遇到消息未处理进程就挂掉的情况,由于没有提交ack,RabbitMQ就不会删除这条消息,而是会把这条消息发送给其他消费者处理,但是消息是不会丢的。

spring:rabbitmq:listener:simple:acknowledge-mode: manual  # 手动ack
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;import java.io.IOException;@Component
public class MessageConsumer {@Autowiredprivate RabbitTemplate rabbitTemplate;@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME, acknowledgeMode = "manual")public void receiveMessage(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {try {TaxiBO taxiBO = rabbitTemplate.getMessageConverter().fromMessage(message, TaxiBO.class);if (taxiBO != null) {System.out.println("Received message: " + taxiBO.getAccountId());// 处理消息的业务逻辑processMessage(taxiBO);// 手动确认消息channel.basicAck(deliveryTag, false);}} catch (Exception e) {e.printStackTrace();// 处理失败,可以选择重新入队或丢弃消息channel.basicNack(deliveryTag, false, true);}}private void processMessage(TaxiBO taxiBO) {// 模拟业务逻辑处理try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("Processed message: " + taxiBO.getAccountId());}
}


http://www.mrgr.cn/news/63957.html

相关文章:

  • k8s集群 ceph rbd 存储动态扩容
  • 机器人领域中的scaling law:通过复现斯坦福机器人UMI——探讨数据规模化定律(含UMI的复现关键)
  • 详解CRC校验原理以及FPGA实现
  • openpnp - 在openpnp中单独测试相机
  • 成都睿明智科技有限公司抖音电商服务的领航者
  • 使用 `tracert [options] <目标地址>` 命令的详细介绍
  • 嵌入式Linux系统中GPIO实验详解
  • vscode markdown-image 图片粘贴自动上传到本地目录设置
  • 什么品牌的护眼台灯比较好?五款护眼效果比较明显的护眼台灯
  • 对于图像的关键点数据提取openpose
  • 第三百零八节 Log4j教程 - Log4j日志到数据库
  • AWS RDS MySQL内存使用
  • Caffeine 手动策略缓存 put() 方法源码解析
  • Copilot功能
  • 单例模式的五种实现方式及优缺点
  • 从0开始学统计-什么是Z-score
  • 【国产MCU系列】-GD32F4开发环境搭建(基于Embedded Builder)
  • 自动化测试工具Ranorex Studio(十九)-其他编辑选项
  • HTML 基础标签——分组标签 <div>、<span> 和基础语义容器
  • magic-api简单使用六:删除接口(支持路径传参)
  • 从实验室到生活:超分子水凝胶湿电发电机的应用之路
  • 【语义分割|代码解析】CMTFNet-2: CNN and Multiscale Transformer Fusion Network 用于遥感图像分割!
  • 学生党百元预算如何选到高性价比头戴耳机?四款百元热门耳机推荐
  • 国密SM2 非对称加解密前后端工具
  • 在 openEuler 22.03 服务器上搭建 web 服务教程
  • 100种算法【Python版】第34篇——PageRank算法