Flink CDC实时同步mysql数据
官方参考资料:
https://nightlies.apache.org/flink/flink-cdc-docs-master/zh/docs/connectors/flink-sources/mysql-cdc/
Apache Flink 的 Change Data Capture (CDC) 是一种用于捕获数据库变化(如插入、更新和删除操作)的技术。Flink CDC Connector 允许你使用 Flink 从 MySQL 等数据库中读取变化数据,并处理这些流式数据。以下是如何在 Flink 中配置和使用 CDC Connector 读取 MySQL 数据的步骤:
前提条件
- MySQL 数据库:确保你已经有一个 MySQL 数据库,并且知道数据库的连接信息(如主机名、端口、用户名、密码、数据库名)。
- Flink 环境:你需要在本地或集群上配置好 Flink 环境。
- MySQL Binlog:确保 MySQL 数据库启用了 Binlog(Binary Logging),因为 Flink CDC 依赖于 Binlog 来捕获数据变化。
支持的数据库
依赖
Maven dependency
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<!-- 请使用已发布的版本依赖,snapshot 版本的依赖需要本地自行编译。 -->
<version>3.3-SNAPSHOT</version>
</dependency>
SQL Client JAR
下载链接仅在已发布版本可用,请在文档网站左下角选择浏览已发布的版本。
下载 flink-sql-connector-mysql-cdc-3.3-SNAPSHOT.jar 到 <FLINK_HOME>/lib/ 目录下。
由于 MySQL Connector 采用的 GPLv2 协议与 Flink CDC 项目不兼容,我们无法在 jar 包中提供 MySQL 连接器。 您可能需要手动配置以下依赖:
配置 MySQL Binlog
修改MySQL 配置文件(my.cnf或my.ini):
[mysqld]
server-id = 1
log-bin = mysql-bin
binlog-format = ROW
配置 MySQL 服务器
先创建一个 MySQL 用户,并授权。
- 创建 MySQL 用户:
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
- 向用户授予所需的权限:
mysql> GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
注意: 在 scan.incremental.snapshot.enabled 参数已启用时(默认情况下已启用)时,不再需要授予 reload 权限。
- 刷新用户权限:
mysql> FLUSH PRIVILEGES;
创建 MySQL CDC 表
MySQL CDC 表可以定义如下:
-- 每 3 秒做一次 checkpoint,用于测试,生产配置建议5到10分钟
Flink SQL> SET 'execution.checkpointing.interval' = '3s';
-- 在 Flink SQL中注册 MySQL 表 'orders'
Flink SQL> CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders');
-- 从订单表读取全量数据(快照)和增量数据(binlog)
Flink SQL> SELECT * FROM orders;
关于无主键表
从2.4.0 版本开始支持无主键表,使用无主键表必须设置 scan.incremental.snapshot.chunk.key-column,且只能选择非空类型的一个字段。
在使用无主键表时,需要注意以下两种情况。
- 配置 scan.incremental.snapshot.chunk.key-column 时,如果表中存在索引,请尽量使用索引中的列来加快 select 速度。
- 无主键表的处理语义由 scan.incremental.snapshot.chunk.key-column 指定的列的行为决定:
- 如果指定的列不存在更新操作,此时可以保证 Exactly once 语义。
- 如果指定的列存在更新操作,此时只能保证 At least once 语义。但可以结合下游,通过指定下游主键,结合幂等性操作来保证数据的正确性。