SpringBoot集成MQ,四种交换机的实例
RabbitMQ交换机(Exchange)的核心作用
在RabbitMQ中,交换机 是消息路由的核心组件,负责接收生产者发送的消息,并根据规则(如路由键、头信息等)将消息分发到对应的队列中。
不同交换机类型决定了消息的路由逻辑,使用不同的交换机在不同的场景下可以提高消息系统的高可用性。
1. 直连交换机(Direct Exchange)
路由机制
- 精确匹配路由键(Routing Key):消息会被发送到与
Routing Key
完全匹配 的队列。 - 典型场景:一对一或一对多的精确消息分发。
应用场景
- 任务分发:如订单处理系统,根据订单类型(如
order.payment
、order.shipping
)分发到不同队列。 - 日志分类:将不同级别的日志(
log.error
、log.info
)路由到对应的处理服务。
使用直连交换机实现消息发送和接收
1.创建一个SpringBoot项目,在yml文件配置如下:
server:port: 8021
spring: application:name: rabbitmq-provider#配置rabbitMq 服务器rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest
2.初始化队列和交换机,并进行绑定
package com.atguigu.demomq;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 功能:* 作者:程序员ZXY* 日期:2025/3/8 下午1:55*/
@Configuration
public class DirectRabbitConfig {@Beanpublic Queue TestDirectQueue(){return new Queue("TestDirectQueue",true);}@BeanDirectExchange TestDirectExchange(){return new DirectExchange("TestDirectExchange",true,false);}@BeanBinding bindingDirect(){return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");}
}
3.实现sendDirectMessage发送消息请求,由生产者发送到MQ,TestDirectRouting作为Key,用于精确转发。
package com.atguigu.demomq;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;/*** 功能:* 作者:程序员ZXY* 日期:2025/3/8 下午2:12*/
@RestController
public class SendMessageController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendDirectMessage")public String sendDirectMessage() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "Hello MQ!";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String,Object> map=new HashMap<>();map.put("messageId",messageId);map.put("messageData",messageData);map.put("createTime",createTime);//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchangerabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);return "OK";}
}
4.此时就可以启动项目发送消息了,使用PostMan发送消息,返回OK说明发送成功
5.进入http://localhost:15672/,可以看到消息发送成功,我这里是请求了两次(也就是发了两条消息)。
6.接下来写消费者的消费过程,新创建一个SpringBoot项目,在yml文件配置如下
server:port: 8022
spring:application:name: rabbitmq-provider#配置rabbitMq 服务器rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest
7.消费者配置类,同样TestDirectRouting用于唯一识别Key
package com.atguigu.demomq2;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 功能:* 作者:程序员ZXY* 日期:2025/3/8 下午 */
@Configuration
public class DirectRabbitConfig {@Beanpublic Queue TestDirectQueue() {return new Queue("TestDirectQueue",true);}@BeanDirectExchange TestDirectExchange() {return new DirectExchange("TestDirectExchange");}@BeanBinding bindingDirect() {return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");}
}
8.消费者 接收消息@RabbitListener(queues = "TestDirectQueue")用于监听指定队列发送的消息
package com.atguigu.demomq2;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
@RabbitListener(queues = "TestDirectQueue")
public class DirectReceiver {@RabbitHandlerpublic void process(Map testMessage) {System.out.println("DirectReceiver消费者收到消息 : " + testMessage.toString());}}
9.启动消费者,成功接收消息
10.查看MQ控制台,消息成功被消费
2. 扇出交换机(Fanout Exchange)
路由机制(一个交换机转发到多个队列)
- 广播模式:忽略
Routing Key
,将消息发送到所有绑定的队列。 - 典型场景:消息的全局通知或并行处理。
应用场景
- 实时通知系统:如用户注册成功后,同时发送邮件、短信、更新缓存。
- 日志广播:多个服务订阅同一日志源,各自独立处理。
使用扇出交换机实现消息发送和接收
1.扇出交换机配置
package com.atguigu.demomq;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutExchangeConfig {// 定义扇出交换机@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanout.user.register", true, false);}// 定义邮件队列@Beanpublic Queue emailQueue() {return new Queue("fanout.user.email", true);}// 定义短信队列@Beanpublic Queue smsQueue() {return new Queue("fanout.user.sms", true);}// 绑定所有队列到扇出交换机(无需路由键)@Beanpublic Binding emailBinding() {return BindingBuilder.bind(emailQueue()).to(fanoutExchange());}@Beanpublic Binding smsBinding() {return BindingBuilder.bind(smsQueue()).to(fanoutExchange());}
}
2.生产者
package com.atguigu.demomq;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class FanoutUserService {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendFanoutMessage")public String sendRegisterBroadcast() {rabbitTemplate.convertAndSend("fanout.user.register", "", // 扇出交换机忽略路由键"message MQ");return "OK Fan";}
}
3.消费者
package com.atguigu.demomq2;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class FanoutNotificationConsumer {@RabbitListener(queues = "fanout.user.email")public void handleEmail(String message) {System.out.println("[Email] Received: " + message);}@RabbitListener(queues = "fanout.user.sms")public void handleSms(String message) {System.out.println("[SMS] Received: " + message);}
}
4.请求并查看消费结果
可以看到一个交换机完成消费两条消息
3. 主题交换机(Topic Exchange)
路由机制
- 模式匹配路由键:使用
*
(匹配一个单词)和#
(匹配多个单词)通配符。 - 典型场景:灵活的多条件消息路由。
应用场景
- 新闻订阅系统:用户订阅特定主题(如
news.sports.*
、news.tech.#
)。 - 设备状态监控:根据设备类型和区域路由消息(如
sensor.temperature.room1
)。
1.配置主题交换机
package com.atguigu.demomq;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class TopicExchangeConfig {// 定义主题交换机@Beanpublic TopicExchange topicExchange() {return new TopicExchange("topic.news", true, false);}// 定义体育新闻队列@Beanpublic Queue sportsQueue() {return new Queue("topic.news.sports", true);}// 定义科技新闻队列@Beanpublic Queue techQueue() {return new Queue("topic.news.tech", true);}// 绑定体育队列:匹配 news.sports.*@Beanpublic Binding sportsBinding() {return BindingBuilder.bind(sportsQueue()).to(topicExchange()).with("news.sports.*");}// 绑定科技队列:匹配 news.tech.#@Beanpublic Binding techBinding() {return BindingBuilder.bind(techQueue()).to(topicExchange()).with("news.tech.#");}
}
2.生产者
package com.atguigu.demomq;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class TopicNewsService {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendTopicMessage1")public String sendSportsNews() {rabbitTemplate.convertAndSend("topic.news", "news.sports.football","* message:news.sports.football");return "*OK";}@GetMapping("/sendTopicMessage2")public String sendTechNews() {rabbitTemplate.convertAndSend("topic.news", "news.tech.ai.abc.123456","# message:news.tech.ai.abc.123456");return "#OK";}
}
3. 消费者
package com.atguigu.demomq2;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class TopicNewsConsumer {@RabbitListener(queues = "topic.news.sports")public void handleSports(String message) {System.out.println("[Sports] Received: " + message);}@RabbitListener(queues = "topic.news.tech")public void handleTech(String message) {System.out.println("[Tech] Received: " + message);}
}
4.发送请求
可以看到消息成功消费,第一个为*通配符,第二个为#通配符
4. 头交换机(Headers Exchange)
路由机制( 我的理解是一种基于 多条件组合 的消息路由机制)
- 基于消息头(Headers)匹配:忽略
Routing Key
,通过键值对(Headers)匹配队列绑定的条件。 - 匹配规则:
x-match
参数设为all
(需全部匹配)或any
(匹配任意一个)。
应用场景
- 复杂路由逻辑:如根据消息的版本号、语言等元数据路由。
- 多维度过滤:如同时匹配用户类型(
user_type: vip
)和地理位置(region: asia
)。
1.头交换机配置
package com.atguigu.demomq;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class HeadersExchangeConfig {// 定义头交换机@Beanpublic HeadersExchange headersExchange() {return new HeadersExchange("headers.user", true, false);}// 定义VIP用户队列@Beanpublic Queue vipQueue() {return new Queue("headers.user.vip", true);}// 绑定VIP队列,要求同时匹配 userType=vip 和 region=asia@Beanpublic Binding vipBinding() {Map<String, Object> headers = new HashMap<>();headers.put("userType", "vip");headers.put("region", "asia");return BindingBuilder.bind(vipQueue()).to(headersExchange()).whereAll(headers).match(); // whereAll 表示需全部匹配}
}
2.生产者
package com.atguigu.demomq;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class HeaderUserVipService {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendHeaderMessage")public String sendVipMessage() {MessageProperties props = new MessageProperties();props.setHeader("userType", "vip");props.setHeader("region", "asia");Message msg = new Message("HeaderMessage".getBytes(), props);rabbitTemplate.send("headers.user", "", msg);return "OK";}
}
3.消费者
package com.atguigu.demomq2;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class HeaderUserVipConsumer {@RabbitListener(queues = "headers.user.vip")public void handleVip(Message message) {String body = new String(message.getBody());System.out.println("[VIP] Received: " + body);}
}
4.PostMan测试
这里仅消费交换机初始化时满足所有设定条件的消息,我们可以测试一下不满足条件时发送消息
消费者不消费消息
总结
需要代码自己进行测试的 可以Git自取
git@gitee.com:myselfzxy/mq-producer.git
git@gitee.com:myselfzxy/mq-customer.git