当前位置: 首页 > news >正文

【MQ】RabbitMQ 高可用延时功能的探究

延迟消息如果使用延时交换机来实现,如果数据量过大,就会很占 CPU 资源,轻则时间误差大,重则 RabbitMQ 宕机

一、针对一个 RabbitMQ 节点

(1)利用队列+ ttl,将延迟消息根据 delay 的时间进行分级

比如两小时、一天,这两个级别:

  1. 小于两天的消息直接放在延迟交换机
  2. 两天到一天的交给 ttl 为两小时的队列
  3. 其余的交给 ttl 为一天的队列

而两个 ttl 队列绑定同一个死信交换机,死信被咱们的服务处理后重新发送(delay - ttl,或者消息本身携带时间的信息,重新计算 delay),再根据新的 delay 判断前往哪里

这里可行的原因是,队列 + ttl 的 CPU 消耗只用于队列头的那一条消息,在这里就可以保存很多消息

对于这个问题,RabbitMQ 这个插件更适合于 delay 比较均匀的情况,一般到这里就结束了,但是有个坑点,一个热点时间的延迟消息会一起进入延迟交换机

(2)对于热点时间的消息处理

可以在发送到延迟交换机之前,先监控其数量(可以用 Redis 实现一个计数器),控制在一个可控的量级,如 10W,在范围内则发送,否则

  1. 存储起来,可以是 Mysql,也可以是 Redis,或者利用 RabbitMQ 的优先级队列,但是要主动的获取消息而不是监听
  2. 周期性的扫描(五分钟左右),监控 RabbitMQ 延时交换机的健康值,可以加入就加入(delay 小的先加入)(也可以 delay 相同的同一种消息合并成一个,与监听器约定一下这种情况)
  3. 但可能因为没及时扫描,导致新 delay ≤ 0,因此可能产生误差,我们可以将 delay 小于周期间隔的
    1. 接受误差的范围内的不给予处理
    2. 直接本地延时任务、交给 Redis 做延时队列分担多出来的这一份数据量(反正就是天女散花,将这个任务分担给其他服务)

如果是一个热点时间,消息的 delay 都很接近,但其实经过多次周期性的扫描,这个热点时间的消息大部分都可以进入延时交换机,再用 Redis 在最后接近热点时间,“冲刺”的时候助 RabbitMQ 一臂之力

以上是单机情况下我能想到的最好方案了,世界上没有绝对完美的东西,你可能描述一个极端的场景依旧可能出现问题,因此你可以解决其本质原因 “就是一个 MQ 处理不过来”

对于这种大数据量的情况,这一业务独享一个 RabbitMQ,减少 CPU 在其他业务的损耗

二、多个 RabbitMQ 节点相互扶持

因为集群针对的是队列中消息的同步,并不会同步延时交换机暂存的消息,这样不同的 RabbitMQ 去维护各自的延迟交换机就行了,让各个节点的延时交换机的数据量处在安全值范围内,之后到达队列再消息同步

你可能说,那宕机了,岂不是丢失了🤣,我们这样子做就是为了多个节点相互扶持,单机不宕机啊,同步了的话那每个节点都岌岌可危

  1. 旧的延时消息持久化了,有保证可靠性,重启即可
  2. 新的延时消息前往存活的节点

你可以维护一个表,就是延时消息表,设立时间误差进行周期性的扫描,将过期未删除记录的消息重新发送,作为一个兜底**(可以用任务调度平台去控制这些任务吧,如果 MQ 都很健康,其实也没必要一直扫描)**

但这种很极端,因为这样子做的话,我们就相当于不相信 MQ,一棒子打死所有人,但其实消息到 MQ 后,我们就只能全身心相信 MQ 持久化了我们的消息

我们不如直接重启这个挂掉的 MQ

注意:记得给每个节点安装延时交换机的插件

三、ttl 消息 + 优先级队列

RabbitMQ 还可以实现优先级队列

暂时不考虑 CPU 的损耗,我认为在定时方面可以忽略不记
优先级队列还不支持策略定义(但除了一些特殊需要,我们平时也用不到)

因此,如果将消息的 max - ttl 作为优先级,放入优先级队列里,是可以实现队列头为最小的 ttl,最先过期的会排在前面

定义队列的时候,设置 args “x-max-priority” 为 max,max 最大可以取 255,所以其优先级数量只可以为 1-255 个,一些甚至只能是 2-10 个

也就是说 ttl 的可选访问很小(固定只能 1-255 这个整数范围,只能控制时间单位改变其值域)

这个方法实现的延迟功能,取值有限,只能在特殊场景下使用

四、其他

RabbitMQ 的解决方案并不是绝对完美的,如果有特殊要求可以考虑其他的技术栈


http://www.mrgr.cn/news/90111.html

相关文章:

  • Python 脚本实现数据可视化
  • Ubuntu 下 nginx-1.24.0 源码分析 - NGX_HAVE_GETTIMEZONE 宏
  • 原生鸿蒙版小艺APP接入DeepSeek-R1,为HarmonyOS应用开发注入新活力
  • pytest-xdist 进行多进程并发测试
  • k8s部署elasticsearch
  • MATLAB | 基于Theil-Sen斜率和Mann-Kendall检验的栅格数据趋势分析
  • 0 Rust与Qt集成实践指南(CXX-Qt)
  • 使用Redis实现业务信息缓存(缓存详解,缓存更新策略,缓存三大问题)
  • 【学Rust写CAD】5 三维转换矩阵解析及应用示例
  • MySQL数据库 - 阶段性体系总结
  • SQL自学,mysql从入门到精通 --- 第 1 天,系统环境搭建,mysql部署
  • 9.JVM-方法区
  • Java/Kotlin 使用 Chrome 无头浏览器
  • 免费windows pdf编辑工具Epdf
  • 【centos安装mysql数据库】详细版
  • SQL自学,mysql从入门到精通 --- 第 15天,数据导入、导出
  • QT-常见问题
  • 数据结构-find()-判断字符串s1中是否包含字符串s2
  • VirtualBox中Ubuntu 22.04网卡配置以及解决过程中遇到的问题
  • 【C++学习篇】C++11第二期学习
  • Docker 1. 基础使用
  • vue3 -- 基于el-statistic实现动态数字滚动效果并封装卡片组件
  • jupyterLab插件开发
  • 保姆级教程Docker部署Zookeeper模式的Kafka镜像
  • android 动态库加载机制
  • Itext源代码阅读(2) -- PdfReader