当前位置: 首页 > news >正文

【RabbitMQ】消息分发、事务

消息分发

概念

RabbitMQ队列拥有多个消费者时,队列会把收到的消息分派给不同的消费者。每条消息只会发送给订阅该队列订阅列表里的一个消费者。这种方式非常适合扩展,如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可。

默认情况下,RabbitMQ是以轮询的方法进行分发的,而不管消费者是否已经消费并且已经确认了该消息。这种方式是不大合理的。试想一下,如果某些消费者消费速度慢,而某些消费者消费速度快,就可能导致某些消费者消息积压,某些消费者空闲,进而应用整体的吞吐量下降。

在工作模式一文中,书写RPC模式的代码时,已经写了一行代码channel.basicQos(1),来限制当前信道上的消费者所能保持的最大未确认消息的数量是1。所以,我们只需要使用此方法来限制每一个消费者的消息数量就可以避免上述情况发生。

比如,消费端调用了channel.basicQos(5),RabbitMQ就会为该消费者计数,发送一条消息计数加一,消费一条消息计数减一。当到达了设定的上限之后,RabbitMQ就不会再向该消费者发送消息了,知道消费者确认了某条消息之后,才会继续发送。

当channel.basicQos(int prefetchCount)中的形参个数为0时,表示的是没有上限。

应用场景

  1. 限流
  2. 非公平分发(负载均衡)

限流

在学习消息分发之前,当消息到达队列之后,如果有对应的消费者存在,那么队列就会一股脑把所有消息全部发送过去,从而造成瞬间压力,进而可能造成服务宕机,产生严重的影响。因此我们就要进行限流,限制消费者接收消息的数量。

限流通过设置prefetchCount参数,同时也必须要设置消息应答方式为手动应答。

spring:rabbitmq:host: 43.138.108.125port: 5672username: adminpassword: adminvirtual-host: mq-springboot-testlistener:simple:acknowledge-mode: manual # 消息确认机制为手动确认prefetch: 5 # 最多拉取5条消息
@Configuration
public class QosConfig {@Bean("qosQueue")public Queue qosQueue() {return QueueBuilder.durable(Constants.QOS_QUEUE).build();}@Bean("qosExchange")public Exchange qosExchange() {return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).durable(true).build();}@Bean("qosQueueBind")public Binding qosQueueBind(@Qualifier("qosQueue") Queue queue, @Qualifier("qosExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("qos").noargs();}}
@RestController
@RequestMapping("/qos")
public class QosController {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMappingpublic void qosQueue() {for (int i = 0; i < 10; i++) {this.rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE, "qos", "hello qos " + i);System.out.println("第" + i + "次发送消息成功!");}}}
@Configuration
public class QosListener {@RabbitListener(queues = Constants.QOS_QUEUE)public void qosListener(String msg, Channel channel, Message message) throws IOException {System.out.println("接收的消息为:" + msg);// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}

 

启动程序之后,可以看到出现如上结果,明显看到,我们发送了10条信息,但是由于限流的原因,当消费者接收了5条消息之后,并且没有去应答,因此程序就不再继续接收消息,而是等待这5条消息应答之后,才会去继续接收消息。

负载均衡

在有两个消费者的情况下,一个消费者处理任务非常快,一个消费者处理任务非常慢,就会造成一个消费者会一直很忙,而另一个消费者会很闲。这是因为RabbitMQ只是在消息进入队列时进行分派消息,他不考虑消费者未确认消息的数量。我们可以使用prefetch=1的方式来进行设置,告诉RabbitMQ一次只给一个消费者一条消息。在消费者处理并确认该消息之前,都不向其发送新的消息。这样做就可以使得有消息时,所有消费者都处理忙碌的状态。

实现负载均衡功能的代码和实现限流的代码类似,只需要将配置文件中的prefetch修改为1即可。

事务

RabbitMQ也实现了事务机制,允许开发者确保消息的接收和发送是原子性的,要么全部成功,要把全部失败。

@Component
public class RabbitTemplateConfig {@Bean("transactionRabbitTemplate")public RabbitTemplate transactionRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true); // 开启事务return rabbitTemplate;}}
@Configuration
public class TransactionConfig {@Bean("transactionQueue")public Queue transactionQueue() {return QueueBuilder.durable(Constants.TRANSACTION_QUEUE).build();}@Bean("transactionExchange")public Exchange transactionExchange() {return ExchangeBuilder.directExchange(Constants.TRANSACTION_EXCHANGE).durable(true).build();}@Bean("transactionQueueBind")public Binding transactionQueueBind(@Qualifier("transactionQueue") Queue queue,@Qualifier("transactionExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("transaction").noargs();}}
@RestController
@RequestMapping("/transaction")
public class TransactionController {@Resource(name = "transactionRabbitTemplate")private RabbitTemplate rabbitTemplate;@Transactional@RequestMappingpublic void transactionQueue() {System.out.println("发送成功");this.rabbitTemplate.convertAndSend(Constants.TRANSACTION_EXCHANGE, "transaction", "hello transaction");int i = 1 / 0;this.rabbitTemplate.convertAndSend(Constants.TRANSACTION_EXCHANGE, "transaction", "hello transaction");}}

RabbitMQ和Redis中的事务相对来说,都是比较简单的,并不和MySQL,包含那么多的性质。因此,在对事务的介绍中,并没有大幅度进行介绍。 


http://www.mrgr.cn/news/33091.html

相关文章:

  • macOS 设置固定IP
  • from sklearn.preprocessing import Imputer.处理缺失数据的工具
  • Visual Studio 2022 安装
  • Word VBA如何间隔选中多个(非连续)段落
  • 【FreeRL】MAPPO的简单复现
  • MySQL中字段类型和Java对象中的数据类型对应关系
  • 【MySQL】 索引
  • JVM 调优篇9 调优案例6- cpu使用过载解决办法【超赞】
  • 京东-第2题-撞车
  • 机器学习——Stacking
  • Zotero——导出已标注好的文件的方法
  • 在Python中,类是用于定义对象的蓝图或模板,而对象则是根据类创建的具体实例
  • AWS 管理控制台
  • 中国IT产业的新机遇与挑战
  • JavaScript ---案例(统计字符出现次数)
  • 【Java】接口interface【主线学习笔记】
  • 【大模型实战篇】关于Bert的一些实操回顾以及clip-as-service的介绍
  • [Python数据拟合与可视化]:使用线性、多项式、指数和高斯模型拟合数据
  • gbase8s数据库常见的索引扫描方式
  • GAMES101(作业4~5)
  • Spring中的Web Service消费者集成(应该被淘汰的技术)
  • 类和对象(上)
  • 一些音频文件转Wav
  • BUUCTF逆向wp [WUSTCTF2020]Cr0ssfun
  • 【笔记】第三节 组织与性能
  • 计算机毕业设计 数字化农家乐管理平台的设计与实现 Java实战项目 附源码+文档+视频讲解