当前位置: 首页 > news >正文

Spring AMQP-保证消费者消息的可靠性

为什么要保证消息的可靠性?

当MQ向消费者发送一个消息之后需要得到消费者的状态,因为消息并不一定就真的被消费者给消费了,可能在消费的过程中出现了一些意外,比如

1. 网络问题

2. 消息转换有问题

3. 消费者本身的业务处理有问题


消费者确认机制

消费者消息处理状态:

  • ack:消息成功接收,并且成功被处理,MQ将此消息删除
  • nack:消息处理失败,需要MQ重新发送消息
  • reject:消息处理失败并且拒绝该消息,MQ将此消息删除

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用

  • manual:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活

  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:

    • 如果是业务异常,会自动返回nack

    • 如果是消息处理或校验异常,自动返回reject;


在配置文件中通过下面的配置即可设置ACK的处理方式

spring:rabbitmq:listener:simple:acknowledge-mode: c # none 默认 auto 自动确认 manual 手动确认 

消费者重试机制

消费者接收了一个消息,但是在处理的过程中出现异常了,那么AMQP会不断的重试,直到把资源占完然后崩掉,这个时候就必须要设置重试机制,限制重试的次数,避免无限制重试。

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

测试

先别配置重试机制,然后在需要在接收消息的地方手动抛出一个异常,查看控制台就会看见消费者在尝试不断的获取消息,但是一直获取不到无限制的重试

  @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "fanout.hamll.query1", // 队列名称durable = "true"), // 是否持久exchange = @Exchange(name = "fanout.hamll", type = ExchangeTypes.FANOUT) // 交换机名称))public void query2(String message) {System.err.println("fanout.hamll.query1 消息内容:" + message); throw  new RuntimeException("故意的错误"); // 抛出异常}

配置好重试之后到了三次就会直接停止,这样子就很好的减少了系统资源的消耗


业务的幂等性判断

什么是幂等性?

在Java领域,幂等性是指同一个请求,不管发送多少次执行的结构都是一样的。

比如支付和交易,支付成功之后通知交易服务修改状态。在交易服务需要查询订单并判断订单的状态,这样子不管同一个订单重复发起多少次请求,都不会对业务的结果造成影响。


MQ保证消息的幂等性

MQ中的幂等是说,不管消息是否被重复消费,都不会对业务造成影响、处理的结果都是一致的。


MQ实现业务幂等性

为每个消息都创建一个唯一的MessageId在操作的时候将其存入数据库,然后在进行判断消息是否存在,存在就直接跳过业务的处理,不存在就继续操作。

    @Beanpublic MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;}

业务实现幂等性

在业务的操作中,比如支付和交易服务,支付成功之后会通知交易服务修改订单的状态,而在交易服务应该做判断,判断该订单的状态是否未未支付。如果是未支付就继续处理接下来的业务,否则就直接结束。

package com.hmall.trade.listener;import com.hmall.trade.domain.po.Order;
import com.hmall.trade.service.IOrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class PayStatusListener {@AutowiredIOrderService orderService;@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "trade.pay.success.queue", durable = "true"),exchange = @Exchange(value = "pay.direct", type = ExchangeTypes.DIRECT),key = "pay.success"))public void paySuccess(Long orderId) {log.info("支付成功,订单号:{}", orderId);//查询当前订单 判断幂等性Order order = orderService.getById(orderId);//判断状态以及对象是否存在if (order == null || order.getStatus() != 1) {return;}orderService.markOrderPaySuccess(orderId);}
}


http://www.mrgr.cn/news/83121.html

相关文章:

  • 微服务-Nacos(注册中心)
  • HTML 音频(Audio)
  • Ubuntu上安装Apache Spark
  • Maven的基本使用
  • 基于ROS先验地图的机器人自主定位与导航SLAM
  • Express 加 sqlite3 写一个简单博客
  • 通俗易懂之线性回归时序预测PyTorch实践
  • 在 Ubuntu 22.04 上部署 AppArmor 应用安全教程
  • 现场展示deepseek VS openAI o1模型大对比
  • 论文笔记:FDTI: Fine-grained Deep Traffic Inference with Roadnet-enriched Graph
  • STM32供电参考设计
  • Windows下安装最新版的OpenSSL,并解决OpenSSL不是当前版本的问题,或者安装不正确的问题
  • 如何在 Ubuntu 22.04 上配置 Logrotate 高级教程
  • SpringBoot操作spark处理hdfs文件
  • 机器学习之随机森林算法实现和特征重要性排名可视化
  • B树及其Java实现详解
  • 《Spring Framework实战》7:4.1.2.容器概述
  • 【Rust自学】11.1. 编写和运行测试
  • 如何使用vue引入three.js
  • 人工智能的发展领域之GPU加速计算的应用概述、架构介绍与教学过程
  • 7ZIP 常见使用问题解决办法
  • B+树的原理及实现
  • SpringBoot日常:集成Kafka
  • Python —— 常用的字符串方法
  • JavaSE
  • UnityRenderStreaming使用记录(五)