使用RabbitMQ
一、MQ是什么
MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信,主要功能业务解耦。
二、市面上常见的MQ产品
RabbitMQ、RocketMQ(阿里的)、Kafka 、 ActiveMQ(很少用了)
三、为什么要用MQ
3.1、异步处理
场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种 1.串行的方式 2.并行的方式
串行化方式:将用户信息注册到数据库以后再发送邮件然后再发送短信,这三步完成以后才返回给客户端。但是邮件短信这种东西不是必须立马发送给用户的,这样就会导致我们使用串行化会很慢
并行化方式:就是用户信息注册到数据库以后,发送邮件的同时发送信息,虽然比串行化快一点,但是依旧是要等待发送完邮箱和短信才能返回给客户端,依旧不够快
使用消息队列:
使用消息队列,用户只管发送请求,而写入数据库到写入消息队列这段时间交给MQ来处理,起两个消费者关注消息队列去消费消息;这样我们发现用户的响应时间就等于写入数据库的时间+写入消息队列的时间(可以忽略不计),而消费消息队列是异步处理的
3.2、应用解耦
场景说明:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口.
订单系统只负责写入消息,库存系统只负责消费消息,大大提高了效率并且也将我们的订单模块和库存模块解耦了,哪怕库存系统宕机了也不会影响到订单系统下单,只需要修复库存系统重新去消费消息队列的消息即可;
3.3、流量削峰
场景说明: 秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
1.可以控制活动人数,超过此一定阀值的订单直接丢弃.
2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)
四、交换机类型
1、Fanout Exchange(广播交换机):
扇型(广播)交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
2、Direct Exchange(直连交换机):
直连型交换机,根据RoutingKey(路由键)路由到不同的队列
3、Topic Exchange (主题交换机):
主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。(开始计算)
简单地介绍下规则:
* (星号) 用来表示一个单词 (必须出现的)
# (井号) 用来表示任意数量(零个或多个)单词
通配的绑定键是跟队列进行绑定的,举个小例子
队列Q1 绑定键为 *.TT.* 队列Q2绑定键为 TT.#
如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到;
如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到;
当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。
当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
如果只有 # ,它就实现了扇形交换机的功能。
所以主题交换机也就实现了扇形交换机的功能,和直连交换机的功能
五、springboot整合RabbitMQ
1、使用前先引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency>
2、配置RabbitMQ连接
在application.properties文件里配置
3、使用直连交换机(其他两个使用方式一样,无非是路由键是否使用以及路由键的配置规则*,#等)
消费者:
@Configuration
public class DirectConsumer {//注册一个队列@Bean //启动多次为什么不报错?启动的时候,它会根据这个名称Direct_Q01先去查找有没有这个队列,如果有什么都不做,如果没有创建一个新的public Queue queue(){return QueueBuilder.durable("Direct_Q01").maxLength(100).build();}//注册交换机@Beanpublic DirectExchange exchange(){//1.启动的时候,它会根据这个名称Direct_E01先去查找有没有这个交换机,如果有什么都不做,如果没有创建一个新的return ExchangeBuilder.directExchange("Direct_E01").build();}//绑定交换机与队列关系@Beanpublic Binding binding(Queue queue,DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("RK01");}//启动一个消费者@RabbitListener(queues = "Direct_Q01")public void receiveMessage(String msg){System.out.println("收到消息:"+msg);}}
注意,如果只设置了一个队列或者交换机,那么在rabbitmq的网页中是看不到的,因为需要有绑定关系,只有建立的绑定关系才能够看到相应的队列和交换机
生产者:
@Service
public class DirectProvider {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(Object message) {rabbitTemplate.convertAndSend("Direct_E01", "RK01", message);}
}
当然我们在实际开发中不可能只传递字符串,也有可能传递对象,这时候需要定义一个消息转换器
package com.by.config;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMqConfig {@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}
这个Bean 是一个 MessageConverter
类型的对象,这个Bean 将被自动注入到任何需要 MessageConverter
的地方,以确保所有消息处理都使用这个转换器;他的作用定义了一个消息转换器
messageConverter
Bean 的作用:消息序列化、消息反序列化、确保一致性(所有的消息都遵循统一的json格式)
六、交换机、队列、消费者之间的关系
一个交换机对应多个队列,每个队列对应一个消费者的时候:
一个队列对应多个消费者的时候:
七、死信交换机
什么是死信
死信就是消息在特定场景下的一种表现形式,这些场景包括:
1. 消息被拒绝访问,即消费者返回 basicNack 的信号时 或者拒绝basicReject
2. 消费者发生异常,超过重试次数 。 (其实spring框架调用的就是 basicNack)
3. 消息的Expiration 过期时长或队列TTL过期时间。.ttl(20*1000) 进入的是 先进业务队列的数据,超时之后送给死信交换机
4. 消息队列达到最大容量 .maxLength(5)
什么是死信队列
存储死信消息的队列
当有消息变成死信了,那么这个消息就会重新被死信交换机路由到指定的死信队列中去,我们可以通过对这个死信队列进行监听,从而手动的去对这一消息进行补偿。 人工干预
死信队列的使用
他的使用方式和正常的队列一样,需要注意的是需要把创建的死信队列和死信交换机绑定到正常的业务队列上
@Slf4j
@Configuration
public class DeadConsumer {@Bean//死信队列public Queue deadQueue() {return QueueBuilder.durable("dead-q").build();}@Beanpublic Exchange deadExchange() {return ExchangeBuilder.fanoutExchange("dead-e").autoDelete().build();}@Beanpublic Binding deadBinding() {return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("").noargs();}@Beanpublic Queue fanoutQueue() {return QueueBuilder.durable("fanout-q").maxLength(10)//队列容积最大为10,超过的消息将发送到死信交换机.deadLetterExchange("dead-e").ttl(5000)//五秒没消费直接发送到死信交换机.build();}@Beanpublic Exchange fanoutExchange() {return ExchangeBuilder.fanoutExchange("fanout-e").durable(true).build();}@Beanpublic Binding fanoutBinding() {return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange()).with("").noargs();}@RabbitListener(queues = "fanout-q")public void consume(Ordering ordering){log.debug("消费者->{}", JSONUtil.toJsonStr(ordering));
//消费者发生异常,不消费消息直接进入死信队列int i = 5/0;}}