当前位置: 首页 > news >正文

【RabbitMQ消息队列】详解(一)

初识RabbitMQ

RabbitMQ 是一个开源的消息代理软件,也被称为消息队列中间件,它遵循 AMQP(高级消息队列协议),并且支持多种其他消息协议。
在这里插入图片描述

核心概念

  • 生产者(Producer):创建消息并将其发送到 RabbitMQ 的应用程序。生产者并不关心消息会被发送到哪个队列,而是将消息发送到交换器(Exchange)。
  • 消费者(Consumer):从队列中获取消息并进行处理的应用程序。消费者监听特定的队列,一旦队列中有新消息,就会将其取出并处理。
  • 队列(Queue):是 RabbitMQ 内部用于存储消息的缓冲区。它是一个先进先出(FIFO)的数据结构,多个生产者可以向同一个队列发送消息,多个消费者也可以从同一个队列接收消息。
  • 交换器(Exchange):接收生产者发送的消息,并根据路由规则将消息路由到一个或多个队列。常见的交换器类型有直连交换器(Direct Exchange)、主题交换器(Topic Exchange)、扇形交换器(Fanout Exchange)和头交换器(Headers Exchange)。
  • 绑定(Binding):是交换器和队列之间的关联关系。通过绑定,交换器可以知道将消息路由到哪些队列。

工作模式

  • 简单模式(Simple Mode):一个生产者将消息发送到一个队列,一个消费者从该队列接收消息。
  • 工作队列模式(Work Queues):多个消费者从同一个队列中竞争获取消息,以实现任务的分发和负载均衡。
  • 发布 / 订阅模式(Publish/Subscribe):生产者将消息发送到扇形交换器,交换器将消息广播到所有绑定的队列,每个绑定的队列都有一个消费者接收消息。
  • 路由模式(Routing):生产者将消息发送到直连交换器,交换器根据消息的路由键将消息路由到绑定键与之匹配的队列。
  • 主题模式(Topics):生产者将消息发送到主题交换器,交换器根据消息的路由键和绑定键的匹配规则将消息路由到相应的队列。

优势

  • 解耦:生产者和消费者之间通过消息队列进行通信,它们不需要直接相互了解,从而降低了系统之间的耦合度。
  • 异步通信:生产者发送消息后可以继续执行其他任务,不需要等待消费者处理完消息,提高了系统的响应速度和吞吐量。
  • 流量削峰:在高并发场景下,消息队列可以作为缓冲区,将大量的请求暂存起来,避免后端服务因瞬间高流量而崩溃。
  • 可扩展性:可以通过增加消费者的数量来提高系统的处理能力,以应对不断增长的业务需求。

应用场景

  • 异步处理:例如用户注册时,将发送邮件、短信等通知的任务放入消息队列,由专门的消费者异步处理,提高注册接口的响应速度。
  • 系统解耦:在微服务架构中,各个服务之间通过消息队列进行通信,降低服务之间的耦合度,提高系统的可维护性和可扩展性。
  • 流量削峰:在电商系统的秒杀活动中,将用户的请求放入消息队列,后端服务按照一定的速率从队列中取出请求进行处理,避免系统因瞬间高流量而崩溃。
  • 日志收集:将各个服务产生的日志信息发送到消息队列,由专门的日志处理服务从队列中获取日志信息进行存储和分析。

与其他消息队列的比较

  • Kafka:Kafka 是一个高吞吐量的分布式消息系统,主要用于处理大规模的实时数据流。与 RabbitMQ 相比,Kafka 的吞吐量更高,更适合处理海量数据的实时传输和处理,但在消息的可靠性和灵活性方面相对较弱。
  • ActiveMQ:ActiveMQ 是一个老牌的消息队列中间件,支持多种消息协议,功能较为全面。与 RabbitMQ 相比,ActiveMQ 的性能相对较低,配置和管理也较为复杂。

RabbitMQ 是一个功能强大、性能稳定、易于使用的消息队列中间件,广泛应用于各种分布式系统和微服务架构中。

生产者

1. 导入 pika

import pika

pika 是 Python 里用于与 RabbitMQ 消息代理交互的库,借助它能够实现消息的发送与接收。

2. 建立与 RabbitMQ 服务器的连接

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

pika.ConnectionParameters('localhost'):创建一个连接参数对象,这里指定 RabbitMQ 服务器的地址为 localhost,也就是本地机器。要是你的 RabbitMQ 服务器部署在其他机器上,就得把 localhost 替换成对应的 IP 地址或者域名。
pika.BlockingConnection():构建一个阻塞式的连接对象。所谓阻塞式连接,意味着在连接建立之后,程序会暂停等待,直到操作完成。

3. 创建通道

channel = connection.channel()

在 RabbitMQ 里,通道是进行大部分操作的基础。通道是轻量级的连接,可在一个连接上创建多个通道,以此来提升效率。借助通道,你能够声明队列、发送和接收消息等。

4. 声明队列

channel.queue_declare(queue='queue1')

channel.queue_declare():这是一个方法,其用途是声明一个队列。如果指定的队列不存在,就会创建该队列;若队列已经存在,则不会有任何影响。
queue='queue1':指定队列的名称为 queue1

5. 发布消息到队列

channel.basic_publish(exchange='', routing_key='queue1', body='hello world!')

channel.basic_publish():这是一个用于发布消息的方法。
exchange='':指定交换器的名称。这里为空字符串,代表使用默认的交换器(也就是简单模式)。默认交换器会根据路由键把消息路由到对应的队列。
routing_key='queue1':指定路由键。由于使用的是默认交换器,所以消息会被路由到名称为 hello 的队列。
body='hello world!':指定要发送的消息内容,此处消息内容为 hello world!

import pika# 1.建立与 RabbitMQ 服务器的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))# 2.创建通道
channel = connection.channel()# 3.声明队列
channel.queue_declare(queue='queue1')# 4.发布消息到队列
channel.basic_publish(exchange='', routing_key='queue1', body='hello world!')channel.close()connection.close()

消费者

1. 定义回调函数

def callback(ch, method, properties, body):print(" [x] Received %r" % body)

这是一个回调函数,当消费者从队列中接收到消息时,会调用这个函数。
函数的参数含义如下:
ch:通道对象,用于与 RabbitMQ 进行交互。
method:包含消息传递的元数据,如路由键、交换器等。
properties:消息的属性,如消息头、优先级等。
body:消息的实际内容,以字节形式表示

2.开始消费消息

channel.basic_consume(queue='queue1', auto_ack=True, on_message_callback=callback)

channel.basic_consume():该方法用于启动一个消费者,从指定队列中接收消息。
queue='queue1':指定要消费的队列名称为 queue1
auto_ack=True:表示自动确认消息。当消费者接收到消息后,会立即向 RabbitMQ 发送确认信号,告知 RabbitMQ 该消息已被处理,可以从队列中删除。
on_message_callback=callback:指定当接收到消息时要调用的回调函数。

3. 启动消息消费循环

channel.start_consuming()

channel.start_consuming():启动一个无限循环,持续从队列中接收消息,并调用回调函数处理接收到的消息。直到程序被手动终止(如按下 CTRL+C)。

import pika# 1.建立与 RabbitMQ 服务器的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))# 2.创建通道
channel = connection.channel()# 3.声明队列
channel.queue_declare(queue='queue1')# 4.定义回调函数
def callback(ch, method, properties, body):print(" [x] Received %r" % body)# 5.开始消费消息
channel.basic_consume(queue='queue1', auto_ack=True, on_message_callback=callback)# 6.启动消息消费循环
channel.start_consuming()

1.手动确认消息

当把 auto_ack 参数设为 False 时,意味着消费者不会自动向 RabbitMQ 服务器确认消息已被处理。而是需要你在代码里手动发送确认信号,这能够保证消息在消费者真正处理完成后才会从队列中移除,避免因消费者处理消息时出错或崩溃而导致消息丢失。

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='queue1')def callback(ch, method, properties, body):print(" [x] Received %r" % body)# 模拟消息处理try:# 这里可以添加实际的消息处理逻辑print("Processing the message...")# 手动确认消息ch.basic_ack(delivery_tag=method.delivery_tag)print(" [x] Message processed and acknowledged.")except Exception as e:print(f"Error processing message: {e}")# 可以选择拒绝消息并将其重新放回队列# ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)channel.basic_consume(queue='queue1',auto_ack=False,on_message_callback=callback)print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

2.持久化队列

channel.queue_declare(queue='queue1', durable=True) 这行代码的作用是声明一个持久化队列。
durable=True:此参数用于设置队列的持久化属性。当 durable 被设置为 True 时,表示这个队列是持久化的。
持久化队列的主要特点是,在 RabbitMQ 服务器重启后,队列不会丢失。即使服务器崩溃或重启,队列的定义和其中未被消费的消息都会被保留下来。
与之相对,如果 durableFalse(默认值),那么队列是非持久化的,在服务器重启后,该队列及其包含的消息都会被清除。

import pika# 建立与 RabbitMQ 服务器的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()# 声明一个持久化队列
channel.queue_declare(queue='queue1', durable=True)# 发布持久化消息到队列
channel.basic_publish(exchange='',routing_key='queue1',body='hello world!',properties=pika.BasicProperties(delivery_mode=2  # 使消息持久化)
)print(" [x] Sent 'hello world!'")# 关闭连接
channel.close()
connection.close()

3.公平分发参数

import pika# 建立与 RabbitMQ 服务器的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()# 声明独占队列
channel.queue_declare(queue='exclusive_queue', exclusive=True)def callback(ch, method, properties, body):print(" [x] Received %r" % body)# 开始消费消息
channel.basic_consume(queue='exclusive_queue',on_message_callback=callback,auto_ack=True)print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

prefetch_count 用于设置每个消费者在同一时间最多能处理的未确认消息数量。默认情况下,RabbitMQ 会采用轮询的方式将消息依次分发给各个消费者,不管消费者当前的处理能力如何。通过设置 prefetch_count,可以实现公平分发,让处理能力强的消费者能处理更多的消息。

4.独占队列参数

channel.queue_declare(queue='exclusive_queue', exclusive=True):声明一个名为 exclusive_queue 的独占队列。
独占队列通常用于临时存储只与当前连接相关的消息,当连接关闭时,队列会自动清理。


http://www.mrgr.cn/news/100496.html

相关文章:

  • 从视频中学习:从Humanoid-X、UH-1的自动打字幕,到首个人形VLA Humanoid-VLA(自监督数据增强且整合第一人称视角)
  • WPF常用技巧汇总 - Part 2
  • DeepSeek 多头潜在注意力(Multi-Head Latent Attention, MLA)技术
  • STM32F103_HAL库+寄存器学习笔记21 - CAN接收过滤器:CPU减负神器,提升系统效率的第一道防线
  • Drivestduio 代码笔记与理解
  • 论文检索相关网站
  • 大模型API密钥的环境变量配置(大模型API KEY管理)(将密钥存储在环境变量)(python-dotenv)(密钥管理)环境变量设置环境变量
  • 新能源汽车运动控制器核心芯片选型与优化:MCU、DCDC与CANFD协同设计
  • T检验、F检验及样本容量计算学习总结
  • Simulink与C的联合仿真调试
  • Servlet (简单的servlet的hello world程序)
  • 小白自学python第一天
  • 鸿蒙NEXT开发正则工具类RegexUtil(ArkTs)
  • 数据库3,
  • 深度解析:具身AI机器人领域最全资源指南(含人形机器人,多足机器人,灵巧手等精选资源)
  • AUTOSAR图解==>AUTOSAR_RS_FeatureModelExchangeFormat
  • 基于esp32实现键值对存储读写c程序例程
  • C++如何使用调试器(如GDB、LLDB)进行程序调试保姆级教程(2万字长文)
  • shell命令二
  • Centos 7 ssh连接速度慢(耗时20秒+)