RabbitMQ 消息队列
1. 消息队列是什么?
当用户注册成功后,就发送邮件。当邮件发送成功了,接口才会提示注册成功信息。但由于发送邮件,依赖于其他厂商的服务,有可能他们的接口会非常耗时。那么用户就一直要等着邮件发送成功了,才提示注册成功。因为耗时很长,这样就会造成很糟糕的体验。
然后大家想一想,我们到底有没有必要等着邮件发送完成呢?就好比大家去邮局寄信,当你把信丢入邮筒里,你需要一直等着信送到别人手里,然后你再离开吗?
当然不需要,只要将信丢入邮筒,我们人就可以离开了。因为你知道,在将来的某个时候,你的信就会被寄到收件人手里。
消息队列
,就好比是这里寄信的过程!消息队列
由三部分构成:
- 生产者(Producer):创建并发送消息,相当于去邮局寄信的人。
- 队列(Queue):用来存放消息的,将消息从生产者传递到消费者,就相当于邮局了。邮局收到你的任务后,它去处理是需要排队的。按照先进先出的原则,需要等其他先来的任务处理完了,才会处理你的任务。
- 消费者(Consumer):接收并处理消息的人。当邮局把信送到他手里了,收到消息了,最终处理任务的人。
还是以我们注册功能为例。当用户注册成功了,我们就通知RabbitMQ,你去发个邮件,然后就直接提示用户注册成功。我们完全没有必要等待邮件发送完成。
接着再写一个程序去接收消息,当它收到消息了,就去执行发邮件任务了。
所以,生产者、消费者,都是我们要写的程序。一个负责提交任务,另一个负责处理任务。互相之间通信,就靠中间的RabbitMQ。
2. 安装
2.1. 使用 Docker
当然,我最推荐的还是用Docker
,因为太方便了,大家打开docker-compose.yml
。增加RabbitMQ
的配置,注意缩进非常重要,一定要和之前的MySQL和Redis的配置对齐
services:mysql:# ....redis:# ....rabbitmq:image: rabbitmq:4.0-managementports:- "5672:5672"- "15672:15672"volumes:- ./data/rabbitmq:/var/lib/rabbitmq # 持久化数据environment:RABBITMQ_DEFAULT_USER: adminRABBITMQ_DEFAULT_PASS: xw
- 我们这里使用的
RabbitMQ
版本号是:4.0
。 - 将RabbitMQ运行的默认端口
5672
和15672
,映射到本机上。 - 其中
15672
是RabbitMQ自带的管理后台端口号
。 - 接着将
RabbitMQ
的用来持久化数据的备份数据
,映射
到项目根目录的data/rabbitmq
目录中。 - 最下面的是
RabbitMQ
用来连接的账号密码,大家可以根据需要自己设置。
完成后,命令行,再次运行
docker-compose up -d
这样Docker,就会自动下载,并启动好RabbitMQ。
2.2. 直接安装
对于部分Windows
上用不了Docker
的同学,那就有点麻烦了。因为既要安装Erlang
,又要安装RabbitMQ
。这里附上下载地址:
-
Erlang下载地址:https://www.erlang.org/downloads
- -
RabbitMQ下载地址:https://www.rabbitmq.com/docs/install-windows#downloads
-
对于这种安装方式,如果在安装过程出现了各种问题,请来讨论群中和我讨论。总而言之,我还是推荐大家尽可能,想办法将Docker
跑通。
2.3.1 安装完Erlang
和RabbitMQ
后,添加环境变量
打开环境变量设置窗口: 右键点击 “
此电脑
”,选择 “属性
”。 在弹出的窗口中,点击 “高级系统设置
”。 在 “系统属性
” 窗口中,点击 “环境变量
” 按钮。 添加RabbitMQ
命令路径到系统的 Path 变量
: 在 “系统变量
” 列表中找到 “Path
” 变量,选中它
并点击 “编辑
” 按钮。 在弹出的 “编辑环境变量
” 窗口中,点击 “新建
”,然后将RabbitMQ 的 sbin目录路径
添加进去,
例如C:\Program Files\RabbitMQServer\rabbitmq_server-<version>\sbin
。注意要将 替换为你实际安装的 RabbitMQ 版本号。 点击 “确定
” 保存设置,依次关闭所有窗口具体选择你自己安装的路径
>。
2.3.2 通过命令行(管理员)安装rabbitmq_management 插件
RabbitMQ 的管理界面依赖于rabbitmq_management 插件
rabbitmq-plugins enable rabbitmq_management
2.3.3 通过命令行(管理员)运行
rabbitmq-service start
2.3. RabbitMQ 管理后台
启动好后,稍等个几秒钟,用浏览器访问:http://127.0.0.1:15672/
,就能进入RabbitMQ
的管理界面了
以游客模式登录 账号:guest 密码:guest
2.3.2.1 新增账号密码
输入刚才docker-compose.yml
里的设置账号密码,就能进去了。里面的东西很多,先不用管它,一会儿再来看。
3. 官方案例解析
学习任何东西之前,最好的办法,无疑就是看官方文档了。
直接看JavaScript
章节的Hello World
部分。很明显,用之前先要装包
npm i amqplib
amqplib官方文档
3.1. 生产者:send.js
在public
目录里,新建一个send.js
,代码大家从讲义文档直接复制过来。
amqplib.connect(‘amqp://admin:xw@localhost
’) 参数详解
字段 | 注解 |
---|---|
amqp:// | 表示使用 AMQP 协议进行连接 |
admin | 连接到 RabbitMQ 服务器时使用的用户名,用于身份验证 |
xw | 是与用户名对应的密码,用于验证用户身份。用户名和密码通过冒号 : 分隔 |
localhost | 指定了 RabbitMQ 服务器所在的主机地址,这里表示服务器运行在本地机器上。如果 RabbitMQ 服务器部署在其他机器上,需要将 localhost 替换为实际的 IP 地址或域名 |
const amqplib = require('amqplib');(async () => {try {const connection = await amqplib.connect('amqp://admin:xw@localhost');// 创建一个通道。通道是进行通信的基本单位,通过通道可以发送和接收消息const channel = await connection.createChannel();// 队列的名字是:helloconst queue = 'hello';// 要发送的消息内容是:你好,xw!const msg = '你好,xw!';// 创建一个队列。如果队列不存在,则创建一个队列。如果队列已经存在,则不会创建新的队列// durable: 表示队列是否持久化。如果设置为true,即重启后队列不会消失await channel.assertQueue(queue, { durable: true });// 发送消息到队列// queue: 要发送消息的队列的名字// content: 要发送的消息内容// persistent: true,消息持久化,确保消息在 RabbitMQ 重启后仍然存在。channel.sendToQueue(queue, Buffer.from(msg), { persistent: true });// 打印发送提示信息console.log('[x] 发送了:%s', msg);// 500ms 后关闭连接setTimeout(() => {connection.close();process.exit(0);}, 500);} catch (error) {console.log(error);}
})();
我们一点点分析下,它这里都写了些什么东西。
1、连接 const connection = await amqplib.connect(‘amqp://admin:clwy1234@localhost’)
2、创建一个通道。通道是进行通信的基本单位,只有通过通道,才可以发送和接收消息
3、我们定义了队列的名字是:hello。并定义了要发送的消息内容是:你好,xw!
4、创建了一个队列。但如果队列已经存在,那就直接使用,不会创建新的了。这里要把队列的名字放进去,也就是hello
这里还有个参数:durable: false
,它表示队列是否持久化
false:不持久化,true:队列的信息会被写入磁盘,也就是docker-compose里设置的./data/rabbitmq,当RabbitMQ服务重启后,队列会被自动恢复
。
5、channel.sendToQueue
,很明显,这就是将消息发送到队列了。
接着做了个console.log提示,可以显示在命令行里,便于我们观察。
最后面,等500ms后,将连接断开
看完后,我们就先将它跑起来,看看到底有个什么效果。用cd命令,进入public目录中。然后直接用node运行这个文件
cd public
node send.js
打开RabbitMQ的管理后台:http://127.0.0.1:15672/
里面的菜单,依次是:
- 概览:Overview
- 连接:Connections
- 通道:Channels
- 交换机:Exchanges
- 队列和流:Queues and Streams
- 管理用户:Admin
由于代码里,在500ms之后,就将连接关闭了。所以连接和通道里,都看不到东西。大家点击队列这里,里面可以看到hello队列了。这正是我们刚才创建的队列名字。
点进去看,里面可以看到队列的一些详细内容。
-
上面有一些统计信息。一共收到了一条信息,准备好的有几条,在内存中的又几条,占有了多少空间、多少内存什么的。
-
再往下看,注意这里有个
发送消息(Publish message)
和获取消息(Get message)
。 -
我们先点
获取消息
,这里就能看到刚才发送的:你好,xw
4.2. receive.js
刚才是生产者的代码
,我们接着继续看消费者的代码
。在public目录中新建receive.js
,
const amqplib = require('amqplib');(async () => {try {// 连接到 RabbitMQconst connection = await amqplib.connect('amqp://admin:clwy1234@localhost');// 创建一个通道。通道是进行通信的基本单位,通过通道可以发送和接收消息const channel = await connection.createChannel();// 队列的名字是:helloconst queue = 'hello';// 创建一个队列。如果队列不存在,则创建一个队列。如果队列已经存在,则不会创建新的队列// durable: 表示队列是否持久化。如果设置为true,即重启后队列不会消失await channel.assertQueue(queue, { durable: true });// 打印等待接收消息的提示信息console.log(' [*] 等待接收消息在 %s 队列中. 按 CTRL+C 退出', queue);// 当接收到消息channel.consume(queue, (msg) => {// 打印接收到的消息内容console.log('[x] 接收到了:%s', msg.content.toString());// 如果不是自动确认,需要手动确认消息// channel.ack(msg);}, {// noAck: 表示是否自动确认消息,设置为true表示自动确认,设置为false表示手动确认// 如果设置为false,需要手动确认消息,否则消息会被重复消费。例如:channel.ack(msg)noAck: true});} catch (error) {console.log(error);}
})();
我们来运行一下,
node receive.js
再回去刷新页面,也多刷新个几次。因为刚才的两条消息都已经处理完成了,所以现在的准备好的(Ready)
和在内存中的(In memory)
都是0
了。
再点击连接(Connections)
,因为我们这次的消费者代码
,并没有关闭连接
,所以一直连在RabbitMQ上
。
点击通道(Channels)
,里面也能看到创建的通道。
4. noAck 自动、手动确认消息
关于自动确认消息,大家可以改为
noAck: false
ack(确认) | 当消费者成功处理完一条消息后,会向 RabbitMQ 服务器发送一个确认信号(ack),告知服务器该消息已经被成功消费,服务器可以将该消息从队列中移除 |
---|---|
nack(否定确认) | 当消费者处理消息失败或者遇到某些异常情况时,会向 RabbitMQ 服务器发送一个否定确认信号(nack),表示消息处理未成功。根据不同的配置,服务器会对该消息进行不同的处理,比如重新入队、丢弃等。 |
channel.nack(message, multiple, requeue); 参数
参数 | 注释 |
---|---|
message | 该参数代表需要进行否定确认的消息对象,也就是消费者从队列中接收到的消息。这个消息对象包含了消息的内容、属性等信息 |
multiple | 这是一个布尔类型的参数,用于指定是否批量否定确认消息。当 multiple 为 true 时,表示会将当前消费者已经接收到但还未确认(包括 ack 和 nack)的所有消息进行否定确认。这在需要一次性处理多个未确认消息时很有用,可以提高处理效率。当 multiple 为 false 时,仅对当前传入的这一条消息进行否定确认。 |
requeue | 同样是布尔类型的参数,用于控制消息在被否定确认后的去向。当 requeue 为 true 时,消息会被重新放回原队列中,通常会被放置在队列的尾部,之后可能会再次被分发给其他消费者或者当前消费者(取决于 RabbitMQ 的分发策略)。这种方式适用于消息处理失败是由于临时原因导致的情况,例如网络抖动、数据库短暂不可用等,让消息有机会被重新处理。当 requeue 为 false 时,消息不会被重新放回原队列,而是会被丢弃或者根据死信队列(Dead Letter Queue,DLQ)的配置进行处理。死信队列用于存储那些无法被正常消费的消息,方便后续分析和处理 |
这样就需要手动确认消息了,我们先不确认。将接收消息的终端重启一下,重新发送一次,可以看到收到信息了。
按CTRL+C
停止后,再次接收消息,发现还能接收到这个消息。这就错了,按道理消息被处理完成后,就不应该重复收到了。
这就是因为,由于我们没有手动确认,所以RabbitMQ
,认为消息一直没有被处理成功,就会不断的发过来,让你处理。
加上
// 当接收到消息
channel.consume(queue, function (msg) {console.log('[x] 接收到了:%s', msg.content.toString());// 如果不是自动确认,需要手动确认消息channel.ack(msg);}, {noAck: false
});
CTRL+C
停止后,重新连接,再接收一次。现在的代码里,已经将消息确认了。再次按CTRL+C
停止,再连接上去,就不会收到重复的消息
了。
那么ack
设计的目的,是让你有更高的灵活度。因为有些任务的执行,有可能失败,需要重试。我这里有个代码示例,大家可以参考一下。
// 模拟处理逻辑
channel.consume(queue, function (msg) {try {// 假设处理成功,手动确认消息channel.ack(msg);} catch (error) {// 处理失败,可以选择拒绝消息,将消息从队列中删除// nack:否定确认):当消费者处理消息失败或者遇到某些异常情况时,会向 RabbitMQ 服务器发送一个否定确认信号(nack),表示消息处理未成功。根据不同的配置,服务器会对该消息进行不同的处理,比如重新入队、丢弃等。channel.nack(msg, false, false);console.error(error);}
}, {noAck: false
});
总结
1、消息队列的组成
基础的消息队列,由三部分组成:生产者、队列和消费者。
名称 | 说明 |
---|---|
生产者 | 将消息发送到队列 |
队列 | 临时存储消息,按照先进先出的原则,在生产者和消费者之间传递消息 |
消费者 | 监听队列,收到消息后进行处理。 |
2、RabbitMQ 的方法
方法 | 说明 |
---|---|
const connection = amqplib.connect | 连接到 RabbitMQ |
connection.close | 关闭连接 |
const channel = connection.createChannel | 创建通道 |
channel.assertQueue | 创建队列 |
channel.sendToQueue | 发送消息 |
channel.consume | 接收消息 |
channel.ack | 确认消息,消息被成功处理。 |
channel.nack | 拒绝消息,消息处理失败。 |
3、nack的参数
参数 | 注释 |
---|---|
message | 该参数代表需要进行否定确认的消息对象,也就是消费者从队列中接收到的消息。这个消息对象包含了消息的内容、属性等信息 |
multiple | 这是一个布尔类型的参数,用于指定是否批量否定确认消息。当 multiple 为 true 时,表示会将当前消费者已经接收到但还未确认(包括 ack 和 nack)的所有消息进行否定确认。这在需要一次性处理多个未确认消息时很有用,可以提高处理效率。当 multiple 为 false 时,仅对当前传入的这一条消息进行否定确认。 |
requeue | 同样是布尔类型的参数,用于控制消息在被否定确认后的去向。当 requeue 为 true 时,消息会被重新放回原队列中,通常会被放置在队列的尾部,之后可能会再次被分发给其他消费者或者当前消费者(取决于 RabbitMQ 的分发策略)。这种方式适用于消息处理失败是由于临时原因导致的情况,例如网络抖动、数据库短暂不可用等,让消息有机会被重新处理。当 requeue 为 false 时,消息不会被重新放回原队列,而是会被丢弃或者根据死信队列(Dead Letter Queue,DLQ)的配置进行处理。死信队列用于存储那些无法被正常消费的消息,方便后续分析和处理 |
4、 RabbitMQ 的参数
字段 | 说明 |
---|---|
durable: true | 队列持久化,队列信息将被写入磁盘,在RabbitMQ服务重启后仍然存在。但是队列已经创建过后,这个参数就没法改了,可以删掉队列,重新建一个新的。 |
persistent: true | 消息持久化,消息将会被写入磁盘,在RabbitMQ服务重启后仍然存在。 |
noAck: true | 自动确认消息,只要收到了就会自动确认。如果设置成false,则需要手动确认。 |
使用RabbitMQ发送邮件