当前位置: 首页 > news >正文

黑马商城微服务复习(6)

MQ高级

  • 1. 消息可靠性
  • 2. 发送者的可靠性
    • 1. 发送者问题
    • 2. 生产者重试机制
    • 3. 生产者确认机制
    • 4. MQ可靠性
    • 5. 消费者的可靠性
  • 3. 延迟消息
    • 1. 定义
    • 2. 死信交换机

1. 消息可靠性

在这里插入图片描述

  • 发送消息时丢失:
    • 生产者发送消息时连接MQ失败
    • 生产者发送消息到达MQ后未找到Exchange
    • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue
    • 消息到达MQ后,处理消息的进程发生异常
  • MQ导致消息丢失:
    • 消息到达MQ,保存到队列后,尚未消费就突然宕机
  • 消费者处理消息时:
    • 消息接收后尚未处理突然宕机
    • 消息接收后处理过程中抛出异常

2. 发送者的可靠性

1. 发送者问题

发送消息时丢失:

  • 生产者发送消息时连接MQ失败
  • 生产者发送消息到达MQ后未找到Exchange
  • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue
  • 消息到达MQ后,处理消息的进程发生异常

2. 生产者重试机制

首先第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断。
为了解决这个问题,SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate与MQ连接超时后,多次重试。
修改publisher模块的application.yaml文件,添加下面的内容:

spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数

3. 生产者确认机制

一般情况下,只要生产者与MQ之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。
不过,在少数情况下,也会出现消息发送到MQ之后丢失的现象,比如:

  • MQ内部处理消息的进程发生了异常

  • 生产者发送消息到达MQ后未找到Exchange

  • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue,因此无法路由。

  • 针对上述情况,RabbitMQ提供了生产者消息确认机制,包括Publisher Confirm和Publisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执。
    在这里插入图片描述

  • 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功

  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功

  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功

  • 其它情况都会返回NACK,告知投递失败
    3.1 使用方法

  1. 在发送者模块配置
    在这里插入图片描述
    2.在发送者中添加配置类
    每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:
    在这里插入图片描述
    在这里插入图片描述
  2. 定义ConfirmCallback
    由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:
    将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执

4. MQ可靠性

消息到达MQ以后,如果MQ不能及时保存,也会导致消息丢失,所以MQ的可靠性也非常重要。
1. 数据持久化
为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:

  • 交换机持久化
  • 队列持久化
  • 消息持久化

**交换机持久化:**在控制台的Exchanges页面,添加交换机时可以配置交换机的Durability参数:
在这里插入图片描述
设置为Durable就是持久化模式,Transient就是临时模式。
2. 队列持久化
在控制台的Queues页面,添加队列时,同样可以配置队列的Durability参数:
在这里插入图片描述

  1. 消息持久化
    在控制台发送消息的时候,可以添加很多参数,而消息的持久化是要配置一个properties:
    在这里插入图片描述
    2. LazyQueue
    一般RabbitMQ将消息存到内存,但是有时候会造成消息积压。
  • 消费者宕机或出现网络故障
  • 消息发送量激增,超过了消费者处理速度
  • 消费者处理业务发生阻塞
    一旦出现消息堆积问题,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。
    为了解决这个问题,从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特征如下:
  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
  • 支持数百万条的消息存储

具体操作

  1. 添加队列时,设置Lazy模式在这里插入图片描述
    代码配置:在这里插入图片描述

5. 消费者的可靠性

当RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。所以需要消费者确认机制

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
    1. 消费者确认机制
    在这里插入图片描述
    在这里插入图片描述
    2. 失败重试机制
    当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。
    极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力:
    Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
    在这里插入图片描述
  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

主要过程:
1)在consumer服务中定义处理失败消息的交换机和队列

@Bean
public DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

2)定义一个RepublishMessageRecoverer,关联队列和交换机

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

完整代码:

package com.itheima.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

3. 业务幂等性问题
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
例如我们当前案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。
在这里插入图片描述
在这里插入图片描述

3. 延迟消息

1. 定义

对于一些库存有限的商品,为了更好的用户体验,通常都会在用户下单时立刻扣减商品库存。
但是这样就存在一个问题,假如用户下单后一直不付款,就会一直占有库存资源,导致其他客户无法正常交易,最终导致商户利益受损!
因此,电商中通常的做法就是:对于超过一定时间未支付的订单,应该立刻取消订单并释放占用的库存。
例如,订单支付超时时间为30分钟,则我们应该在用户下单后的第30分钟检查订单支付状态,如果发现未支付,应该立刻取消订单,释放库存。
在这里插入图片描述

2. 死信交换机

在这里插入图片描述

在这里插入图片描述
1. DelayExchange插件
基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此RabbitMQ社区提供了一个延迟消息插件来实现相同的效果。
在这里插入图片描述
在这里插入图片描述
3. 取消超时订单
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述


http://www.mrgr.cn/news/79905.html

相关文章:

  • java判断文件是否存在
  • [工具升级问题] 钉钉(linux版)升级带来的小麻烦
  • 网络安全论坛
  • 单元测试
  • python脚本将多个txt文件的内容合并为一个
  • 悬赏任务源码(悬赏发布web+APP+小程序)开发附源码
  • MVC配置文件及位置
  • 【C语言】浮点数的原理、整型如何转换成浮点数
  • 计算机组成原理复习
  • 【漏洞复现】CVE-2022-26619 CVE-2022-32994 Arbitrary File Upload
  • 多发电站实现光伏发电预测的统一管理模式
  • CSDN原力值说明
  • mac 安装CosyVoice (cpu版本)
  • 通用定时器之输出比较的功能
  • 0001.简易酒店管理系统后台
  • MOTR: End-to-End Multiple-Object Tracking with Transformer
  • PyQt5入门(四)--------下拉选择框控件(comboBox)
  • 【Neo4J】neo4j docker容器下的备份与恢复
  • 微信小程序web-view 嵌套h5界面 实现文件预览效果
  • 餐饮平台数仓建模案例
  • Spann3R:基于DUSt3R的密集捕获数据增量式重建方法
  • day11 性能测试(4)——Jmeter使用(黑马的完结,课程不全)直连数据库+逻辑控制器+定时器
  • 分布式事物XA、BASE、TCC、SAGA、AT
  • 解决 MyBatis 中空字符串与数字比较引发的条件判断错误
  • ubuntu 安装 docker详细教程
  • 第十九章程序清单合集——Java语言程序设计进阶篇(黑皮书)