从安装到实战:Spring Boot与RabbitMQ的终极整合指南
Docker环境中部署Rabbitmq
# 拉取最新的rabbitmq的镜像
docker pull rabbitmq# 拉取指定版本的rabbitmq镜像
docker pull rabbitmq:版本号#启动rabbitmq镜像服务
docker run -d -p 15672:15672 -p 5672:5672 \--restart=always \-e RABBITMQ_DEFAULT_VHOST=my_vhost \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=admin \--hostname myRabbit \--name rabbitmq\rabbitmq:latest# 参数说明:
# -d:表示在后台运行容器;
# -p:将主机的端口 15673(Web访问端口号)对应当前rabbitmq容器中的 15672 端口,将主机的5674(应用访问端口)端口映射到# # rabbitmq中的5672端口;
# --restart=alawys:设置开机自启动
# -e:指定环境变量:
# RABBITMQ_DEFAULT_VHOST:默认虚拟机名;
# RABBITMQ_DEFAULT_USER:默认的用户名;
# RABBITMQ_DEFAULT_PASS:默认的用户密码;
# --hostname:指定主机名(RabbitMQ 的一个重要注意事项是它根据所谓的 节点名称 存储数据,默认为主机名);
# --name rabbitmq:设置容器名称;
启动RabbitMQ访问的Web客户端
#方法一:进入容器开启
# 1. 进入rabbitmq容器
docker exec -it 容器名/容器id /bin/bash
# 2. 开启web客户端
rabbitmq-plugins enable rabbitmq_management#方法二:直接开启
docker exec -it 容器名/容器id rabbitmq-plugins enable rabbitmq_management
访问web页面
处理Stats in management UI are disabled on this node问题
# 1. 进入启动rabbitmq的容器中 docker exec -it 容器名称/容器id /bin/bashdocker exec -it rabbitmq /bin/bash
# 2. 切换到rabbitmq的配置文件目录cd /etc/rabbitmq/conf.d/
# 3. 修改配置文件- management_agent.disable_metrics_collector.confecho management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
# 4. 查看配置文件cat management_agent.disable_metrics_collector.conf
# 5. 退出容器exit
# 6. 重启容器 docker restart 容器名称/容器iddocker restart rabbitmq
# 7. 再次进入rabbitmq的web客户端页面,点击各个页面就不会出现那个提示,
RabbitMQ详解
工作流程:
- 生产者将消息推送到交换机上 交换机根据路由规则将消息路由到一个或者多个队列
- 消费者从队列中拉取推送消息,然后进行处理。
- 消费者在成功处理消息后,会向MQ 发送确认(ack),表示消息成功消费。
- 如果消费者未能处理消息(崩溃),MQ 重新将消息放回队列,待其他消费者处理。
核心概念:
生产者 (Producer):
- 负责创建并发送消息到 RabbitMQ。生产者通常不直接将消息发送到队列,而是发送到交换机。
交换机 (Exchange):
- 负责接收来自生产者的消息,并根据绑定规则将消息路由到相应的队列
交换机有不同类型:
- Direct Exchange: 根据精确的路由键进行匹配,精确匹配队列。
- Fanout Exchange: 将消息广播到所有绑定的队列,发布订阅模式。
- Topic Exchange: 通过模式匹配路由键来路由消息,更为灵活的路由匹配规则。
- Headers Exchange: 使用消息头属性进行路由,很少使用。
队列 (Queue):
- 消息被存储在队列中等待消费者处理。
- 队列按照先进先出(FIFO)的顺序存储和传递消息。
消费者 (Consumer):
- 从队列中接收并处理消息。消费者可以是独立的应用程序或服务。
- 消费者需要确认已成功处理消息,以便 RabbitMQ 可以将该消息从队列中移除。
绑定 (Binding):
- 是交换机和队列之间的链接,定义了交换机如何将消息路由到队列。
- 绑定可以包含特定的路由键,帮助交换机确定目标队列。
连接 (Connection) 和通道 (Channel):
- 连接是应用程序与 RabbitMQ 之间的 TCP 连接。
- 通道是在连接内的虚拟连接,轻量级,建议在应用中使用通道而非直接操作连接。
虚拟主机 (Virtual Host, vhost):
- 用于多租户和隔离,可以在同一实例中定义多个虚拟主机,每个虚拟主机可以包含自己的交换机、队列等。
用户权限 (Permissions):
- RabbitMQ 中的用户权限控制访问级别,可以设定哪些用户可以访问哪些虚拟主机及其资源。
web页面介绍:
Overview
Connections
- Name 连接名 点击连接名, 还可以查看详细的信息~
- User name 当前连接登录MQ 的用户
- State 当前连接的状态,running 运行 idle 空闲
- SSL|TLS 是否使用的是 SSL|TLS协议
- Peotocol AMQP 0-9-1 指的是AMQP 的协议版本号
- Channels 当前连接创建通道的 通道总数
- From client 每秒发出的消息数
- To client 每秒接收的消息数
Channels
记录各个连接的信道:
一个连接IP 可以有多个信道 多个通道通过多线程实现,不相互干扰 我们在 信道中创建:队列 交换机 …
生产者的通道一般使用完之后会立马关闭,消费者是一直监听的… - Channel 通道名称
- User Name 该通道,创建者 用户名
- Model 通道的确认模式 C confirm模式 T 表示事务
- State 通道当前的状态 running 运行 idie 空闲
- Unconfirmed 待确认的消息数
- Prefetch 预先载入
- Prefetch 表示每个消费者最大的能承受的未确认消息数目
简单来说就是用来指定一个消费者一次可以从 RabbitMQ 中获取多少条消息并缓存在消费者中,
一旦消费者的缓冲区满了,RabbitMQ 将会停止投递新的消息到该消费者中直到它发出有消息被 ack 了
消费者负责不断处理消息,不断 ack,然后只要 UnAcked 数少于 Prefetch * consumer 数目,RabbitMQ 就不断将消息投递过去 - Unacker 待 ack 的消息数
- publish 消息生产者发送消息的 速率
- confirm 消息生产者确认消息的 速率
- unroutable drop 表示消息,未被接收,且已经删除的消息.
- deliver / get 消息消费者获取消息的 速率
- ack 消息消费者 ack 消息的速率. MQ 的 ACK机制:100%消息消费!
Exchange
Queue
- Name 表示消息队列的名称
- Type 消息队列的类型…
- Features:表示消息队列的特性,D 表示消息队列持久化
- State:表示当前队列的状态,running 表示运行中;idle 表示空闲
- Ready:表示待消费的消息总数
- Unacked:表示待应答的消息总数
- Total:表示消息总数 Ready+Unacked
- incoming:表示消息进入的速率
- deliver/get:表示获取消息的速率
- ack:表示消息应答的速
SpringBoot整合RabbitMQ
我们启动两个springboot项目一个作为生产者推送消息,一个作为消费者消费消息。
生产者
引入依赖
<!--rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
添加配置
spring.application.name=provider
server.port=8080
spring.rabbitmq.host=192.168.253.166
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=my_vhost
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
注意虚拟主机不是必须的,如果未创建可以不加此配置。
消费者
引入依赖
<!--rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency>
添加配置
spring.application.name=consumer
server.port=8081
spring.rabbitmq.host=192.168.253.166
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=my_vhost
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
三种常见的交换机样例
direct exchange(直连型交换机)
创建DirectRabbitConfig.java
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Description : 直连交换机**/
@Configuration
public class DirectRabbitConfig {//Direct交换机 起名:TestDirectExchange@BeanDirectExchange TestDirectExchange() {// durable:是否持久化,默认是false,如果交换机被设置为 true,则该交换机在重启后也会存在,但需要注意的是,它不能保证未持久化的消息也会被保留。// autoDelete:是否自动删除,当没有生产者或者消费者使用此交换机,该交换机会自动删除。return new DirectExchange("TestDirectExchange", true, false);}@BeanDirectExchange lonelyDirectExchange() {return new DirectExchange("lonelyDirectExchange");}//队列 起名:TestDirectQueue@Beanpublic Queue TestDirectQueue() {// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。//一般设置一下队列的持久化就好,其余两个就是默认falsereturn new Queue("TestDirectQueue", true);}//绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting@BeanBinding bindingDirect() {return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");}}
写个推送消息的方法
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;/*** @Description :推送**/
@RestController
public class SendMessageController {@AutowiredRabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法@GetMapping("/sendDirectMessage")public String sendDirectMessage() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "test message, hello!";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String,Object> map=new HashMap<>();map.put("messageId",messageId);map.put("messageData",messageData);map.put("createTime",createTime);//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchangerabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);return "ok";}}
调用sendDirectMessage方法,在rabbitMq管理页面看看,消息已经成功推送到所定义的直连交换机上了。
接下来,我们监听这个队列,在消费者程序中,消费掉这条消息。创建直接交换机监听类DirectReceiver。
@Component
@RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue
public class DirectReceiver {@RabbitHandlerpublic void process(Map testMessage) {System.out.println("DirectReceiver消费者收到消息 : " + testMessage.toString());}}
可以看到,该条消息已被消费掉。
直连交换机是一对一,现在配置多台监听绑定到同一个直连交互的同一个队列,看看如何消费的呢?
结果是实现了轮询的方式对消息进行消费,而且不存在重复消费。
Topic Exchange 主题交换机。
在生产者程序中创建TopicRabbitConfig.java:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Description :主题交换机**/@Configuration
public class TopicRabbitConfig {//绑定键public final static String dog = "topic.dog";//队列public final static String dogQueue = "dogQueue";public final static String catQueue = "catQueue";@Beanpublic Queue firstQueue() {return new Queue(TopicRabbitConfig.dogQueue);}@Beanpublic Queue secondQueue() {return new Queue(TopicRabbitConfig.catQueue);}@BeanTopicExchange exchange() {return new TopicExchange("topicExchange");}//将firstQueue和topicExchange绑定,而且绑定的键值为topic.dog//这样只要是消息携带的路由键是topic.man,才会分发到该队列@BeanBinding bindingExchangeMessage() {return BindingBuilder.bind(firstQueue()).to(exchange()).with(dog);}//将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#//注意第二个队列topic.cat绑定了绑定键的统配规则此时为topic.#//这样只要是消息携带的路由键是以topic.开头,都会分发到该队列@BeanBinding bindingExchangeMessage2() {return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");}}
然后添加多2个接口,用于推送消息到主题交换机:
@GetMapping("/sendTopicMessage1")public String sendTopicMessage1() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "message: dog dog ";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> manMap = new HashMap<>();manMap.put("messageId", messageId);manMap.put("messageData", messageData);manMap.put("createTime", createTime);rabbitTemplate.convertAndSend("topicExchange", "topic.dog", manMap);return "ok";}@GetMapping("/sendTopicMessage2")public String sendTopicMessage2() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "message: cat cat ";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> womanMap = new HashMap<>();womanMap.put("messageId", messageId);womanMap.put("messageData", messageData);womanMap.put("createTime", createTime);rabbitTemplate.convertAndSend("topicExchange", "topic.cat", womanMap);return "ok";}
接着在消费者程序中,添加两个监听。
此时队列dogQueue的绑定建为topic.dog 则只能监听绑定键为topic.dog的消息。
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;@Component
@RabbitListener(queues = "dogQueue")
public class TopicDogReceiver {@RabbitHandlerpublic void process(Map testMessage) {System.out.println("TopicDogReceiver消费者收到消息 : " + testMessage.toString());}
}
注意此时队列catQueue实际上 通配规则为topic.# 即前缀为topic. 消息 此队列都能监听到。
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;@Component
@RabbitListener(queues = "catQueue")
public class TopicCatReceiver {@RabbitHandlerpublic void process(Map testMessage) {System.out.println("TopicCatReceiver消费者收到消息 : " + testMessage.toString());}
}
现在调用sendTopicMessage1发送方法,可以看到,两个队列监听均消费到了消息。
现在调用sendTopicMessage2发送方法,可以看到,只有队列catQueue接收到了消息
Fanout Exchang 扇型交换机。
在生产者项目上创建FanoutRabbitConfig.java:
扇形交换机不使用路由键来决定消息的接收者。它只需将消息发送给所有与之绑定的队列,因此在创建绑定时不需要指定路由键
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutRabbitConfig {/*** 创建三个队列 :fanout.A fanout.B fanout.C* 将三个队列都绑定在交换机 fanoutExchange 上* 因为是扇型交换机, 路由键无需配置,配置也不起作用*/@Beanpublic Queue queueA() {return new Queue("fanout.A");}@Beanpublic Queue queueB() {return new Queue("fanout.B");}@Beanpublic Queue queueC() {return new Queue("fanout.C");}@BeanFanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange");}@BeanBinding bindingExchangeA() {return BindingBuilder.bind(queueA()).to(fanoutExchange());}@BeanBinding bindingExchangeB() {return BindingBuilder.bind(queueB()).to(fanoutExchange());}@BeanBinding bindingExchangeC() {return BindingBuilder.bind(queueC()).to(fanoutExchange());}
}
写一个方法用来发送消息
扇形不需要输入绑定键,输入也不起作用,所以此处是null。
@GetMapping("/sendFanoutMessage")public String sendFanoutMessage() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "message: testFanoutMessage ";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("messageId", messageId);map.put("messageData", messageData);map.put("createTime", createTime);rabbitTemplate.convertAndSend("fanoutExchange", null, map);return "ok";}
接着在消费者程序上,监听三个队列。
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {@RabbitHandlerpublic void process(Map testMessage) {System.out.println("FanoutReceiverA消费者收到消息 : " +testMessage.toString());}}
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {@RabbitHandlerpublic void process(Map testMessage) {System.out.println("FanoutReceiverB消费者收到消息 : " +testMessage.toString());}}
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {@RabbitHandlerpublic void process(Map testMessage) {System.out.println("FanoutReceiverC消费者收到消息 : " +testMessage.toString());}}
调用/sendFanoutMessage 方法
可以看到只要发送到 fanoutExchange 这个扇型交换机的消息, 三个队列都绑定这个交换机,所以三个消息接收类都监听到了这条消息
消息回调
接下来测试消息的回调,其实就是消息确认(生产者推送消息成功,消费者接收消息成功)。
生产者推送消息成功
在生产者程序中 开启消息确认的配置项
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
配置回调函数
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);
// Mandatory 设置:
// 当 Mandatory 被设置为 true 时,如果消息发送到交换机但没有找到匹配的队列(即无法路由到任何队列),RabbitMQ 会将该消息返回给生产者,并触发 ReturnCallback。
// 如果 Mandatory 设置为 false,在这种情况下,消息将被丢弃,不会触发 ReturnCallback。rabbitTemplate.setMandatory(true);// ConfirmCallback 在消息成功发送到 RabbitMQ 服务器并被确认时触发。
// 如果设置了 Mandatory 为 true(如你的代码中所做),即使消息未能路由到任何队列,也会触发这个回调,只是 ack 参数会为 false。rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("ConfirmCallback: "+"相关数据:"+correlationData);System.out.println("ConfirmCallback: "+"确认情况:"+ack);System.out.println("ConfirmCallback: "+"原因:"+cause);}});// ReturnCallback 在消息发送到交换机但未能路由到任何队列时触发。rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("ReturnCallback: "+"消息:"+message);System.out.println("ReturnCallback: "+"回应码:"+replyCode);System.out.println("ReturnCallback: "+"回应信息:"+replyText);System.out.println("ReturnCallback: "+"交换机:"+exchange);System.out.println("ReturnCallback: "+"路由键:"+routingKey);}});return rabbitTemplate;}}
上面写了两个回调函数,一个叫 ConfirmCallback ,一个叫 RetrunCallback;
现在我们验证两种回调函数都是在什么情况触发?
一共有的四种情况
- 消息推送到server,但是在server里找不到交换机
- 消息推送到server,找到交换机了,但是没找到队列
- 消息推送到sever,交换机和队列啥都没找到
- 消息推送成功
一、消息推送到server,但是在server里找不到交换机
测试接口写个不存在交换机的
@GetMapping("/TestUnExistentMessageAck")public String TestUnExistentMessageAck() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "message: non-existent-exchange test message ";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("messageId", messageId);map.put("messageData", messageData);map.put("createTime", createTime);rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", map);return "ok";}
结论: 这种情况触发的是 ConfirmCallback 回调函数。
二、消息推送到server,找到交换机了,但是没找到队列
现在使用我们之前创建的lonelyDirectExchange,绑定队列
写个测试接口,把消息推送到‘lonelyDirectExchange’的交换机上(这个交换机是没有任何队列配置的):
@GetMapping("/TestLonelyMessageAck2")public String TestLonelyMessageAck2() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "message: lonelyDirectExchange test message ";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("messageId", messageId);map.put("messageData", messageData);map.put("createTime", createTime);rabbitTemplate.convertAndSend("lonelyDirectExchange", "TestDirectRouting", map);return "ok";}
可以看到,两个函数都被调用了。
这种情况下,消息成功推送到服务器,因此 ConfirmCallback 的确认情况为 true。同时,消息已成功发送到交换机,但由于找不到队列,导致路由失败,触发了 ReturnCallback 的错误码 NO_ROUTE。
结论: 在这种情况下,ConfirmCallback 和 ReturnCallback 都会被触发。
三、消息推送到sever,交换机和队列啥都没找到
和第一种情况类似,不在赘述。
结论: 这种情况触发的是 ConfirmCallback 回调函数。
四、消息推送成功
调用之前正常的接口即可。
结论: ④这种情况触发的是 ConfirmCallback 回调函数。
消费者推送消息成功
和生产者的消息确认机制不同,消费者本来就是在监听消息,符合条件的消息就会消费下来。
所以,消息接收的确认机制主要存在三种模式:
- 自动确认: 默认模式,RabbitMQ 在消息发送后立即认为已处理,存在丢失风险。
对于丢失风险:一般使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。 - 根据情况确认: 消费者可以根据业务逻辑选择性地确认或拒绝消息。(不做介绍)
- 手动确认: 消费者显式调用确认方法,消息只有成功处理后才会标记为已完成,这是一种常用且安全的模式。
在RabbitMQ中,消费者需要手动调用basic.ack
、basic.nack
或basic.reject
来确认消息投递的结果:
- basic.ack:表示消息已被成功处理。
- basic.nack:否定确认,消息未被正确处理。可以选择是否重新入列。
- basic.reject:与
basic.nack
类似,但只能拒绝单条消息,也可选择是否重新入列。
使用basic.reject
和basic.nack
时,如果选择将消息重新入列(参数设为true
),需谨慎操作。因为这样可能导致消息不断在队列中循环(消费-入列-消费),最终导致消息积压。如果消息确实不再需要处理,应该避免重新入列。
手动确认测试
在消费者程序里,新建MessageListenerConfig.java。
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MessageListenerConfig {@Autowiredprivate CachingConnectionFactory connectionFactory;@Autowiredprivate MyAckReceiver myAckReceiver;//消息接收处理类@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer() {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);container.setConcurrentConsumers(1);container.setMaxConcurrentConsumers(1);container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息//设置一个队列container.setQueueNames("TestDirectQueue");//如果同时设置多个如下: 前提是队列都是必须已经创建存在的// container.setQueueNames("TestDirectQueue","TestDirectQueue2","TestDirectQueue3");//另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues//container.setQueues(new Queue("TestDirectQueue",true));//container.addQueues(new Queue("TestDirectQueue2",true));//container.addQueues(new Queue("TestDirectQueue3",true));container.setMessageListener(myAckReceiver);return container;}}
对应的手动确认消息监听类,MyAckReceiver.java(手动确认模式需要实现 ChannelAwareMessageListener)
关掉之前的监听队列 ,以免造成多个同类型监听器都监听同一个队列。
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.Map;@Componentpublic class MyAckReceiver implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {byte[] body = message.getBody();ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(body));Map<String,String> msgMap = (Map<String,String>) ois.readObject();String messageId = msgMap.get("messageId");String messageData = msgMap.get("messageData");String createTime = msgMap.get("createTime");ois.close();System.out.println(" MyAckReceiver messageId:"+messageId+" messageData:"+messageData+" createTime:"+createTime);System.out.println("消费的主题消息来自:"+message.getMessageProperties().getConsumerQueue());channel.basicAck(deliveryTag, true); //第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
// channel.basicReject(deliveryTag, true);//第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝} catch (Exception e) {channel.basicReject(deliveryTag, false);e.printStackTrace();}}}
调用接口/sendDirectMessage, 给直连交换机TestDirectExchange 的队列TestDirectQueue 推送一条消息,可以看到监听器正常消费了下来:
如果想同时监听多个队列变为手动模式。
1、MessageListenerConfig类SimpleMessageListenerContainer方法中添加多个监听队列。
2、MyAckReceiver类中onMessage匹配对应队列,并且处理对应队列的业务。
3、可以看到已经成功消费两个队列的消息
以上就是关于rabbitmq的整合。