当前位置: 首页 > news >正文

《RabbitMQ篇》消息应答和发布确认

消息应答

消息应答机制:消费者接收信息并处理完之后,告诉rabbitmq该信息已经处理,rabbitmq可以把该信息删除了.

消息自动重新入队:如果处理某个消息的消费者异常关闭了,没有发送ACK确认,rabbitmq会将其重新入队,分发给其他消费者处理。

自动应答

消息发送后即被认为传送成功。该模式弊端,消费者不断的接收消息,如果处理不及时,导致内存耗尽,系统杀死消费者进程。所以适用于消费者可以高效处理信息的情况下。

手动应答:

Channel.basicAck(long deliveryTag, boolean multiple):

确认一条或多条收到的消息。对于经过确认的消息,RabbitMQ 认为该消息成功的处理,可以将其丢弃了。

在手动应答情况下,如果消费者没有关闭,但就是不发送ACK确认,这条消息就会一直在MQ中显示unacked,不会重新分发给其他消费者。

deliveryTag:对应消息的标识。

multiple:如果为True,则应用多消息。

multiple该参数用于开启批量应答:比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时 5-8 的还未应答的消息都会被确认收到消息应答。

Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue):

拒绝一条或多条收到的消息。

deliveryTag:对应消息的标识。

multiple:如果为True,则应用多消息。

requeue:如果为True,拒绝的消息将会重新加入队列

重新入队的消息会分发给其他消费者,不重新入队的消息将会丢失/死信。

Channel.basicReject(long deliveryTag, boolean requeue):

拒绝一条消息。与 Channel.basicNack 相比少一个参数Multiple。

发布确认

生产者把消息发送到MQ后,需要MQ返回一个ACK给生产者,这样生产者才知道自己的消息成功发出去了。

实现过程:生产者将信道设置成confirm模式,所有在该信道的消息都会被指派一个唯一的id,消息到达匹配的队列后,broker发送一个确认给生产者,这样生产者就知道消息已经发布到MQ了

RabbitMQ默认没有开启发布确认模式的,需要使用Channel.confirmSelect() 开启发布确认。

Channel.waitForCinfirms():等待Broker确认或拒绝自上次调用以来发布的所有消息。

单个发布确认

单个确认发布:发布一个确认一个,才能接着发第二个。

优缺点:可以确认哪个消息失败,适用于吞吐量不大的程序。

public class ConfirmMessage {//消息的个数public static final int MESSAGE_COUNT = 1000; public static void main(String[] args) throws Exception {publishMessageIndividually();//发布1000个单独确认消息,耗时:546ms}public static void publishMessageIndividually() throws Exception {Channel channel = RabbitMQUtil.getChannel();//队列的声明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,true,false,null);//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();//批量发消息for (int i = 1; i <= MESSAGE_COUNT; i++) {String message = i+"";channel.basicPublish("",queueName,null,message.getBytes());//单个确认boolean flag = channel.waitForConfirms();if(flag){// TODO:消息发送成功的处理逻辑} else {// TODO: 消息发送失败的处理逻辑}}//结束时间long end = System.currentTimeMillis();System.out.println("发布"+MESSAGE_COUNT+"个单独确认消息,耗时:"+(end-begin)+"ms");}
}

批量发布确认

批量发布确认:以批次发布,一批次可以有很多消息,待批次发布完,才确认。

优缺点:速度比单个快,但是不能确认哪个消息失败。

public class ConfirmMessage {//消息的个数public static final int MESSAGE_COUNT = 1000; public static void main(String[] args) throws Exception {publishMessageBatch(); //发布1000个批量确认消息,耗时:117ms}public static void publishMessageBatch() throws Exception {Channel channel = RabbitMQUtil.getChannel();//队列的声明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,true,false,null);//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();int batchSize = 10;//批量发消息for (int i = 1; i <= MESSAGE_COUNT; i++) {String message = i+"";channel.basicPublish("",queueName,null,message.getBytes());//批量确认if(i % batchSize == 0) {boolean flag = channel.waitForConfirms();if(flag) {// TODO:消息发送成功的处理逻辑} else {// TODO: 消息发送失败的处理逻辑}}}//结束时间long end = System.currentTimeMillis();System.out.println("发布"+MESSAGE_COUNT+"个批量确认消息,耗时:"+(end-begin)+"ms");}
}

异步发布确认

异步确认发布:每个信息都有自己的标识,一直发布即可,发布成功与否,都会有回调信息。

优缺点:最快,又能确认哪个消息发布失败。

public class ConfirmMessage {//消息的个数public static final int MESSAGE_COUNT = 1000;public static void main(String[] args) throws Exception {publishMessageAsync(); //发布1000个异步确认消息,耗时:61ms}public static void publishMessageAsync() throws Exception {Channel channel = RabbitMQUtil.getChannel();//队列的声明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,true,false,null);//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();//异步确认ConfirmCallback confirmAckCallback = (deliveryTag, multiple) -> {// TODO:消息发送成功的处理逻辑};ConfirmCallback confirmNackCallback = (deliveryTag, multiple) -> {// TODO: 消息发送失败的处理逻辑};channel.addConfirmListener(confirmAckCallback, confirmNackCallback);//批量发消息for (int i = 1; i <= MESSAGE_COUNT; i++) {String message = i+"";channel.basicPublish("",queueName,null,message.getBytes());}//结束时间long end = System.currentTimeMillis();System.out.println("发布"+MESSAGE_COUNT+"个异步确认消息,耗时:"+(end-begin)+"ms");}
}

消息应答和发布确认的区别

消息应答属于消费者,消费完消息告诉MQ已经消费成功。

发布确认属于生产者,生产消息发送到MQ,MQ告诉生产者已经收到消息。


http://www.mrgr.cn/news/45257.html

相关文章:

  • 昆虫分类与检测系统源码分享
  • Machine Learning Specialization 学习笔记(5)
  • 处理器中的几种hazard
  • 幽门螺杆菌筛查在健康管理中的意义
  • 本地知识库+本地大模型,借助RAGFlow搭建医院医疗问诊助手,纯本地,超实用!
  • 全面整理人工智能(AI)学习路线图及资源推荐,非常详细收藏我这一篇就够了
  • 基于PHP+MySQL组合开发的720VR全景在线制作网站源码系统 带完整的安装代码包以及搭建部署教程
  • 自动化测试 | 下载谷歌驱动
  • 投资 -风口/政策大事记
  • 想走?可以!先买票——迭代器模式
  • 在 MySQL 中通过自定义哈希分片实现大规模数据的多线程并行处理20241008
  • 【Canvas与标牌】盾形银底红带Best Quality Premium标牌
  • JS 介绍/书写位置/输入输出语法
  • 一款开源Ai语音合成TTS工具:Fish Speech
  • SQL进阶技巧:如何优化NULL值引发的数据倾斜问题?
  • Dubbo超时设置与动态调整解决方案
  • Spring Boot实现License生成与校验详解
  • 省市区json记录
  • 上交2024最新-《动手学大模型》实战教程及ppt分享!
  • 什么是源代码加密?十种方法教你软件开发源代码加密