当前位置: 首页 > news >正文

RocketMQ 主题与队列的协同作用解析(既然队列存储在不同的集群中,那要主题有什么用呢?)---管理命令、配置安装(主题、消息、队列与 Broker 的关系解析)

目录

 学习之前呢需要会使用linux的基础命令

一.RocketMQ 主题与队列的协同作用解析

1. ‌主题是消息的逻辑分类与路由标识‌

2. ‌主题实现消息的订阅与消费隔离‌

3. ‌主题为运维提供统一管理入口‌

4. ‌主题与队列的协同模型‌

实际场景示例

小结

二.RocketMQ管理命令 

1、updateTopic

2、deleteTopic

3、topicList

4、topicStatus

5、cleanUnusedTopic

三.RocketMQ安装与配置 

1、首先修改环境变量 

2、解压文件

3、启动NameServer 

4、启动broker 

 很常见的问题​编辑

四.主题、消息、队列与 Broker 的关系解析 

‌1. 主题(Topic)‌

‌2. 消息(Message)‌

‌3. 队列(Queue/Partition/MessageQueue)‌

‌4. Broker‌

‌四者关系总结‌


 学习之前呢需要会使用linux的基础命令

一.RocketMQ 主题与队列的协同作用解析

在 RocketMQ 中,‌主题(Topic)‌与‌队列(Queue)‌的协同设计实现了消息系统的逻辑抽象与物理存储分离。虽然队列实际存储在不同集群的 Broker 节点上,但主题作为逻辑层面的核心单元,仍具有不可替代的作用:

1. ‌主题是消息的逻辑分类与路由标识
  • 逻辑聚合‌:主题将同一类业务消息聚合为逻辑单元(例如订单消息归入 OrderTopic),生产者只需关注发送到哪个主题,无需感知底层队列的物理分布。
  • 路由依据‌:消息通过主题名称路由到对应集群的队列中。例如,生产者发送到 PaymentTopic 的消息会被自动分配到该主题关联的队列(可能分布在多个集群)。
  • 跨集群协同‌:若队列分布在多个集群,主题的元数据(如队列分布规则)由 NameServer 统一管理,确保生产者/消费者能准确找到目标队列。

2. ‌主题实现消息的订阅与消费隔离
  • 订阅关系管理‌:消费者通过订阅主题来接收消息,而非直接绑定队列。即使队列分布在多个集群,消费者组仍可通过主题名称统一订阅,RocketMQ 自动完成队列分配和负载均衡。
  • 权限控制‌:主题支持独立的权限配置(如读写权限),实现不同业务消息的访问隔离。例如,财务系统仅能访问 FinanceTopic,与订单系统隔离。
  • Tag 过滤扩展‌:在主题基础上,通过 Tag 进一步细分消息类型(如 OrderTopic:TagA),消费者可选择性订阅特定 Tag,减少无关消息处理。

3. ‌主题为运维提供统一管理入口
  • 队列动态扩展‌:当集群扩容时,通过调整主题的队列数量(如从 8 队列增至 16 队列),新队列可自动分配到新增集群的 Broker,无需修改业务代码。
  • 监控与告警‌:主题级别的监控指标(如消息堆积量、消费 TPS)便于快速定位问题。例如,InventoryTopic 的消费延迟告警可直接关联到库存业务。
  • 数据生命周期‌:主题支持独立的消息保留策略(如保存 3 天),不同业务可按需配置,避免全局策略的局限性。
4. ‌主题与队列的协同模型
维度主题(Topic)队列(Queue)
定位逻辑分类单元,定义消息身份与权限物理存储单元,实际承载消息数据
扩展性通过增加队列数量实现横向扩展依赖 Broker 集群扩容提升单队列容量
消费模式支持集群消费(单队列单消费者)或广播消费仅作为存储载体,不直接决定消费模式
运维操作配置权限、Tag 规则、保留策略等调整副本数、存储路径、故障迁移等

实际场景示例

  • 场景‌:电商系统的订单消息(OrderTopic)需要支持每秒 10 万条吞吐量。
  • 实现‌:
    1. 创建 OrderTopic 并设置 32 个队列。
    2. 将队列分布在 4 个集群的 Broker 节点上(每个集群 8 队列)。
    3. 消费者组订阅 OrderTopic,RocketMQ 自动将 32 个队列均分给消费者实例。
    4. 当某个集群故障时,NameServer 将故障队列的路由指向其他集群的副本队列,保障高可用

小结

主题的核心价值在于‌逻辑抽象‌,它屏蔽了底层队列的物理复杂性,同时提供业务层面的统一管理入口。队列的分布式存储解决的是‌性能与可靠性问题‌,而主题则定义了消息的‌业务语义与协作规则‌,两者共同构成 RocketMQ 高并发、高可用的基石。

二.RocketMQ管理命令 

1、updateTopic

作用:修改或创建一个Topic

命令:mqadmin updateTopic -b | -c [-h] [-n ] [-o ] [-p ] [-r ] [-s ] -t [-u ] [-w ]

参数:

  • -n: name server地址列表
  • -c: cluster 名称,表示topic 建在该集群
  • -t: 设置topic名称
  • -h: 打印help信息
  • -o: 设置topic是否为有序的 取值:true、false(默认)
  • -p: 设置topic的权限

示例: 

 mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t testtopic

2、deleteTopic

作用:从broker和nameserver删除topic 

命令:mqadmin deleteTopic -c [-h] [-n ] -t

参数:

  • -n: name server地址列表
  • -c: cluster 名称,表示topic 建在该集群
  • -t: 设置topic名称
  • -h: 打印help信息

示例

mqadmin deleteTopic -n localhost:9876 -c DefaultCluster -t testtopic

3、topicList

作用:从nameserver列出所有topic

命令:mqadmin topicList [-c] [-h] [-n ]

参数:

  • -n: name server地址列表
  • -c: cluster 名称,表示topic 建在该集群
  • -h: 打印help信息

示例

mqadmin topicList -n localhost:9876

4、topicStatus

作用:检查topic的状态信息

命令:mqadmin topicStatus [-h] [-n ] -t

参数:

  • -n: name server地址列表
  • -c: cluster 名称,表示topic 建在该集群
  • -t: 设置topic名称
  • -h: 打印help信息

 示例

mqadmin topicStatus -n localhost:9876 -t testtopic

5、cleanUnusedTopic

作用:清理未使用的topic

命令:mqadmin cleanUnusedTopic [-b ] [-c ] [-h] [-n ]

参数:

  • -n: name server地址列表
  • -b: broker地址
  • -c: 集群名称
  • -h: 打印help信息

6关闭namesrv和broker服务

mqshutdown namesrv

mqshutdown broker

三.RocketMQ安装与配置 

1、首先修改环境变量 

vim /etc/profile
# 文件末尾追加改信息
export ROCKETMQ_HOME=/usr/local/rocketmq
export PATH=$PATH:$ROCKETMQ_HOME/bin
# 生效环境变量
source /etc/profile

2、解压文件

unzip rocketmq-all-5.1.4-bin-release.zip

我们上传安装包至opt目录下,使用该命令将其解压至opt目录下,最后使用mv命令对其进行位置移动和改名-----/usr/local/rocketmq

# 解压到当前目录(/opt)
unzip rocketmq-all-5.0.0-bin-release.zip# 将解压后的目录移动到/usr/local
mv rocketmq-all-5.0.0-bin-release /usr/local/# 改名
mv rocketmq-all-5.0.0-bin-release rocketmq

 进入bin目录如下

3、启动NameServer 

nohup sh mqnamesrv &

 

4、启动broker 

nohup sh ./mqbroker -n localhost:9876 & 

使用jps命令查看后发现broker未启动

启动成功后的输出结果 

使用 cat nohup.out 查看日志如下

 很常见的问题

1、java.lang.IllegalAccessError: class org.apache.rocketmq.common.UtilAll (in unnamed module @0x58acb723) cannot access class sun.nio.ch.DirectBuffer (in module java.base) because module java.base does not export sun.nio.ch to unnamed module 

解决:添加启动参数

修改runbroker文件,添加红色参数 $JAVA ${JAVA_OPT} --add-exports=java.base/sun.nio.ch=ALL-UNNAMED $@

2、堆空间初始值太大也报错 

修改文件runbroker.sh(将光标位置修改为如下数字即可)

通过上述修改,将初始堆内存512M,最大堆内存设置为512M,新生代(Java中用于存储创建对象的部分)设置为256M,修改完成后便可以正常启动以及查看日志。 

四.主题、消息、队列与 Broker 的关系解析 

1. 主题(Topic)
  • 定义‌:主题是消息的逻辑分类标识,用于区分不同业务场景的消息类型(如订单、日志)。
  • 与消息的关系‌:每条消息必须属于且仅属于一个主题,生产者根据主题发送消息,消费者按主题订阅消息。
  • 物理实现‌:主题在物理上会被拆分为多个队列(如 Kafka 的 Partition、RocketMQ 的 MessageQueue),以实现分布式存储和并行处理。

2. 消息(Message)
  • 定义‌:消息是传输的最小数据单元,包含业务内容、元数据(如 Topic、Key)等信息。
  • 生命周期‌:生产者将消息发送到指定 Topic,消息被持久化到 Broker 的队列中,最终由消费者消费并确认。

3. 队列(Queue/Partition/MessageQueue)
  • 定义‌:队列是主题的物理存储单元,用于实现消息的顺序性、并行处理和高吞吐。
    • Kafka‌:称为 Partition,每个 Topic 被划分为多个 Partition,分布在不同 Broker 上。
    • RocketMQ‌:称为 MessageQueue,Topic 创建时需指定队列数量,队列与 Broker 绑定。
  • 与 Broker 的关系‌:每个队列实际存储在一个或多个 Broker 中,Broker 负责队列的读写、持久化和负载均衡。

4. Broker
  • 定义‌:Broker 是消息中间件的核心服务节点,负责消息存储、传输和集群管理。
  • 核心功能‌:
    • 接收生产者消息并持久化到队列中。
    • 处理消费者拉取消息的请求,保证消息顺序性和可靠性。
    • 通过主从复制(如 RocketMQ)、分区副本(如 Kafka)实现高可用。
    • 无状态(如 Pulsar Broker)或有状态(如 Kafka Broker)管理消息传输。

四者关系总结

  1. 层级结构‌:
    • 主题‌ → ‌队列‌(逻辑分类 → 物理存储单元)。
    • 队列‌ → ‌Broker‌(存储和处理的物理节点)。
  2. 数据流‌:
    • 生产者 → ‌消息‌ → ‌主题‌ → ‌队列‌ → Broker → 消费者。
  3. 分布式设计‌:
    • 主题通过多队列分布在不同 Broker 上,实现水平扩展和负载均衡。

宝宝们还有什么想知道到的可以评论!!! 


http://www.mrgr.cn/news/100246.html

相关文章:

  • 张 LLM提示词拓展16中方式,提示词
  • 14-DevOps-快速部署Kubernetes
  • 【2025 最新前沿 MCP 教程 01】模型上下文协议:AI 领域的 USB-C
  • YOLO12架构优化——引入多维协作注意力机制(MCAM)抑制背景干扰,强化多尺度与小目标检测性能
  • 【数据可视化-25】时尚零售销售数据集的机器学习可视化分析
  • 【深度强化学习 DRL 快速实践】异步优势演员评论员算法 (A3C)
  • MySQL数据库(基础篇)
  • 【计算机视觉】CV实战项目 - 深入解析基于HOG+SVM的行人检测系统:Pedestrian Detection
  • VScode远程连接服务器(免密登录)
  • 【数据可视化-24】巧克力销售数据的多维度可视化分析
  • Mysql日志undo redo binlog与更新一条数据的执行过程详解
  • 【深度强化学习 DRL 快速实践】Value-based 方法总结
  • RefFormer论文精读
  • 使用 Python 项目管理工具 uv 快速创建 MCP 服务(Cherry Studio、Trae 添加 MCP 服务)
  • 蓝耘平台介绍:算力赋能AI创新的智算云平台
  • (三) Trae 调试C++ 基本概念
  • 开发并发布一个属于自己的包(npm)
  • fps项目总结:生成武器子弹丧尸攻击
  • 从FP32到BF16,再到混合精度的全景解析
  • TortoiseGit使用图解