当前位置: 首页 > news >正文

谷粒商城のRabbitMQ高级篇最终一致性解决方案。

文章目录

  • 前言
  • 一、延迟&死信队列
    • 1、死信队列
    • 2、延迟队列
  • 二、库存解锁的分布式事务最终一致性
    • 2.1、队列架构设计
    • 2、业务设计
  • 三、订单关单的分布式事务最终一致性
    • 2.1、队列架构设计
    • 2、业务设计
  • 3、消息丢失、积压、重复消费解决方案
    • 3.1、消息丢失
    • 3.2、消息重复
    • 3.3、消息积压


前言

  本篇介绍RabbitMQ的延迟队列死信队列,以及利用这些实现订单关单,库存解锁的分布式事务最终一致性解决方案。
  对应视频P292-P300

一、延迟&死信队列

1、死信队列

  死信队列是一种处理"不合格"消息的机制,当消息在原队列中无法被正常消费时,会被转发到另一个队列(死信队列)供后续分析或处理。
  死信产生的原因:

  • 启动手动ACK时,消息被拒收,并且没有重新放回队列。
  • 队列设置了TTL,消息到期后仍然没有被消费。
  • 队列中的消息达到了最大长度,最早入队的消息会被移除。

  死信队列本质上就是普通队列,在构造时的Map<String, Object> arguments参数传递了x-dead-letter-exchange(死信交换机),x-dead-letter-routing-key(死信路由键),该队列中的消息一旦满足上面的条件,就会根据路由键将消息发到指定的交换机上。
在这里插入图片描述

2、延迟队列

  延迟队列用于将消息延迟一段时间后再投递到指定的目标队列。其实现方式通常是通过死信队列结合TTL过期时间。需要在定义死信队列时,Map<String, Object> arguments参数额外传递一个x-message-ttl(过期时间)。如果需要设计一套延迟队列,通常如下:
在这里插入图片描述

二、库存解锁的分布式事务最终一致性

  在前篇中提到,单体事务的@Transcational注解无法控制远程调用的服务的回滚,同时使用seata的AT模式对于下单这样的高并发场景性能损失大。根据BASE理论的最终一致性,可以有如下的方案:

2.1、队列架构设计

在这里插入图片描述
  需要定义两个队列,一个交换机,两个绑定关系:

@Configuration
public class MyMQConfig {//    /**
//     * 队列,交换机是懒加载的,只有第一次监听消息发现不存在的时候才会创建
//     * @param message
//     * @param channel
//     */
//    @RabbitListener
//    public void listener(Message message, Channel channel){
//
//    }/*** 创建交换机* @return*/@Beanpublic Exchange stockEventExchange() {return new TopicExchange("stock-event-exchange", true, false);}/*** 延迟队列 50分钟* @return*/@Beanpublic Queue stockDelayQueue() {HashMap<String, Object> map = new HashMap<>();//死信交换机map.put("x-dead-letter-exchange", "stock-event-exchange");//路由键map.put("x-dead-letter-routing-key", "stock.release");//过期时间map.put("x-message-ttl", 120000);return new Queue("stock.delay.queue", true, false, false, map);}/*** 解锁库存消息队列* @return*/@Beanpublic Queue stockReleaseQueue() {return new Queue("stock.release.stock.queue", true, false, false);}/*** 将延迟队列绑定到交换机* @return*/@Beanpublic Binding delayQueueToExchange() {return new Binding("stock.delay.queue",Binding.DestinationType.QUEUE,"stock-event-exchange","stock.locked",null);}/*** 解锁库存消息队列绑定到交换机* @return*/@Beanpublic Binding stockReleaseToExchange() {return new Binding("stock.release.stock.queue",Binding.DestinationType.QUEUE,"stock-event-exchange","stock.release.#",null);}
}

2、业务设计

  根据上面的绑定关系:

  1. 库存服务在锁定库存后,向stock-event-exchange中使用stock.locked路由键发送一条消息。
  2. 交换机根据路由键找到stock.delay.queue队列。
  3. stock.delay.queue队列中的消息在40分钟后到期,带着stock.release路由键重新回到交换机。
  4. 交换机根据路由键,将消息转发到stock.release.stock.queue队列。
  5. 库存服务监听stock.release.stock.queue队列。

  库存服务监听到stock.release.stock.queue队列的消息后,应该进行判断:

  • 40分钟前锁定的库存工作单是否还存在?如果不存在,说明是库存服务出现问题,库存和订单都进行了回滚,此时无需解锁库存。
  • 根据orderSn查询订单表,可能会有两种情况:
      1. 订单表不存在,说明是下单和扣库存之后又出现了异常,订单回滚了,库存没有回滚,需要解锁库存。
      2. 订单表存在,就要判定状态,如果状态是已取消才需要解锁库存。

  业务实现关键代码:
  类上加@RabbitListener(queues = "stock.release.stock.queue")注解,监听指定的队列。

    @Override@Transactional(rollbackFor = Exception.class)public void stockNum(StockLockVO vo) {log.info("Seata全局事务id=================>{}", RootContext.getXID());String orderSn = vo.getOrderSn();List<CartItem> cartItems = vo.getCartItems();ArrayList<HasStock> hasStocks = new ArrayList<>();//锁定库存之前先创建一个工作单WareOrderTaskEntity wareOrderTaskEntity = new WareOrderTaskEntity();wareOrderTaskEntity.setOrderSn(orderSn);wareOrderTaskDao.insert(wareOrderTaskEntity);for (CartItem cartItem : cartItems) {HasStock hasStock = new HasStock();Long skuId = cartItem.getSkuId();//找到这个skuId在哪个仓库有库存List<Long> wareIds = wareSkuDao.selectWareIdBySkuId(skuId);hasStock.setSkuId(skuId);hasStock.setWareIds(wareIds);hasStock.setCount(cartItem.getCount());hasStocks.add(hasStock);}for (HasStock hasStock : hasStocks) {boolean singleLocked = false;Long skuId = hasStock.getSkuId();Integer count = hasStock.getCount();List<Long> wareIds = hasStock.getWareIds();if (CollectionUtils.isEmpty(wareIds)) {throw new NoStockException(skuId);}//依次锁定库存,这个仓库库存锁定完了,还需要继续锁定,就锁定下一个仓库的for (Long wareId : wareIds) {int resCount = wareSkuDao.stockLock(skuId, wareId, count);if (resCount > 0) {singleLocked = true;//锁定成功,创建库存工作单详情WareOrderTaskDetailEntity wareOrderTaskDetailEntity = new WareOrderTaskDetailEntity();wareOrderTaskDetailEntity.setWareId(wareId);wareOrderTaskDetailEntity.setTaskId(wareOrderTaskEntity.getId());wareOrderTaskDetailEntity.setSkuNum(hasStock.getCount());wareOrderTaskDetailEntity.setSkuId(skuId);wareOrderTaskDetailEntity.setLockStatus(1);wareOrderTaskDetailDao.insert(wareOrderTaskDetailEntity);//向消息队列发送延迟消息 用于后续判断是否需要解锁库存StockLockedTO stockLockedTO = new StockLockedTO();stockLockedTO.setId(wareOrderTaskEntity.getId());stockLockedTO.setDetailId(wareOrderTaskDetailEntity.getId());rabbitTemplate.convertAndSend("stock-event-exchange", "stock.locked", stockLockedTO);break;}}//某个skuId所有仓库都没有锁住if (!singleLocked) {throw new NoStockException(skuId);}}}

  监听stock.release.stock.queue队列,进行库存解锁的逻辑:

    /*** 监听stock.release.stock.queue队列,进行库存解锁*/@RabbitHandlerpublic void stockRelease(StockLockedTO stockLockedTO, Message message, Channel channel) throws IOException {log.info("订单已关闭,准备被动解锁库存");Long id = stockLockedTO.getId();Long detailId = stockLockedTO.getDetailId();//首先用上面两个id查询wareOrderTaskDao和wareOrderTaskDetailDao//如果两张表都不存在,说明是锁库存的时候就报错了,不需要进行处理,订单也跟着回滚了WareOrderTaskEntity wareOrderTaskEntity = wareOrderTaskDao.selectById(id);WareOrderTaskDetailEntity wareOrderTaskDetailEntity = wareOrderTaskDetailDao.selectById(detailId);if (!ObjectUtils.isEmpty(wareOrderTaskDetailEntity) && !ObjectUtils.isEmpty(wareOrderTaskEntity)) {//根据orderSn查询订单表(远程调用订单服务),如果订单表不存在,说明是下单和扣库存之后又出现了异常,订单回滚了,这里也要解锁库存OrderEntity order = null;try {order = orderRemotesServiceClient.getOrder(wareOrderTaskEntity.getOrderSn());} catch (Exception e) {//远程调用失败,拒收消息channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);}Long wareId = wareOrderTaskDetailEntity.getWareId();Long skuId = wareOrderTaskDetailEntity.getSkuId();Integer skuNum = wareOrderTaskDetailEntity.getSkuNum();if (ObjectUtils.isEmpty(order)) {//必须解锁库存this.doStockRelease(wareId, skuId, skuNum,detailId);}if (order.getStatus() == 4) {this.doStockRelease(wareId, skuId, skuNum,detailId);}//手动ackchannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}//  //如果两张表都不存在,说明是锁库存的时候就报错了,不需要进行处理,订单也跟着回滚了else {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}

三、订单关单的分布式事务最终一致性

2.1、队列架构设计

  需要定义两个队列,一个交换机,三个绑定关系在这里插入图片描述

  这一套的绑定关系中,与库存解锁最大的不同在于,订单关单后,还需要通过交换机主动给库存服务的stock.release.stock.queue
队列发送一条消息作为兜底,主动要求库存服务判断是否需要关单。
  原因在于,最初的设计中,订单延迟30分钟关单,库存服务延迟40分钟判断是否需要解锁库存。而订单服务因为各种原因,没能及时消费消息修改订单状态,此时库存解锁就先于关单执行,去查订单的状态还是新建状态,库存服务就不会解锁,并且将消息消费完成。订单服务在这个时候去改状态,库存就无法再次解锁了。

@Configuration
public class MyMQConfig {/*** 创建交换机* @return*/@Beanpublic Exchange orderEventExchange() {return new TopicExchange("order-event-exchange", true, false);}/*** 定义死信队列,消息到达三十分钟后重新回到order-event-exchange交换机* @return*/@Beanpublic Queue orderDelayQueue() {HashMap<String, Object> map = new HashMap<>();//死信交换机map.put("x-dead-letter-exchange", "order-event-exchange");//路由键map.put("x-dead-letter-routing-key", "order.release.order");//过期时间map.put("x-message-ttl", 60000);return new Queue("order.delay.queue", true, false, false,map);}/*** 定义订单释放队列* @return*/@Beanpublic Queue orderReleaseOrderQueue() {return new Queue("order.release.order.queue", true, false, false);}/*** 将order.delay.queue队列和order-event-exchange交换机绑定,路由键order.create.order* @return*/@Beanpublic Binding orderCreateOrderBinding() {return new Binding("order.delay.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.create.order",null);}/*** 将order.release.order.queue队列和order-event-exchange交换机绑定,路由键order.release.order* @return*/@Beanpublic Binding orderReleaseOrderBinding() {return new Binding("order.release.order.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.order",null);}/*** 用于订单关单后,将消息通过order-event-exchange,路由键order.release.other.#,发送给stock.release.order.queue* @return*/@Beanpublic Binding orderReleaseOtherOrderBinding() {return new Binding("stock.release.stock.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.other.#",null);}
}

2、业务设计

  根据上面的绑定关系:

  1. 订单服务下单完成后,向order-event-exchange中使用order.created.order路由键发送一条消息。
  2. 交换机根据路由键找到order.delay.queue队列。
  3. order.delay.queue队列中的消息在40分钟后到期,带着order.release.order路由键重新回到交换机。
  4. 交换机根据路由键,将消息转发到order.release.order.queue队列。
  5. 订单服务监听order.release.order.queue队列。
  6. 订单服务进行判断是否需要关单,如果需要关单,关单完成后再次向order-event-exchange使用order.release.other.#路由键发送一条消息。
  7. 库存服务监听stock.release.stock.queue队列,再次判断是否应该解锁库存。

  订单服务关键代码:
  类上加@RabbitListener(queues = "order.release.order.queue")注解,监听指定队列。

    /*** 提交订单* @param dto   前端传递的订单信息* @return*/@Override@Transactional(rollbackFor = Exception.class)//这里的事务仅仅能保证订单数据入表,不能保证远程查库存
//    @GlobalTransactionalpublic SubmitOrderResponseVO submitOrder(SubmitOrderDTO dto) {//... 创建订单,锁定库存业务			//发送消息到order-event-exchange交换机 30分钟后从order.release.order.queue 队列中取消息,判断状态,是否需要关单this.releaseOrder(vo.getOrder());return vo;}private void releaseOrder(OrderEntity order) {rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", order);}@Overridepublic void doReleaseOrder(OrderEntity order) {Integer status = order.getStatus();//需要关单if (Objects.equals(status, OrderStatusEnum.CREATE_NEW.getCode())) {OrderEntity orderEntityForUpd = new OrderEntity();orderEntityForUpd.setId(order.getId());orderEntityForUpd.setStatus(OrderStatusEnum.CANCLED.getCode());updateById(orderEntityForUpd);//order-event-exchange发消息到 stock.release.order.queue 库存服务监听//为了防止一种极端情况,就是订单关单由于系统卡顿,一直无法进行关单,库存消息优先到期,判断状态一直解锁不了库存//所以在关单之后主动给库存服务发一个消息进行兜底rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", order);}}

  这里写一个类专门去监听消息:

@Slf4j
@Component
@RabbitListener(queues = "order.release.order.queue")
public class MyRabbitListener {@Autowiredprivate OrderService orderService;@RabbitHandlerpublic void releaseOrder(OrderEntity order, Message message, Channel channel) throws IOException {log.info("接收到关单请求,订单号:{}",order.getOrderSn());try {orderService.doReleaseOrder(order);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}
}

  库存服务中,进行处理:

    /*** 监听 关单后 主动发送给队列的消息* @param order* @param message* @param channel* @throws IOException*/@RabbitHandlerpublic void stockRelease(OrderEntity order, Message message, Channel channel) throws IOException {log.info("订单已关闭,准备主动解锁库存");try {//再次判断工作单详情表的状态,筛选出是1的订单进行关单String orderSn = order.getOrderSn();WareOrderTaskEntity wareOrderTaskEntity = wareOrderTaskDao.selectOne(new QueryWrapper<WareOrderTaskEntity>().eq("order_sn", orderSn));Long id = wareOrderTaskEntity.getId();List<WareOrderTaskDetailEntity> wareOrderTaskDetailEntities = wareOrderTaskDetailDao.selectList(new QueryWrapper<WareOrderTaskDetailEntity>().eq("task_id", id));List<WareOrderTaskDetailEntity> collect = wareOrderTaskDetailEntities.stream().filter(wareOrderTaskDetailEntity -> wareOrderTaskDetailEntity.getLockStatus() == 1).collect(Collectors.toList());if (!CollectionUtils.isEmpty(collect)) {for (WareOrderTaskDetailEntity wareOrderTaskDetailEntity : collect) {//解锁doStockRelease(wareOrderTaskDetailEntity.getWareId(), wareOrderTaskDetailEntity.getSkuId(), wareOrderTaskDetailEntity.getSkuNum(),id);}}//手动确认channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {//重新放回队列channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);}}

3、消息丢失、积压、重复消费解决方案

3.1、消息丢失

  关于消息丢失,在RabbitMQ基础篇中已经提到,可以通过回调函数,以及手动ACK的方式解决。除此之外,在进行业务代码编写时,还需要加入重试机制,利用try-catch捕获异常,拒绝签收并重新放回队列。但是不应该无限重试。合理的做法是创建一个消息日志表,记录消息的信息和状态,并且定期扫描数据库对未发送成功的消息进行重发。

3.2、消息重复

  可能会存在这样一种情况,消息已经成功消费,并执行了业务逻辑,在手动ACK的时候系统宕机或出现了异常(执行业务代码和手动ACK不是原子性操作,分为了两步。),这时消息会重新回到ready状态,重新投递。这样就类似于重复提交,可以在业务代码中加上状态位条件校验,或者每条消息附带一个唯一标识(messageId),在处理前检查该消息是否已经处理过,避免重复操作。

3.3、消息积压

  RabbitMQ队列中的消息堆积过多,消费者无法及时处理,导致系统性能下降或消息延迟。如果业务允许,可以对消息进行批量处理(Batch Processing),一次消费多条消息,减少每条消息的处理开销。同时对于不重要或时效性强的消息,如果积压严重,可以设置消息的TTL或使用死信队列,将过期或处理失败的消息转移或丢弃。也可以增加消费者实例来并行处理消息,缓解单个消费者处理慢的问题,或对于耗时较长的操作,可以将其改为异步处理,快速从队列中取出消息,避免队列积压。


下一篇:秒杀服务。


http://www.mrgr.cn/news/54603.html

相关文章:

  • 第十八课:Python学习之多态
  • 平面声波——一维Helmhotz波动方程
  • 基于微博评论的自然语言处理情感分析
  • Ajax:跨域、防抖和节流、HTTP协议
  • PostgreSQL的学习心得和知识总结(一百五十五)|[performance]优化期间将 WHERE 子句中的 IN VALUES 替换为 ANY
  • 常用DateUtils工具类
  • Sourceforge下载镜像选择方法
  • Git Push(TODO)
  • Widget结构(一)
  • Mac M1 修改设置默认 PHP 版本
  • 晶体生长中位错的作用
  • 【你也能从零基础学会网站开发】浅谈一下SQL Server 2000中的日期和时间数据类型
  • Java 数组新手教程一口气讲完!(≧∀≦)ゞ
  • c++ 桶排序(看这一篇就够了)
  • 域渗透之内网渗透 frp内网穿透 环境部署 软件下载地址 实现内网服务访问 端口映射 一步步实现效果 以及Ngrok示例场景讲解
  • 嵌入式开发介绍以及项目示例
  • 相对强弱指标(RSI, Relative Strength Index)
  • IT运维的365天--017 如何在两台Linux服务器之间快速传输文件夹(同时设置免密)
  • 少儿Scratch图形化编程案例100课——005公鸡捉虫
  • 【人工智能-初级】第10章 用Python从零构建简单的神经网络
  • 能够免费剪辑音频的工具有哪些?试试这4款!
  • JS闭包的特性和应用场景
  • Kubernetes GPU 调度和 Device Plugin、CDI、NFD、GPU Operator 概述
  • FastDFS单节点部署
  • 《欢乐饭米粒儿》第九季热播中,今晚精彩继续!
  • PUBG报错:吃鸡请重新安装软件MSVCP140.dll的必备修复方法