当前位置: 首页 > news >正文

【RabbitMQ】06-消费者的可靠性

在这里插入图片描述
在这里插入图片描述

1. 消费者确认机制

没有ack,mq就会一直保留消息。

spring:rabbitmq:listener:simple:acknowledge-mode: auto # 自动ack

2. 失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。
极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力。

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。可以发现:

  • 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
  • 本地重试3次以后,抛出了AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回reject,消息会被丢弃

3. 失败处理策略

在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
在这里插入图片描述
代码:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfiguration {
//    @RabbitListener(bindings = @QueueBinding(
//            value = @Queue(name = "error.queue"),
//            exchange = @Exchange(name = "error.direct", type = ExchangeTypes.DIRECT),
//            key = {"error"}
//    ))
//    public void bings(Object msg){
//        System.out.println("异常"+msg.toString());
//    }@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

http://www.mrgr.cn/news/70414.html

相关文章:

  • Varjo:垂直起降机混合现实培训解决方案
  • MongoDB相关问题
  • linux从0到1——shell编程9
  • uniapp 自定义popup 弹窗 简单封装(微信小程序)
  • [ 渗透测试面试篇-2 ] 针对大规模资产的攻击思路
  • LSA详情与特殊区域
  • 【前端】手写一个简单的分页器
  • 如何解决亚马逊商家IP问题:静态住宅IP的优势与选择指南
  • 1547. 切棍子的最小成本-cangjie
  • 网络、子网
  • 实验室信息管理系统源码,医院LIS系统源码,C/S结构,C#语言开发,适合上项目。
  • vxe-vxe-colgroup后端返回数据 对数据进行处理 动态合并分组表头(v-if控制表格渲染(数据请求完成后渲染))
  • ROS2在自定义服务接口中的常数调用(python)
  • c++如何绑定一个类与类内成员的关系
  • AES加密原理
  • Docker使用docker-compose一键部署nacos、Mysql、redis
  • 分段式爬虫和数据采集的有趣话题
  • c++基础30字符
  • 【前端学习笔记】JavaScript学习一【变量与数据类型】
  • 体育数据API纳米篮球数据API:网球数据接口文档API示例③
  • 多态之魂:C++中的优雅与力量
  • 位运算_常见位运算总结
  • C语言 函数
  • mysql:解决windows启动失败无报错(或长时间未响应)
  • c++11(一)
  • 怎么查域名的交易价格?