当前位置: 首页 > news >正文

Canal数据同步

Canal 是阿里巴巴开源的一个用于数据库增量订阅和消费的中间件,常用于数据同步和实时数据处理。

数据同步要求两个库里面有相同的数据库名、和数据库结构

Canal环境搭建

canal的原理是基于mysql binlog技术,所以这里一定需要开启mysql的binlog写入功能

开启mysql服务: service mysql start (或者 systemctl start mysqld.service

检查binlog功能是否有开启

show variables like 'log_bin';

如果显示状态为OFF表示该功能未开启,开启

binlog功能

  • 修改mysql的配置文件my.cnf
vi /etc/my.cnf
追加内容:
log-bin=mysql-bin   #binlog文件名
binlog_format=ROW   #选择row模式
service_id=1        #mysql实例id,不能和canal的slaveId重复
  • 重启 mysql
service mysql restart
  • 登录 mysql 客户端,查看 log_bin 变量
show variables like 'log_bin

在mysql里面添加以下的相关用户和权限

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
GRANT ALL PRIVILEGES ON *.* TO 'root'@'%'IDENTIFIED BY '00000' WITH GRANT OPTION
FLUSH PRIVILEGES;

下载安装Canal服务

下载地址:

https://github.com/alibaba/canal/releases

下载之后,放到目录中,解压文件

cd /usr/local/canal
canal.deployer-1.1.4.tar.gz
tar zxvf canal.deployer-1.1.4.tar.gz

修改配置文件

vi conf/example/instance.properties

当前linux系统的数据库ip

#需要改成自己的数据库信息
canal.instance.master.address= 。。:3306#需要改成自己的数据库有权限的用户名与密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal#需要改成同步的数据库表规则,例如只是同步一下表
canal.instance.filter.regex=.*\\..*                 #所有有库所有表都同步
#canal.instance.filter.regex=db.table

进入bin目录下启动

sh bin/startup.sh

引入依赖

  <dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId></dependency>

创建application.properties配置文件

# 服务端口号
server.port=10000# 服务名
spring.application.name=canal-client# 环境设置:dev,test,prod
spring.profiles.active=dev# mysql数据库连接
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/db?serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=12345678

编写canal客户端类


@Component
public class CanalClient {//sql队列//下面判断拼接后的sql会加入这个队列private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();@Resourceprivate DataSource dataSource;/*** canal入库方法*///指定要监听的ip地址的canal端口号,默认开启为11111public void run() {CanalConnector connector = CanalConnectors.newSingleConnector(newInetSocketAddress("192.168.1.1",11111), "example", "", ""); //服务器的 IP 地址和端口,以及需要订阅的 instance 名称int batchSize = 1000;//每次拉取更新数据的批大小为 1000 条。try {connector.connect();connector.subscribe(".*\\..*");connector.rollback(); //rollback 方法用于在新的一轮拉取之前,清除之前的状态。try {while (true) {//尝试从master那边拉去数据batchSize条记录,有多少取多少//监控上面ip的数据库是否变化Message message = connector.getWithoutAck(batchSize);//拉取更新数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {//没变化就睡Thread.sleep(1000);} else {//有变化就同步dataHandle(message.getEntries());}connector.ack(batchId);//当队列里面堆积的sql大于一定数值的时候就模拟执行//队列里如果有sql语句就执行if (SQL_QUEUE.size() >= 1) {executeQueueSql();}}} catch (InterruptedException e) {e.printStackTrace();} catch (InvalidProtocolBufferException e) {e.printStackTrace();}} finally {connector.disconnect();}}/*** 模拟执行队列里面的sql语句*/public void executeQueueSql() {int size = SQL_QUEUE.size();for (int i = 0; i < size; i++) {String sql = SQL_QUEUE.poll();System.out.println("[sql]----> " + sql);this.execute(sql.toString());}}/*** 数据处理** @param entrys*/private void dataHandle(List<Entry> entrys) throwsInvalidProtocolBufferException {for (Entry entry : entrys) {if (EntryType.ROWDATA == entry.getEntryType()) {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());EventType eventType = rowChange.getEventType();//判断当前是什么操作:删除、更新、插入if (eventType == EventType.DELETE) {saveDeleteSql(entry);} else if (eventType == EventType.UPDATE) {saveUpdateSql(entry);} else if (eventType == EventType.INSERT) {saveInsertSql(entry);}}}}//保存更新语句private void saveUpdateSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<RowData> rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List<Column> newColumnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("update " +entry.getHeader().getTableName() + " set ");for (int i = 0; i < newColumnList.size(); i++) {sql.append(" " + newColumnList.get(i).getName()+ " = '" + newColumnList.get(i).getValue() + "'");if (i != newColumnList.size() - 1) {sql.append(",");}}sql.append(" where ");List<Column> oldColumnList = rowData.getBeforeColumnsList();for (Column column : oldColumnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName() + "=" + column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}//保存删除语句private void saveDeleteSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<RowData> rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List<Column> columnList = rowData.getBeforeColumnsList();StringBuffer sql = new StringBuffer("delete from " +entry.getHeader().getTableName() + " where ");for (Column column : columnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName() + "=" + column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}//保存插入语句private void saveInsertSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<RowData> rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List<Column> columnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("insert into " +entry.getHeader().getTableName() + " (");for (int i = 0; i < columnList.size(); i++) {sql.append(columnList.get(i).getName());if (i != columnList.size() - 1) {sql.append(",");}}sql.append(") VALUES (");for (int i = 0; i < columnList.size(); i++) {sql.append("'" + columnList.get(i).getValue() + "'");if (i != columnList.size() - 1) {sql.append(",");}}sql.append(")");SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}//入库//与数据库连接,并执行队列里面取出的sql语句public void execute(String sql) {Connection con = null;try {if (null == sql) return;con = dataSource.getConnection();QueryRunner qr = new QueryRunner();int row = qr.execute(con, sql);System.out.println("update: " + row);} catch (SQLException e) {e.printStackTrace();} finally {DbUtils.closeQuietly(con);}}}

创建启动类

@SpringBootApplication
public class CanalApplication implements CommandLineRunner {@Autowiredprivate CanalClient canalClient;public static void main(String[] args) {SpringApplication.run(CanalApplication.class,args);}//只要程序在执行状态,他就会一直执行里面的方法@Overridepublic void run(String... args) throws Exception {//项目启动,执行canal客户端监听canalClient.run();}
}

启动linux的canal、启动java代码

如果是云服务器,记得给11111端口放行


http://www.mrgr.cn/news/55535.html

相关文章:

  • RHCSA复习题
  • 基于Multisim8路彩灯循环控制电路设计与仿真
  • Java抽象类
  • Maxwell 的容错和恢复机制 详解
  • 华为配置 之 Console线路配置
  • MySQL同步到ES的方案选型
  • 变换器交流模型建模方法
  • CCF-BDCI大数据与计算智能大赛TOP4-京东生鲜
  • 同济子豪兄--随机游走的艺术-图嵌入表示学习【斯坦福CS224W图机器学习】
  • 梦熊十三连测题解
  • 英语语法学习框架(考研)
  • STM32启动文件浅析
  • JavaScript 中的防抖和节流(简易版)
  • VSCode编译器改为中文
  • C语言中的内存函数
  • 没有网络限制!超简单本地部署 Llama3 的方法
  • MySQL-30.索引-介绍
  • Rat工具:vshell 4.9.3简单使用介绍
  • C++实现循环队列和链式队列操作(实验5--作业)
  • J1:ResNet-50算法实战与解析(鸟类识别)
  • webpack 老项目升级记录:node-sass 规定的 node v8 提升至支持 node v22
  • Selenium自动化测试全攻略:从入门到精通
  • Anchor DETR论文笔记
  • Telink 2.4G proprietary protocol 泰凌2.4G私有协议
  • Windows下安装并使用 NVM(Node Version Manager)
  • 材料研究与应用