当前位置: 首页 > news >正文

Flink CDC(SQL Client)连接 MySQL 数据库教程

Flink CDC(SQL Client)连接 MySQL 数据库教程

这篇文章将指导如何使用 Flink CDC 连接到 MySQL 数据库,并捕获数据变更。我们将逐步完成以下操作:

1. 检查 Binlog 是否启用

首先,您需要确保 MySQL 的 Binlog 功能已经启用,因为 Flink CDC 依赖于 Binlog 来捕获数据变更。

-- 检查 Binlog 是否启用
SHOW VARIABLES LIKE 'log_bin';

如果 log_bin 返回的不是 ON 或者一个文件名,那么 Binlog 没有启用。

2. 检查 Binlog 格式

-- 检查 Binlog 格式
SHOW VARIABLES LIKE 'binlog_format';

确保 binlog_formatROW。如果不是,您需要修改配置以启用 Binlog 并设置正确的格式。

3. 开启 Binlog 并配置相关参数

如果 log_bin 的值为 OFF,这意味着 MySQL 的二进制日志(Binlog)功能没有开启。以下是开启 Binlog 并配置相关参数的步骤:

3.1 编辑 MySQL 配置文件

找到 MySQL 的配置文件 /etc/my.cn/

[mysqld] 部分添加或修改以下配置:

[mysqld]
log_bin = mysql-bin
binlog_format = ROW
server_id = 1
  • log_bin 设置 Binlog 的日志文件名前缀。
  • binlog_format 设置为 ROW,这是 Flink CDC 所需的格式。
  • server_id 设置为一个唯一的整数,用于标识 MySQL 服务器。

3.2 重启 MySQL 服务

保存配置文件后,重启 MySQL 服务以使更改生效。重启命令取决于您的操作系统:

  • 对于 Linux/Unix:

    sudo systemctl restart mysql
    

    或者

    sudo service mysql restart
    

4. 创建 CDC 用户

创建一个具有适当权限的 MySQL 用户,以便 Flink CDC 可以连接到 MySQL 数据库并监控数据变化:

CREATE USER 'flinkcdc'@'%' IDENTIFIED BY 'FlinkCDC_123456';
GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkcdc' IDENTIFIED BY 'FlinkCDC_123456';
FLUSH PRIVILEGES;

5. 创建 MySQL 表和插入示例数据

以下是一个名为 users 的表,包含 id(主键)、name(姓名)和 age(年龄)三个字段的创建语句:

-- 创建数据库(如果不存在)
CREATE DATABASE IF NOT EXISTS cdc;
USE cdc;-- 创建表
CREATE TABLE users (id INT AUTO_INCREMENT PRIMARY KEY,name VARCHAR(50) NOT NULL,age INT NOT NULL
) ENGINE=InnoDB;-- 插入示例数据
INSERT INTO users (name, age) VALUES ('Alice', 30);
INSERT INTO users (name, age) VALUES ('Bob', 25);
INSERT INTO users (name, age) VALUES ('Charlie', 35);
INSERT INTO users (name, age) VALUES ('David', 40);
INSERT INTO users (name, age) VALUES ('Eve', 22);

6. 下载 Flink CDC JAR 包

下载 Flink CDC JAR 包并放到 Flink 安装目录下的 lib 目录中。您可以从 Maven 中央仓库下载 flink-sql-connector-mysql-cdc 2.3.0 版本:

Flink SQL Connector for MySQL CDC

请注意,官网提示下载的那个包缺少一些依赖在执行时会报错,需要下载 20M 以上的 JAR 包。

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: com.ververica.cdc.debezium.utils.ResolvedSchemaUtils

7. 启动 Flink 集群

需要先启动 Flink 集群,否则后面会提示无法连接:[ERROR] Could not execute SQL statement. Reason:java.net.ConnectException: 拒绝连接

start-cluster.sh

8. 在 Flink SQL Client 中执行

打开 sql-client.sh 执行以下命令:

CREATE TABLE mysql_source (id INT NOT NULL,name STRING,age INT,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = '192.168.56.152','port' = '3306','username' = 'flinkcdc','password' = 'FlinkCDC_123456','database-name' = 'cdc','table-name' = 'users'
);SELECT * FROM mysql_source;

通过以上步骤,您应该能够成功使用 Flink CDC 连接到 MySQL 数据库并捕获数据变更。如果在执行过程中遇到任何问题,请检查配置和网络设置,确保所有服务正常运行。


http://www.mrgr.cn/news/71943.html

相关文章:

  • idea 删除本地分支后,弹窗 delete tracked brank
  • C#实现在windows上实现指定句柄窗口的指定窗口坐标点击鼠标左键和右键的详细情况
  • SQL的基本CRUD操作
  • 量化交易系统开发-实时行情自动化交易-3.4.2.2.Okex交易数据
  • 17RAL_Visual-Inertial Monocular SLAM with Map Reuse
  • 客户手机号收集小程序有什么用
  • 数据结构中的抽象数据类型、逻辑结构、存储结构等到底是什么?
  • Linux学习笔记之shell快速入门及相关变量
  • PYNQ 框架 - 中断(INTR)驱动
  • 阿里巴巴通义灵码推出Lingma SWE-GPT:开源模型的性能新标杆
  • 音视频入门基础:MPEG2-TS专题(4)——使用工具分析MPEG2-TS传输流
  • JavaScript案例-轮播图
  • LeetCode【0019】删除链表的倒数第N个结点
  • 论文3—《基于YOLOv5s的农田垃圾轻量化检测方法》文献阅读分析报告
  • 我是如何一步步学习深度学习模型PyThorch
  • 信息收集系列(二):ASN分析及域名收集
  • LLM - 使用 LLaMA-Factory 微调大模型 Qwen2-VL SFT(LoRA) 图像数据集 教程 (2)
  • Python 正则表达式使用指南
  • WSL与Ubuntu系统--使用Linux
  • 渗透测试---网络基础之HTTP协议与内外网划分
  • 实战指南:理解 ThreadLocal 原理并用于Java 多线程上下文管理
  • Ngxin隐藏服务名称和版本号(源码部署和Docker部署)
  • 【最少刷题数——二分】
  • Java Review - 线程池原理源码解析
  • Ubuntu linux 命令总结
  • 如何理解DDoS安全防护在企业安全防护中的作用