MQTT从入门到精通之MQTT进阶
MQTT进阶
1 QoS介绍
1.1 QoS简介
使用MQTT协议的设备大部分都是运行在网络受限
的环境下,而只依靠底层的TCP传输协议,并不能完全保证消息的可靠到达。
MQTT提供了QoS机制,其核心是设计了多种消息交互机制来提供不同的服务质量
,来满足用户在各种场景下对消息可靠性的要求。
MQTT 定义了三个 QoS 等级,分别为:
1、QoS 0,最多交付一次 -----> 可能丢失消息
2、QoS 1,至少交付一次 -----> 可以保证收到消息,但消息可能重复
3、QoS 2,只交付一次 -----> 可以保证消息既不丢失也不重复
QoS等级是由发布者在PUBLISH报文中指定的
,大部分情况下Broker向订阅者转发消息时都会维持原始的 QoS 不变。不过也有一些例外的情况,根据订阅者的订阅要求,
消息的 QoS 等级可能会在转发的时候发生降级。
例如,订阅者在订阅时要求Broker可以向其转发的消息的最大QoS等级为QoS 1,那么后续所有QoS 2消息都会降级至QoS 1转发给此订阅者,而所有QoS 0和QoS 1消息
则会保持原始的QoS等级转发。
1.2 QoS 0原理介绍
1.2.1 通讯原理说明
QoS 0 是最低的 QoS 等级。QoS 0 消息即发即弃,不需要等待确认,不需要存储和重传,因此对于接收方来说,永远都不需要担心收到重复的消息。
涉及到的相关报文:
1.2.2 QoS 0消息丢失原因
当我们使用 QoS 0 传递消息时,消息的可靠性完全依赖于底层的 TCP 协议。
而 TCP 只能保证在连接稳定不关闭的情况下消息的可靠到达
,一旦出现连接关闭、重置,
仍有可能丢失当前处于网络链路或操作系统底层缓冲区中的消息。这也是 QoS 0 消息最主要的丢失场景。
1.3 QoS 1原理介绍
1.3.1 通讯原理说明
为了保证消息到达,QoS 1 加入了应答与重传机制,发送方只有在收到接收方的 PUBACK 报文以后,才能认为消息投递成功,在此之前,发送方需要存储该 PUBLISH 报
文以便下次重传。
QoS 1需要在 PUBLISH 报文中设置 Packet ID,而作为响应的 PUBACK 报文,则会使用与 PUBLISH 报文相同的 Packet ID,以便发送方收到后删除正确PUBLISH
报文缓存。
涉及到的相关报文:
1.3.2 QoS 1 消息会重复原因
对于发送方来说,没收到 PUBACK 报文分为以下两种情况:
1、PUBLISH 未到达接收方
2、PUBLISH 已经到达接收方,接收方的 PUBACK 报文还未到达发送方
在第一种情况下,发送方虽然重传了 PUBLISH 报文,但是对于接收方来说,实际上仍然仅收到了一次消息。
在第二种情况下,在发送方重传时,接收方已经收到过了这个 PUBLISH 报文,这就导致接收方将收到重复的消息。
重传 PUBLISH 报文的时候,PUBLISH 中的 DUP 标志会被设置为 1,用以表示这是一个重传的报文。
1.4 QoS 2原理介绍
1.4.1 通讯原理说明
QoS 2 解决了 QoS 0、1 消息可能丢失或者重复的问题,但相应地,它也带来了最复杂的交互流程和最高的开销。每一次的 QoS 2 消息投递,都要求发送方与接收方
进行至少两次请求/响应流程。
流程说明:
1、首先,发送方存储并发送 QoS 为 2 的 PUBLISH 报文以启动一次 QoS 2 消息的传输,然后等待接收方回复 PUBREC 报文。这一部分与 QoS 1 基本一致,只是响
应报文从 PUBACK 变成了 PUBREC。
2、当发送方收到 PUBREC 报文,即可确认对端已经收到了 PUBLISH 报文,发送方将不再需要重传这个报文,并且也不能再重传这个报文。所以此时发送方可以删除本地
存储的 PUBLISH 报文,然后发送一个 PUBREL 报文,通知对端自己准备将本次使用的 Packet ID 标记为可用了。与 PUBLISH 报文一样,我们需要确保 PUBREL
报文到达对端,所以也需要一个响应报文,并且这个 PUBREL 报文需要被存储下来以便后续重传。
3、当接收方收到 PUBREL 报文,也可以确认在这一次的传输流程中不会再有重传的 PUBLISH 报文到达,因此回复 PUBCOMP 报文表示自己也准备好将当前的 Packet
ID 用于新的消息了。
4、当发送方收到 PUBCOMP 报文,这一次的 QoS 2 消息传输就算正式完成了。在这之后,发送方可以再次使用当前的 Packet ID 发送新的消息,而接收方再次收到使
用这个 Packet ID 的 PUBLISH 报文时,也会将它视为一个全新的消息。
涉及到的报文:
1.4.2 QoS 2消息不会重复原因
消息不丢失原因:与 QoS 1 相同
消息不会重复原因:
快速回顾一下 QoS 1 消息无法避免重复的原因:
当我们使用 QoS 1 消息时,对接收方来说,回复完 PUBACK 这个响应报文以后 Packet ID 就重新可用了,也不管响应是否确实已经到达了发送方。所以就无法得知
之后到达的,携带了相同 Packet ID 的 PUBLISH 报文,到底是发送方因为没有收到响应而重传的,还是发送方因为收到了响应所以重新使用了这个 Packet ID 发
送了一个全新的消息。
所以,消息去重的关键就在于,通信双方如何正确地同步释放 Packet ID,换句话说,不管发送方是重传消息还是发布新消息,一定是和对端达成共识了的。而 QoS 2
中增加的 PUBREL 流程,正是提供了帮助通信双方协商 Packet ID 何时可以重用的能力。
QoS 2 规定,发送方只有在收到 PUBREC 报文之前可以重传 PUBLISH 报文。一旦收到 PUBREC 报文并发出 PUBREL 报文,发送方就进入了 Packet ID 释放流
程,不可以再使用当前 Packet ID 重传 PUBLISH 报文。同时,在收到对端回复的 PUBCOMP 报文确认双方都完成 Packet ID 释放之前,也不可以使用当前
Packet ID 发送新的消息。
因此,对于接收方来说,能够以 PUBREL 报文为界限,凡是在 PUBREL 报文之前到达的 PUBLISH 报文,都必然是重复的消息;而凡是在 PUBREL 报文之后到达的
PUBLISH 报文,都必然是全新的消息。一旦有了这个前提,我们就能够在协议层面完成 QoS 2 消息的去重。
1.5 不同QoS的适用场景
1.5.1 QoS 0
QoS 0 的缺点是可能会丢失消息,消息丢失的频率依赖于你所处的网络环境,并且可能使你错过断开连接期间的消息,不过优点是投递的效率较高。
所以我们通常选择使用 QoS 0 传输一些高频且不那么重要的数据
,比如传感器数据
,周期性更新,即使遗漏几个周期的数据也可以接受。
1.5.2 QoS 1
QoS 1 可以保证消息到达,所以适合传输一些较为重要的数据
,比如下达关键指令、更新重要的有实时性要求的状态
等。但因为 QoS 1 还可能会导致消息重复,所以当
我们选择使用 QoS 1 时,还需要能够处理消息的重复,或者能够允许消息的重复。
消息重复带来的危害:
如果我们不对 QoS 1 进行去重处理,我们可能会遭遇这种情况,发布方以 1、2 的顺序发布消息,但最终订阅方接收到的消息顺序可能是 1、2、1、2。如果 1 表示开
灯指令,2 表示关灯指令,我想大部分用户都不会接受自己仅仅进行了开灯然后关灯的操作,结果灯在开和关的状态来回变化。
1.5.3 QoS 2
QoS 2 既可以保证消息到达,也可以保证消息不会重复,但传输成本最高。如果我们不愿意自行实现去重方案,并且能够接受 QoS 2 带来的额外开销,那么 QoS 2 将是
一个合适的选择。通常我们会在金融、航空
等行业场景下会更多地见到 QoS 2 的使用。
2 主题详解
MQTT 主题本质上是一个 UTF-8 编码的字符串,是 MQTT 协议进行消息路由的基础。MQTT 主题类似 URL 路径,使用斜杠 /
进行分层:
chat/room/1
test/10/temperature
test/+/temperature
test/#
为了避免歧义且易于理解,通常不建议主题以 /
开头或结尾,例如 /chat
或 chat/
。
MQTT 主题不需要提前创建。MQTT 客户端在订阅或发布时即自动的创建了主题,开发者无需再关心主题的创建,并且也不需要手动删除主题。
2.1 主题通配符
MQTT 主题通配符包含单层通配符 +
及多层通配符 #
,主要用于客户端一次订阅多个主题
。
2.1.1 单层通配符
加号 (“+”) 是用于单个主题层级匹配的通配符
。在使用单层通配符时,单层通配符必须占据整个层级,例如:
+ 有效
test/+ 有效
test/+/temperature 有效
test+ 无效(没有占据整个层级)
如果客户端订阅了主题 test/+/temperature
,将会收到以下主题的消息:
test/1/temperature
test/2/temperature
...
test/n/temperature
但是不会匹配以下主题:
test/temperature
test/bedroom/1/temperature
2.1.2 多层通配符
井字符号(“#”)是用于匹配主题中任意层级的通配符
。多层通配符表示它的父级和任意数量的子层级,在使用多层通配符时,它必须占据整个层级并且必须是主题的最后
一个字符
,例如:
# 有效,匹配所有主题
test/# 有效
test/bedroom# 无效(没有占据整个层级)
test/#/temperature 无效(不是主题最后一个字符)
如果客户端订阅主题 test/#
,它将会收到以下主题的消息:
test
test/temperature
test/1/temperature
2.2 系统主题
以 $SYS/
开头的主题为系统主题,系统主题主要用于获取 MQTT 服务器自身运行状态、消息统计、客户端上下线事件等数据
。目前,MQTT 协议暂未明确规定 $SYS/
主题标准,但大多数 MQTT 服务器都遵循该标准建议。
例如,EMQX 服务器支持通过以下主题获取集群状态:
主题 | 说明 |
---|---|
$SYS/brokers | EMQX 集群节点列表 |
$SYS/brokers/emqx@127.0.0.1/version | EMQX 版本 |
$SYS/brokers/emqx@127.0.0.1/uptime | EMQX 运行时间 |
$SYS/brokers/emqx@127.0.0.1/datetime | EMQX 系统时间 |
EMQX 还支持客户端上下线事件、收发流量、消息收发、系统监控等丰富的系统主题,用户可通过订阅 $SYS/#
主题获取所有系统主题消息。
系统主题文档:https://www.emqx.io/docs/zh/v5.0/observability/mqtt-system-topics.html?__hstc=3614191.d3e87b0aa86919c8c8f01e199a29f583.1715423024096.1717117142732.1717123186833.30&__hssc=3614191.2.1717123186833&__hsfp=3042319173#%E5%AE%A2%E6%88%B7%E7%AB%AF%E4%B8%8A%E4%B8%8B%E7%BA%BF%E4%BA%8B%E4%BB%B6
比如订阅客户端上下线事件主题:
$SYS/brokers/emqx@172.17.0.4/clients/+/connected # 订阅客户端上线的主题
$SYS/brokers/emqx@172.17.0.4/clients/+/disconnected # 订阅客户端下线的主题
注意:监听系统主题需要在broker端开通对应的访问权限
3 会话介绍
3.1 MQTT会话
MQTT客户端和MQTT服务器之间的连接被称为会话。每个MQTT客户端都可以启动一个或多个会话,通过会话可以实现客户端和服务器之间的消息传递。
3.2 常见配置参数
3.2.1 Clean Start
Clean Start作用:用于指示客户端在和服务器建立连接的时候应该尝试恢复之前的会话还是直接创建全新的会话。
常见取值以及含义:
0:服务端存在一个关联此客户端标识符(Client ID)的会话,服务端必须基于此会话的状态恢复与客户端的通信(之前的订阅信息会再次绑定,并且会接收到客户端断开
时,发布者所发布的消息)。如果不存在任何关联此客户端标识符的会话,服务端必须创建一个新的会话。
1:客户端和服务端必须丢弃任何已存在的会话,并开始一个新的会话。
3.2.2 Session Expiry Interval
Session Expiry Interval 决定了会话状态数据在服务端的存储时长。
常见取值:
- 没有指定此属性或者设置为 0,表示会话将在网络连接断开时立即结束。
- 设置为一个大于 0 的值,则表示会话将在网络连接断开的多少秒之后过期。
- 设置为 0xFFFFFFFF,即 Session Expiry Interval 属性能够设置的最大值时,表示会话数据永不过期。
4、服务端使用 Client ID
来唯一地标识每个会话,如果客户端想要在连接时复用之前的会话,那么必须使用与此前一致的 Client ID。
3.3 会话演示
具体步骤:
1、在MQTTX中设置关闭自动重订阅功能
2、创建一个名为 sub
的客户端连接,将 MQTT Version 设置为 5.0,开启 Clean Start,Session Expiry Interval 设置为 300 秒,然后链接到MQTT的服
务端,并订阅主题 mqttx_290c747e/test
:
3、创建一个名为 pub
的客户端连接向主题 mqttx_290c747e/test
发布消息,消息内容可以随意设置,我们将看到 sub
客户端收到这些消息。这时我们断开 sub
客户端的连接,然后继续通过 pub
客户端发布消息:
4、接下来,我们将 sub
客户端的 Clean Start 选项关闭,并保持 Session Expiry Interval 为 300 秒,然后再次连接。我们将看到 sub
客户端陆续收到我
们在它离线期间发布的消息:
以上就是 MQTT 会话为离线客户端缓存消息的能力。
4 消息详解
4.1 保留消息
4.1.1 保留消息简介
普通消息:普通消息在发送之前其所对应的主题如果不存在订阅者,普通消息MQTT服务器会直接将其丢弃。
保留消息:保留消息可以保留在 MQTT 服务器中。任何新的订阅者订阅与该保留消息中的主题匹配的主题时,都会立即接收到该消息,即使这个消息是在它们订阅主题之前
发布的。
如下图,当客户端订阅主题时,如果服务端存在该主题匹配的保留消息,则该保留消息将被立即发送给该客户端。
保留消息的常见使用场景:
1、智能家居设备的状态只有在变更时才会上报,但是控制端需要在上线后就能获取到设备的状态;
2、传感器上报数据的间隔太长,但是订阅者需要在订阅后立即获取到最新的数据;
3、传感器的版本号、序列号等不会经常变更的属性,可在上线后发布一条保留消息告知后续的所有订阅者;
4.1.2 保留消息使用
发布保留消息
在发布消息的时候将Retained 标记被设置为 true
,则该消息即是 MQTT 中的保留消息(Retained Message)。
注意事项:
1、可以通过Dashboard查看保留消息
2、MQTT 服务器会为每个主题存储最新一条保留消息
3、在保留消息发布前订阅主题,将不会收到保留消息。需要待保留消息发布后,重新订阅该主题,才会收到保留消息。
保留消息的存储方式
保留消息的存储方式:内存存储(默认存储类型)、磁盘存储
5、保留消息虽然存储在服务端中,但它并不属于会话的一部分。也就是说,即便发布这个保留消息的会话已结束,保留消息也不会被删除。
保留消息的删除方式
1、客户端往某个主题发送一个 Payload 为空的保留消息,服务端就会删除这个主题下的保留消息;
2、在 MQTT 服务器上删除,比如 EMQX MQTT 服务器提供了在 Dashboard 上删除保留消息的功能;
3、MQTT 5.0 新增了消息过期间隔属性,发布时可使用该属性设置消息的过期时间,将会在过期时间后自动被删除。
4.2 消息过期间隔
4.2.1 消息过期间隔简介
MQTT 可以通过Session Expiry Interval
为离线客户端缓存尚未发送的消息,然后在客户端恢复连接时发送。但如果客户端离线时间较长,可能有一些寿命较短的消
息已经没有必要必须发送给客户端了,继续发送这些过期的消息,只会浪费网络带宽和客户端资源。
举例:以联网汽车为例,我们可以向车辆发送建议车速
使它能够在绿灯期间通过路口,这类消息通常仅在车辆到达下一个路口之前有效,生命周期非常短暂。
消息过期间隔是 MQTT 5.0 引入的一个新特性,它允许发布端为有时效性的消息设置一个过期间隔,如果该消息在服务端中停留超过了这个指定的间隔,那么服务端将
不会再将它分发给订阅端。默认情况下,消息中不会包含消息过期间隔,这表示该消息永远不会过期。
注意:如果客户端在发布消息时设置了过期间隔,那么服务端在转发这个消息时也会包含过期间隔,但过期间隔的值会被更新为服务端接收到的值减去该消息在服务端停留的
时间。这可以避免消息的时效性在传递的过程中丢失,特别是在桥接到另一个MQTT 服务器的时候
。
4.2.2 消息过期间演示
具体步骤如下所示:
1、创建一个 pub
的客户端链接,链接到MQTT服务端
2、新建一个名为 sub
的客户端连接用于订阅,并将 Session Expiry Interval 设置为 300 秒
表示这个会话状态的数据在服务端保存300s。
3、连接成功后,我们订阅主题 mqttx_a9d86661/demo
,使用 Client ID 作为主题前缀可以有效避免主题重复:
4、订阅成功后,我们断开 sub
客户端与服务器的连接,然后切换到 pub
客户端,向主题 mqttx_a9d86661/demo
发布以下两条 Message Expiry Interval 分
别为 5 秒和 60 秒的消息:
5、发布完成后,切换到 sub
客户端,将 Clean Session 设置为 false 表示想要恢复之前的会话,然后等待至少 5 秒再重新连接。我们将看到 sub
只收到了过
期时间为 60 秒的消息,因为此时另一条消息已经过期:
4.3 遗嘱消息
4.3.1 遗嘱消息简介
在现实世界中,一个人可以制定一份遗嘱,声明在他去世后应该如何分配他的财产以及应该采取什么行动。在他去世后,遗嘱执行人会将这份遗嘱公开,并执行遗嘱中的指示
在 MQTT中,客户端可以在连接时在服务端中注册一个遗嘱消息
,与普通消息类似,我们可以设置遗嘱消息的主题、有效载荷等等。当该客户端意外断开连接,服务端就会
向其他订阅了相应主题的客户端发送此遗嘱消息
。这些接收者也因此可以及时地采取行动,例如向用户发送通知、切换备用设备等等。
作用:借助于遗嘱消息可以感知到客户端是意外断开
4.3.2 遗嘱消息原理
遗嘱消息指定时机
遗嘱消息在客户端发起连接时指定
,它和 Client ID、Clean Start 这些字段一起包含在客户端发送的 CONNECT 报文中。
与普通消息一样,我们可以为遗嘱消息设置主题(Will Topic)、保留消息标识位(Will Retain)、属性(Will Properties)、QoS(Will QoS)和有效载荷
(Will Payload)。
这些字段的用法与它们在普通消息中时完全相同,只是遗嘱消息可用的属性与普通应用消息略有不同,下表列出了它们的具体区别:
遗嘱消息只是多了一个专属属性:Will Delay Interval。
Will Delay Interval
作用:这个属性决定了服务端将在网络连接关闭后延迟多久发布遗嘱消息,并以秒为单位。
默认情况下,服务端总是在网络连接意外关闭时立即发布遗嘱消息。但是很多时候,网络连接的中断是短暂的,所以客户端往往能够重新连接并继续之前的会话。这导致遗嘱
消息可能被频繁地且无意义地发送。
如果没有指定 Will Delay Interval 或者将其设置为 0,服务端将仍然在网络连接关闭时立即发布遗嘱消息。
但如果将 Will Delay Interval 设置为一个大于 0 的值,并且客户端能够在 Will Delay Interval 到期前恢复连接,那么该遗嘱消息将不会被发布。
遗嘱消息与会话
遗嘱消息是会话状态的一部分,当会话结束,遗嘱消息也无法继续单独存在。但是在遗嘱消息延迟发布期间,会话可能过期,也可能因为客户端在新的连接中设置Clean
Start 为 1 所以服务端需要丢弃之前的会话。为了避免丢失遗嘱,此时服务端必须发布该遗嘱消息,即便 Will Delay Interval 还没有到期。所以服务端最终何时发
布遗嘱消息,取决于 Will Delay Interval 到期和会话结束这两种情况谁先发生。
4.3.3 遗嘱消息演示
具体步骤:
1、建立链接的时候指定遗嘱消息
在这个连接中我们指定了一个主题为 mqttx_8189c0fc/status
,Payload 为 offline 的遗嘱消息,并且将 Will Delay Interval 设置为 5 秒,Session
Expiry Interval 设置为 300 秒:遗嘱消息主题使用 Client ID 作为前缀可以有效避免主题重复:
2、建立新的客户端连接到服务端,并且监听遗嘱消息主题
3、正常关闭第一个链接,不会发送遗嘱消息
4、选择第一个链接右键新建窗口,在新的窗口中进行连接,然后将新的窗口关闭,等待5s就会在原有窗口的第二个链接中获取到遗嘱消息。
4.4 延迟发布
4.4.1 延迟发布简介
延迟发布简介:MQTT服务端收到发布者发布的消息以后,延迟一段时间以后再把消息转发给订阅者
延迟发布的使用场景:
1、农业智能化管理:在智能农业中,可能需要在特定时间启动灌溉系统或调节温室环境。
通过MQTT延迟发布,可以预先设定好指令发布时间,如在清晨或傍晚自动发送开
启灌溉的信号,确保水资源的有效利用且不对作物生长周期造成干扰。
2、能源管理与自动控制:智能家居或智能建筑中的照明、供暖、通风系统可以根据居民生活习惯或节能策略,利用延迟发布在预设时间自动调整,如在居民到家前半小时开
启空调或在离开家后一定时间关闭所有非必要电器,以达到节能减排的目的。
3、公共设施维护:城市中的公共照明、广告牌等设施可能需要在特定时间统一开关,以节省能源或遵守当地法规。通过MQTT延迟发布功能,可以安排在夜间自动发送开关指
令,无需人工干预,简化运维流程。
4、医疗健康监护:在远程医疗监护中,设备可能需要在一天中的特定时间收集患者数据或发送提醒,如定时提醒患者服药或在固定时间收集心率、血压等生理参数,以优化
患者护理计划。
延迟发布主题格式:$delayed/{DelayInterval}/{TopicName}
d e l a y e d : 使用 ‘ delayed: 使用 ` delayed:使用‘delay` 作为主题前缀的消息都将被视为需要延迟发布的消息
DelayInterval: 延迟发布的时间间隔,单位为妙,允许的最大间隔是 4294967 秒。如果 {DelayInterval}
无法被解析为一个整型数字,EMQX 将丢弃该消
息,客户端不会收到任何信息。
TopicName:MQTT消息的主题名称
举例说明:
$delayed/15/x/y:15 秒后将 MQTT 消息发布到主题 x/y。$delayed/60/a/b:1 分钟后将 MQTT 消息发布到 a/b。
4.4.2 延迟发布演示
具体步骤如下所示:
1、通过 Dashboard 配置延迟发布
打开 EMQX Dashboard,在左侧导航菜单中,点击管理-> 延迟发布。
在延迟发布页面,您可以进行以下配置:
- 启用: 启用或禁用延迟发布。默认情况下,已启用。
- 最大延迟消息数:可以指定延迟消息的最大数量。
2、在MQTTX中建立sub的客户端连接,并且添加订阅主题为delay/msg
3、在在MQTTX中建立pub的客户端连接,并且向主题 $delayed/10/delay/msg
4、等待 10 秒钟。您将看到客户端 sub
在 10 秒后接收到延迟消息。
4.5 用户属性
4.5.1 用户属性简介
用户属性简介:
1、MQTT 5.0版本引入的一个新特性。
2、它允许在Publish、Subscribe、Connect、Disconnect等报文中携带附加信息。
3、类似于http协议的请求头
用户属性的应用场景:
1、日志记录:在发布(PUBLISH)和订阅(SUBSCRIBE)报文中加入用户属性,可以帮助记录操作者信息、操作时间、原因说明等,便于后续的审计跟踪和问题排查。
2、消息分类与标记:用户属性可以用来给消息添加标签或分类信息,如消息类型等,使得接收方能根据这些属性对消息进行过滤、排序或特殊处理。
4.5.2 用户属性演示
最为常见的使用场景就是在发布消息的时候指定用户属性。
具体步骤:
1、创建sub
的客户端连接,并且订阅userprop/demo
主题
2、创建pub
的客户端连接,发送消息指定用户属性
3、观察sub
的客户端,可以看到获取到消息的内容同时也获取到了用户指定的属性
5 订阅详解
5.1 订阅选项
5.1.1 订阅选项简介
订阅的组成:
1、主题过滤器:决定了服务端将向我们转发哪些主题下的消息
2、订阅选项:是允许我们进一步定制服务端的转发行为
MQTT 5.0提供了4个订阅选项:QoS、No Local、Retain As Published、Retain Handling
5.1.2 QoS
QoS订阅选项简介
QoS 是最常用的一个订阅选项,它表示服务端在向订阅端发送消息时可以使用的最大QoS等级。
情况1:服务端支持的最大 QoS < 客户端订阅时请求的最大QoS
服务端将无法满足客户端的要求,这时服务端就会通过订阅的响应报文(SUBACK)告知订阅端最终授予的最大 QoS 等级,订阅端可以自行评估是否接受并继续通信。
情况2:订阅时请求的最大QoS < 消息发布时的QoS
为了尽可能地投递消息,服务端不会忽略这些消息,而是会在转发时对这些消息的 QoS 进行降级处理。
QoS订阅选项演示
具体步骤如下所示:
1、创建sub客户端连接,并且订阅sub/qos/demo
主题, 并指定QOS为0
2、订阅成功以后,通过sub客户端连接向sub/qos/demo
主题发布消息,并且指定QOS的登记为1
结果:sub客户端收到的消息的QOS值为0
5.1.3 No Local
No Local订阅选项简介
No Local取值:
1、0(默认值):服务端可以
将消息转发给发布这个消息的客户端
2、1:服务端不可以
将消息转发给发布这个消息的客户端
这个订阅选项尝尝被用在桥接场景
中,桥接本质上是两个 MQTT Server 建立了一个 MQTT 连接,然后相互订阅一些主题,Server 将客户端的消息转发给另一个
Server,而另一个 Server 则可以将消息继续转发给它的客户端。
在桥接的场景中,如果没有将No Local订阅选项的值设置为1,那么此时会形成转发风暴
。
举例:假设两个 MQTT Server 分别是 Server A 和 Server B,它们分别向对方订阅了#
主题。现在,Server A 将一些来自客户端的消息转发给了 Server B,而
当 Server B 查找匹配的订阅时,Server A 也会位于其中。如果 Server B 将消息转发给了 Server A,那么同样 Server A 在收到消息后又会把它们再次转发
给 Server B,这样就陷入了无休止的转发风暴。
而如果 Server A 和 Server B 在订阅 #
主题的同时,将 No Local 选项设置为 1,就可以完美地避免这个问题。
No Local订阅选项演示
具体步骤:
1、创建sub客户端连接,并订阅sub/local/demo
主题,并且设置No Local订阅选项为1
2、使用sub客户端连接向sub/local/demo
主题发布消息
结果:当前的客户端连接没有收到该主题中的消息
5.1.4 Retain As Published
Retain As Published订阅选项简介
Retain As Published取值:
1、0(默认值):服务端在向此订阅转发应用消息时需要清除
消息中的 Retain 标识不变
2、1:服务端在向此订阅转发应用消息时需要保持
消息中的 Retain 标识不变
应用场景:桥接场景
桥接场景下带来了一些问题。我们继续沿用前面的设定,当 Server A 将保留消息转发给 Server B 时,由于消息中的 Retain 标识已经被清除,Server B 将不会
知道这原本是一条保留消息,自然不会再存储它。这就导致了保留消息无法跨桥接使用。
那么在 MQTT 5.0 中,我们可以让桥接的服务端在订阅时将 Retain As Published 选项设置为 1,来解决这个问题。
Retain As Published订阅选项演示
具体步骤如下:
1、创建sub客户端连接,分别订阅主题sub/rap/demo01
和sub/rap/demo02
, 并且将Retain As Published设置为 0 和 1
2、通过sub客户端连接分别向sub/rap/demo01
和sub/rap/demo02
主题发布保留消息
5.1.5 Retain Handling
Retain Handling订阅选项简介
作用:Retain Handling 这个订阅选项被用来向服务端指示当订阅建立时,是否需要发送保留消息
。
Retain Handling常见取值:
1、0(默认值):表示只要订阅建立,就发送保留消息;
2、1:表示只有建立全新的订阅而不是重复订阅时,才发送保留消息;
3、2:表示订阅建立时不要发送保留消息;
Retain Handling订阅选项演示
具体步骤如下所示:
情况1:Retain Handling值设置为0
1、开启客户端的自动重订阅功能
2、创建sub客户端连接(Clean Start值设置为1,并且将Session Expiry Interval设置为300),并且向sub/rh/demo
主题中发布一个保留消息
3、在sub客户端连接中,订阅sub/rh/demo
主题,并且将Retain Handling的值设置为0
结果:只要订阅成功了,那么此时立马会收到保留消息
4、关闭客户端连接,设置客户端的Clean Start值设置为0
表示需要复用之间的会话
注意:只要是重新订阅成功了,那么此时就会收到保留消息
情况2:Retain Handling设置为1
删除sub客户端连接中的订阅,重新订阅sub/rh/demo
主题,并且将Retain Handling的值设置为1
,新建立的订阅是可以获取到保留消息的。
关闭当前连接,重新建立连接【会自动复用之前的订阅】,此时无法获取到保留消息。
情况3: Retain Handling设置为2
删除sub客户端连接中的订阅,重新订阅sub/rh/demo
主题,并且将Retain Handling的值设置为2
结果:即使订阅成功了,那么此时也不会收到保留消息
5.2 共享订阅
5.2.1 共享订阅简介
在普通的订阅中,我们每发布一条消息,所有匹配的订阅端都会收到该消息的副本。当某个订阅端的消费速度无法跟上消息的生产速度时,我们没有办法将其中一部分消息分
流到其他订阅端中来分担压力。这使订阅端容易成为整个消息系统的性能瓶颈。
MQTT 5.0 引入了共享订阅特性,它使得 MQTT 服务端可以在使用特定订阅的客户端之间均衡地分配消息负载。
这表示,当我们有两个客户端共享一个订阅时,那么每个
匹配该订阅的消息都只会有一个副本投递给其中一个客户端。
共享订阅不仅为消费端带来了极佳的水平扩展能力,使我们可以应对更高的吞吐量
,还为其带来了高可用性
,即使共享订阅组中的一个客户端断开连接或发生故障,其他客
户端仍然可以继续处理消息,在必要时还可以接管原先流向该客户端的消息流。
5.2.2 共享订阅分类
启用共享订阅:为一组订阅者的原始主题添加指定前缀
共享订阅分类:
前缀格式 | 示例 | 前缀 | 真实主题名 |
---|---|---|---|
带群组格式 | $share/abc/t/1 | $share/abc | t/1 |
不带群组格式 | $queue/t/1 | $queue/ | t/1 |
带群组的共享订阅
您可以通过在原始主题前添加 $share/<group-name>
前缀为分组的订阅者启用共享订阅。组名可以是任意字符串。EMQX 同时将消息转发给不同的组,属于同一组的订
阅者可以使用负载均衡接收消息。
例如,如果订阅者 s1
、s2
和 s3
是组 g1
的成员,订阅者 s4
和 s5
是组 g2
的成员,而所有订阅者都订阅了原始主题 t1
。共享订阅的主题是
$share/g1/t1
和 $share/g2/t1
。当 EMQX 发布消息 msg1
到原始主题 t1
时:
- EMQX 将
msg1
发送给g1
和g2
两个组。 s1
、s2
、s3
中的一个订阅者将接收msg1
。s4
和s5
中的一个订阅者将接收msg1
。
不带群组的共享订阅
以 $queue/
为前缀的共享订阅是不带群组的共享订阅。它是 $share
订阅的一种特例。您可以将其理解为所有订阅者都在一个订阅组中:
5.2.3 共享订阅演示
1、创建4个客户端连接分别是sub1
、sub2
、sub3
、sub4
,其中s1和s2属于同一个共享订阅组g1, s3和s4属于同一个共享订阅组g2
订阅的主题如下所示:
sub1: $share/g1/t/1
sub2: $share/g1/t/1
sub3: $share/g2/t/1
sub4: $share/g2/t/1
2、创建pub客户端连接,并且向t/1主题发布两条消息观测结果
默认的负载均衡算法:轮询
3、删除sub1、sub2、sub3、sub4的订阅信息,重新添加$queue/t/1
订阅
4、通过pub客户端向t/1主题发布消息观测结果
5.2.5 负载均衡算法
可以通过Dashboard进行负载均衡算法的配置【管理====>MATT配置】:
大致可以分为:
1、随机(Random),在共享订阅组内随机选择一个会话发送消息。
2、轮询(Round Robin),在共享订阅组内按顺序选择一个会话发送消息,循环往复。
3、哈希(Hash),基于某个字段的哈希结果来分配。
4、粘性(Sticky),在共享订阅组内随机选择一个会话发送消息,此后保持这一选择,直到该会话结束再重复这一过程。
6、本地优先(Local),随机选择,但优先选择与消息的发布者处于同一节点的会话,如果不存在这样的会话,则退化为普通的随机策略。
5.3 排它订阅
5.3.1 排它订阅简介
排它订阅允许对主题进行互斥订阅,一个主题同一时刻仅被允许存在一个订阅者,在当前订阅者未取消订阅前,其他订阅者都将无法订阅对应主题。
要进行排它订阅,您需要为主题名称添加$exclusive/
前缀,如以下表格中的示例:
示例 | 前缀 | 真实主题名 |
---|---|---|
$exclusive/t/1 | $exclusive/ | t/1 |
当某个客户端 A 订阅 $exclusive/t/1
后,其他客户端再订阅 $exclusive/t/1
时都会失败,直到 A 取消了对 $exclusive/t/1
的订阅为止。
注意: 排它订阅必须使用 $exclusive/
前缀,在上面的示例中,其他客户端依然可以通过 t/1
成功进行订阅。
订阅失败的常见错误码:
5.3.2 排它订阅演示
默认情况下排它订阅是关闭的。
具体步骤:
1、创建sub1客户端连接,并且添加$exclusive/t/1
订阅
2、在Dashboard开启排它订阅配置【管理====>MATT配置】
3、在sub2客户端连接中重新添加$exclusive/t/1
订阅
4、创建sub2客户端连接,并且添加$exclusive/t/1
订阅
5、创建sub2客户端连接,并且添加t/1
订阅,此时订阅成功
5.4 自动订阅
自动订阅能够给 EMQX 设置多个规则,在设备成功连接后按照规则为其订阅指定主题,不需要额外发起订阅。
5.4.1 配置自动订阅规则
通过 Dashboard 配置自动订阅规则:【管理 ====> MQTT高级特性 =====> 自动订阅 ====> 添加】
5.4.2 演示自动订阅使用
具体步骤:
1、创建pub客户端连接,作为发布者
2、创建sub客户端连接,作为订阅者
3、在pub客户端连接中向a/1主题发布消息