当前位置: 首页 > news >正文

电商项目-数据同步解决方案(一)

一、 canal简介

        canal是阿里旗下的开源项目,其内部是基于java开发。主要作用是用于监控数据库内部数据的改变。从而获得新增数据,或者修改的数据。canal主要支持mysql数据库。

        canal是应对阿里巴巴存在杭州和美国的双机房部署,存在跨机房同步的业务需求而提出的。

        阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。

工作原理图:

工作原理:

  1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  3. canal解析binary log对象(原始为byte流)

二 、搭建环境

2.1 mysql开启binlog模式

(1)查看当前mysql是否开启binlog模式。

SHOW VARIABLES LIKE '%log_bin%'

如果log_bin的值为OFF是未开启,为ON是已开启。

(2)修改/etc/my.cnf 需要开启binlog模式。

[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1

修改完成之后,重启mysqld的服务。

(3) 进入mysql

mysql -h localhost -u root -p

(4)创建账号 用于测试使用

使用root账号创建用户并授予权限

create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
2.2 canal服务端安装与配置

(1)下载地址canal

Release v1.1.7 · alibaba/canal · GitHub

(2)下载之后 上传到linux系统中,解压缩到指定的目录/usr/local/canal

解压缩之后的目录结构:

(3)修改 exmaple下的实例配置

vi conf/example/instance.properties

修改下面的几个参数。


canal.instance.Master.address = 192.16.200.128:3306canal.instance.dbUsername= canal
canal.instance.dbPassword = canal

(3)指定读取位置

进入mysql中执行下面语句查看binlog所在位置

mysql> show master status;
 

如果file中binlog文件不为 mysql-bin.000001 可以重置mysql

mysql> reset master;

查看canal配置文件

vim /usr/local/canal/conf/example/meta.dat

找到对应的binlog信息更改一致即可

"journalName":"mysql-bin.000001","position":120,"

注意:如果不一致,可能导致以下错误

2023-06-18 16:35:20.918 [New I/O server worker #1-2] ERROR 
c.a.otter.canal.server.netty.handler.SessionHandler - something goes wrong with channel:
[id: 0x7f2e9be3, /192.168.200.56:52225 => /192.168.200.128:11111], 
exception=java.io.IOException: Connection reset by peer

(4)启动服务:

[root@localhost canal]# ./bin/startup.sh

(5)查看日志:

cat /usr/local/canal/logs/canal/canal.log

三 、数据监控微服务搭建

       

         当用户执行数据库的操作的时候,binlog 日志会被canal捕获到,并解析出数据。我们就可以将解析出来的数据进行相应的逻辑处理。

        在数据监控微服务中,基于canal完成mysql数据库的监控。当发现mysql内部有数据改变的时候会把相关数据提取出来,接着完成后续的数据同步操作。

3.1 微服务搭建

步骤一:创建模块shangcheng_canal,pom引入canla 和RabbitMQ的依赖

<dependencies><dependency><groupId>com.xpand</groupId><artifactId>starter-canal</artifactId><version>0.0.1-SNAPSHOT</version></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId></dependency></dependencies>

步骤二:创建包com.shangcheng.canal ,在包下创建启动类

@SpringBootApplication
@EnableCanalClient //声明当前的服务是canal的客户端
public class CanalApplication {
​public static void main(String[] args) {SpringApplication.run(CanalApplication.class,args);}
}

步骤三:创建配置文件application.properties

canal.client.instances.example.host=192.168.200.128
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000
spring.rabbitmq.host=192.168.200.128

步骤四:创建com.changgou.canal.listener包,在包下创建监听类

@CanalEventListener //声明当前的类是canal的监听类
public class BusinessListener {
​@Autowiredprivate RabbitTemplate rabbitTemplate;
​/**** @param eventType 当前操作数据库的类型* @param rowData 当前操作数据库的数据*/@ListenPoint(schema = "shangcheng_business",table = "tb_ad")public void adUpdate(CanalEntry.EventType eventType,CanalEntry.RowData rowData){System.out.println("广告表数据发生改变");//获取改变之前的数据rowData.getBeforeColumnsList().forEach((c)-> System.out.println("改变前的数据:"+c.getName()+"::"+c.getValue()));
​//获取改变之后的数据rowData.getAfterColumnsList().forEach((c)-> System.out.println("改变之后的数据:"+c.getName()+"::"+c.getValue()));}
}

步骤五:测试:启动数据监控微服务,修改shangcheng_business的tb_ad表,观察控制台输出。


http://www.mrgr.cn/news/81094.html

相关文章:

  • Android adb查看某个进程的总线程数
  • 【腾讯云产品最佳实践】小白萌新 ES Serverless 之初体验——好用!
  • tryhackme-Pre Security-HTTP in Detail(HTTP的详细内容)
  • nodejs:nodejs的技巧有哪些
  • 力学笃行(二)Qt 示例程序运行
  • CCF-GESP 等级考试 2023年6月认证C++一级真题解析
  • 「Mac畅玩鸿蒙与硬件46」UI互动应用篇23 - 自定义天气预报组件
  • 数据库原理学习——存储过程详解
  • AtCoder Beginner Contest 385(A~F)题解
  • 【微服务】SpringBoot 整合Redis实现延时任务处理使用详解
  • kafka理解记录
  • Java重要面试名词整理(二):SpringMyBatis
  • SMMU软件指南SMMU编程之虚拟机结构和缓存
  • List接口
  • 机器学习(Machine Learning)的安全问题
  • 以太网详解(三)FPGA以太网IP配置(Quartus平台)
  • C++的封装(十四):《设计模式》这本书
  • Kafka快速扫描
  • Redis存在安全漏洞
  • EasyPoi 使用$fe:模板语法生成Word动态行
  • [react 3种方法] 获取ant组件ref用ts如何定义?
  • 麒麟操作系统服务架构保姆级教程(三)ssh远程连接
  • en3d 部署笔记
  • 数据可视化echarts学习笔记
  • 【老白学 Java】HashSet 应用 - 卡拉 OK(五)
  • 第1章 命题逻辑