RocketMQ场景使用
1.消息丢失
有这么一个场景,就是订单支付完成之后,订单系统会进行发送消息给RocketMQ集群,下游会有积分系统进行监听这个消息,进行消费然后给用户发放积分。在下面的这个场景中,通过查询日志发现了订单系统发送订单支付消息的日志。别的就没有了。
那我们进行分析
1. 发送订单支付消息给RocketMQ集群会出现消息丢失的情况的吗?
会出现消息丢失的情况,可能出现的原因:在发送的过程中,出现了网络抖动,导致了消息没有发送的MQ的集群上;MQ集群收到了消息,但是在进行保存的时候,由于自身的一些bug或者其他原因导致数据没有存储上也是可能的等。
2.消息到达了MQ,MQ会自己导致数据丢失吗?
答案也是可能的,举个例子,我们发送消息到MQ集群上,此时消息还是在os cache中,还是没有写入到磁盘,此时Broker机器发生了宕机,那么消息就会出现丢失的情况。
3.消息到达了MQ,并且MQ将数据写入了磁盘,数据还会丢失吗?
答案还是有可能的,比方说磁盘发生了故障,导致数据的丢失。
4.消费者拿到了消息,消息就不会丢失了吗?
答案还是有可能会丢失的,假如我们的消费者拿到了消息,此时还没有进行消费,此时系统会有自动提交offset的能力,自动提交了offset,同时咱们的消息者宕机了,那么再次重启之后,这条消息就会出现丢失的情况。
解决方案:
针对情况一:我们可以采用RocketMQ的事务消息机制来保证消息发送的可靠性。
步骤一 都失败了,那就没必要进行更新订单状态可以直接返回了;
步骤二失败了 此时就会有步骤五算是一个补偿机制来进行反查状态;
步骤三失败了,会触发步骤四进行RollBack;
步骤四失败了此时就会有步骤五算是一个补偿机制来进行反查状态;
针对情况二,情况三:我们采用同步刷盘和Raft协议主从同步的策略
当我们的Broker接收到消息之后,会先写入到os cache中,此时还没写入到磁盘中,这个时候如果机器宕机了,消息就会丢失了。需要将默认的刷盘机制由异步刷盘改为同步刷盘,同时采用Broker集群的策略,一个Master broker写入成功,一个slave broker写入成功才认为是写入成功。这样的话就算Master Broker宕机了,slave Broker上也有消息,不会出现消息的丢失的情况。
针对情况四:我们采用消息消费完成之后,再进行提交offset。
2.消息重复消费的问题
还是上述的场景,可能会出现消息消费重复的问题。
情况一:生产者进行重复发送了两次消息。
情况二:消费者进行消费了两次消息,在进行第一次消费的时候,代码逻辑处理完成了,但还未来得及进行offset提交,此时机器宕机或者重启,那么下次进行拉取消息的时候,还是会把消息再次拉取一遍,再次进行消费。
针对情况一,我们的解决办法:
1.在进行发送的消息的时候,从MQ中进行查询一下消息是否存在MQ中,如果MQ中存在,那么就没必要进行再次的发送
2.基于Redis缓存的幂等控制:当你成功发送一条消息到MQ之后,就要在Redis缓存中存放一个消息的状态信息。
3.基于业务的判断逻辑:比方说你的业务里有一些操作,是发送完成消息之后就要做一些状态的变更,那么就可以根据状态的变更来做。
其实针对这种发送端来保证发送的幂等性,在我们的实际的开发中并不常用,因为我们基本上都能接受消息在发送端的重复,我们要进行保证的是消息在消费端的幂等性,这种方式跟在发送端保证消息发送的幂等性基本类似。这里就不再赘述。
3.消费者宕机的情况
正常情况下我们进行消费RocketMQ的消息,消费成功的话会进行返回CONSUME_SUCCESS,但当我们消费消息有异常的时候,我们就不会返回CONSUME_SUCCESS而是返回CONSUME_LA TER状态。那RocketMQ收到这个状态之后,会把这个消息放入到重试队列中去,过一段时间,重试队列中的消息会给我们,让我们进行再次消费,这个也是有次数的限制,并不会一直让我们进行重试,默认重试的次数是16次。如果超过了16次还是没成功呢,此时消息就会进入到死信队列中,针对死信队列中的消息要进行如何的处理,就要看我们的业务场景来说,如果需要处理死信队列中数据,我们可以订阅死信队列,然后开启消费。
4.消息乱序的情况
在进行消费消息的时候,会出现消息乱序的情况,我们举一个例子,我们有一个场景是要进行数据同步,在进行数据同步的时候,进行监听的数据库的binlog消息,一般情况下,都是先有insert的binlog,然后才会有update的binlog消息,但是在实际场景中出现了先有update的binlog,然后再有insert的binlog的情况。
出现这种的情况的原因:因为Topic会有多个MessageQueue,咱们再进行发送消息的时候会将消息发送到不同的MessageQueue上,当有两个消费者来进行拉取消息的时候,就会出现update的binlog先进行消费,insert的binlog后进行消费的场景。
那出现这种情况的原因是不在同一个MessageQueue中,那么我们将针对同一个表的binlog消息都写到同一个MessageQueue文件中,那么这个问题是不是就会解决了。答案是未必的,因为在进行消息消费的时候可能会出现失败的情况,如果出现了失败的情况就要进入重试,那么这个时候消息还是会乱序的,那么此时如果针对这种情况的话就不能返回CONSUME_LA TER状态,而是应该返回SUSPEND_CURRENT_QUEUE_A_MOUNT状态,这个状态的意思是等一会,一会再来进行处理这批消息。
5.通过延迟消息解决订单超时未支付的问题
在我们订单系统中,会有下面的一个场景,就是用户下完订单之后不进行支付,那么我们就需要开启一个后台线程不断的进行扫描订单数据库中未支付的订单信息,如果发现订单超过一定时间未进行支付,那么就取消订单。
这种实现方式其实不是很友好,一是订单数据库中的数据很多的时候,你就需要分好几台机器进行扫描订单,这种方式效率并不是很好;二是扫描的时候,会有很多订单都还没到时间了也会进行扫描一次,这种方式并不好。