Canal数据同步
Canal 是阿里巴巴开源的一个用于数据库增量订阅和消费的中间件,常用于数据同步和实时数据处理。
数据同步要求两个库里面有相同的数据库名、和数据库结构
Canal环境搭建
canal的原理是基于mysql binlog技术,所以这里一定需要开启mysql的binlog写入功能
开启mysql服务
: service mysql start
(或者 systemctl start mysqld.service
)
检查binlog功能是否有开启
show variables like 'log_bin';
如果显示状态为OFF表示该功能未开启,开启
binlog功能
- 修改mysql的配置文件my.cnf
vi /etc/my.cnf
追加内容:
log-bin=mysql-bin #binlog文件名
binlog_format=ROW #选择row模式
service_id=1 #mysql实例id,不能和canal的slaveId重复
- 重启 mysql
service mysql restart
- 登录 mysql 客户端,查看 log_bin 变量
show variables like 'log_bin
在mysql里面添加以下的相关用户和权限
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
GRANT ALL PRIVILEGES ON *.* TO 'root'@'%'IDENTIFIED BY '00000' WITH GRANT OPTION
FLUSH PRIVILEGES;
下载安装Canal服务
下载地址:
https://github.com/alibaba/canal/releases
下载之后,放到目录中,解压文件
cd /usr/local/canal
canal.deployer-1.1.4.tar.gz
tar zxvf canal.deployer-1.1.4.tar.gz
修改配置文件
vi conf/example/instance.properties
当前linux系统的数据库ip
#需要改成自己的数据库信息
canal.instance.master.address= 。。:3306#需要改成自己的数据库有权限的用户名与密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal#需要改成同步的数据库表规则,例如只是同步一下表
canal.instance.filter.regex=.*\\..* #所有有库所有表都同步
#canal.instance.filter.regex=db.table
进入bin目录下启动
sh bin/startup.sh
引入依赖
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId></dependency>
创建application.properties配置文件
# 服务端口号
server.port=10000# 服务名
spring.application.name=canal-client# 环境设置:dev,test,prod
spring.profiles.active=dev# mysql数据库连接
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/db?serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=12345678
编写canal客户端类
@Component
public class CanalClient {//sql队列//下面判断拼接后的sql会加入这个队列private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();@Resourceprivate DataSource dataSource;/*** canal入库方法*///指定要监听的ip地址的canal端口号,默认开启为11111public void run() {CanalConnector connector = CanalConnectors.newSingleConnector(newInetSocketAddress("192.168.1.1",11111), "example", "", ""); //服务器的 IP 地址和端口,以及需要订阅的 instance 名称int batchSize = 1000;//每次拉取更新数据的批大小为 1000 条。try {connector.connect();connector.subscribe(".*\\..*");connector.rollback(); //rollback 方法用于在新的一轮拉取之前,清除之前的状态。try {while (true) {//尝试从master那边拉去数据batchSize条记录,有多少取多少//监控上面ip的数据库是否变化Message message = connector.getWithoutAck(batchSize);//拉取更新数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {//没变化就睡Thread.sleep(1000);} else {//有变化就同步dataHandle(message.getEntries());}connector.ack(batchId);//当队列里面堆积的sql大于一定数值的时候就模拟执行//队列里如果有sql语句就执行if (SQL_QUEUE.size() >= 1) {executeQueueSql();}}} catch (InterruptedException e) {e.printStackTrace();} catch (InvalidProtocolBufferException e) {e.printStackTrace();}} finally {connector.disconnect();}}/*** 模拟执行队列里面的sql语句*/public void executeQueueSql() {int size = SQL_QUEUE.size();for (int i = 0; i < size; i++) {String sql = SQL_QUEUE.poll();System.out.println("[sql]----> " + sql);this.execute(sql.toString());}}/*** 数据处理** @param entrys*/private void dataHandle(List<Entry> entrys) throwsInvalidProtocolBufferException {for (Entry entry : entrys) {if (EntryType.ROWDATA == entry.getEntryType()) {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());EventType eventType = rowChange.getEventType();//判断当前是什么操作:删除、更新、插入if (eventType == EventType.DELETE) {saveDeleteSql(entry);} else if (eventType == EventType.UPDATE) {saveUpdateSql(entry);} else if (eventType == EventType.INSERT) {saveInsertSql(entry);}}}}//保存更新语句private void saveUpdateSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<RowData> rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List<Column> newColumnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("update " +entry.getHeader().getTableName() + " set ");for (int i = 0; i < newColumnList.size(); i++) {sql.append(" " + newColumnList.get(i).getName()+ " = '" + newColumnList.get(i).getValue() + "'");if (i != newColumnList.size() - 1) {sql.append(",");}}sql.append(" where ");List<Column> oldColumnList = rowData.getBeforeColumnsList();for (Column column : oldColumnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName() + "=" + column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}//保存删除语句private void saveDeleteSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<RowData> rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List<Column> columnList = rowData.getBeforeColumnsList();StringBuffer sql = new StringBuffer("delete from " +entry.getHeader().getTableName() + " where ");for (Column column : columnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName() + "=" + column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}//保存插入语句private void saveInsertSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<RowData> rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List<Column> columnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("insert into " +entry.getHeader().getTableName() + " (");for (int i = 0; i < columnList.size(); i++) {sql.append(columnList.get(i).getName());if (i != columnList.size() - 1) {sql.append(",");}}sql.append(") VALUES (");for (int i = 0; i < columnList.size(); i++) {sql.append("'" + columnList.get(i).getValue() + "'");if (i != columnList.size() - 1) {sql.append(",");}}sql.append(")");SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}//入库//与数据库连接,并执行队列里面取出的sql语句public void execute(String sql) {Connection con = null;try {if (null == sql) return;con = dataSource.getConnection();QueryRunner qr = new QueryRunner();int row = qr.execute(con, sql);System.out.println("update: " + row);} catch (SQLException e) {e.printStackTrace();} finally {DbUtils.closeQuietly(con);}}}
创建启动类
@SpringBootApplication
public class CanalApplication implements CommandLineRunner {@Autowiredprivate CanalClient canalClient;public static void main(String[] args) {SpringApplication.run(CanalApplication.class,args);}//只要程序在执行状态,他就会一直执行里面的方法@Overridepublic void run(String... args) throws Exception {//项目启动,执行canal客户端监听canalClient.run();}
}
启动linux的canal、启动java代码
如果是云服务器,记得给11111端口放行