SpringBoot整合Canal+RabbitMQ监听数据变更
需求
在现代分布式系统中,实时获取数据库的变更信息是一个常见的需求。例如,在电商系统中,当订单表发生更新时,可能需要同步这些变更到搜索服务、缓存服务或者通知其他微服务。传统的解决方案包括定时轮询数据库或通过触发器将变更写入消息队列等方法,但这些方案要么效率低下,要么实现复杂。而使用 Canal + RabbitMQ 可以提供一种高效且可靠的方式来捕获 MySQL 数据库的变更,并将其发送到 RabbitMQ 中供其他服务消费。
Canal 是阿里巴巴开源的一个用于增量订阅和消费 MySQL 数据库 Binlog 的工具,它模拟 MySQL 主从复制机制,无需侵入业务逻辑即可捕获数据库变更。RabbitMQ 是一个流行的开源消息代理,支持多种协议并提供了丰富的特性来确保消息传递的可靠性。结合这两者,可以构建一个强大的实时数据变更监听和处理系统。
步骤
- 环境搭建
- 整合SpringBoot与Canal实现客户端
- Canal整合RabbitMQ
- SpringBoot整合RabbitMQ
环境搭建
1. 安装MySQL
确保你有一个正在运行的 MySQL 实例,并启用了 binlog 日志记录功能。这是 Canal 捕获数据库变更的基础。
# 修改 MySQL 配置文件 my.cnf 或 my.ini
[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=ROW
重启 MySQL 服务使配置生效。
2. 安装Canal Server
下载最新版本的 Canal Server 并解压到合适的位置。根据官方文档进行必要的配置,特别是 instance.properties
文件中的数据库连接信息。
3. 安装RabbitMQ
可以通过 Docker 快速安装 RabbitMQ:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
访问 http://localhost:15672 登录管理界面,默认用户名/密码为 guest/guest。
整合SpringBoot与Canal实现客户端
创建SpringBoot项目
使用 Spring Initializr 创建一个新的 Spring Boot 项目,添加 Web, JPA, 和 AMQP(用于后续整合 RabbitMQ)依赖。
引入Canal依赖
在 pom.xml
中添加 Canal Client 的依赖:
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.5</version>
</dependency>
编写Canal客户端代码
创建一个 Canal 客户端类,用来监听 MySQL 数据库的变化,并将变更事件转发给 RabbitMQ。
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.rabbitmq.client.Channel;public class CanalClient {private final CanalConnector connector;private final Channel channel;public CanalClient(CanalConnector connector, Channel channel) {this.connector = connector;this.channel = channel;}public void start() {// Canal 连接配置connector.connect();connector.subscribe(".*\\..*"); // 订阅所有数据库和表connector